runworker.py 1.8 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950
  1. # Copyright: (c) OpenSpug Organization. https://github.com/openspug/spug
  2. # Copyright: (c) <spug.dev@gmail.com>
  3. # Released under the AGPL-3.0 License.
  4. from django.core.management.base import BaseCommand
  5. from django.conf import settings
  6. from django.db import connections
  7. from django_redis import get_redis_connection
  8. from concurrent.futures import ThreadPoolExecutor
  9. from apps.schedule.executors import schedule_worker_handler
  10. from apps.monitor.executors import monitor_worker_handler
  11. from apps.exec.executors import exec_worker_handler
  12. import logging
  13. import os
  14. EXEC_WORKER_KEY = settings.EXEC_WORKER_KEY
  15. MONITOR_WORKER_KEY = settings.MONITOR_WORKER_KEY
  16. SCHEDULE_WORKER_KEY = settings.SCHEDULE_WORKER_KEY
  17. logging.basicConfig(level=logging.WARNING, format='%(asctime)s %(message)s')
  18. class Worker:
  19. def __init__(self):
  20. self.rds = get_redis_connection()
  21. self._executor = ThreadPoolExecutor(max_workers=max(50, os.cpu_count() * 20))
  22. def job_done(self, future):
  23. connections.close_all()
  24. def run(self):
  25. logging.warning('Running worker')
  26. self.rds.delete(EXEC_WORKER_KEY, MONITOR_WORKER_KEY, SCHEDULE_WORKER_KEY)
  27. while True:
  28. key, job = self.rds.blpop([EXEC_WORKER_KEY, SCHEDULE_WORKER_KEY, MONITOR_WORKER_KEY])
  29. key = key.decode()
  30. if key == SCHEDULE_WORKER_KEY:
  31. future = self._executor.submit(schedule_worker_handler, job)
  32. elif key == MONITOR_WORKER_KEY:
  33. future = self._executor.submit(monitor_worker_handler, job)
  34. else:
  35. future = self._executor.submit(exec_worker_handler, job)
  36. future.add_done_callback(self.job_done)
  37. class Command(BaseCommand):
  38. help = 'Start worker process'
  39. def handle(self, *args, **options):
  40. w = Worker()
  41. w.run()