scheduler.py 5.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128
  1. # Copyright: (c) OpenSpug Organization. https://github.com/openspug/spug
  2. # Copyright: (c) <spug.dev@gmail.com>
  3. # Released under the AGPL-3.0 License.
  4. from apscheduler.schedulers.background import BackgroundScheduler
  5. from apscheduler.executors.pool import ThreadPoolExecutor
  6. from apscheduler.triggers.interval import IntervalTrigger
  7. from apscheduler.triggers.date import DateTrigger
  8. from apscheduler.triggers.cron import CronTrigger
  9. from apscheduler.events import EVENT_SCHEDULER_SHUTDOWN, EVENT_JOB_MAX_INSTANCES, EVENT_JOB_ERROR, EVENT_JOB_EXECUTED
  10. from django_redis import get_redis_connection
  11. from django.utils.functional import SimpleLazyObject
  12. from django.db import connections
  13. from apps.schedule.models import Task, History
  14. from apps.schedule.utils import send_fail_notify
  15. from apps.notify.models import Notify
  16. from apps.schedule.builtin import auto_run_by_day, auto_run_by_minute
  17. from django.conf import settings
  18. from libs import AttrDict, human_datetime
  19. import logging
  20. import json
  21. SCHEDULE_WORKER_KEY = settings.SCHEDULE_WORKER_KEY
  22. class Scheduler:
  23. timezone = settings.TIME_ZONE
  24. week_map = {
  25. '-': '-',
  26. '*': '*',
  27. '7': '6',
  28. '0': '6',
  29. '1': '0',
  30. '2': '1',
  31. '3': '2',
  32. '4': '3',
  33. '5': '4',
  34. '6': '5',
  35. }
  36. def __init__(self):
  37. self.scheduler = BackgroundScheduler(timezone=self.timezone, executors={'default': ThreadPoolExecutor(30)})
  38. self.scheduler.add_listener(
  39. self._handle_event,
  40. EVENT_SCHEDULER_SHUTDOWN | EVENT_JOB_ERROR | EVENT_JOB_MAX_INSTANCES | EVENT_JOB_EXECUTED
  41. )
  42. @classmethod
  43. def covert_week(cls, week_str):
  44. return ''.join(map(lambda x: cls.week_map[x], week_str))
  45. @classmethod
  46. def parse_trigger(cls, trigger, trigger_args):
  47. if trigger == 'interval':
  48. return IntervalTrigger(seconds=int(trigger_args), timezone=cls.timezone)
  49. elif trigger == 'date':
  50. return DateTrigger(run_date=trigger_args, timezone=cls.timezone)
  51. elif trigger == 'cron':
  52. args = json.loads(trigger_args) if not isinstance(trigger_args, dict) else trigger_args
  53. minute, hour, day, month, week = args['rule'].split()
  54. week = cls.covert_week(week)
  55. return CronTrigger(minute=minute, hour=hour, day=day, month=month, day_of_week=week,
  56. start_date=args['start'], end_date=args['stop'])
  57. else:
  58. raise TypeError(f'unknown schedule policy: {trigger!r}')
  59. def _handle_event(self, event):
  60. obj = SimpleLazyObject(lambda: Task.objects.filter(pk=event.job_id).first())
  61. if event.code == EVENT_SCHEDULER_SHUTDOWN:
  62. logging.warning(f'EVENT_SCHEDULER_SHUTDOWN: {event}')
  63. Notify.make_notify('schedule', '1', '调度器已关闭', '调度器意外关闭,你可以在github上提交issue')
  64. elif event.code == EVENT_JOB_MAX_INSTANCES:
  65. logging.warning(f'EVENT_JOB_MAX_INSTANCES: {event}')
  66. send_fail_notify(obj, '达到调度实例上限,一般为上个周期的执行任务还未结束,请增加调度间隔或减少任务执行耗时')
  67. elif event.code == EVENT_JOB_ERROR:
  68. logging.warning(f'EVENT_JOB_ERROR: job_id {event.job_id} exception: {event.exception}')
  69. send_fail_notify(obj, f'执行异常:{event.exception}')
  70. connections.close_all()
  71. def _init_builtin_jobs(self):
  72. self.scheduler.add_job(auto_run_by_day, 'cron', hour=1, minute=20)
  73. self.scheduler.add_job(auto_run_by_minute, 'interval', minutes=1)
  74. def _dispatch(self, task_id, command, targets):
  75. output = {x: None for x in targets}
  76. history = History.objects.create(
  77. task_id=task_id,
  78. status='0',
  79. run_time=human_datetime(),
  80. output=json.dumps(output)
  81. )
  82. Task.objects.filter(pk=task_id).update(latest_id=history.id)
  83. rds_cli = get_redis_connection()
  84. for t in targets:
  85. rds_cli.rpush(SCHEDULE_WORKER_KEY, json.dumps([history.id, t, command]))
  86. def _init(self):
  87. self.scheduler.start()
  88. self._init_builtin_jobs()
  89. for task in Task.objects.filter(is_active=True):
  90. trigger = self.parse_trigger(task.trigger, task.trigger_args)
  91. self.scheduler.add_job(
  92. self._dispatch,
  93. trigger,
  94. id=str(task.id),
  95. args=(task.id, task.command, json.loads(task.targets)),
  96. )
  97. def run(self):
  98. rds_cli = get_redis_connection()
  99. self._init()
  100. rds_cli.delete(settings.SCHEDULE_KEY)
  101. logging.warning('Running scheduler')
  102. while True:
  103. _, data = rds_cli.brpop(settings.SCHEDULE_KEY)
  104. task = AttrDict(json.loads(data))
  105. if task.action in ('add', 'modify'):
  106. trigger = self.parse_trigger(task.trigger, task.trigger_args)
  107. self.scheduler.add_job(
  108. self._dispatch,
  109. trigger,
  110. id=str(task.id),
  111. args=(task.id, task.command, task.targets),
  112. replace_existing=True
  113. )
  114. elif task.action == 'remove':
  115. job = self.scheduler.get_job(str(task.id))
  116. if job:
  117. job.remove()