views.py 7.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182
  1. # Copyright: (c) OpenSpug Organization. https://github.com/openspug/spug
  2. # Copyright: (c) <spug.dev@gmail.com>
  3. # Released under the AGPL-3.0 License.
  4. from django.views.generic import View
  5. from django_redis import get_redis_connection
  6. from apscheduler.schedulers.background import BackgroundScheduler
  7. from apscheduler.triggers.cron import CronTrigger
  8. from apps.schedule.scheduler import Scheduler
  9. from apps.schedule.models import Task, History
  10. from apps.schedule.executors import local_executor, host_executor
  11. from apps.host.models import Host
  12. from django.conf import settings
  13. from libs import json_response, JsonParser, Argument, human_datetime
  14. import json
  15. class Schedule(View):
  16. def get(self, request):
  17. tasks = Task.objects.all()
  18. types = [x['type'] for x in tasks.order_by('type').values('type').distinct()]
  19. return json_response({'types': types, 'tasks': [x.to_dict() for x in tasks]})
  20. def post(self, request):
  21. form, error = JsonParser(
  22. Argument('id', type=int, required=False),
  23. Argument('type', help='请输入任务类型'),
  24. Argument('name', help='请输入任务名称'),
  25. Argument('command', help='请输入任务内容'),
  26. Argument('rst_notify', type=dict, help='请选择执行失败通知方式'),
  27. Argument('targets', type=list, filter=lambda x: len(x), help='请选择执行对象'),
  28. Argument('trigger', filter=lambda x: x in dict(Task.TRIGGERS), help='请选择触发器类型'),
  29. Argument('trigger_args', help='请输入触发器参数'),
  30. Argument('desc', required=False),
  31. ).parse(request.body)
  32. if error is None:
  33. form.targets = json.dumps(form.targets)
  34. form.rst_notify = json.dumps(form.rst_notify)
  35. if form.trigger == 'cron':
  36. args = json.loads(form.trigger_args)['rule'].split()
  37. if len(args) != 5:
  38. return json_response(error='无效的执行规则,请更正后再试')
  39. minute, hour, day, month, week = args
  40. week = '0' if week == '7' else week
  41. try:
  42. CronTrigger(minute=minute, hour=hour, day=day, month=month, day_of_week=week)
  43. except ValueError:
  44. return json_response(error='无效的执行规则,请更正后再试')
  45. if form.id:
  46. Task.objects.filter(pk=form.id).update(
  47. updated_at=human_datetime(),
  48. updated_by=request.user,
  49. **form
  50. )
  51. task = Task.objects.filter(pk=form.id).first()
  52. if task and task.is_active:
  53. form.action = 'modify'
  54. form.targets = json.loads(form.targets)
  55. rds_cli = get_redis_connection()
  56. rds_cli.lpush(settings.SCHEDULE_KEY, json.dumps(form))
  57. else:
  58. Task.objects.create(created_by=request.user, **form)
  59. return json_response(error=error)
  60. def patch(self, request):
  61. form, error = JsonParser(
  62. Argument('id', type=int, help='请指定操作对象'),
  63. Argument('is_active', type=bool, required=False)
  64. ).parse(request.body, True)
  65. if error is None:
  66. task = Task.objects.get(pk=form.id)
  67. if form.get('is_active') is not None:
  68. task.is_active = form.is_active
  69. task.latest_id = None
  70. if form.is_active:
  71. message = {'id': form.id, 'action': 'add'}
  72. message.update(task.to_dict(selects=('trigger', 'trigger_args', 'command', 'targets')))
  73. else:
  74. message = {'id': form.id, 'action': 'remove'}
  75. rds_cli = get_redis_connection()
  76. rds_cli.lpush(settings.SCHEDULE_KEY, json.dumps(message))
  77. task.save()
  78. return json_response(error=error)
  79. def delete(self, request):
  80. form, error = JsonParser(
  81. Argument('id', type=int, help='请指定操作对象')
  82. ).parse(request.GET)
  83. if error is None:
  84. task = Task.objects.filter(pk=form.id).first()
  85. if task:
  86. if task.is_active:
  87. return json_response(error='该任务在运行中,请先停止任务再尝试删除')
  88. task.delete()
  89. History.objects.filter(task_id=task.id).delete()
  90. return json_response(error=error)
  91. class HistoryView(View):
  92. def get(self, request, t_id):
  93. task = Task.objects.filter(pk=t_id).first()
  94. if not task:
  95. return json_response(error='未找到指定任务')
  96. h_id = request.GET.get('id')
  97. if h_id:
  98. h_id = task.latest_id if h_id == 'latest' else h_id
  99. return json_response(self._fetch_detail(h_id))
  100. histories = History.objects.filter(task_id=t_id)
  101. return json_response([x.to_list() for x in histories])
  102. def post(self, request, t_id):
  103. task = Task.objects.filter(pk=t_id).first()
  104. if not task:
  105. return json_response(error='未找到指定任务')
  106. outputs, status = {}, 1
  107. for host_id in json.loads(task.targets):
  108. if host_id == 'local':
  109. code, duration, out = local_executor(task.command)
  110. else:
  111. host = Host.objects.filter(pk=host_id).first()
  112. if not host:
  113. code, duration, out = 1, 0, f'unknown host id for {host_id!r}'
  114. else:
  115. code, duration, out = host_executor(host, task.command)
  116. if code != 0:
  117. status = 2
  118. outputs[host_id] = [code, duration, out]
  119. history = History.objects.create(
  120. task_id=task.id,
  121. status=status,
  122. run_time=human_datetime(),
  123. output=json.dumps(outputs)
  124. )
  125. return json_response(history.id)
  126. def _fetch_detail(self, h_id):
  127. record = History.objects.filter(pk=h_id).first()
  128. outputs = json.loads(record.output)
  129. host_ids = (x for x in outputs.keys() if x != 'local')
  130. hosts_info = {str(x.id): x.name for x in Host.objects.filter(id__in=host_ids)}
  131. data = {'run_time': record.run_time, 'success': 0, 'failure': 0, 'duration': 0, 'outputs': []}
  132. for host_id, value in outputs.items():
  133. if not value:
  134. continue
  135. code, duration, out = value
  136. key = 'success' if code == 0 else 'failure'
  137. data[key] += 1
  138. data['duration'] += duration
  139. data['outputs'].append({
  140. 'name': hosts_info.get(host_id, '本机'),
  141. 'code': code,
  142. 'duration': duration,
  143. 'output': out})
  144. data['duration'] = f"{data['duration'] / len(outputs):.3f}"
  145. return data
  146. def next_run_time(request):
  147. form, error = JsonParser(
  148. Argument('rule', help='参数错误'),
  149. Argument('start', required=False),
  150. Argument('stop', required=False)
  151. ).parse(request.body)
  152. if error is None:
  153. try:
  154. minute, hour, day, month, week = form.rule.split()
  155. week = Scheduler.covert_week(week)
  156. trigger = CronTrigger(minute=minute, hour=hour, day=day, month=month, day_of_week=week,
  157. start_date=form.start, end_date=form.stop)
  158. except (ValueError, KeyError):
  159. return json_response({'success': False, 'msg': '无效的执行规则'})
  160. scheduler = BackgroundScheduler(timezone=settings.TIME_ZONE)
  161. scheduler.start()
  162. job = scheduler.add_job(lambda: None, trigger)
  163. run_time = job.next_run_time
  164. scheduler.shutdown()
  165. if run_time:
  166. return json_response({'success': True, 'msg': run_time.strftime('%Y-%m-%d %H:%M:%S')})
  167. else:
  168. return json_response({'success': False, 'msg': '无法被触发'})
  169. return json_response(error=error)