123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182 |
- # Copyright: (c) OpenSpug Organization. https://github.com/openspug/spug
- # Copyright: (c) <spug.dev@gmail.com>
- # Released under the AGPL-3.0 License.
- from django.views.generic import View
- from django_redis import get_redis_connection
- from apscheduler.schedulers.background import BackgroundScheduler
- from apscheduler.triggers.cron import CronTrigger
- from apps.schedule.scheduler import Scheduler
- from apps.schedule.models import Task, History
- from apps.schedule.executors import local_executor, host_executor
- from apps.host.models import Host
- from django.conf import settings
- from libs import json_response, JsonParser, Argument, human_datetime
- import json
- class Schedule(View):
- def get(self, request):
- tasks = Task.objects.all()
- types = [x['type'] for x in tasks.order_by('type').values('type').distinct()]
- return json_response({'types': types, 'tasks': [x.to_dict() for x in tasks]})
- def post(self, request):
- form, error = JsonParser(
- Argument('id', type=int, required=False),
- Argument('type', help='请输入任务类型'),
- Argument('name', help='请输入任务名称'),
- Argument('command', help='请输入任务内容'),
- Argument('rst_notify', type=dict, help='请选择执行失败通知方式'),
- Argument('targets', type=list, filter=lambda x: len(x), help='请选择执行对象'),
- Argument('trigger', filter=lambda x: x in dict(Task.TRIGGERS), help='请选择触发器类型'),
- Argument('trigger_args', help='请输入触发器参数'),
- Argument('desc', required=False),
- ).parse(request.body)
- if error is None:
- form.targets = json.dumps(form.targets)
- form.rst_notify = json.dumps(form.rst_notify)
- if form.trigger == 'cron':
- args = json.loads(form.trigger_args)['rule'].split()
- if len(args) != 5:
- return json_response(error='无效的执行规则,请更正后再试')
- minute, hour, day, month, week = args
- week = '0' if week == '7' else week
- try:
- CronTrigger(minute=minute, hour=hour, day=day, month=month, day_of_week=week)
- except ValueError:
- return json_response(error='无效的执行规则,请更正后再试')
- if form.id:
- Task.objects.filter(pk=form.id).update(
- updated_at=human_datetime(),
- updated_by=request.user,
- **form
- )
- task = Task.objects.filter(pk=form.id).first()
- if task and task.is_active:
- form.action = 'modify'
- form.targets = json.loads(form.targets)
- rds_cli = get_redis_connection()
- rds_cli.lpush(settings.SCHEDULE_KEY, json.dumps(form))
- else:
- Task.objects.create(created_by=request.user, **form)
- return json_response(error=error)
- def patch(self, request):
- form, error = JsonParser(
- Argument('id', type=int, help='请指定操作对象'),
- Argument('is_active', type=bool, required=False)
- ).parse(request.body, True)
- if error is None:
- task = Task.objects.get(pk=form.id)
- if form.get('is_active') is not None:
- task.is_active = form.is_active
- task.latest_id = None
- if form.is_active:
- message = {'id': form.id, 'action': 'add'}
- message.update(task.to_dict(selects=('trigger', 'trigger_args', 'command', 'targets')))
- else:
- message = {'id': form.id, 'action': 'remove'}
- rds_cli = get_redis_connection()
- rds_cli.lpush(settings.SCHEDULE_KEY, json.dumps(message))
- task.save()
- return json_response(error=error)
- def delete(self, request):
- form, error = JsonParser(
- Argument('id', type=int, help='请指定操作对象')
- ).parse(request.GET)
- if error is None:
- task = Task.objects.filter(pk=form.id).first()
- if task:
- if task.is_active:
- return json_response(error='该任务在运行中,请先停止任务再尝试删除')
- task.delete()
- History.objects.filter(task_id=task.id).delete()
- return json_response(error=error)
- class HistoryView(View):
- def get(self, request, t_id):
- task = Task.objects.filter(pk=t_id).first()
- if not task:
- return json_response(error='未找到指定任务')
- h_id = request.GET.get('id')
- if h_id:
- h_id = task.latest_id if h_id == 'latest' else h_id
- return json_response(self._fetch_detail(h_id))
- histories = History.objects.filter(task_id=t_id)
- return json_response([x.to_list() for x in histories])
- def post(self, request, t_id):
- task = Task.objects.filter(pk=t_id).first()
- if not task:
- return json_response(error='未找到指定任务')
- outputs, status = {}, 1
- for host_id in json.loads(task.targets):
- if host_id == 'local':
- code, duration, out = local_executor(task.command)
- else:
- host = Host.objects.filter(pk=host_id).first()
- if not host:
- code, duration, out = 1, 0, f'unknown host id for {host_id!r}'
- else:
- code, duration, out = host_executor(host, task.command)
- if code != 0:
- status = 2
- outputs[host_id] = [code, duration, out]
- history = History.objects.create(
- task_id=task.id,
- status=status,
- run_time=human_datetime(),
- output=json.dumps(outputs)
- )
- return json_response(history.id)
- def _fetch_detail(self, h_id):
- record = History.objects.filter(pk=h_id).first()
- outputs = json.loads(record.output)
- host_ids = (x for x in outputs.keys() if x != 'local')
- hosts_info = {str(x.id): x.name for x in Host.objects.filter(id__in=host_ids)}
- data = {'run_time': record.run_time, 'success': 0, 'failure': 0, 'duration': 0, 'outputs': []}
- for host_id, value in outputs.items():
- if not value:
- continue
- code, duration, out = value
- key = 'success' if code == 0 else 'failure'
- data[key] += 1
- data['duration'] += duration
- data['outputs'].append({
- 'name': hosts_info.get(host_id, '本机'),
- 'code': code,
- 'duration': duration,
- 'output': out})
- data['duration'] = f"{data['duration'] / len(outputs):.3f}"
- return data
- def next_run_time(request):
- form, error = JsonParser(
- Argument('rule', help='参数错误'),
- Argument('start', required=False),
- Argument('stop', required=False)
- ).parse(request.body)
- if error is None:
- try:
- minute, hour, day, month, week = form.rule.split()
- week = Scheduler.covert_week(week)
- trigger = CronTrigger(minute=minute, hour=hour, day=day, month=month, day_of_week=week,
- start_date=form.start, end_date=form.stop)
- except (ValueError, KeyError):
- return json_response({'success': False, 'msg': '无效的执行规则'})
- scheduler = BackgroundScheduler(timezone=settings.TIME_ZONE)
- scheduler.start()
- job = scheduler.add_job(lambda: None, trigger)
- run_time = job.next_run_time
- scheduler.shutdown()
- if run_time:
- return json_response({'success': True, 'msg': run_time.strftime('%Y-%m-%d %H:%M:%S')})
- else:
- return json_response({'success': False, 'msg': '无法被触发'})
- return json_response(error=error)
|