rov_server_update.py 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361
  1. import asyncio
  2. import sys
  3. import time
  4. import requests
  5. import utils
  6. import logging
  7. import os
  8. import docker
  9. from concurrent.futures import ThreadPoolExecutor
  10. health_instances = []
  11. ess_instances = []
  12. slb_id = 'lb-bp1werfophtsjzfr76njm'
  13. # 修改负载均衡权限
  14. slb_client_params = {
  15. 'access_key_id': 'LTAIuPbTPL3LDDKN',
  16. 'access_key_secret': 'ORcNedKwWuwVtcq4IRFtUDZgS0b1le',
  17. 'region_id': 'cn-hangzhou'
  18. }
  19. # 购买机器权限
  20. create_client_params = {
  21. 'access_key_id': 'LTAI4GBWbFvvXoXsSVBe1o9f',
  22. 'access_key_secret': 'kRAikWitb4kDxaAyBqNrmLmllMEDO3',
  23. 'region_id': 'cn-hangzhou'
  24. }
  25. # 机器配置
  26. instance_config = {
  27. # 使用的镜像信息
  28. 'image_id': 'm-bp1e5jx8eqhq22l91xw7',
  29. # 设置实例规格
  30. 'instance_type': 'ecs.ic5.xlarge',
  31. # 选择的交换机
  32. 'vswitch_id': 'vsw-bp19lpjwtc6j0p0m9mdc2',
  33. # 当前VPC类型的安全组
  34. 'security_group_id': 'sg-bp1irhrkr4vfj272hk4y',
  35. # 硬盘的大小,单位:G
  36. 'disk_size': '200',
  37. # 服务器命名
  38. 'instance_name': 'ESS-rov-server-[1,2]',
  39. # 服务器所在区域
  40. 'zone_id': 'cn-hangzhou-h',
  41. # 磁盘类型:云盘
  42. 'disk_category': 'cloud_efficiency',
  43. # 密钥
  44. 'key_pair_name': 'stuuudy'
  45. }
  46. # 服务启动脚本
  47. start_sh_dir = os.path.dirname(os.path.realpath(__file__))
  48. start_sh_filename = 'rov_server_start.sh'
  49. with open(file=os.path.join(start_sh_dir, start_sh_filename), mode='r', encoding='utf-8') as rf:
  50. file_content = rf.read()
  51. start_sh = {
  52. 'target_dir': '/home/piaoquan_server_sh',
  53. 'name': start_sh_filename,
  54. 'content': file_content,
  55. }
  56. def server_health_check(client, instance_id):
  57. """
  58. 服务健康检查
  59. :param client: 客户端连接
  60. :param instance_id: instanceId
  61. :return:
  62. """
  63. global health_instances
  64. ip_address = utils.get_ip_address(client=client, instance_id=instance_id)
  65. while True:
  66. health_check_url = f"http://{ip_address}:5001/healthcheck"
  67. try:
  68. http_code = requests.get(health_check_url).status_code
  69. except:
  70. logging.info("images is downloading")
  71. http_code = 0
  72. if http_code == 200:
  73. health_instances.append((instance_id, ip_address))
  74. break
  75. else:
  76. # time.sleep(20)
  77. await asyncio.sleep(10)
  78. def set_instance_weight_process(client, instance_id_list, weight_list):
  79. """
  80. 修改服务器的权重值
  81. :param client: slb客户端连接
  82. :param instance_id_list: instance id list
  83. :param weight_list: 权重修改列表 type-list [(weight, sleep_time), ...]
  84. :return:
  85. """
  86. for weight, sleep_time in weight_list:
  87. flag = True
  88. while flag:
  89. try:
  90. utils.set_weight_for_instances(client=client,
  91. slb_id=slb_id,
  92. instance_id_list=instance_id_list,
  93. weight=weight)
  94. time.sleep(sleep_time)
  95. flag = False
  96. except Exception as e:
  97. time.sleep(10)
  98. continue
  99. async def run_server(create_client, slb_client, instance_ids, max_workers):
  100. # 1. 发送启动脚本到机器上
  101. utils.send_file_to_ecs(client=create_client, instance_id_list=instance_ids, **start_sh)
  102. logging.info(f"send start shell file finished, instances: {instance_ids}")
  103. # 2. 启动服务
  104. server_start_sh = os.path.join(start_sh['target_dir'], start_sh['name'])
  105. server_start_commend = f"sh {server_start_sh}"
  106. utils.run_command(client=create_client, instance_ids=instance_ids, command=server_start_commend)
  107. # 3. 异步探活
  108. global health_instances
  109. health_instances = []
  110. loop = asyncio.get_running_loop()
  111. executor = ThreadPoolExecutor(max_workers=max_workers)
  112. tasks = [
  113. loop.run_in_executor(executor, server_health_check, *args) for args in
  114. [(slb_client, instance_id) for instance_id in instance_ids]
  115. ]
  116. await asyncio.wait(tasks)
  117. logging.info(f"health instances count: {len(health_instances)}, {health_instances}")
  118. # 4. 挂载流量
  119. if len(health_instances) == len(instance_ids):
  120. # 所有机器探活成功
  121. time.sleep(60)
  122. add_weight_list = [(10, 30), (20, 20), (40, 10), (60, 10), (80, 10), (100, 10)]
  123. set_instance_weight_process(client=slb_client, instance_id_list=instance_ids, weight_list=add_weight_list)
  124. logging.info(f"add weight instances count: {len(health_instances)}")
  125. else:
  126. logging.info(f"instances count: {len(instance_ids)},\nhealth instances count: {len(health_instances)}")
  127. sys.exit()
  128. async def ess_instance(create_client, slb_client, ess_count, max_workers):
  129. """
  130. 扩容机器并运行新服务
  131. :param create_client: 购买机器客户端连接
  132. :param slb_client: 修改负载均衡权限
  133. :param ess_count: 扩容数量
  134. :param max_workers: 线程数
  135. :return:
  136. """
  137. # 1. 购买机器并启动
  138. ess_instance_ids = utils.create_multiple_instances(
  139. amount=ess_count,
  140. client=create_client,
  141. **instance_config,
  142. )
  143. time.sleep(60)
  144. # 2. 发送启动脚本到机器上
  145. utils.send_file_to_ecs(client=create_client, instance_id_list=ess_instance_ids, **start_sh)
  146. logging.info(f"send start shell file finished, instances: {ess_instance_ids}")
  147. # 3. 启动服务
  148. server_start_sh = os.path.join(start_sh['target_dir'], start_sh['name'])
  149. server_start_commend = f"sh {server_start_sh}"
  150. utils.run_command(client=create_client, instance_ids=ess_instance_ids, command=server_start_commend)
  151. # 4. 异步探活
  152. global health_instances
  153. health_instances = []
  154. loop = asyncio.get_running_loop()
  155. executor = ThreadPoolExecutor(max_workers=max_workers)
  156. tasks = [
  157. loop.run_in_executor(executor, server_health_check, *args) for args in
  158. [(slb_client, instance_id) for instance_id in ess_instance_ids]
  159. ]
  160. await asyncio.wait(tasks)
  161. logging.info(f"health instances count: {len(health_instances)}, {health_instances}")
  162. # 5. 挂载流量
  163. if len(health_instances) == len(ess_instance_ids):
  164. # 所有机器探活成功
  165. time.sleep(60)
  166. add_weight_list = [(10, 30), (20, 20), (40, 10), (60, 10), (80, 10), (100, 10)]
  167. set_instance_weight_process(client=slb_client, instance_id_list=ess_instance_ids, weight_list=add_weight_list)
  168. global ess_instances
  169. ess_instances.extend(ess_instance_ids)
  170. logging.info(f"ess count: {ess_count}, "
  171. f"create count: {len(ess_instance_ids)}, "
  172. f"finished count: {len(health_instances)}")
  173. else:
  174. logging.info(f"ess count: {ess_count}, "
  175. f"create count: {len(ess_instance_ids)}, "
  176. f"health count: {len(health_instances)}")
  177. sys.exit()
  178. def remove_container_image(client, instance_id, container_name):
  179. """
  180. 移除旧容器并删除旧镜像
  181. :param client: 客户端连接
  182. :param instance_id: instanceId type-string
  183. :param container_name: 容器名称 type-string
  184. :return:
  185. """
  186. ip_address = utils.get_ip_address(client=client, instance_id=instance_id)
  187. logging.info(f"服务器信息:{instance_id}/{ip_address}")
  188. client = docker.DockerClient(base_url=f'tcp://{ip_address}:2375', timeout=60)
  189. # 移除旧的容器
  190. container_remove_retry = 3
  191. i = 0
  192. while i < container_remove_retry:
  193. try:
  194. container_id = client.containers.get(container_name)
  195. container_id.remove(force=True)
  196. break
  197. except Exception as e:
  198. i += 1
  199. print("容器不存在或者无法删除当前容器")
  200. # 删除旧镜像
  201. images_remove_retry = 3
  202. j = 0
  203. while j < images_remove_retry:
  204. try:
  205. images = client.images.list()
  206. for image in images:
  207. client.images.remove(force=True, image=image.tags[0])
  208. time.sleep(2)
  209. except Exception as e:
  210. i += 1
  211. print("镜像不存在,无法获取到镜像ID")
  212. async def update_instance(create_client, slb_client, instance_ids, max_workers):
  213. """
  214. 线上机器更新
  215. :param create_client:
  216. :param slb_client: slb客户端连接
  217. :param instance_ids: instanceId type-list
  218. :param max_workers:
  219. :return:
  220. """
  221. media_index = len(instance_ids)//2
  222. instance_ids_group = [instance_ids[:media_index], instance_ids[media_index:]]
  223. for instance_id_list in instance_ids_group:
  224. # 1. 摘流量
  225. set_instance_weight_process(client=slb_client, instance_id_list=instance_id_list, weight_list=[(0, 60)])
  226. logging.info(f"set weight with 0 finished, instances: {instance_id_list}")
  227. # 2. 异步移除旧容器并删除旧镜像
  228. container_name = 'rov-server'
  229. loop = asyncio.get_running_loop()
  230. executor = ThreadPoolExecutor(max_workers=max_workers)
  231. tasks = [
  232. loop.run_in_executor(executor, remove_container_image, *args) for args in
  233. [(slb_client, instance_id, container_name) for instance_id in instance_id_list]
  234. ]
  235. await asyncio.wait(tasks)
  236. logging.info(f"remove container & images finished, instances: {instance_id_list}")
  237. # 3. 发送启动脚本到机器上
  238. utils.send_file_to_ecs(client=create_client, instance_id_list=instance_id_list, **start_sh)
  239. logging.info(f"send start shell file finished, instances: {instance_id_list}")
  240. # 4. 启动服务
  241. server_start_sh = os.path.join(start_sh['target_dir'], start_sh['name'])
  242. server_start_commend = f"sh {server_start_sh}"
  243. utils.run_command(client=create_client, instance_ids=instance_id_list, command=server_start_commend)
  244. # 5. 异步探活
  245. global health_instances
  246. health_instances = []
  247. loop = asyncio.get_running_loop()
  248. executor = ThreadPoolExecutor(max_workers=max_workers)
  249. tasks = [
  250. loop.run_in_executor(executor, server_health_check, *args) for args in
  251. [(slb_client, instance_id) for instance_id in instance_id_list]
  252. ]
  253. await asyncio.wait(tasks)
  254. logging.info(f"health instances count: {len(health_instances)}, {health_instances}")
  255. # 6. 挂载流量
  256. if len(health_instances) == len(instance_id_list):
  257. # 所有机器探活成功
  258. time.sleep(60)
  259. add_weight_list = [(10, 30), (20, 20), (40, 10), (60, 10), (80, 10), (100, 10)]
  260. set_instance_weight_process(client=slb_client, instance_id_list=instance_id_list,
  261. weight_list=add_weight_list)
  262. logging.info(f"finished count: {len(health_instances)}")
  263. else:
  264. logging.info(f"health count: {len(health_instances)}")
  265. sys.exit()
  266. def remove_instances(create_client, slb_client, instance_ids):
  267. """
  268. 停止并释放机器
  269. :param create_client:
  270. :param slb_client:
  271. :param instance_ids: instanceId type-list
  272. :return: None
  273. """
  274. # 1. 摘流量
  275. set_instance_weight_process(client=slb_client, instance_id_list=instance_ids, weight_list=[(0, 60)])
  276. logging.info(f"set weight = 0 finished, instances: {instance_ids}")
  277. time.sleep(10)
  278. # 2. 停止机器
  279. utils.stop_instances(client=create_client, instance_ids=instance_ids)
  280. logging.info(f"instances stop finished, instances: {instance_ids}")
  281. # 3. 判断机器运行状态是否为Stopped
  282. response = utils.get_instances_status(client=create_client, instance_ids=instance_ids)
  283. if response.get('Code') is None:
  284. instances_list = response.get('InstanceStatuses').get('InstanceStatus')
  285. stopped_instances = [instance.get('InstanceId') for instance in instances_list
  286. if instance.get('Status') == 'Stopped']
  287. if len(stopped_instances) == len(instance_ids):
  288. logging.info(f"instances stopped status set success, instances: {stopped_instances}")
  289. else:
  290. logging.info(f"stopped instances count = {len(instance_ids)}, instances: {stopped_instances}")
  291. sys.exit()
  292. else:
  293. logging.error(response)
  294. sys.exit()
  295. # 4. 释放机器
  296. response = utils.release_instances(client=create_client, instance_ids=stopped_instances)
  297. if response.get('Code') is None:
  298. logging.info(f"release instances finished, instances: {stopped_instances}")
  299. else:
  300. logging.error(f"release instances fail!!!")
  301. sys.exit()
  302. def main():
  303. slb_client = utils.connect_client(access_key_id=slb_client_params['access_key_id'],
  304. access_key_secret=slb_client_params['access_key_secret'],
  305. region_id=slb_client_params['region_id'])
  306. create_client = utils.connect_client(access_key_id=create_client_params['access_key_id'],
  307. access_key_secret=create_client_params['access_key_secret'],
  308. region_id=create_client_params['region_id'])
  309. # 1. 获取slb下所有机器
  310. online_instance_ids = utils.get_instance_ids(client=slb_client, slb_id=slb_id)
  311. online_instance_count = len(online_instance_ids)
  312. logging.info(f"online instance count: {online_instance_count}.")
  313. logging.info(f"online instance ids: {online_instance_ids}")
  314. # 2. 扩容机器并启动新服务 扩容数量:线上机器数量/2
  315. logging.info(f"ess instances start ...")
  316. ess_instance_count = online_instance_count / 2
  317. logging.info(f"ess instance count: {ess_instance_count}")
  318. asyncio.run(ess_instance(create_client=create_client, slb_client=slb_client,
  319. ess_count=ess_instance_count, max_workers=2))
  320. logging.info(f"ess instances end!")
  321. # 3. 原有机器进行更新
  322. logging.info(f"update online instances start ...")
  323. asyncio.run(update_instance(create_client=create_client, slb_client=slb_client,
  324. instance_ids=online_instance_ids, max_workers=2))
  325. logging.info(f"update online instances end!")
  326. # 4. 停止并释放扩容机器
  327. logging.info(f"stop & release instances start ...")
  328. remove_instances(create_client=create_client, slb_client=slb_client, instance_ids=ess_instances)
  329. logging.info(f"stop & release instances end!")
  330. if __name__ == '__main__':
  331. main()