scheduler.py 3.9 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485
  1. # Copyright: (c) OpenSpug Organization. https://github.com/openspug/spug
  2. # Copyright: (c) <spug.dev@gmail.com>
  3. # Released under the AGPL-3.0 License.
  4. from apscheduler.schedulers.background import BackgroundScheduler
  5. from apscheduler.executors.pool import ThreadPoolExecutor
  6. from apscheduler.triggers.interval import IntervalTrigger
  7. from apscheduler.events import EVENT_SCHEDULER_SHUTDOWN, EVENT_JOB_MAX_INSTANCES, EVENT_JOB_ERROR, EVENT_JOB_EXECUTED
  8. from django_redis import get_redis_connection
  9. from django.utils.functional import SimpleLazyObject
  10. from django.db import connections
  11. from apps.monitor.models import Detection
  12. from apps.notify.models import Notify
  13. from django.conf import settings
  14. from libs import AttrDict, human_datetime
  15. from datetime import datetime, timedelta
  16. from random import randint
  17. import logging
  18. import json
  19. MONITOR_WORKER_KEY = settings.MONITOR_WORKER_KEY
  20. class Scheduler:
  21. timezone = settings.TIME_ZONE
  22. def __init__(self):
  23. self.scheduler = BackgroundScheduler(timezone=self.timezone, executors={'default': ThreadPoolExecutor(30)})
  24. self.scheduler.add_listener(
  25. self._handle_event,
  26. EVENT_SCHEDULER_SHUTDOWN | EVENT_JOB_ERROR | EVENT_JOB_MAX_INSTANCES | EVENT_JOB_EXECUTED
  27. )
  28. def _handle_event(self, event):
  29. obj = SimpleLazyObject(lambda: Detection.objects.filter(pk=event.job_id).first())
  30. if event.code == EVENT_SCHEDULER_SHUTDOWN:
  31. logging.warning(f'EVENT_SCHEDULER_SHUTDOWN: {event}')
  32. Notify.make_notify('monitor', '1', '调度器已关闭', '调度器意外关闭,你可以在github上提交issue', False)
  33. elif event.code == EVENT_JOB_MAX_INSTANCES:
  34. logging.warning(f'EVENT_JOB_MAX_INSTANCES: {event}')
  35. Notify.make_notify('monitor', '1', f'{obj.name} - 达到调度实例上限', '一般为上个周期的执行任务还未结束,请增加调度间隔或减少任务执行耗时')
  36. elif event.code == EVENT_JOB_ERROR:
  37. logging.warning(f'EVENT_JOB_ERROR: job_id {event.job_id} exception: {event.exception}')
  38. Notify.make_notify('monitor', '1', f'{obj.name} - 执行异常', f'{event.exception}')
  39. connections.close_all()
  40. def _dispatch(self, task_id, tp, targets, extra, threshold, quiet):
  41. Detection.objects.filter(pk=task_id).update(latest_run_time=human_datetime())
  42. rds_cli = get_redis_connection()
  43. for t in json.loads(targets):
  44. rds_cli.rpush(MONITOR_WORKER_KEY, json.dumps([task_id, tp, t, extra, threshold, quiet]))
  45. def _init(self):
  46. self.scheduler.start()
  47. for item in Detection.objects.filter(is_active=True):
  48. now = datetime.now()
  49. trigger = IntervalTrigger(minutes=int(item.rate), timezone=self.timezone)
  50. self.scheduler.add_job(
  51. self._dispatch,
  52. trigger,
  53. id=str(item.id),
  54. args=(item.id, item.type, item.targets, item.extra, item.threshold, item.quiet),
  55. next_run_time=now + timedelta(seconds=randint(0, 60))
  56. )
  57. def run(self):
  58. rds_cli = get_redis_connection()
  59. self._init()
  60. rds_cli.delete(settings.MONITOR_KEY)
  61. logging.warning('Running monitor')
  62. while True:
  63. _, data = rds_cli.brpop(settings.MONITOR_KEY)
  64. task = AttrDict(json.loads(data))
  65. if task.action in ('add', 'modify'):
  66. trigger = IntervalTrigger(minutes=int(task.rate), timezone=self.timezone)
  67. self.scheduler.add_job(
  68. self._dispatch,
  69. trigger,
  70. id=str(task.id),
  71. args=(task.id, task.type, task.targets, task.extra, task.threshold, task.quiet),
  72. replace_existing=True
  73. )
  74. elif task.action == 'remove':
  75. job = self.scheduler.get_job(str(task.id))
  76. if job:
  77. job.remove()