rov_server_update.py 14 KB


  1. import asyncio
  2. import sys
  3. import time
  4. import requests
  5. import utils
  6. import logging
  7. import os
  8. import docker
  9. from concurrent.futures import ThreadPoolExecutor
  10. health_instances = []
  11. ess_instances = []
  12. slb_id = 'lb-bp1werfophtsjzfr76njm'
  13. # 修改负载均衡权限
  14. slb_client_params = {
  15. 'access_key_id': 'LTAIuPbTPL3LDDKN',
  16. 'access_key_secret': 'ORcNedKwWuwVtcq4IRFtUDZgS0b1le',
  17. 'region_id': 'cn-hangzhou'
  18. }
  19. # 购买机器权限
  20. create_client_params = {
  21. 'access_key_id': 'LTAI4GBWbFvvXoXsSVBe1o9f',
  22. 'access_key_secret': 'kRAikWitb4kDxaAyBqNrmLmllMEDO3',
  23. 'region_id': 'cn-hangzhou'
  24. }
  25. # 机器配置
  26. instance_config = {
  27. # 使用的镜像信息
  28. 'image_id': 'm-bp1e5jx8eqhq22l91xw7',
  29. # 设置实例规格
  30. 'instance_type': 'ecs.ic5.xlarge',
  31. # 选择的交换机
  32. 'vswitch_id': 'vsw-bp19lpjwtc6j0p0m9mdc2',
  33. # 当前VPC类型的安全组
  34. 'security_group_id': 'sg-bp1irhrkr4vfj272hk4y',
  35. # 硬盘的大小,单位:G
  36. 'disk_size': '200',
  37. # 服务器命名
  38. 'instance_name': 'ESS-rov-server-[1,2]',
  39. # 服务器所在区域
  40. 'zone_id': 'cn-hangzhou-h',
  41. # 磁盘类型:云盘
  42. 'disk_category': 'cloud_efficiency',
  43. # 密钥
  44. 'key_pair_name': 'stuuudy'
  45. }
  46. # 服务启动脚本
  47. start_sh_dir = os.path.dirname(os.path.realpath(__file__))
  48. start_sh_filename = 'rov_server_start.sh'
  49. with open(file=os.path.join(start_sh_dir, start_sh_filename), mode='r', encoding='utf-8') as rf:
  50. file_content = rf.read()
  51. start_sh = {
  52. 'target_dir': '/home/piaoquan_server_sh',
  53. 'name': start_sh_filename,
  54. 'content': file_content,
  55. }
  56. def server_health_check(client, instance_id):
  57. """
  58. 服务健康检查
  59. :param client: 客户端连接
  60. :param instance_id: instanceId
  61. :return:
  62. """
  63. global health_instances
  64. ip_address = utils.get_ip_address(client=client, instance_id=instance_id)
  65. while True:
  66. health_check_url = f"http://{ip_address}:5001/healthcheck"
  67. try:
  68. http_code = requests.get(health_check_url).status_code
  69. except:
  70. logging.info("images is downloading")
  71. http_code = 0
  72. if http_code == 200:
  73. health_instances.append((instance_id, ip_address))
  74. break
  75. else:
  76. # time.sleep(20)
  77. await asyncio.sleep(10)
  78. def set_instance_weight_process(client, instance_id_list, weight_list):
  79. """
  80. 修改服务器的权重值
  81. :param client: slb客户端连接
  82. :param instance_id_list: instance id list
  83. :param weight_list: 权重修改列表 type-list [(weight, sleep_time), ...]
  84. :return:
  85. """
  86. for weight, sleep_time in weight_list:
  87. flag = True
  88. while flag:
  89. try:
  90. utils.set_weight_for_instances(client=client,
  91. slb_id=slb_id,
  92. instance_id_list=instance_id_list,
  93. weight=weight)
  94. time.sleep(sleep_time)
  95. flag = False
  96. except Exception as e:
  97. time.sleep(10)
  98. continue
  99. async def run_server(create_client, slb_client, instance_ids, max_workers):
  100. # 1. 发送启动脚本到机器上
  101. utils.send_file_to_ecs(client=create_client, instance_id_list=instance_ids, **start_sh)
  102. logging.info(f"send start shell file finished, instances: {instance_ids}")
  103. # 2. 启动服务
  104. server_start_sh = os.path.join(start_sh['target_dir'], start_sh['name'])
  105. server_start_commend = f"sh {server_start_sh}"
  106. utils.run_command(client=create_client, instance_ids=instance_ids, command=server_start_commend)
  107. # 3. 异步探活
  108. global health_instances
  109. health_instances = []
  110. loop = asyncio.get_running_loop()
  111. executor = ThreadPoolExecutor(max_workers=max_workers)
  112. tasks = [
  113. loop.run_in_executor(executor, server_health_check, *args) for args in
  114. [(slb_client, instance_id) for instance_id in instance_ids]
  115. ]
  116. await asyncio.wait(tasks)
  117. logging.info(f"health instances count: {len(health_instances)}, {health_instances}")
  118. # 4. 挂载流量
  119. if len(health_instances) == len(instance_ids):
  120. # 所有机器探活成功
  121. time.sleep(60)
  122. add_weight_list = [(10, 30), (20, 20), (40, 10), (60, 10), (80, 10), (100, 10)]
  123. set_instance_weight_process(client=slb_client, instance_id_list=instance_ids, weight_list=add_weight_list)
  124. logging.info(f"add weight instances count: {len(health_instances)}")
  125. else:
  126. logging.info(f"instances count: {len(instance_ids)},\nhealth instances count: {len(health_instances)}")
  127. sys.exit()
  128. async def ess_instance(create_client, slb_client, ess_count, max_workers):
  129. """
  130. 扩容机器并运行新服务
  131. :param create_client: 购买机器客户端连接
  132. :param slb_client: 修改负载均衡权限
  133. :param ess_count: 扩容数量
  134. :param max_workers: 线程数
  135. :return:
  136. """
  137. # 1. 购买机器并启动
  138. ess_instance_ids = utils.create_multiple_instances(
  139. amount=ess_count,
  140. client=create_client,
  141. **instance_config,
  142. )
  143. time.sleep(60)
  144. # 2. 发送启动脚本到机器上
  145. utils.send_file_to_ecs(client=create_client, instance_id_list=ess_instance_ids, **start_sh)
  146. logging.info(f"send start shell file finished, instances: {ess_instance_ids}")
  147. # 3. 启动服务
  148. server_start_sh = os.path.join(start_sh['target_dir'], start_sh['name'])
  149. server_start_commend = f"sh {server_start_sh}"
  150. utils.run_command(client=create_client, instance_ids=ess_instance_ids, command=server_start_commend)
  151. # 4. 异步探活
  152. global health_instances
  153. health_instances = []
  154. loop = asyncio.get_running_loop()
  155. executor = ThreadPoolExecutor(max_workers=max_workers)
  156. tasks = [
  157. loop.run_in_executor(executor, server_health_check, *args) for args in
  158. [(slb_client, instance_id) for instance_id in ess_instance_ids]
  159. ]
  160. await asyncio.wait(tasks)
  161. logging.info(f"health instances count: {len(health_instances)}, {health_instances}")
  162. # 5. 挂载流量
  163. if len(health_instances) == len(ess_instance_ids):
  164. # 所有机器探活成功
  165. time.sleep(60)
  166. add_weight_list = [(10, 30), (20, 20), (40, 10), (60, 10), (80, 10), (100, 10)]
  167. set_instance_weight_process(client=slb_client, instance_id_list=ess_instance_ids, weight_list=add_weight_list)
  168. global ess_instances
  169. ess_instances.extend(ess_instance_ids)
  170. logging.info(f"ess count: {ess_count}, "
  171. f"create count: {len(ess_instance_ids)}, "
  172. f"finished count: {len(health_instances)}")
  173. else:
  174. logging.info(f"ess count: {ess_count}, "
  175. f"create count: {len(ess_instance_ids)}, "
  176. f"health count: {len(health_instances)}")
  177. sys.exit()
  178. def remove_container_image(client, instance_id, container_name):
  179. """
  180. 移除旧容器并删除旧镜像
  181. :param client: 客户端连接
  182. :param instance_id: instanceId type-string
  183. :param container_name: 容器名称 type-string
  184. :return:
  185. """
  186. ip_address = utils.get_ip_address(client=client, instance_id=instance_id)
  187. logging.info(f"服务器信息:{instance_id}/{ip_address}")
  188. client = docker.DockerClient(base_url=f'tcp://{ip_address}:2375', timeout=60)
  189. # 移除旧的容器
  190. container_remove_retry = 3
  191. i = 0
  192. while i < container_remove_retry:
  193. try:
  194. container_id = client.containers.get(container_name)
  195. container_id.remove(force=True)
  196. break
  197. except Exception as e:
  198. i += 1
  199. print("容器不存在或者无法删除当前容器")
  200. # 删除旧镜像
  201. images_remove_retry = 3
  202. j = 0
  203. while j < images_remove_retry:
  204. try:
  205. images = client.images.list()
  206. for image in images:
  207. client.images.remove(force=True, image=image.tags[0])
  208. time.sleep(2)
  209. except Exception as e:
  210. i += 1
  211. print("镜像不存在,无法获取到镜像ID")
  212. async def update_instance(create_client, slb_client, instance_ids, max_workers):
  213. """
  214. 线上机器更新
  215. :param create_client:
  216. :param slb_client: slb客户端连接
  217. :param instance_ids: instanceId type-list
  218. :param max_workers:
  219. :return:
  220. """
  221. media_index = len(instance_ids)//2
  222. instance_ids_group = [instance_ids[:media_index], instance_ids[media_index:]]
  223. for instance_id_list in instance_ids_group:
  224. # 1. 摘流量
  225. set_instance_weight_process(client=slb_client, instance_id_list=instance_id_list, weight_list=[(0, 60)])
  226. logging.info(f"set weight with 0 finished, instances: {instance_id_list}")
  227. # 2. 异步移除旧容器并删除旧镜像
  228. container_name = 'rov-server'
  229. loop = asyncio.get_running_loop()
  230. executor = ThreadPoolExecutor(max_workers=max_workers)
  231. tasks = [
  232. loop.run_in_executor(executor, remove_container_image, *args) for args in
  233. [(slb_client, instance_id, container_name) for instance_id in instance_id_list]
  234. ]
  235. await asyncio.wait(tasks)
  236. logging.info(f"remove container & images finished, instances: {instance_id_list}")
  237. # 3. 发送启动脚本到机器上
  238. utils.send_file_to_ecs(client=create_client, instance_id_list=instance_id_list, **start_sh)
  239. logging.info(f"send start shell file finished, instances: {instance_id_list}")
  240. # 4. 启动服务
  241. server_start_sh = os.path.join(start_sh['target_dir'], start_sh['name'])
  242. server_start_commend = f"sh {server_start_sh}"
  243. utils.run_command(client=create_client, instance_ids=instance_id_list, command=server_start_commend)
  244. # 5. 异步探活
  245. global health_instances
  246. health_instances = []
  247. loop = asyncio.get_running_loop()
  248. executor = ThreadPoolExecutor(max_workers=max_workers)
  249. tasks = [
  250. loop.run_in_executor(executor, server_health_check, *args) for args in
  251. [(slb_client, instance_id) for instance_id in instance_id_list]
  252. ]
  253. await asyncio.wait(tasks)
  254. logging.info(f"health instances count: {len(health_instances)}, {health_instances}")
  255. # 6. 挂载流量
  256. if len(health_instances) == len(instance_id_list):
  257. # 所有机器探活成功
  258. time.sleep(60)
  259. add_weight_list = [(10, 30), (20, 20), (40, 10), (60, 10), (80, 10), (100, 10)]
  260. set_instance_weight_process(client=slb_client, instance_id_list=instance_id_list,
  261. weight_list=add_weight_list)
  262. logging.info(f"finished count: {len(health_instances)}")
  263. else:
  264. logging.info(f"health count: {len(health_instances)}")
  265. sys.exit()
  266. def remove_instances(create_client, slb_client, instance_ids):
  267. """
  268. 停止并释放机器
  269. :param create_client:
  270. :param slb_client:
  271. :param instance_ids: instanceId type-list
  272. :return: None
  273. """
  274. # 1. 摘流量
  275. set_instance_weight_process(client=slb_client, instance_id_list=instance_ids, weight_list=[(0, 60)])
  276. logging.info(f"set weight = 0 finished, instances: {instance_ids}")
  277. time.sleep(10)
  278. # 2. 停止机器
  279. utils.stop_instances(client=create_client, instance_ids=instance_ids)
  280. logging.info(f"instances stop finished, instances: {instance_ids}")
  281. # 3. 判断机器运行状态是否为Stopped
  282. response = utils.get_instances_status(client=create_client, instance_ids=instance_ids)
  283. if response.get('Code') is None:
  284. instances_list = response.get('InstanceStatuses').get('InstanceStatus')
  285. stopped_instances = [instance.get('InstanceId') for instance in instances_list
  286. if instance.get('Status') == 'Stopped']
  287. if len(stopped_instances) == len(instance_ids):
  288. logging.info(f"instances stopped status set success, instances: {stopped_instances}")
  289. else:
  290. logging.info(f"stopped instances count = {len(instance_ids)}, instances: {stopped_instances}")
  291. sys.exit()
  292. else:
  293. logging.error(response)
  294. sys.exit()
  295. # 4. 释放机器
  296. response = utils.release_instances(client=create_client, instance_ids=stopped_instances)
  297. if response.get('Code') is None:
  298. logging.info(f"release instances finished, instances: {stopped_instances}")
  299. else:
  300. logging.error(f"release instances fail!!!")
  301. sys.exit()
  302. def main():
  303. slb_client = utils.connect_client(access_key_id=slb_client_params['access_key_id'],
  304. access_key_secret=slb_client_params['access_key_secret'],
  305. region_id=slb_client_params['region_id'])
  306. create_client = utils.connect_client(access_key_id=create_client_params['access_key_id'],
  307. access_key_secret=create_client_params['access_key_secret'],
  308. region_id=create_client_params['region_id'])
  309. # 1. 获取slb下所有机器
  310. online_instance_ids = utils.get_instance_ids(client=slb_client, slb_id=slb_id)
  311. online_instance_count = len(online_instance_ids)
  312. logging.info(f"online instance count: {online_instance_count}.")
  313. logging.info(f"online instance ids: {online_instance_ids}")
  314. # 2. 扩容机器并启动新服务 扩容数量:线上机器数量/2
  315. logging.info(f"ess instances start ...")
  316. ess_instance_count = online_instance_count / 2
  317. logging.info(f"ess instance count: {ess_instance_count}")
  318. asyncio.run(ess_instance(create_client=create_client, slb_client=slb_client,
  319. ess_count=ess_instance_count, max_workers=2))
  320. logging.info(f"ess instances end!")
  321. # 3. 原有机器进行更新
  322. logging.info(f"update online instances start ...")
  323. asyncio.run(update_instance(create_client=create_client, slb_client=slb_client,
  324. instance_ids=online_instance_ids, max_workers=2))
  325. logging.info(f"update online instances end!")
  326. # 4. 停止并释放扩容机器
  327. logging.info(f"stop & release instances start ...")
  328. remove_instances(create_client=create_client, slb_client=slb_client, instance_ids=ess_instances)
  329. logging.info(f"stop & release instances end!")
  330. if __name__ == '__main__':
  331. main()