utils.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281
  1. # Copyright: (c) OpenSpug Organization. https://github.com/openspug/spug
  2. # Copyright: (c) <spug.dev@gmail.com>
  3. # Released under the AGPL-3.0 License.
  4. from django_redis import get_redis_connection
  5. from django.conf import settings
  6. from django.db import close_old_connections
  7. from libs.utils import AttrDict, human_time
  8. from apps.host.models import Host
  9. from apps.config.utils import compose_configs
  10. from apps.repository.models import Repository
  11. from apps.repository.utils import dispatch as build_repository
  12. from apps.deploy.helper import Helper, SpugError
  13. from concurrent import futures
  14. import json
  15. import uuid
  16. import os
  17. REPOS_DIR = settings.REPOS_DIR
  18. def dispatch(req):
  19. rds = get_redis_connection()
  20. rds_key = f'{settings.REQUEST_KEY}:{req.id}'
  21. helper = Helper(rds, rds_key)
  22. try:
  23. api_token = uuid.uuid4().hex
  24. rds.setex(api_token, 60 * 60, f'{req.deploy.app_id},{req.deploy.env_id}')
  25. env = AttrDict(
  26. SPUG_APP_NAME=req.deploy.app.name,
  27. SPUG_APP_KEY=req.deploy.app.key,
  28. SPUG_APP_ID=str(req.deploy.app_id),
  29. SPUG_REQUEST_NAME=req.name,
  30. SPUG_DEPLOY_ID=str(req.deploy.id),
  31. SPUG_REQUEST_ID=str(req.id),
  32. SPUG_ENV_ID=str(req.deploy.env_id),
  33. SPUG_ENV_KEY=req.deploy.env.key,
  34. SPUG_VERSION=req.version,
  35. SPUG_DEPLOY_TYPE=req.type,
  36. SPUG_API_TOKEN=api_token,
  37. SPUG_REPOS_DIR=REPOS_DIR,
  38. )
  39. # append configs
  40. configs = compose_configs(req.deploy.app, req.deploy.env_id)
  41. configs_env = {f'_SPUG_{k.upper()}': v for k, v in configs.items()}
  42. env.update(configs_env)
  43. if req.deploy.extend == '1':
  44. _ext1_deploy(req, helper, env)
  45. else:
  46. _ext2_deploy(req, helper, env)
  47. req.status = '3'
  48. except Exception as e:
  49. req.status = '-3'
  50. raise e
  51. finally:
  52. close_old_connections()
  53. req.save()
  54. helper.clear()
  55. Helper.send_deploy_notify(req)
  56. def _ext1_deploy(req, helper, env):
  57. if not req.repository_id:
  58. rep = Repository(
  59. app_id=req.deploy.app_id,
  60. env_id=req.deploy.env_id,
  61. deploy_id=req.deploy_id,
  62. version=req.version,
  63. spug_version=req.spug_version,
  64. extra=req.extra,
  65. remarks='SPUG AUTO MAKE',
  66. created_by_id=req.created_by_id
  67. )
  68. build_repository(rep, helper)
  69. req.repository = rep
  70. extend = req.deploy.extend_obj
  71. env.update(SPUG_DST_DIR=extend.dst_dir)
  72. extras = json.loads(req.extra)
  73. if extras[0] == 'repository':
  74. extras = extras[1:]
  75. if extras[0] == 'branch':
  76. env.update(SPUG_GIT_BRANCH=extras[1], SPUG_GIT_COMMIT_ID=extras[2])
  77. else:
  78. env.update(SPUG_GIT_TAG=extras[1])
  79. if req.deploy.is_parallel:
  80. threads, latest_exception = [], None
  81. max_workers = max(10, os.cpu_count() * 5)
  82. with futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
  83. for h_id in json.loads(req.host_ids):
  84. new_env = AttrDict(env.items())
  85. t = executor.submit(_deploy_ext1_host, req, helper, h_id, new_env)
  86. t.h_id = h_id
  87. threads.append(t)
  88. for t in futures.as_completed(threads):
  89. exception = t.exception()
  90. if exception:
  91. latest_exception = exception
  92. if not isinstance(exception, SpugError):
  93. helper.send_error(t.h_id, f'Exception: {exception}', False)
  94. if latest_exception:
  95. raise latest_exception
  96. else:
  97. host_ids = sorted(json.loads(req.host_ids), reverse=True)
  98. while host_ids:
  99. h_id = host_ids.pop()
  100. new_env = AttrDict(env.items())
  101. try:
  102. _deploy_ext1_host(req, helper, h_id, new_env)
  103. except Exception as e:
  104. helper.send_error(h_id, f'Exception: {e}', False)
  105. for h_id in host_ids:
  106. helper.send_error(h_id, '终止发布', False)
  107. raise e
  108. def _ext2_deploy(req, helper, env):
  109. helper.send_info('local', f'\033[32m完成√\033[0m\r\n')
  110. extend, step = req.deploy.extend_obj, 1
  111. host_actions = json.loads(extend.host_actions)
  112. server_actions = json.loads(extend.server_actions)
  113. env.update({'SPUG_RELEASE': req.version})
  114. if req.version:
  115. for index, value in enumerate(req.version.split()):
  116. env.update({f'SPUG_RELEASE_{index + 1}': value})
  117. for action in server_actions:
  118. helper.send_step('local', step, f'{human_time()} {action["title"]}...\r\n')
  119. helper.local(f'cd /tmp && {action["data"]}', env)
  120. step += 1
  121. helper.send_step('local', 100, '')
  122. tmp_transfer_file = None
  123. for action in host_actions:
  124. if action.get('type') == 'transfer':
  125. if action.get('src_mode') == '1':
  126. break
  127. helper.send_info('local', f'{human_time()} 检测到来源为本地路径的数据传输动作,执行打包... \r\n')
  128. action['src'] = action['src'].rstrip('/ ')
  129. action['dst'] = action['dst'].rstrip('/ ')
  130. if not action['src'] or not action['dst']:
  131. helper.send_error('local', f'invalid path for transfer, src: {action["src"]} dst: {action["dst"]}')
  132. is_dir, exclude = os.path.isdir(action['src']), ''
  133. sp_dir, sd_dst = os.path.split(action['src'])
  134. contain = sd_dst
  135. if action['mode'] != '0' and is_dir:
  136. files = helper.parse_filter_rule(action['rule'], ',')
  137. if files:
  138. if action['mode'] == '1':
  139. contain = ' '.join(f'{sd_dst}/{x}' for x in files)
  140. else:
  141. excludes = []
  142. for x in files:
  143. if x.startswith('/'):
  144. excludes.append(f'--exclude={sd_dst}{x}')
  145. else:
  146. excludes.append(f'--exclude={x}')
  147. exclude = ' '.join(excludes)
  148. tar_gz_file = f'{req.spug_version}.tar.gz'
  149. helper.local(f'cd {sp_dir} && tar -zcf {tar_gz_file} {exclude} {contain}')
  150. helper.send_info('local', f'{human_time()} \033[32m完成√\033[0m\r\n')
  151. tmp_transfer_file = os.path.join(sp_dir, tar_gz_file)
  152. break
  153. if host_actions:
  154. if req.deploy.is_parallel:
  155. threads, latest_exception = [], None
  156. max_workers = max(10, os.cpu_count() * 5)
  157. with futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
  158. for h_id in json.loads(req.host_ids):
  159. new_env = AttrDict(env.items())
  160. t = executor.submit(_deploy_ext2_host, helper, h_id, host_actions, new_env, req.spug_version)
  161. t.h_id = h_id
  162. threads.append(t)
  163. for t in futures.as_completed(threads):
  164. exception = t.exception()
  165. if exception:
  166. latest_exception = exception
  167. if not isinstance(exception, SpugError):
  168. helper.send_error(t.h_id, f'Exception: {exception}', False)
  169. if tmp_transfer_file:
  170. os.remove(tmp_transfer_file)
  171. if latest_exception:
  172. raise latest_exception
  173. else:
  174. host_ids = sorted(json.loads(req.host_ids), reverse=True)
  175. while host_ids:
  176. h_id = host_ids.pop()
  177. new_env = AttrDict(env.items())
  178. try:
  179. _deploy_ext2_host(helper, h_id, host_actions, new_env, req.spug_version)
  180. except Exception as e:
  181. helper.send_error(h_id, f'Exception: {e}', False)
  182. for h_id in host_ids:
  183. helper.send_error(h_id, '终止发布', False)
  184. raise e
  185. else:
  186. helper.send_step('local', 100, f'\r\n{human_time()} ** 发布成功 **')
  187. def _deploy_ext1_host(req, helper, h_id, env):
  188. extend = req.deploy.extend_obj
  189. helper.send_step(h_id, 1, f'\033[32m就绪√\033[0m\r\n{human_time()} 数据准备... ')
  190. host = Host.objects.filter(pk=h_id).first()
  191. if not host:
  192. helper.send_error(h_id, 'no such host')
  193. env.update({'SPUG_HOST_ID': h_id, 'SPUG_HOST_NAME': host.hostname})
  194. with host.get_ssh(default_env=env) as ssh:
  195. base_dst_dir = os.path.dirname(extend.dst_dir)
  196. code, _ = ssh.exec_command_raw(
  197. f'mkdir -p {extend.dst_repo} {base_dst_dir} && [ -e {extend.dst_dir} ] && [ ! -L {extend.dst_dir} ]')
  198. if code == 0:
  199. helper.send_error(host.id, f'检测到该主机的发布目录 {extend.dst_dir!r} 已存在,为了数据安全请自行备份后删除该目录,Spug 将会创建并接管该目录。')
  200. if req.type == '2':
  201. helper.send_step(h_id, 1, '\033[33m跳过√\033[0m\r\n')
  202. else:
  203. # clean
  204. clean_command = f'ls -d {extend.deploy_id}_* 2> /dev/null | sort -t _ -rnk2 | tail -n +{extend.versions + 1} | xargs rm -rf'
  205. helper.remote_raw(host.id, ssh, f'cd {extend.dst_repo} && {clean_command}')
  206. # transfer files
  207. tar_gz_file = f'{req.spug_version}.tar.gz'
  208. try:
  209. ssh.put_file(os.path.join(REPOS_DIR, 'build', tar_gz_file), os.path.join(extend.dst_repo, tar_gz_file))
  210. except Exception as e:
  211. helper.send_error(host.id, f'Exception: {e}')
  212. command = f'cd {extend.dst_repo} && rm -rf {req.spug_version} && tar xf {tar_gz_file} && rm -f {req.deploy_id}_*.tar.gz'
  213. helper.remote_raw(host.id, ssh, command)
  214. helper.send_step(h_id, 1, '\033[32m完成√\033[0m\r\n')
  215. # pre host
  216. repo_dir = os.path.join(extend.dst_repo, req.spug_version)
  217. if extend.hook_pre_host:
  218. helper.send_step(h_id, 2, f'{human_time()} 发布前任务... \r\n')
  219. command = f'cd {repo_dir} && {extend.hook_pre_host}'
  220. helper.remote(host.id, ssh, command)
  221. # do deploy
  222. helper.send_step(h_id, 3, f'{human_time()} 执行发布... ')
  223. helper.remote_raw(host.id, ssh, f'rm -f {extend.dst_dir} && ln -sfn {repo_dir} {extend.dst_dir}')
  224. helper.send_step(h_id, 3, '\033[32m完成√\033[0m\r\n')
  225. # post host
  226. if extend.hook_post_host:
  227. helper.send_step(h_id, 4, f'{human_time()} 发布后任务... \r\n')
  228. command = f'cd {extend.dst_dir} && {extend.hook_post_host}'
  229. helper.remote(host.id, ssh, command)
  230. helper.send_step(h_id, 100, f'\r\n{human_time()} ** \033[32m发布成功\033[0m **')
  231. def _deploy_ext2_host(helper, h_id, actions, env, spug_version):
  232. helper.send_info(h_id, '\033[32m就绪√\033[0m\r\n')
  233. host = Host.objects.filter(pk=h_id).first()
  234. if not host:
  235. helper.send_error(h_id, 'no such host')
  236. env.update({'SPUG_HOST_ID': h_id, 'SPUG_HOST_NAME': host.hostname})
  237. with host.get_ssh(default_env=env) as ssh:
  238. for index, action in enumerate(actions):
  239. helper.send_step(h_id, 1 + index, f'{human_time()} {action["title"]}...\r\n')
  240. if action.get('type') == 'transfer':
  241. if action.get('src_mode') == '1':
  242. try:
  243. ssh.put_file(os.path.join(REPOS_DIR, env.SPUG_DEPLOY_ID, spug_version), action['dst'])
  244. except Exception as e:
  245. helper.send_error(host.id, f'Exception: {e}')
  246. helper.send_info(host.id, 'transfer completed\r\n')
  247. continue
  248. else:
  249. sp_dir, sd_dst = os.path.split(action['src'])
  250. tar_gz_file = f'{spug_version}.tar.gz'
  251. try:
  252. ssh.put_file(os.path.join(sp_dir, tar_gz_file), f'/tmp/{tar_gz_file}')
  253. except Exception as e:
  254. helper.send_error(host.id, f'Exception: {e}')
  255. command = f'mkdir -p /tmp/{spug_version} && tar xf /tmp/{tar_gz_file} -C /tmp/{spug_version}/ '
  256. command += f'&& rm -rf {action["dst"]} && mv /tmp/{spug_version}/{sd_dst} {action["dst"]} '
  257. command += f'&& rm -rf /tmp/{spug_version}* && echo "transfer completed"'
  258. else:
  259. command = f'cd /tmp && {action["data"]}'
  260. helper.remote(host.id, ssh, command)
  261. helper.send_step(h_id, 100, f'\r\n{human_time()} ** \033[32m发布成功\033[0m **')