middleware.py 1.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748
  1. # Copyright: (c) OpenSpug Organization. https://github.com/openspug/spug
  2. # Copyright: (c) <spug.dev@gmail.com>
  3. # Released under the AGPL-3.0 License.
  4. from django.db import close_old_connections
  5. from channels.security.websocket import WebsocketDenier
  6. from apps.account.models import User
  7. from libs.utils import get_request_real_ip
  8. from urllib.parse import parse_qs
  9. import time
  10. class AuthMiddleware:
  11. def __init__(self, application):
  12. self.application = application
  13. def __call__(self, scope):
  14. # Make sure the scope is of type websocket
  15. if scope["type"] != "websocket":
  16. raise ValueError(
  17. "You cannot use AuthMiddleware on a non-WebSocket connection"
  18. )
  19. headers = dict(scope.get('headers', []))
  20. is_ok, message = self.verify_user(scope, headers)
  21. if is_ok:
  22. return self.application(scope)
  23. else:
  24. print(message)
  25. return WebsocketDenier(scope)
  26. def get_real_ip(self, headers):
  27. decode_headers = {
  28. 'x-forwarded-for': headers.get(b'x-forwarded-for', b'').decode(),
  29. 'x-real-ip': headers.get(b'x-real-ip', b'').decode()
  30. }
  31. return get_request_real_ip(decode_headers)
  32. def verify_user(self, scope, headers):
  33. close_old_connections()
  34. query_string = scope['query_string'].decode()
  35. x_real_ip = self.get_real_ip(headers)
  36. token = parse_qs(query_string).get('x-token', [''])[0]
  37. if token and len(token) == 32:
  38. user = User.objects.filter(access_token=token).first()
  39. if user and x_real_ip == user.last_ip and user.token_expired >= time.time() and user.is_active:
  40. scope['user'] = user
  41. return True, None
  42. return False, f'IP verify failed: {x_real_ip} <> {user.last_ip}'
  43. return False, 'Token is invalid'