4 Commits df70c02eeb ... e5ad537225

Tác giả SHA1 Thông báo Ngày
  xuekailun e5ad537225 merge 11 tháng trước cách đây
  xuekailun 535f79de69 Merge branch 'pre' 11 tháng trước cách đây
  xuekailun 9fddfc6bc7 update instance_name 11 tháng trước cách đây
  xuekailun c07c64e003 add distribution/ 11 tháng trước cách đây

+ 593 - 0
distribution/alb_utils.py

@@ -0,0 +1,593 @@
+import logging
+import json
+import sys
+import time
+from asyncio import wait_for
+
+import requests
+import asyncio
+import time
+
+from alibabacloud_tea_util.client import Client as UtilClient
+from aliyunsdkcore.client import AcsClient
+from aliyunsdkecs.request.v20140526.RunInstancesRequest import RunInstancesRequest
+from aliyunsdkecs.request.v20140526.DescribeInstancesRequest import DescribeInstancesRequest
+from aliyunsdkecs.request.v20140526.DescribeNetworkInterfacesRequest import DescribeNetworkInterfacesRequest
+from aliyunsdkecs.request.v20140526.RunCommandRequest import RunCommandRequest
+from aliyunsdkecs.request.v20140526.SendFileRequest import SendFileRequest
+from aliyunsdkecs.request.v20140526.StopInstancesRequest import StopInstancesRequest
+from aliyunsdkecs.request.v20140526.DeleteInstancesRequest import DeleteInstancesRequest
+from aliyunsdkecs.request.v20140526.DescribeInstanceStatusRequest import DescribeInstanceStatusRequest
+from aliyunsdkecs.request.v20140526.ModifySecurityGroupRuleRequest import ModifySecurityGroupRuleRequest
+from alibabacloud_alb20200616.client import Client as Alb20200616Client
+from alibabacloud_tea_openapi import models as open_api_models
+from alibabacloud_alb20200616 import models as alb_models
+from alibabacloud_alb20200616 import models as alb_20200616_models
+from alibabacloud_tea_util import models as util_models
+from urllib3 import request
+
+logging.basicConfig(level=logging.INFO,
+                    format='%(asctime)s %(filename)s[line:%(lineno)d] %(levelname)s %(message)s',
+                    datefmt='%a, %d %b %Y %H:%M:%S')
+
+
+def send_msg_to_feishu(webhook, key_word, msg_text):
+    """发送消息到飞书"""
+    headers = {'Content-Type': 'application/json'}
+    payload_message = {
+        "msg_type": "text",
+        "content": {
+            "text": '{}: {}'.format(key_word, msg_text)
+        }
+    }
+    response = requests.request('POST', url=webhook, headers=headers, data=json.dumps(payload_message))
+    logging.info(response.text)
+
+
+def connect_client(access_key_id, access_key_secret, region_id):
+    """
+    初始化账号,连接客户端
+    :param access_key_id: access key Id, type-string
+    :param access_key_secret: access key secret, type-string
+    :param region_id: region_id
+    :return: clt
+    """
+    try:
+        clt = AcsClient(ak=access_key_id, secret=access_key_secret, region_id=region_id)
+        return clt
+    except Exception as e:
+        # 失败,记录报错信息,发送通知,停止并退出
+        logging.error(e)
+        sys.exit()
+
+
+def connect_alb_client(access_key_id, access_key_secret, endpoint):
+    """
+    初始化ALB客户端
+    :param access_key_id: access key Id, type-string
+    :param access_key_secret: access key secret, type-string
+    :return: alb_client
+    """
+    config = open_api_models.Config(
+        access_key_id=access_key_id,
+        access_key_secret=access_key_secret,
+        endpoint=endpoint
+    )
+    alb_client = Alb20200616Client(config)
+    return alb_client
+
+
+def build_create_instances_request(image_id, vswitch_id, security_group_id, zone_id, instance_type, instance_name,
+                                   disk_size, disk_category, key_pair_name, tags):
+    """
+    购买服务器参数配置
+    :param image_id: 使用的镜像信息 type-string
+    :param vswitch_id: 选择的交换机 type-string
+    :param security_group_id: 当前vpc类型的安全组 type-string
+    :param zone_id: 服务器所在区域 type-string
+    :param instance_type: 实例规格 type-string
+    :param instance_name: 实例命名 type-string
+    :param disk_size: 磁盘大小,单位:G,type-string
+    :param disk_category: 磁盘类型 type-string
+    :param key_pair_name: 密钥对名称 type-string
+    :param tags: 标签 type-list, eg: [{"Key": "ecs", "Value": "rov-server.prod"}, ...]
+    :return: request
+    """
+    request = RunInstancesRequest()
+    request.set_ImageId(image_id)
+    request.set_VSwitchId(vswitch_id)
+    request.set_SecurityGroupId(security_group_id)
+    request.set_ZoneId(zone_id)
+    request.set_InstanceType(instance_type)
+    request.set_InstanceName(instance_name)
+    request.set_SystemDiskSize(disk_size)
+    request.set_SystemDiskCategory(disk_category)
+    request.set_KeyPairName(key_pair_name)
+    request.set_Tags(tags)
+    return request
+
+
+def send_req(client, request):
+    """
+    发送API请求
+    :param client: 客户端连接
+    :param request: 请求配置
+    :return: response
+    """
+    request.set_accept_format('json')
+    response = client.do_action_with_exception(request)
+    # print(response)
+    response = json.loads(response)
+    print(response)
+    # logging.info(response)
+    print(response.get('Code'))
+    return response
+    # except Exception as e:
+    # 失败,记录报错信息,发送通知,停止并退出
+    # logging.error(e)
+    # sys.exit()
+
+
+def check_instance_running(ecs_client, instance_ids):
+    """
+    检查服务器运行状态
+    :param ecs_client: 客户端连接
+    :param instance_ids: 实例id列表, type-list
+    :return: running_count,Status为Running的实例数
+    """
+    try:
+        request = DescribeInstancesRequest()
+        request.set_InstanceIds(json.dumps(instance_ids))
+        request.set_PageSize(100)
+        response = send_request(ecs_client=ecs_client, request=request)
+        if response.get('Code') is None:
+            instances_list = response.get('Instances').get('Instance')
+            running_count = 0
+            running_instances = []
+            for instance_detail in instances_list:
+                if instance_detail.get('Status') == "Running":
+                    running_count += 1
+                    running_instances.append(instance_detail.get('InstanceId'))
+            return running_count, running_instances
+        else:
+            # 失败,记录报错信息,发送通知,停止并退出
+            logging.error(response)
+            sys.exit()
+    except Exception as e:
+        # 失败,记录报错信息,发送通知,停止并退出
+        logging.error(e)
+        sys.exit()
+
+
+def get_ip_address(ecs_client, instance_id):
+    """
+    获取实例IP地址
+    :param ecs_client: 客户端连接
+    :param instance_id: 实例id, type-string
+    :return: ip_address, type-string
+    """
+    request = DescribeNetworkInterfacesRequest()
+    request.set_accept_format('json')
+    request.set_InstanceId(instance_id)
+    response = send_request(ecs_client=ecs_client, request=request)
+    ip_address = response['NetworkInterfaceSets']['NetworkInterfaceSet'][0]['PrivateIpAddress']
+    return ip_address
+
+
+def create_multiple_instances(amount, ecs_client,
+                              image_id, vswitch_id, security_group_id, zone_id, instance_type, instance_name,
+                              disk_size, disk_category, key_pair_name, tags):
+    """
+    创建多个ECS实例
+    :param amount: 创建实例数 type-int 取值范围:[1, 100]
+    :param ecs_client: 购买机器客户端连接
+    :param image_id: 使用的镜像信息 type-string
+    :param vswitch_id: 选择的交换机 type-string
+    :param security_group_id: 当前vpc类型的安全组 type-string
+    :param zone_id: 服务器所在区域 type-string
+    :param instance_type: 实例规格 type-string
+    :param instance_name: 实例命名 type-string
+    :param disk_size: 磁盘大小,单位:G,type-string
+    :param disk_category: 磁盘类型 type-string
+    :param key_pair_name: 密钥对名称 type-string
+    :param tags: 标签 type-list, eg: [{"Key": "ecs", "Value": "rov-server.prod"}, ...]
+    :return:
+    """
+    logging.info(f"create instances start, request amount: {amount}.")
+    # 1. 连接客户端
+    # create_instances_clt = connect_client(
+    #     access_key_id=access_key_id, access_key_secret=access_key_secret, region_id=region_id
+    # )
+    # 2. 请求参数配置
+    instance_ids = []
+    remain = amount
+    while True:
+        if remain <= 0:
+            break
+        if remain > 50:
+            sub_amount = 50
+            remain = remain - sub_amount
+        else:
+            sub_amount = remain
+            remain = 0
+        request = build_create_instances_request(
+            image_id=image_id, vswitch_id=vswitch_id, security_group_id=security_group_id, zone_id=zone_id,
+            instance_type=instance_type, instance_name=instance_name, disk_size=disk_size, disk_category=disk_category,
+            key_pair_name=key_pair_name, tags=tags
+        )
+        request.set_Amount(sub_amount)
+        # 3. 发送API请求,购买机器并启动
+        response = send_request(ecs_client=ecs_client, request=request)
+        if response.get('Code') is None:
+            sub_instance_ids = response.get('InstanceIdSets').get('InstanceIdSet')
+            logging.info(f"success amount: {len(sub_instance_ids)}, instance ids: {sub_instance_ids}.")
+            # 获取机器运行状态
+            running_amount = 0
+            while running_amount < sub_amount:
+                time.sleep(20)
+                running_amount, running_instances = check_instance_running(ecs_client=ecs_client,
+                                                                           instance_ids=sub_instance_ids)
+                logging.info(f"running amount: {running_amount}, running instances: {running_instances}.")
+            # return instance_ids
+            instance_ids.extend(sub_instance_ids)
+        else:
+            # 失败,记录报错信息,发送通知,停止并退出
+            logging.error(response)
+            sys.exit()
+    return instance_ids
+
+
+def release_instances(ecs_client, instance_ids, force=False):
+    """
+    释放实例
+    :param ecs_client:
+    :param instance_ids: instance_id, type-list
+    :param force: 是否强制释放, True-强制释放, False-正常释放, type-bool
+    :return:
+    """
+    instance_id_list_array = [instance_ids[i:i + 100] for i in range(0, len(instance_ids), 100)]
+    for instance_id_sub_array in instance_id_list_array:
+        request = DeleteInstancesRequest()
+        request.set_InstanceIds(instance_id_sub_array)
+        request.set_Force(force)
+        response = send_request(ecs_client=ecs_client, request=request)
+        # return response
+        if response.get('Code') is None:
+            logging.info(f"Release instances finished, count = {len(instance_id_sub_array)} instances: {instance_id_sub_array}")
+        else:
+            logging.error(f"Release instances fail!!!")
+            sys.exit()
+        time.sleep(3)
+
+
+def get_instances_status(ecs_client, instance_ids):
+    """
+    获取实例运行状态
+    :param ecs_client:
+    :param instance_ids: instance_id, type-list
+    :return:
+    """
+    stopped_instances = []
+    instance_id_list_array = [instance_ids[i:i + 50] for i in range(0, len(instance_ids), 50)]
+    for instance_id_sub_array in instance_id_list_array:
+        while True:
+            request = DescribeInstanceStatusRequest()
+            request.set_InstanceIds(instance_id_sub_array)
+            request.set_PageSize(50)
+            response = send_request(ecs_client=ecs_client, request=request)
+            # return response
+            if response.get('Code') is None:
+                instances_list = response.get('InstanceStatuses').get('InstanceStatus')
+                stopped_instances_sub = [instance.get('InstanceId') for instance in instances_list if
+                                     instance.get('Status') == 'Stopped']
+                if len(stopped_instances_sub) == len(instance_id_sub_array):
+                    logging.info(f"Instances stopped status set success, count:{len(stopped_instances_sub)} instances: {stopped_instances_sub}")
+                    stopped_instances.extend(stopped_instances_sub)
+                    break
+                else:
+                    logging.info(f"Stopped instances count = {len(stopped_instances_sub)}, instances: {stopped_instances_sub}")
+                    time.sleep(5)
+            else:
+                logging.error(response)
+                sys.exit()
+            time.sleep(3)
+    return stopped_instances
+
+def stop_instances(ecs_client, instance_ids, force_stop=False):
+    """
+    停止实例
+    :param ecs_client:
+    :param instance_ids: 实例ID, type-list
+    :param force_stop: 是否强制关机, True-强制关机, False-正常关机, type-bool
+    :return:
+    """
+    instance_id_list_array = [instance_ids[i:i + 100] for i in range(0, len(instance_ids), 100)]
+    for instance_id_sub_array in instance_id_list_array:
+        request = StopInstancesRequest()
+        request.set_InstanceIds(instance_id_sub_array)
+        request.set_ForceStop(force_stop)
+        response = send_request(ecs_client=ecs_client, request=request)
+        # return response
+        if response.get('Code') is None:
+            logging.info(f"Instances stop finished, count:{len(instance_id_sub_array)} instances: {instance_id_sub_array}")
+        else:
+            logging.error(f"Failed to stop instances: {response}")
+            sys.exit()
+        time.sleep(3)
+
+
+def send_request(ecs_client, request):
+    """
+    发送API请求
+    :param ecs_client: 客户端连接
+    :param request: 请求配置
+    :return: response
+    """
+    request.set_accept_format('json')
+    try:
+        response = ecs_client.do_action_with_exception(request)
+        response = json.loads(response)
+        # logging.info(response)
+        return response
+    except Exception as e:
+        # 失败,记录报错信息,发送通知,停止并退出
+        logging.error(e)
+        sys.exit()
+
+
+def run_command(ecs_client, instance_ids, command):
+    """
+    批量执行命令
+    :param ecs_client: 客户端连接
+    :param instance_ids: 实例id列表, type-list, 最多能指定50台ECS实例ID
+    :param command: 命令 type-string
+    :return:
+    """
+    for i in range(len(instance_ids) // 50 + 1):
+        instance_id_list = instance_ids[i * 50:(i + 1) * 50]
+        if len(instance_id_list) == 0:
+            return
+        request = RunCommandRequest()
+        request.set_accept_format('json')
+        request.set_Type("RunShellScript")
+        request.set_CommandContent(command)
+        request.set_InstanceIds(instance_id_list)
+        request.set_Timeout(180)
+        response = send_request(ecs_client=ecs_client, request=request)
+        logging.info(f"run_command count:{len(instance_id_list)} instance_id_list:{instance_id_list} response:{response}")
+
+
+def send_file_to_ecs(ecs_client, instance_id_list, target_dir, name, content):
+    """
+    发送文件到ecs;alb应用,区分上方clb
+    :param ecs_client:
+    :param instance_id_list: 最多能指定50台ECS实例ID
+    :param target_dir: 文件存放目录 type-string
+    :param name: 文件名 type-string
+    :param content: 文件内容 type-string
+    :return:
+    """
+    if not instance_id_list:
+        logging.warning("实例ID列表为空,无法发送文件。")
+        return
+
+    for i in range(len(instance_id_list) // 50 + 1):
+        instance_ids = instance_id_list[i * 50:(i + 1) * 50]
+        if len(instance_ids) == 0:
+            logging.info("没有更多的实例ID需要发送文件,退出。")
+            return
+        request = SendFileRequest()
+        request.set_Content(content)
+        request.set_TargetDir(target_dir)
+        request.set_Name(name)
+        request.set_Overwrite(True)
+        request.set_InstanceIds(instance_ids)
+        try:
+            logging.info(f"正在向实例 {instance_ids} 发送文件 '{name}' 到目录 '{target_dir}'")
+            response = send_request(ecs_client=ecs_client, request=request)
+            logging.info(f"成功发送文件到实例 {instance_ids},响应: {response}")
+        except Exception as e:
+            logging.error(f"发送文件到实例 {instance_ids} 失败,错误: {str(e)}")
+
+
+def add_servers_to_server_group(alb_client, server_group_ids, instance_ids, weight, port):
+    """
+    添加服务器到ALB服务器组
+    :param alb_client: ALB客户端连接
+    :param server_group_ids: 服务器组ID
+    :param instance_ids: 实例ID
+    :param weight: 权重
+    :param port: 后端服务器使用的端口
+    """
+    instance_ids_array = [instance_ids[i:i + 200] for i in range(0, len(instance_ids), 200)]
+    for instance_ids_sub_array in instance_ids_array:
+        servers = []
+        for i in range(len(instance_ids_sub_array)):
+            server = alb_models.AddServersToServerGroupRequestServers(
+                server_id=instance_ids_sub_array[i],
+                server_type='ecs',
+                weight=weight,
+                port=port
+            )
+            servers.append(server)
+
+        # server = alb_models.AddServersToServerGroupRequestServers(
+        #     server_id=instance_id,
+        #     server_type='ecs',
+        #     weight=weight,
+        #     port=port
+        # )
+        for server_group_id in server_group_ids:
+            request = alb_models.AddServersToServerGroupRequest(
+                server_group_id=server_group_id,
+                servers=servers
+            )
+            runtime = util_models.RuntimeOptions(
+                connect_timeout=5000,
+                read_timeout=60000
+            )
+            try:
+                alb_client.add_servers_to_server_group_with_options(request, runtime)
+                logging.info(
+                    f"Successfully added count:{len(instance_ids_sub_array)} instance_ids: {instance_ids_sub_array} to server group {server_group_id} with weight {weight}.")
+            except Exception as e:
+                logging.error(f"Failed to add count:{len(instance_ids_sub_array)} instance_ids: {instance_ids_sub_array} to server group {server_group_id}: {str(e)}")
+                sys.exit()
+        time.sleep(5)
+
+def remove_servers_from_server_group(alb_client, server_group_id_list, instance_ids, port):
+    """
+    从ALB服务器组中移除服务器
+    :param alb_client: ALB客户端连接
+    :param server_group_id_list: 服务器组ID list
+    :param instance_ids: 实例ID list
+    :param port: 后端服务器使用的端口
+    """
+    instance_ids_array = [instance_ids[i:i + 200] for i in range(0, len(instance_ids), 200)]
+    for instance_ids_sub_array in instance_ids_array:
+        servers = []
+        for instance_id in instance_ids_sub_array:
+            server = alb_models.RemoveServersFromServerGroupRequestServers(
+                port=port,
+                server_id=instance_id,
+                server_type='ecs'
+            )
+            servers.append(server)
+        for server_group_id in server_group_id_list:
+            request = alb_models.RemoveServersFromServerGroupRequest(
+                server_group_id=server_group_id,
+                servers=servers
+            )
+            runtime = util_models.RuntimeOptions(
+                connect_timeout=5000,
+                read_timeout=60000
+            )
+            try:
+                alb_client.remove_servers_from_server_group_with_options(request, runtime)
+                logging.info(f"Successfully removed count: {len(instance_ids_sub_array)} instance_ids: {instance_ids_sub_array} from server group {server_group_id}.")
+            except Exception as e:
+                logging.error(f"Failed to remove count: {len(instance_ids_sub_array)} instance_ids: {instance_ids_sub_array} from server group {server_group_id}: {str(e)}")
+                sys.exit()
+        time.sleep(5)
+
+
+def list_server_group_servers(alb_client, server_group_id):
+    """
+    列出服务器组中的服务器并返回实例ID列表
+    @param alb_client: ALB客户端
+    @param server_group_id: 服务器组ID
+    @return: 实例ID列表
+    """
+    instance_ids = []
+    next_token = None
+    while True:
+        try:
+            list_server_group_servers_request = alb_20200616_models.ListServerGroupServersRequest(
+                server_group_id=server_group_id,
+                max_results=100,
+                next_token=next_token
+            )
+            runtime = util_models.RuntimeOptions(
+                connect_timeout=5000,
+                read_timeout=60000
+            )
+            response = alb_client.list_server_group_servers_with_options(list_server_group_servers_request, runtime)
+            next_token = UtilClient.to_map(response.body).get('NextToken')
+            sub_instance_ids = [server.server_id for server in response.body.servers]
+            if len(sub_instance_ids) > 0:
+                instance_ids.extend(sub_instance_ids)
+            if next_token is None:
+                break
+        except Exception as error:
+            logging.error(error)
+        time.sleep(3)
+    return instance_ids
+
+
+def update_server_group_server_weight(alb_client, server_group_id_list, instance_id_list, weight, port):
+    """
+    更指定服务器在服务器组中的权重
+    :param alb_client: ALB客户端
+    :param server_group_id_list: 服务器组ID list
+    :param instance_id_list: 实例ID list
+    :param weight: 权重值
+    :param port: 后端服务器使用的端口
+    """
+    instance_id_list_array = [instance_id_list[i:i + 40] for i in range(0, len(instance_id_list), 40)]
+    for instance_ids_sub_array in instance_id_list_array:
+        servers = []
+        for i in range(len(instance_ids_sub_array)):
+            server = alb_20200616_models.UpdateServerGroupServersAttributeRequestServers(
+                server_type='Ecs',
+                server_id=instance_ids_sub_array[i],
+                weight=weight,
+                port=port
+            )
+            servers.append(server)
+        for server_group_id in server_group_id_list:
+            request = alb_20200616_models.UpdateServerGroupServersAttributeRequest(
+                servers=servers,
+                server_group_id=server_group_id
+            )
+            # logging.info(f"servers = {servers}")
+            runtime = util_models.RuntimeOptions(
+                connect_timeout=5000,
+                read_timeout=60000
+            )
+            try:
+                # logging.info(f"instance_id_list = {instance_id_list} request = {request}")
+                alb_client.update_server_group_servers_attribute_with_options(request, runtime)
+                logging.info(
+                    f"Successfully updated count = {len(instance_ids_sub_array)} instance_ids: {instance_ids_sub_array} in group {server_group_id} to weight {weight}.")
+            except Exception as e:
+                logging.error(e)
+                sys.exit()
+        time.sleep(5)
+
+
+def update_server_group_servers_attribute(alb_client, server_group_id_list, instance_id_list, weight_list, port):
+    """
+    更新服务器组中的服务器权重
+    :param alb_client: ALB客户端
+    :param server_group_id_list: 服务器组ID列表
+    :param instance_id_list: 实例ID列表
+    :param weight_list: 权重修改列表 type-list [(weight, sleep_time), ...]
+    :param port: 后端服务器使用的端口
+    """
+    # for server_group_id in server_group_id_list:
+    #     for instance_id in instance_id_list:
+    for weight, sleep_time in weight_list:
+        update_server_group_server_weight(alb_client, server_group_id_list, instance_id_list, weight, port)
+        time.sleep(sleep_time)
+        # check_server_group_status(alb_client, server_group_id_list)
+
+
+def check_server_group_status(alb_client, server_group_id_list):
+    list_server_groups_request = alb_20200616_models.ListServerGroupsRequest(
+        server_group_ids=server_group_id_list,
+        max_results=100
+    )
+    flag = False
+    runtime = util_models.RuntimeOptions(
+        connect_timeout=5000,
+        read_timeout=60000
+    )
+    for i in range(10):
+        try:
+            response = alb_client.list_server_groups_with_options(list_server_groups_request, runtime)
+            count = 0
+            if response.body:
+                server_groups = UtilClient.to_map(response.body).get("ServerGroups")
+                if server_groups:
+                    for server_group in server_groups:
+                        if server_group.get("ServerGroupStatus") == "Available":
+                            logging.info(f"Server group {server_group} is available.")
+                            count += 1
+            if count == len(server_group_id_list):
+                flag = True
+                break
+            time.sleep(2)
+        except Exception as e:
+            logging.error(e)
+    if not flag:
+        sys.exit()

+ 80 - 0
distribution/distribution_config.py

@@ -0,0 +1,80 @@
+import os
+import logging
+import sys
+
+logging.basicConfig(level=logging.INFO,
+                    format='%(asctime)s %(filename)s[line:%(lineno)d] %(levelname)s %(message)s',
+                    datefmt='%a, %d %b %Y %H:%M:%S')
+
+
+# alb后端服务器_调试使用组
+# server_group_id_list = ["sgp-ec4gopoclruofsfmxu"]
+
+
+# 生产环境 video-distribution-alb-ecs组
+server_group_id_list = ["sgp-aepczwiwxe4ncwi9p7"]
+
+# 后端服务器使用的端口
+port = "8080"
+
+# 修改负载均衡权限
+clb_client_params = {
+    'access_key_id': 'LTAIuPbTPL3LDDKN',
+    'access_key_secret': 'ORcNedKwWuwVtcq4IRFtUDZgS0b1le',
+    'region_id': 'cn-hangzhou'
+}
+
+alb_client_params = {
+    'access_key_id': 'LTAI5tASD5yEZLeC8ffmNebY',
+    'access_key_secret': '1PtsFRdp8viJmI78lEhNZR8MezWZBq',
+    'endpoint': 'alb.cn-hangzhou.aliyuncs.com',
+    'region_id': 'cn-hangzhou'
+}
+# 购买机器权限
+ecs_client_params = {
+    'access_key_id': 'LTAI4GBWbFvvXoXsSVBe1o9f',
+    'access_key_secret': 'kRAikWitb4kDxaAyBqNrmLmllMEDO3',
+    'region_id': 'cn-hangzhou'
+}
+
+docker_config = {
+    'username': 'stuuudys',
+    'password': 'Qingqu@2019',
+    'registry': 'registry-vpc.cn-hangzhou.aliyuncs.com'
+}
+
+# 机器配置
+instance_config_b = {
+    # 使用的镜像信息
+    'image_id': 'm-bp15xqcuacm4zw2h2gi6',
+    # 设置实例规格
+    'instance_type': 'ecs.sn1ne.xlarge',
+    # 选择的交换机
+    'vswitch_id': 'vsw-bp1x47zx13vqkfrp9ugc4',
+    # 当前VPC类型的安全组
+    'security_group_id': 'sg-bp1irhrkr4vfj272hk4y',
+    # 硬盘的大小,单位:G
+    'disk_size': '200',
+    # 服务器命名
+    'instance_name': 'video-distribution-alb-[1,2]',
+    # 服务器所在区域
+    'zone_id': 'cn-hangzhou-b',
+    # 磁盘类型:云盘
+    'disk_category': 'cloud_efficiency',
+    # 密钥
+    'key_pair_name': 'stuuudy',
+    # tag
+    'tags': [{"Key": "ecs", "Value": "distribution.prod"}]
+}
+
+
+# 服务启动脚本
+start_sh_dir = os.path.dirname(os.path.realpath(__file__))
+start_sh_filename = 'distribution_start.sh'
+with open(file=os.path.join(start_sh_dir, start_sh_filename), mode='r', encoding='utf-8') as rf:
+    file_content = rf.read()
+start_sh = {
+    'target_dir': '/home/distribution_server_sh',
+    'name': start_sh_filename,
+    'content': file_content,
+}

+ 2 - 0
distribution/distribution_start.sh

@@ -0,0 +1,2 @@
+docker login --username=stuuudys --password=Qingqu@2019   registry-vpc.cn-hangzhou.aliyuncs.com
+docker run --cap-add=SYS_PTRACE  -d -it   --name distribution  --restart=always   --network  host   registry-vpc.cn-hangzhou.aliyuncs.com/stuuudy/video-distribution:$1

+ 63 - 0
distribution/distribution_unittest.py

@@ -0,0 +1,63 @@
+import os
+import unittest
+import alb_utils
+import distribution_config
+import distribution_update_b
+import logging
+
+class MyTestCase(unittest.TestCase):
+
+    def test_create_multiple_instances(self):
+        ecs_client = alb_utils.connect_client(access_key_id=distribution_config.ecs_client_params['access_key_id'],
+                                              access_key_secret=distribution_config.ecs_client_params['access_key_secret'],
+                                              region_id=distribution_config.ecs_client_params['region_id'])
+
+        ess_instance_ids = alb_utils.create_multiple_instances(
+            amount=1,
+            ecs_client=ecs_client,
+            **distribution_config.instance_config_b,
+        )
+        logging.info(ess_instance_ids)
+
+
+    def test_send_file_to_ecs(self):
+        ecs_client = alb_utils.connect_client(access_key_id=distribution_config.ecs_client_params['access_key_id'],
+                                              access_key_secret=distribution_config.ecs_client_params['access_key_secret'],
+                                              region_id=distribution_config.ecs_client_params['region_id'])
+        instance_ids = ["i-bp1gg4mac4z31sjgq7ec"]
+        alb_utils.send_file_to_ecs(ecs_client,
+                                   instance_id_list=instance_ids,
+                                   **distribution_config.start_sh)
+
+    def test_run_command(self):
+        ecs_client = alb_utils.connect_client(access_key_id=distribution_config.ecs_client_params['access_key_id'],
+                                              access_key_secret=distribution_config.ecs_client_params['access_key_secret'],
+                                              region_id=distribution_config.ecs_client_params['region_id'])
+        instance_ids = ["i-bp1gg4mac4z31sjgq7ec"]
+        start_sh_param = "latest"
+        server_start_sh = os.path.join(distribution_config.start_sh['target_dir'], distribution_config.start_sh['name'])
+        server_start_commend = f"sh {server_start_sh} {start_sh_param}"
+        alb_utils.run_command(ecs_client, instance_ids, command=server_start_commend)
+
+    def test_distribution_health_check(self):
+        ecs_client = alb_utils.connect_client(access_key_id=distribution_config.ecs_client_params['access_key_id'],
+                                              access_key_secret=distribution_config.ecs_client_params['access_key_secret'],
+                                              region_id=distribution_config.ecs_client_params['region_id'])
+        instance_ids = "i-bp1gg4mac4z31sjgq7ec"
+        distribution_update_b.server_health_check(ecs_client,
+                                                  instance_id=instance_ids)
+
+
+    def test_remove_container_image(self):
+        ecs_client = alb_utils.connect_client(access_key_id=distribution_config.ecs_client_params['access_key_id'],
+                                              access_key_secret=distribution_config.ecs_client_params[
+                                                  'access_key_secret'],
+                                              region_id=distribution_config.ecs_client_params['region_id'])
+        instance_id = "i-bp1gg4mac4z31sjgq7ec"
+        container_name_list = ['distribution']
+        distribution_update_b.remove_container_image(ecs_client,
+                                                         instance_id,
+                                                         container_name_list)
+
+if __name__ == '__main__':
+    unittest.main()

+ 351 - 0
distribution/distribution_update_b.py

@@ -0,0 +1,351 @@
+import asyncio
+import sys
+import time
+import requests
+import alb_utils
+import logging
+import os
+import docker
+import distribution_config
+
+from concurrent.futures import ThreadPoolExecutor
+
+health_instances = []
+ess_instances = []
+remove_container_instances = []
+
+
+def server_health_check(ecs_client, instance_id):
+    """
+    服务健康检查
+    :param ecs_client: 客户端连接
+    :param instance_id: instanceId
+    :return:
+    """
+    global health_instances
+    ip_address = alb_utils.get_ip_address(ecs_client=ecs_client, instance_id=instance_id)
+    while True:
+        health_check_url = f"http://{ip_address}:8080/healthcheck"
+        try:
+            http_code = requests.get(health_check_url).status_code
+        except:
+            logging.info(f"images is downloading ip:{ip_address}")
+            http_code = 0
+
+        if http_code == 200:
+            health_instances.append((instance_id, ip_address))
+            logging.info(f"health check success, instance: {instance_id}/{ip_address}")
+            break
+        else:
+            time.sleep(10)
+
+
+async def ess_instance(ecs_client, alb_client, ess_count, max_workers, version, port):
+    """
+    扩容机器并运行新服务
+    :param ecs_client: 购买机器客户端连接
+    :param alb_client: 修改负载均衡权限
+    :param ess_count: 扩容数量
+    :param max_workers: 线程数
+    :param version: 版本标记
+    :param port: 后端服务器使用的端口
+    :return:
+    """
+    # 1. 购买机器并启动
+    ess_instance_ids = alb_utils.create_multiple_instances(
+        amount=ess_count,
+        ecs_client=ecs_client,
+        **distribution_config.instance_config_b,
+    )
+    time.sleep(60)
+
+    # 2. 发送启动脚本到机器上
+    alb_utils.send_file_to_ecs(ecs_client=ecs_client, instance_id_list=ess_instance_ids, **distribution_config.start_sh)
+    logging.info(f"send start shell file finished, count: {len(ess_instance_ids)} instances: {ess_instance_ids}")
+    # 3. 启动服务
+    server_start_sh = os.path.join(distribution_config.start_sh['target_dir'], distribution_config.start_sh['name'])
+    server_start_commend = f"sh {server_start_sh} {version}"
+    alb_utils.run_command(ecs_client=ecs_client, instance_ids=ess_instance_ids, command=server_start_commend)
+    logging.info(f"start server finish, count: {len(ess_instance_ids)} instances: {ess_instance_ids}")
+    # 4. 异步探活
+    global health_instances
+    health_instances = []
+    loop = asyncio.get_running_loop()
+    executor = ThreadPoolExecutor(max_workers=max_workers)
+    tasks = [
+        loop.run_in_executor(executor, server_health_check, *args) for args in
+        [(ecs_client, instance_id) for instance_id in ess_instance_ids]
+    ]
+    await asyncio.wait(tasks)
+    logging.info(f"health instances count: {len(health_instances)}, instances: {health_instances}")
+    # 5. 挂载流量
+    if len(health_instances) == len(ess_instance_ids):
+        # 所有机器探活成功
+        time.sleep(20)
+        health_instance_ids = [instance_id for instance_id, _ in health_instances]
+
+        alb_utils.add_servers_to_server_group(alb_client, distribution_config.server_group_id_list, health_instance_ids, weight=0, port=port)
+        logging.info(f"Successfully added count: {len(health_instance_ids)} health_instance_ids {health_instance_ids} to server groups {distribution_config.server_group_id_list}.")
+
+        time.sleep(20)
+        logging.info(f"start update weight count: {len(health_instance_ids)} instances: {health_instance_ids} server groups: {distribution_config.server_group_id_list}.")
+        add_weight_list = [(10, 5), (20, 5), (40, 5), (60, 5), (80, 5), (100, 5)]
+        alb_utils.update_server_group_servers_attribute(alb_client,
+                                                    server_group_id_list=distribution_config.server_group_id_list,
+                                                    instance_id_list=health_instance_ids,
+                                                    weight_list=add_weight_list,
+                                                    port=port)
+        global ess_instances
+        ess_instances.extend(health_instance_ids)
+        logging.info(f"ess count: {ess_count}, "
+                     f"create count: {len(ess_instance_ids)}, "
+                     f"finished count: {len(health_instance_ids)}")
+    else:
+        logging.info(f"ess count: {ess_count}, "
+                     f"create count: {len(ess_instance_ids)}, "
+                     f"health count: {len(health_instances)}")
+        sys.exit()
+
+
+def remove_container_image(ecs_client, instance_id, container_name_list):
+    """
+    移除旧容器并删除旧镜像
+    :param ecs_client: 客户端连接
+    :param instance_id: instanceId type-string
+    :param container_name: 容器名称 type-string
+    :return:
+    """
+    ip_address = alb_utils.get_ip_address(ecs_client=ecs_client, instance_id=instance_id)
+    logging.info(f"服务器信息:{instance_id}/{ip_address}")
+    client = docker.DockerClient(base_url=f'tcp://{ip_address}:2375', timeout=60)
+    # 移除旧的容器
+    container_remove_retry = 3
+    i = 0
+    while True:
+        if i >= container_remove_retry:
+            logging.error(f"容器不存在或者无法删除当前容器, instance = {instance_id}/{ip_address}")
+            sys.exit()
+        try:
+            flag = False
+            for container_name in container_name_list:
+                try:
+                    container_id = client.containers.get(container_name)
+                    container_id.remove(force=True)
+                    flag = True
+                    break
+                except:
+                    continue
+            if flag:
+                break
+        except Exception as e:
+            i += 1
+
+    # 删除旧镜像
+    images_remove_retry = 3
+    j = 0
+    while True:
+        if j >= images_remove_retry:
+            logging.error(f"镜像不存在,无法获取到镜像ID, instance = {instance_id}/{ip_address}")
+            sys.exit()
+        try:
+            images = client.images.list()
+            for image in images:
+                client.images.remove(force=True, image=image.tags[0])
+                time.sleep(2)
+            global remove_container_instances
+            remove_container_instances.append(instance_id)
+            break
+        except Exception as e:
+            i += 1
+
+
+async def update_instance(ecs_client, alb_client, instance_ids, max_workers, version, port):
+    """
+    线上机器更新
+    :param ecs_client:
+    :param alb_client: alb客户端连接
+    :param instance_ids: instanceId type-list
+    :param max_workers:
+    :param version: 版本标记
+    :param port: 后端服务器使用的端口
+    :return:
+    """
+    sub_index = len(instance_ids)//2
+    instance_ids_group = [instance_ids[:sub_index], instance_ids[sub_index:]]
+    update_finished_count = 0
+    for instance_id_list in instance_ids_group:
+        logging.info(f"update instances: {instance_id_list}")
+        # 1. 摘流量
+        alb_utils.update_server_group_servers_attribute(alb_client=alb_client,
+                                                    server_group_id_list=distribution_config.server_group_id_list,
+                                                    instance_id_list=instance_id_list,
+                                                    weight_list=[(0, 20)],
+                                                    port=port)
+        logging.info(f"set weight with 0 finished, count: {len(instance_id_list)} instances: {instance_id_list}")
+        # 2. 异步移除旧容器并删除旧镜像
+        global remove_container_instances
+        remove_container_instances = []
+        container_name_list = ['distribution']
+        loop = asyncio.get_running_loop()
+        executor = ThreadPoolExecutor(max_workers=max_workers)
+        tasks = [
+            loop.run_in_executor(executor, remove_container_image, *args) for args in
+            [(ecs_client, instance_id, container_name_list) for instance_id in instance_id_list]
+        ]
+        await asyncio.wait(tasks)
+        logging.info(f"remove container & images finished, instances: {remove_container_instances},"
+                     f" count: {len(remove_container_instances)}")
+        if len(remove_container_instances) < len(instance_id_list):
+            logging.error(f"remove container image failed| "
+                          f"request count: {len(instance_id_list)}, removed count: {len(remove_container_instances)}")
+            sys.exit()
+        # 3. 发送启动脚本到机器上
+        alb_utils.send_file_to_ecs(ecs_client=ecs_client, instance_id_list=instance_id_list, **distribution_config.start_sh)
+        logging.info(f"send start shell file finished, instances: {instance_id_list}, count: {len(instance_id_list)}")
+        # 4. 启动服务
+        server_start_sh = os.path.join(distribution_config.start_sh['target_dir'], distribution_config.start_sh['name'])
+        server_start_commend = f"sh {server_start_sh} {version}"
+        alb_utils.run_command(ecs_client=ecs_client, instance_ids=instance_id_list, command=server_start_commend)
+        # 5. 异步探活
+        global health_instances
+        health_instances = []
+        loop = asyncio.get_running_loop()
+        executor = ThreadPoolExecutor(max_workers=max_workers)
+        tasks = [
+            loop.run_in_executor(executor, server_health_check, *args) for args in
+            [(ecs_client, instance_id) for instance_id in instance_id_list]
+        ]
+        await asyncio.wait(tasks)
+        logging.info(f"health instances: {health_instances}, count: {len(health_instances)}")
+        # 6. 挂载流量
+        if len(health_instances) == len(instance_id_list):
+            # 所有机器探活成功
+            time.sleep(10)
+            health_instance_ids = [instance_id for instance_id, _ in health_instances]
+
+            add_weight_list = [(10, 5), (20, 5), (40, 5), (60, 5), (80, 5), (100, 5)]
+            # add_weight_list = [(10, 10), (20, 10), (40, 10), (60, 10), (80, 10), (100, 10)]
+            alb_utils.update_server_group_servers_attribute(alb_client,
+                                                        server_group_id_list=distribution_config.server_group_id_list,
+                                                        instance_id_list=health_instance_ids,
+                                                        weight_list=add_weight_list,
+                                                        port=port)
+            logging.info(f"finished instances: {health_instances}, count: {len(health_instances)}")
+            update_finished_count += len(health_instances)
+            logging.info(f"update finished: {update_finished_count}/{len(instance_ids)}")
+        else:
+            logging.info(f"health instances: {health_instances}, count: {len(health_instances)}")
+            sys.exit()
+
+
+def remove_instances(ecs_client, alb_client, instance_ids, port):
+    """
+    停止并释放机器
+    :param ecs_client:
+    :param alb_client:
+    :param instance_ids: instanceId type-list
+    :param port: 后端服务器使用的端口
+    :return: None
+    """
+    # 1. 摘流量
+    weight_list = [(0, 20)]  # 设置权重为0,等待20秒
+    try:
+        alb_utils.update_server_group_servers_attribute(alb_client=alb_client,
+                                                    server_group_id_list=distribution_config.server_group_id_list,
+                                                    instance_id_list=instance_ids,
+                                                    weight_list=weight_list,
+                                                    port=port)
+    except Exception as e:
+        logging.error(f"Failed to set instance weight: {e}")
+        sys.exit()
+
+    time.sleep(10)
+    # 2.移除alb
+    # for server_group_id in distribution_config.server_group_id_list:
+    try:
+        alb_utils.remove_servers_from_server_group(alb_client=alb_client, server_group_id_list=distribution_config.server_group_id_list,
+                                               instance_ids=instance_ids, port=port)
+        logging.info(f"Successfully removed instances: {instance_ids} from server group {distribution_config.server_group_id_list}.")
+    except Exception as e:
+        logging.error(f"Failed to remove instances: {instance_ids} from server group {distribution_config.server_group_id_list}: {e}")
+
+    logging.info(f"Remove from ALB finished, instances: {instance_ids}")
+
+    # 3. 停止机器
+    alb_utils.stop_instances(ecs_client=ecs_client, instance_ids=instance_ids)
+    # stop_response = alb_utils.stop_instances(ecs_client=ecs_client, instance_ids=instance_ids)
+    # if stop_response.get('Code') is None:
+    #     logging.info(f"Instances stop finished, instances: {instance_ids}")
+    # else:
+    #     logging.error(f"Failed to stop instances: {stop_response}")
+    #     sys.exit()
+
+    # 4. 判断机器运行状态是否为Stopped
+    # while True:
+    stopped_instances = alb_utils.get_instances_status(ecs_client=ecs_client, instance_ids=instance_ids)
+        # response = alb_utils.get_instances_status(ecs_client=ecs_client, instance_ids=instance_ids)
+        # if response.get('Code') is None:
+        #     instances_list = response.get('InstanceStatuses').get('InstanceStatus')
+        #     stopped_instances = [instance.get('InstanceId') for instance in instances_list if
+        #                          instance.get('Status') == 'Stopped']
+        #     if len(stopped_instances) == len(instance_ids):
+        #         logging.info(f"Instances stopped status set success, instances: {stopped_instances}")
+        #         break
+        #     else:
+        #         logging.info(f"Stopped instances count = {len(stopped_instances)}, instances: {stopped_instances}")
+        #         time.sleep(5)
+        # else:
+        #     logging.error(response)
+        #     sys.exit()
+    # 5. 释放机器
+    alb_utils.release_instances(ecs_client=ecs_client, instance_ids=stopped_instances)
+    # release_response = alb_utils.release_instances(ecs_client=ecs_client, instance_ids=stopped_instances)
+    # if release_response.get('Code') is None:
+    #     logging.info(f"Release instances finished, instances: {stopped_instances}")
+    # else:
+    #     logging.error(f"Release instances fail!!!")
+    #     sys.exit()
+
+
+def main():
+    try:
+        version = sys.argv[1]
+        alb_client = alb_utils.connect_alb_client(
+            access_key_id=distribution_config.alb_client_params['access_key_id'],
+            access_key_secret=distribution_config.alb_client_params['access_key_secret'],
+            endpoint=distribution_config.alb_client_params['endpoint']
+        )
+        ecs_client = alb_utils.connect_client(access_key_id=distribution_config.ecs_client_params['access_key_id'],
+                                          access_key_secret=distribution_config.ecs_client_params['access_key_secret'],
+                                          region_id=distribution_config.ecs_client_params['region_id'])
+
+        # 1. 获取alb下所有机器
+        online_instance_ids = alb_utils.list_server_group_servers(alb_client=alb_client, server_group_id=distribution_config.server_group_id_list[0])
+        online_instance_count = len(online_instance_ids)
+        logging.info(f"online instance count: {online_instance_count} instance_ids: {online_instance_ids}")
+
+        # 2. 扩容机器并启动新服务 扩容数量:线上机器数量//2
+        logging.info(f"ess instances start ...")
+        ess_instance_count = online_instance_count // 2
+        logging.info(f"ess instance count: {ess_instance_count}")
+        asyncio.run(ess_instance(ecs_client=ecs_client, alb_client=alb_client,
+                                 ess_count=ess_instance_count, max_workers=2, version=version, port=distribution_config.port))
+        logging.info(f"ess instances end!")
+
+        # 3. 原有机器进行更新
+        logging.info(f"update online instances start ...")
+        asyncio.run(update_instance(ecs_client=ecs_client, alb_client=alb_client,
+                                    instance_ids=online_instance_ids, max_workers=8, version=version, port=distribution_config.port))
+        logging.info(f"update online instances end!")
+
+        # 4. 停止并释放扩容机器
+        logging.info(f"stop & release instances start ...")
+        remove_instances(ecs_client=ecs_client, alb_client=alb_client, instance_ids=ess_instances, port=distribution_config.port)
+        logging.info(f"stop & release instances end!")
+    except Exception as e:
+        logging.error(e)
+        sys.exit()
+
+
+if __name__ == '__main__':
+    main()