liqian 2 vuotta sitten
vanhempi
commit
2155a8b6d8
1 muutettua tiedostoa jossa 160 lisäystä ja 10 poistoa
  1. 160 10
      rov_server_update.py

+ 160 - 10
rov_server_update.py

@@ -1,8 +1,16 @@
+import asyncio
+import sys
 import time
 import requests
 import utils
 import logging
 import os
+import docker
+
+from concurrent.futures import ThreadPoolExecutor
+
+
+health_instances = []
 
 slb_id = 'lb-bp1werfophtsjzfr76njm'
 # 修改负载均衡权限
@@ -52,16 +60,17 @@ start_sh = {
 }
 
 
-def server_health_check(instance_id, health_check_url):
+def server_health_check(client, instance_id):
     """
     服务健康检查
+    :param client: 客户端连接
     :param instance_id: instanceId
-    :param health_check_url: 服务健康检查url
     :return:
     """
-    ip_address = get_ip_address(client=client, instance_id=instance_id)
+    global health_instances
+    ip_address = utils.get_ip_address(client=client, instance_id=instance_id)
     while True:
-        health_url = f"http://{ip_address}:5001/healthcheck"
+        health_check_url = f"http://{ip_address}:5001/healthcheck"
         try:
             http_code = requests.get(health_check_url).status_code
         except:
@@ -69,18 +78,43 @@ def server_health_check(instance_id, health_check_url):
             http_code = 0
 
         if http_code == 200:
+            health_instances.append((instance_id, ip_address))
             break
         else:
-            time.sleep(20)
+            # time.sleep(20)
+            await asyncio.sleep(10)
 
 
+def set_instance_weight_process(client, instance_id_list, weight_list):
+    """
+    修改服务器的权重值
+    :param client: slb客户端连接
+    :param instance_id_list: instance id list
+    :param weight_list: 权重修改列表 type-list [(weight, sleep_time), ...]
+    :return:
+    """
+    for weight, sleep_time in weight_list:
+        flag = True
+        while flag:
+            try:
+                utils.set_weight_for_instances(client=client,
+                                               slb_id=slb_id,
+                                               instance_id_list=instance_id_list,
+                                               weight=weight)
+                time.sleep(sleep_time)
+                flag = False
+            except Exception as e:
+                time.sleep(10)
+                continue
+
 
-def ess_instance(create_client, slb_client, ess_count):
+async def ess_instance(create_client, slb_client, ess_count, max_workers):
     """
     扩容机器并运行新服务
     :param create_client: 购买机器客户端连接
     :param slb_client: 修改负载均衡权限
     :param ess_count: 扩容数量
+    :param max_workers: 线程数
     :return:
     """
     # 1. 购买机器并启动
@@ -96,8 +130,123 @@ def ess_instance(create_client, slb_client, ess_count):
     server_start_sh = os.path.join(start_sh['target_dir'], start_sh['name'])
     server_start_commend = f"sh {server_start_sh}"
     utils.run_command(client=create_client, instance_ids=ess_instance_ids, command=server_start_commend)
