executors.py 2.0 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364
  1. # Copyright: (c) OpenSpug Organization. https://github.com/openspug/spug
  2. # Copyright: (c) <spug.dev@gmail.com>
  3. # Released under the AGPL-3.0 License.
  4. from django_redis import get_redis_connection
  5. from libs.ssh import SSH
  6. import threading
  7. import socket
  8. import json
  9. def exec_worker_handler(job):
  10. job = Job(**json.loads(job))
  11. threading.Thread(target=job.run).start()
  12. class Job:
  13. def __init__(self, key, name, hostname, port, username, pkey, command, token=None):
  14. self.ssh = SSH(hostname, port, username, pkey)
  15. self.key = key
  16. self.command = command
  17. self.token = token
  18. self.rds_cli = None
  19. self.env = dict(
  20. SPUG_HOST_ID=str(self.key),
  21. SPUG_HOST_NAME=name,
  22. SPUG_HOST_HOSTNAME=hostname,
  23. SPUG_SSH_PORT=str(port),
  24. SPUG_SSH_USERNAME=username,
  25. )
  26. def _send(self, message, with_expire=False):
  27. if self.rds_cli is None:
  28. self.rds_cli = get_redis_connection()
  29. self.rds_cli.lpush(self.token, json.dumps(message))
  30. if with_expire:
  31. self.rds_cli.expire(self.token, 300)
  32. def send(self, data):
  33. message = {'key': self.key, 'data': data}
  34. self._send(message)
  35. def send_status(self, code):
  36. message = {'key': self.key, 'status': code}
  37. self._send(message, True)
  38. def run(self):
  39. if not self.token:
  40. with self.ssh:
  41. return self.ssh.exec_command(self.command, self.env)
  42. self.send('\x1b[36m### Executing ...\x1b[0m\r\n')
  43. code = -1
  44. try:
  45. with self.ssh:
  46. for code, out in self.ssh.exec_command_with_stream(self.command, self.env):
  47. self.send(out)
  48. except socket.timeout:
  49. code = 130
  50. self.send('\r\n\x1b[31m### Time out\x1b[0m')
  51. except Exception as e:
  52. code = 131
  53. self.send(f'\r\n\x1b[31m### Exception {e}\x1b[0m')
  54. raise e
  55. finally:
  56. self.send_status(code)