utils.py 11 KB


  1. # Copyright: (c) OpenSpug Organization. https://github.com/openspug/spug
  2. # Copyright: (c) <spug.dev@gmail.com>
  3. # Released under the AGPL-3.0 License.
  4. from django_redis import get_redis_connection
  5. from libs.helper import make_ali_request, make_tencent_request
  6. from libs.ssh import SSH, AuthenticationException
  7. from libs.utils import AttrDict, human_datetime
  8. from libs.validators import ip_validator
  9. from apps.host.models import HostExtend
  10. from apps.setting.utils import AppSetting
  11. from collections import defaultdict
  12. from datetime import datetime, timezone
  13. from concurrent import futures
  14. import ipaddress
  15. import json
  16. import math
  17. import os
  18. def check_os_type(os_name):
  19. os_name = os_name.lower()
  20. types = ('centos', 'coreos', 'debian', 'suse', 'ubuntu', 'windows', 'freebsd', 'tencent', 'alibaba')
  21. for t in types:
  22. if t in os_name:
  23. return t
  24. return 'unknown'
  25. def check_instance_charge_type(value, supplier):
  26. if supplier == 'ali':
  27. if value in ('PrePaid', 'PostPaid'):
  28. return value
  29. else:
  30. return 'Other'
  31. if supplier == 'tencent':
  32. if value == 'PREPAID':
  33. return 'PrePaid'
  34. if value == 'POSTPAID_BY_HOUR':
  35. return 'PostPaid'
  36. return 'Other'
  37. def check_internet_charge_type(value, supplier):
  38. if supplier == 'ali':
  39. if value in ('PayByTraffic', 'PayByBandwidth'):
  40. return value
  41. else:
  42. return 'Other'
  43. if supplier == 'tencent':
  44. if value == 'TRAFFIC_POSTPAID_BY_HOUR':
  45. return 'PayByTraffic'
  46. if value in ('BANDWIDTH_PREPAID', 'BANDWIDTH_POSTPAID_BY_HOUR'):
  47. return 'PayByBandwidth'
  48. return 'Other'
  49. def parse_utc_date(value):
  50. if not value:
  51. return None
  52. s_format = '%Y-%m-%dT%H:%M:%SZ'
  53. if len(value) == 17:
  54. s_format = '%Y-%m-%dT%H:%MZ'
  55. date = datetime.strptime(value, s_format).replace(tzinfo=timezone.utc)
  56. return date.astimezone().strftime('%Y-%m-%d %H:%M:%S')
  57. def fetch_ali_regions(ak, ac):
  58. params = dict(Action='DescribeRegions')
  59. res = make_ali_request(ak, ac, 'http://ecs.aliyuncs.com', params)
  60. if 'Regions' in res:
  61. return res['Regions']['Region']
  62. else:
  63. raise Exception(res)
  64. def fetch_ali_disks(ak, ac, region_id, page_number=1):
  65. data, page_size = defaultdict(list), 20
  66. params = dict(
  67. Action='DescribeDisks',
  68. RegionId=region_id,
  69. PageNumber=page_number,
  70. PageSize=page_size
  71. )
  72. res = make_ali_request(ak, ac, 'http://ecs.aliyuncs.com', params)
  73. if 'Disks' in res:
  74. for item in res['Disks']['Disk']:
  75. data[item['InstanceId']].append(item['Size'])
  76. if len(res['Disks']['Disk']) == page_size:
  77. page_number += 1
  78. new_data = fetch_ali_disks(ak, ac, region_id, page_number)
  79. data.update(new_data)
  80. return data
  81. else:
  82. raise Exception(res)
  83. def fetch_ali_instances(ak, ac, region_id, page_number=1):
  84. data, page_size = {}, 20
  85. params = dict(
  86. Action='DescribeInstances',
  87. RegionId=region_id,
  88. PageNumber=page_number,
  89. PageSize=page_size
  90. )
  91. res = make_ali_request(ak, ac, 'http://ecs.aliyuncs.com', params)
  92. if 'Instances' not in res:
  93. raise Exception(res)
  94. for item in res['Instances']['Instance']:
  95. if 'NetworkInterfaces' in item:
  96. network_interface = item['NetworkInterfaces']['NetworkInterface']
  97. else:
  98. network_interface = []
  99. data[item['InstanceId']] = dict(
  100. instance_id=item['InstanceId'],
  101. instance_name=item['InstanceName'],
  102. os_name=item['OSName'],
  103. os_type=check_os_type(item['OSName']),
  104. cpu=item['Cpu'],
  105. memory=item['Memory'] / 1024,
  106. created_time=parse_utc_date(item['CreationTime']),
  107. expired_time=parse_utc_date(item['ExpiredTime']),
  108. instance_charge_type=check_instance_charge_type(item['InstanceChargeType'], 'ali'),
  109. internet_charge_type=check_internet_charge_type(item['InternetChargeType'], 'ali'),
  110. public_ip_address=item['PublicIpAddress']['IpAddress'],
  111. private_ip_address=list(map(lambda x: x['PrimaryIpAddress'], network_interface)),
  112. zone_id=item['ZoneId']
  113. )
  114. if len(res['Instances']['Instance']) == page_size:
  115. new_data = fetch_ali_instances(ak, ac, region_id, page_number + 1)
  116. data.update(new_data)
  117. if page_number != 1:
  118. return data
  119. for instance_id, disk in fetch_ali_disks(ak, ac, region_id).items():
  120. if instance_id in data:
  121. data[instance_id]['disk'] = disk
  122. return list(data.values())
  123. def fetch_tencent_regions(ak, ac):
  124. params = dict(Action='DescribeRegions')
  125. res = make_tencent_request(ak, ac, 'cvm.tencentcloudapi.com', params)
  126. if 'RegionSet' in res['Response']:
  127. return res['Response']['RegionSet']
  128. else:
  129. raise Exception(res)
  130. def fetch_tencent_instances(ak, ac, region_id, page_number=1):
  131. data, page_size = [], 20
  132. params = dict(
  133. Action='DescribeInstances',
  134. Region=region_id,
  135. Offset=(page_number - 1) * page_size,
  136. Limit=page_size
  137. )
  138. res = make_tencent_request(ak, ac, 'cvm.tencentcloudapi.com', params)
  139. if 'InstanceSet' not in res['Response']:
  140. raise Exception(res)
  141. for item in res['Response']['InstanceSet']:
  142. data_disks = list(map(lambda x: x['DiskSize'], item['DataDisks']))
  143. internet_charge_type = item['InternetAccessible']['InternetChargeType']
  144. data.append(dict(
  145. instance_id=item['InstanceId'],
  146. instance_name=item['InstanceName'],
  147. os_name=item['OsName'],
  148. os_type=check_os_type(item['OsName']),
  149. cpu=item['CPU'],
  150. memory=item['Memory'],
  151. disk=[item['SystemDisk']['DiskSize']] + data_disks,
  152. created_time=parse_utc_date(item['CreatedTime']),
  153. expired_time=parse_utc_date(item['ExpiredTime']),
  154. instance_charge_type=check_instance_charge_type(item['InstanceChargeType'], 'tencent'),
  155. internet_charge_type=check_internet_charge_type(internet_charge_type, 'tencent'),
  156. public_ip_address=item['PublicIpAddresses'],
  157. private_ip_address=item['PrivateIpAddresses'],
  158. zone_id=item['Placement']['Zone']
  159. ))
  160. if len(res['Response']['InstanceSet']) == page_size:
  161. page_number += 1
  162. new_data = fetch_tencent_instances(ak, ac, region_id, page_number)
  163. data.extend(new_data)
  164. return data
  165. def fetch_host_extend(ssh):
  166. public_ip_address = set()
  167. private_ip_address = set()
  168. response = {'disk': []}
  169. with ssh:
  170. code, out = ssh.exec_command_raw('nproc')
  171. if code != 0:
  172. code, out = ssh.exec_command_raw("grep -c 'model name' /proc/cpuinfo")
  173. if code == 0:
  174. response['cpu'] = int(out.strip())
  175. code, out = ssh.exec_command_raw("cat /etc/os-release | grep PRETTY_NAME | awk -F \\\" '{print $2}'")
  176. if '/etc/os-release' in out:
  177. code, out = ssh.exec_command_raw("cat /etc/issue | head -1 | awk '{print $1,$2,$3}'")
  178. if code == 0:
  179. response['os_name'] = out.strip()
  180. code, out = ssh.exec_command_raw('hostname -I')
  181. if code == 0:
  182. for ip in out.strip().split():
  183. if ipaddress.ip_address(ip).is_global:
  184. public_ip_address.add(ip)
  185. elif len(private_ip_address) < 10:
  186. private_ip_address.add(ip)
  187. ssh_hostname = ssh.arguments.get('hostname')
  188. if ip_validator(ssh_hostname):
  189. if ipaddress.ip_address(ssh_hostname).is_global:
  190. public_ip_address.add(ssh_hostname)
  191. else:
  192. private_ip_address.add(ssh_hostname)
  193. code, out = ssh.exec_command_raw('lsblk -dbn -o SIZE -e 11 2> /dev/null')
  194. if code == 0:
  195. for item in out.strip().splitlines():
  196. item = item.strip()
  197. response['disk'].append(math.ceil(int(item) / 1024 / 1024 / 1024))
  198. code, out = ssh.exec_command_raw("dmidecode -t 17 | grep -E 'Size: [0-9]+' | awk '{s+=$2} END {print s,$3}'")
  199. if code == 0:
  200. fields = out.strip().split()
  201. if len(fields) == 2 and fields[1] in ('GB', 'MB'):
  202. size, unit = out.strip().split()
  203. if unit == 'GB':
  204. response['memory'] = size
  205. else:
  206. response['memory'] = round(int(size) / 1024, 0)
  207. if 'memory' not in response:
  208. code, out = ssh.exec_command_raw("free -m | awk 'NR==2{print $2}'")
  209. if code == 0:
  210. response['memory'] = math.ceil(int(out) / 1024)
  211. response['public_ip_address'] = list(public_ip_address)
  212. response['private_ip_address'] = list(private_ip_address)
  213. return response
  214. def batch_sync_host(token, hosts, password):
  215. private_key, public_key = AppSetting.get_ssh_key()
  216. threads, latest_exception, rds = [], None, get_redis_connection()
  217. max_workers = max(10, os.cpu_count() * 5)
  218. with futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
  219. for host in hosts:
  220. t = executor.submit(_sync_host_extend, host, private_key, public_key, password)
  221. t.host = host
  222. threads.append(t)
  223. for t in futures.as_completed(threads):
  224. exception = t.exception()
  225. if exception:
  226. rds.rpush(token, json.dumps({'key': t.host.id, 'status': 'fail', 'message': f'{exception}'}))
  227. else:
  228. rds.rpush(token, json.dumps({'key': t.host.id, 'status': 'ok'}))
  229. t.host.is_verified = True
  230. t.host.save()
  231. rds.expire(token, 60)
  232. def _sync_host_extend(host, private_key=None, public_key=None, password=None, ssh=None):
  233. if not ssh:
  234. kwargs = host.to_dict(selects=('hostname', 'port', 'username'))
  235. ssh = _get_ssh(kwargs, host.pkey, private_key, public_key, password)
  236. form = AttrDict(fetch_host_extend(ssh))
  237. form.disk = json.dumps(form.disk)
  238. form.public_ip_address = json.dumps(form.public_ip_address)
  239. form.private_ip_address = json.dumps(form.private_ip_address)
  240. form.updated_at = human_datetime()
  241. form.os_type = check_os_type(form.os_name)
  242. if hasattr(host, 'hostextend'):
  243. extend = host.hostextend
  244. extend.update_by_dict(form)
  245. else:
  246. extend = HostExtend.objects.create(host=host, **form)
  247. return extend
  248. def _get_ssh(kwargs, pkey=None, private_key=None, public_key=None, password=None):
  249. try:
  250. ssh = SSH(pkey=pkey or private_key, **kwargs)
  251. ssh.get_client()
  252. return ssh
  253. except AuthenticationException as e:
  254. if password:
  255. with SSH(password=str(password), **kwargs) as ssh:
  256. ssh.add_public_key(public_key)
  257. return _get_ssh(kwargs, private_key)
  258. raise e