| 1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950 |
- # Copyright: (c) OpenSpug Organization. https://github.com/openspug/spug
- # Copyright: (c) <spug.dev@gmail.com>
- # Released under the AGPL-3.0 License.
- from django.core.management.base import BaseCommand
- from django.conf import settings
- from django.db import connections
- from django_redis import get_redis_connection
- from concurrent.futures import ThreadPoolExecutor
- from apps.schedule.executors import schedule_worker_handler
- from apps.monitor.executors import monitor_worker_handler
- from apps.exec.executors import exec_worker_handler
- import logging
- import os
- EXEC_WORKER_KEY = settings.EXEC_WORKER_KEY
- MONITOR_WORKER_KEY = settings.MONITOR_WORKER_KEY
- SCHEDULE_WORKER_KEY = settings.SCHEDULE_WORKER_KEY
- logging.basicConfig(level=logging.WARNING, format='%(asctime)s %(message)s')
- class Worker:
- def __init__(self):
- self.rds = get_redis_connection()
- self._executor = ThreadPoolExecutor(max_workers=max(50, os.cpu_count() * 20))
- def job_done(self, future):
- connections.close_all()
- def run(self):
- logging.warning('Running worker')
- self.rds.delete(EXEC_WORKER_KEY, MONITOR_WORKER_KEY, SCHEDULE_WORKER_KEY)
- while True:
- key, job = self.rds.blpop([EXEC_WORKER_KEY, SCHEDULE_WORKER_KEY, MONITOR_WORKER_KEY])
- key = key.decode()
- if key == SCHEDULE_WORKER_KEY:
- future = self._executor.submit(schedule_worker_handler, job)
- elif key == MONITOR_WORKER_KEY:
- future = self._executor.submit(monitor_worker_handler, job)
- else:
- future = self._executor.submit(exec_worker_handler, job)
- future.add_done_callback(self.job_done)
- class Command(BaseCommand):
- help = 'Start worker process'
- def handle(self, *args, **options):
- w = Worker()
- w.run()
|