-    # 4. 探活
-    health_instance_ids = []
+    # 4. 异步探活
+    global health_instances
+    health_instances = []
+    loop = asyncio.get_running_loop()
+    executor = ThreadPoolExecutor(max_workers=max_workers)
+    tasks = [
+        loop.run_in_executor(executor, server_health_check, *args) for args in
+        [(slb_client, instance_id) for instance_id in ess_instance_ids]
+    ]
+    await asyncio.wait(tasks)
+    logging.info(f"health instances count: {len(health_instances)}, {health_instances}")
+    # 挂载流量
+    if len(health_instances) == len(ess_instance_ids):
+        # 所有机器探活成功
+        time.sleep(60)
+        add_weight_list = [(10, 30), (20, 20), (40, 10), (60, 10), (80, 10), (100, 10)]
+        set_instance_weight_process(client=slb_client, instance_id_list=ess_instance_ids, weight_list=add_weight_list)
+        logging.info(f"ess count: {ess_count}, "
+                     f"create count: {len(ess_instance_ids)}, "
+                     f"finished count: {len(health_instances)}")
+    else:
+        logging.info(f"ess count: {ess_count}, "
+                     f"create count: {len(ess_instance_ids)}, "
+                     f"health count: {len(health_instances)}")
+        sys.exit()
+
+
+def remove_container_image(client, instance_id, container_name):
+    """
+    移除旧容器并删除旧镜像
+    :param client: 客户端连接
+    :param instance_id: instanceId type-string
+    :param container_name: 容器名称 type-string
+    :return:
+    """
+    ip_address = utils.get_ip_address(client=client, instance_id=instance_id)
+    logging.info(f"服务器信息:{instance_id}/{ip_address}")
+    client = docker.DockerClient(base_url=f'tcp://{ip_address}:2375', timeout=60)
+    # 移除旧的容器
+    container_remove_retry = 3
+    i = 0
+    while i < container_remove_retry:
+        try:
+            container_id = client.containers.get(container_name)
+            container_id.remove(force=True)
+            break
+        except Exception as e:
+            i += 1
+            print("容器不存在或者无法删除当前容器")
+    # 删除旧镜像
+    images_remove_retry = 3
+    j = 0
+    while j < images_remove_retry:
+        try:
+            images = client.images.list()
+            for image in images:
+                client.images.remove(force=True, image=image.tags[0])
+                time.sleep(2)
+        except Exception as e:
+            i += 1
+            print("镜像不存在,无法获取到镜像ID")
+
+
+async def update_instance(create_client, slb_client, instance_ids, max_workers):
+    """
+    线上机器更新
+    :param create_client:
+    :param slb_client: slb客户端连接
+    :param instance_ids: instanceId type-list
+    :param max_workers:
+    :return:
+    """
+    media_index = len(instance_ids)//2
+    instance_ids_group = [instance_ids[:media_index], instance_ids[media_index:]]
+    for instance_id_list in instance_ids_group:
+        # 1. 摘流量
+        set_instance_weight_process(client=slb_client, instance_id_list=instance_id_list, weight_list=[(0, 60)])
+        logging.info(f"set weight with 0 finished, instances: {instance_id_list}")
+        # 2. 异步移除旧容器并删除旧镜像
+        container_name = 'rov-server'
+        loop = asyncio.get_running_loop()
+        executor = ThreadPoolExecutor(max_workers=max_workers)
+        tasks = [
+            loop.run_in_executor(executor, remove_container_image, *args) for args in
+            [(slb_client, instance_id, container_name) for instance_id in instance_id_list]
+        ]
+        await asyncio.wait(tasks)
+        logging.info(f"remove container & images finished, instances: {instance_id_list}")
+        # 3. 发送启动脚本到机器上
+        utils.send_file_to_ecs(client=create_client, instance_id_list=instance_id_list, **start_sh)
+        logging.info(f"send start shell file finished, instances: {instance_id_list}")
+        # 4. 启动服务
+        server_start_sh = os.path.join(start_sh['target_dir'], start_sh['name'])
+        server_start_commend = f"sh {server_start_sh}"
+        utils.run_command(client=create_client, instance_ids=instance_id_list, command=server_start_commend)
+        # 5. 异步探活
+        global health_instances
+        health_instances = []
+        loop = asyncio.get_running_loop()
+        executor = ThreadPoolExecutor(max_workers=max_workers)
+        tasks = [
+            loop.run_in_executor(executor, server_health_check, *args) for args in
+            [(slb_client, instance_id) for instance_id in instance_id_list]
+        ]
+        await asyncio.wait(tasks)
+        logging.info(f"health instances count: {len(health_instances)}, {health_instances}")
+        # 挂载流量
+        if len(health_instances) == len(instance_id_list):
+            # 所有机器探活成功
+            time.sleep(60)
+            add_weight_list = [(10, 30), (20, 20), (40, 10), (60, 10), (80, 10), (100, 10)]
+            set_instance_weight_process(client=slb_client, instance_id_list=instance_id_list,
+                                        weight_list=add_weight_list)
+            logging.info(f"finished count: {len(health_instances)}")
+        else:
+            logging.info(f"health count: {len(health_instances)}")
+            sys.exit()
 
 
 
@@ -119,6 +268,7 @@ def main():
     # 2. 扩容机器并启动新服务 扩容数量:线上机器数量/2
     ess_instance_count = online_instance_count / 2
     logging.info(f"ess instance count: {ess_instance_count}")
+    asyncio.run(ess_instance(create_client=create_client, slb_client=slb_client,
+                             ess_count=ess_instance_count, max_workers=2))
 
-
-
+    # 3. 原有机器进行更新