rov_server_update.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305
  1. import asyncio
  2. import sys
  3. import time
  4. import requests
  5. import utils
  6. import logging
  7. import os
  8. import docker
  9. from concurrent.futures import ThreadPoolExecutor
  10. health_instances = []
  11. slb_id = 'lb-bp1werfophtsjzfr76njm'
  12. # 修改负载均衡权限
  13. slb_client_params = {
  14. 'access_key_id': 'LTAIuPbTPL3LDDKN',
  15. 'access_key_secret': 'ORcNedKwWuwVtcq4IRFtUDZgS0b1le',
  16. 'region_id': 'cn-hangzhou'
  17. }
  18. # 购买机器权限
  19. create_client_params = {
  20. 'access_key_id': 'LTAI4GBWbFvvXoXsSVBe1o9f',
  21. 'access_key_secret': 'kRAikWitb4kDxaAyBqNrmLmllMEDO3',
  22. 'region_id': 'cn-hangzhou'
  23. }
  24. # 机器配置
  25. instance_config = {
  26. # 使用的镜像信息
  27. 'image_id': 'm-bp1e5jx8eqhq22l91xw7',
  28. # 设置实例规格
  29. 'instance_type': 'ecs.ic5.xlarge',
  30. # 选择的交换机
  31. 'vswitch_id': 'vsw-bp19lpjwtc6j0p0m9mdc2',
  32. # 当前VPC类型的安全组
  33. 'security_group_id': 'sg-bp1irhrkr4vfj272hk4y',
  34. # 硬盘的大小,单位:G
  35. 'disk_size': '200',
  36. # 服务器命名
  37. 'instance_name': 'ESS-rov-server-[1,2]',
  38. # 服务器所在区域
  39. 'zone_id': 'cn-hangzhou-h',
  40. # 磁盘类型:云盘
  41. 'disk_category': 'cloud_efficiency',
  42. # 密钥
  43. 'key_pair_name': 'stuuudy'
  44. }
  45. # 服务启动脚本
  46. start_sh_dir = os.path.dirname(os.path.realpath(__file__))
  47. start_sh_filename = 'rov_server_start.sh'
  48. with open(file=os.path.join(start_sh_dir, start_sh_filename), mode='r', encoding='utf-8') as rf:
  49. file_content = rf.read()
  50. start_sh = {
  51. 'target_dir': '/home/piaoquan_server_sh',
  52. 'name': start_sh_filename,
  53. 'content': file_content,
  54. }
  55. def server_health_check(client, instance_id):
  56. """
  57. 服务健康检查
  58. :param client: 客户端连接
  59. :param instance_id: instanceId
  60. :return:
  61. """
  62. global health_instances
  63. ip_address = utils.get_ip_address(client=client, instance_id=instance_id)
  64. while True:
  65. health_check_url = f"http://{ip_address}:5001/healthcheck"
  66. try:
  67. http_code = requests.get(health_check_url).status_code
  68. except:
  69. logging.info("images is downloading")
  70. http_code = 0
  71. if http_code == 200:
  72. health_instances.append((instance_id, ip_address))
  73. break
  74. else:
  75. # time.sleep(20)
  76. await asyncio.sleep(10)
  77. def set_instance_weight_process(client, instance_id_list, weight_list):
  78. """
  79. 修改服务器的权重值
  80. :param client: slb客户端连接
  81. :param instance_id_list: instance id list
  82. :param weight_list: 权重修改列表 type-list [(weight, sleep_time), ...]
  83. :return:
  84. """
  85. for weight, sleep_time in weight_list:
  86. flag = True
  87. while flag:
  88. try:
  89. utils.set_weight_for_instances(client=client,
  90. slb_id=slb_id,
  91. instance_id_list=instance_id_list,
  92. weight=weight)
  93. time.sleep(sleep_time)
  94. flag = False
  95. except Exception as e:
  96. time.sleep(10)
  97. continue
  98. async def run_server(create_client, slb_client, instance_ids, max_workers):
  99. # 1. 发送启动脚本到机器上
  100. utils.send_file_to_ecs(client=create_client, instance_id_list=instance_ids, **start_sh)
  101. logging.info(f"send start shell file finished, instances: {instance_ids}")
  102. # 2. 启动服务
  103. server_start_sh = os.path.join(start_sh['target_dir'], start_sh['name'])
  104. server_start_commend = f"sh {server_start_sh}"
  105. utils.run_command(client=create_client, instance_ids=instance_ids, command=server_start_commend)
  106. # 3. 异步探活
  107. global health_instances
  108. health_instances = []
  109. loop = asyncio.get_running_loop()
  110. executor = ThreadPoolExecutor(max_workers=max_workers)
  111. tasks = [
  112. loop.run_in_executor(executor, server_health_check, *args) for args in
  113. [(slb_client, instance_id) for instance_id in instance_ids]
  114. ]
  115. await asyncio.wait(tasks)
  116. logging.info(f"health instances count: {len(health_instances)}, {health_instances}")
  117. # 4. 挂载流量
  118. if len(health_instances) == len(instance_ids):
  119. # 所有机器探活成功
  120. time.sleep(60)
  121. add_weight_list = [(10, 30), (20, 20), (40, 10), (60, 10), (80, 10), (100, 10)]
  122. set_instance_weight_process(client=slb_client, instance_id_list=instance_ids, weight_list=add_weight_list)
  123. logging.info(f"add weight instances count: {len(health_instances)}")
  124. else:
  125. logging.info(f"instances count: {len(instance_ids)},\nhealth instances count: {len(health_instances)}")
  126. sys.exit()
  127. async def ess_instance(create_client, slb_client, ess_count, max_workers):
  128. """
  129. 扩容机器并运行新服务
  130. :param create_client: 购买机器客户端连接
  131. :param slb_client: 修改负载均衡权限
  132. :param ess_count: 扩容数量
  133. :param max_workers: 线程数
  134. :return:
  135. """
  136. # 1. 购买机器并启动
  137. ess_instance_ids = utils.create_multiple_instances(
  138. amount=ess_count,
  139. client=create_client,
  140. **instance_config,
  141. )
  142. time.sleep(60)
  143. # 2. 发送启动脚本到机器上
  144. utils.send_file_to_ecs(client=create_client, instance_id_list=ess_instance_ids, **start_sh)
  145. logging.info(f"send start shell file finished, instances: {ess_instance_ids}")
  146. # 3. 启动服务
  147. server_start_sh = os.path.join(start_sh['target_dir'], start_sh['name'])
  148. server_start_commend = f"sh {server_start_sh}"
  149. utils.run_command(client=create_client, instance_ids=ess_instance_ids, command=server_start_commend)
  150. # 4. 异步探活
  151. global health_instances
  152. health_instances = []
  153. loop = asyncio.get_running_loop()
  154. executor = ThreadPoolExecutor(max_workers=max_workers)
  155. tasks = [
  156. loop.run_in_executor(executor, server_health_check, *args) for args in
  157. [(slb_client, instance_id) for instance_id in ess_instance_ids]
  158. ]
  159. await asyncio.wait(tasks)
  160. logging.info(f"health instances count: {len(health_instances)}, {health_instances}")
  161. # 5. 挂载流量
  162. if len(health_instances) == len(ess_instance_ids):
  163. # 所有机器探活成功
  164. time.sleep(60)
  165. add_weight_list = [(10, 30), (20, 20), (40, 10), (60, 10), (80, 10), (100, 10)]
  166. set_instance_weight_process(client=slb_client, instance_id_list=ess_instance_ids, weight_list=add_weight_list)
  167. logging.info(f"ess count: {ess_count}, "
  168. f"create count: {len(ess_instance_ids)}, "
  169. f"finished count: {len(health_instances)}")
  170. else:
  171. logging.info(f"ess count: {ess_count}, "
  172. f"create count: {len(ess_instance_ids)}, "
  173. f"health count: {len(health_instances)}")
  174. sys.exit()
  175. def remove_container_image(client, instance_id, container_name):
  176. """
  177. 移除旧容器并删除旧镜像
  178. :param client: 客户端连接
  179. :param instance_id: instanceId type-string
  180. :param container_name: 容器名称 type-string
  181. :return:
  182. """
  183. ip_address = utils.get_ip_address(client=client, instance_id=instance_id)
  184. logging.info(f"服务器信息:{instance_id}/{ip_address}")
  185. client = docker.DockerClient(base_url=f'tcp://{ip_address}:2375', timeout=60)
  186. # 移除旧的容器
  187. container_remove_retry = 3
  188. i = 0
  189. while i < container_remove_retry:
  190. try:
  191. container_id = client.containers.get(container_name)
  192. container_id.remove(force=True)
  193. break
  194. except Exception as e:
  195. i += 1
  196. print("容器不存在或者无法删除当前容器")
  197. # 删除旧镜像
  198. images_remove_retry = 3
  199. j = 0
  200. while j < images_remove_retry:
  201. try:
  202. images = client.images.list()
  203. for image in images:
  204. client.images.remove(force=True, image=image.tags[0])
  205. time.sleep(2)
  206. except Exception as e:
  207. i += 1
  208. print("镜像不存在,无法获取到镜像ID")
  209. async def update_instance(create_client, slb_client, instance_ids, max_workers):
  210. """
  211. 线上机器更新
  212. :param create_client:
  213. :param slb_client: slb客户端连接
  214. :param instance_ids: instanceId type-list
  215. :param max_workers:
  216. :return:
  217. """
  218. media_index = len(instance_ids)//2
  219. instance_ids_group = [instance_ids[:media_index], instance_ids[media_index:]]
  220. for instance_id_list in instance_ids_group:
  221. # 1. 摘流量
  222. set_instance_weight_process(client=slb_client, instance_id_list=instance_id_list, weight_list=[(0, 60)])
  223. logging.info(f"set weight with 0 finished, instances: {instance_id_list}")
  224. # 2. 异步移除旧容器并删除旧镜像
  225. container_name = 'rov-server'
  226. loop = asyncio.get_running_loop()
  227. executor = ThreadPoolExecutor(max_workers=max_workers)
  228. tasks = [
  229. loop.run_in_executor(executor, remove_container_image, *args) for args in
  230. [(slb_client, instance_id, container_name) for instance_id in instance_id_list]
  231. ]
  232. await asyncio.wait(tasks)
  233. logging.info(f"remove container & images finished, instances: {instance_id_list}")
  234. # 3. 发送启动脚本到机器上
  235. utils.send_file_to_ecs(client=create_client, instance_id_list=instance_id_list, **start_sh)
  236. logging.info(f"send start shell file finished, instances: {instance_id_list}")
  237. # 4. 启动服务
  238. server_start_sh = os.path.join(start_sh['target_dir'], start_sh['name'])
  239. server_start_commend = f"sh {server_start_sh}"
  240. utils.run_command(client=create_client, instance_ids=instance_id_list, command=server_start_commend)
  241. # 5. 异步探活
  242. global health_instances
  243. health_instances = []
  244. loop = asyncio.get_running_loop()
  245. executor = ThreadPoolExecutor(max_workers=max_workers)
  246. tasks = [
  247. loop.run_in_executor(executor, server_health_check, *args) for args in
  248. [(slb_client, instance_id) for instance_id in instance_id_list]
  249. ]
  250. await asyncio.wait(tasks)
  251. logging.info(f"health instances count: {len(health_instances)}, {health_instances}")
  252. # 6. 挂载流量
  253. if len(health_instances) == len(instance_id_list):
  254. # 所有机器探活成功
  255. time.sleep(60)
  256. add_weight_list = [(10, 30), (20, 20), (40, 10), (60, 10), (80, 10), (100, 10)]
  257. set_instance_weight_process(client=slb_client, instance_id_list=instance_id_list,
  258. weight_list=add_weight_list)
  259. logging.info(f"finished count: {len(health_instances)}")
  260. else:
  261. logging.info(f"health count: {len(health_instances)}")
  262. sys.exit()
  263. def main():
  264. slb_client = utils.connect_client(access_key_id=slb_client_params['access_key_id'],
  265. access_key_secret=slb_client_params['access_key_secret'],
  266. region_id=slb_client_params['region_id'])
  267. create_client = utils.connect_client(access_key_id=create_client_params['access_key_id'],
  268. access_key_secret=create_client_params['access_key_secret'],
  269. region_id=create_client_params['region_id'])
  270. # 1. 获取slb下所有机器
  271. online_instance_ids = utils.get_instance_ids(client=slb_client, slb_id=slb_id)
  272. online_instance_count = len(online_instance_ids)
  273. logging.info(f"online instance count: {online_instance_count}.")
  274. logging.info(f"online instance ids: {online_instance_ids}")
  275. # 2. 扩容机器并启动新服务 扩容数量:线上机器数量/2
  276. ess_instance_count = online_instance_count / 2
  277. logging.info(f"ess instance count: {ess_instance_count}")
  278. asyncio.run(ess_instance(create_client=create_client, slb_client=slb_client,
  279. ess_count=ess_instance_count, max_workers=2))
  280. # 3. 原有机器进行更新