consumers.py 4.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150
  1. # Copyright: (c) OpenSpug Organization. https://github.com/openspug/spug
  2. # Copyright: (c) <spug.dev@gmail.com>
  3. # Released under the AGPL-3.0 License.
  4. from channels.generic.websocket import WebsocketConsumer
  5. from django.conf import settings
  6. from django_redis import get_redis_connection
  7. from asgiref.sync import async_to_sync
  8. from apps.host.models import Host
  9. from apps.account.utils import has_host_perm
  10. from threading import Thread
  11. import time
  12. import json
  13. class ExecConsumer(WebsocketConsumer):
  14. def __init__(self, *args, **kwargs):
  15. super().__init__(*args, **kwargs)
  16. self.token = self.scope['url_route']['kwargs']['token']
  17. self.rds = get_redis_connection()
  18. def connect(self):
  19. self.accept()
  20. def disconnect(self, code):
  21. self.rds.close()
  22. def get_response(self):
  23. response = self.rds.brpop(self.token, timeout=5)
  24. return response[1] if response else None
  25. def receive(self, **kwargs):
  26. response = self.get_response()
  27. while response:
  28. data = response.decode()
  29. self.send(text_data=data)
  30. response = self.get_response()
  31. self.send(text_data='pong')
  32. class ComConsumer(WebsocketConsumer):
  33. def __init__(self, *args, **kwargs):
  34. super().__init__(*args, **kwargs)
  35. token = self.scope['url_route']['kwargs']['token']
  36. module = self.scope['url_route']['kwargs']['module']
  37. if module == 'build':
  38. self.key = f'{settings.BUILD_KEY}:{token}'
  39. elif module == 'request':
  40. self.key = f'{settings.REQUEST_KEY}:{token}'
  41. elif module == 'host':
  42. self.key = token
  43. else:
  44. raise TypeError(f'unknown module for {module}')
  45. self.rds = get_redis_connection()
  46. def connect(self):
  47. self.accept()
  48. def disconnect(self, code):
  49. self.rds.close()
  50. def get_response(self, index):
  51. counter = 0
  52. while counter < 30:
  53. response = self.rds.lindex(self.key, index)
  54. if response:
  55. return response.decode()
  56. counter += 1
  57. time.sleep(0.2)
  58. def receive(self, text_data='', **kwargs):
  59. if text_data.isdigit():
  60. index = int(text_data)
  61. response = self.get_response(index)
  62. while response:
  63. index += 1
  64. self.send(text_data=response)
  65. response = self.get_response(index)
  66. self.send(text_data='pong')
  67. class SSHConsumer(WebsocketConsumer):
  68. def __init__(self, *args, **kwargs):
  69. super().__init__(*args, **kwargs)
  70. self.user = self.scope['user']
  71. self.id = self.scope['url_route']['kwargs']['id']
  72. self.chan = None
  73. self.ssh = None
  74. def loop_read(self):
  75. while True:
  76. data = self.chan.recv(32 * 1024)
  77. # print('read: {!r}'.format(data))
  78. if not data:
  79. self.close(3333)
  80. break
  81. self.send(bytes_data=data)
  82. def receive(self, text_data=None, bytes_data=None):
  83. data = text_data or bytes_data
  84. if data:
  85. data = json.loads(data)
  86. # print('write: {!r}'.format(data))
  87. resize = data.get('resize')
  88. if resize and len(resize) == 2:
  89. self.chan.resize_pty(*resize)
  90. else:
  91. self.chan.send(data['data'])
  92. def disconnect(self, code):
  93. self.chan.close()
  94. self.ssh.close()
  95. # print('Connection close')
  96. def connect(self):
  97. if has_host_perm(self.user, self.id):
  98. self.accept()
  99. self._init()
  100. else:
  101. self.close()
  102. def _init(self):
  103. self.send(bytes_data=b'Connecting ...\r\n')
  104. host = Host.objects.filter(pk=self.id).first()
  105. if not host:
  106. self.send(text_data='Unknown host\r\n')
  107. self.close()
  108. try:
  109. self.ssh = host.get_ssh().get_client()
  110. except Exception as e:
  111. self.send(bytes_data=f'Exception: {e}\r\n'.encode())
  112. self.close()
  113. return
  114. self.chan = self.ssh.invoke_shell(term='xterm')
  115. self.chan.transport.set_keepalive(30)
  116. Thread(target=self.loop_read).start()
  117. class NotifyConsumer(WebsocketConsumer):
  118. def connect(self):
  119. async_to_sync(self.channel_layer.group_add)('notify', self.channel_name)
  120. self.accept()
  121. def disconnect(self, code):
  122. async_to_sync(self.channel_layer.group_discard)('notify', self.channel_name)
  123. def receive(self, **kwargs):
  124. self.send(text_data='pong')
  125. def notify_message(self, event):
  126. self.send(text_data=json.dumps(event))