xuekailun 4 hónapja
szülő
commit
6e82650b57
1 módosított fájl, 546 hozzáadás és 0 törlés
  1. 546 0
      gateway/utils.py

+ 546 - 0
gateway/utils.py

@@ -0,0 +1,546 @@
+import logging
+import json
+import sys
+import time
+from asyncio import wait_for
+
+import requests
+import asyncio
+import time
+
+from alibabacloud_tea_util.client import Client as UtilClient
+from aliyunsdkcore.client import AcsClient
+from aliyunsdkecs.request.v20140526.RunInstancesRequest import RunInstancesRequest
+from aliyunsdkecs.request.v20140526.DescribeInstancesRequest import DescribeInstancesRequest
+from aliyunsdkecs.request.v20140526.DescribeNetworkInterfacesRequest import DescribeNetworkInterfacesRequest
+from aliyunsdkecs.request.v20140526.RunCommandRequest import RunCommandRequest
+from aliyunsdkecs.request.v20140526.SendFileRequest import SendFileRequest
+from aliyunsdkecs.request.v20140526.StopInstancesRequest import StopInstancesRequest
+from aliyunsdkecs.request.v20140526.DeleteInstancesRequest import DeleteInstancesRequest
+from aliyunsdkecs.request.v20140526.DescribeInstanceStatusRequest import DescribeInstanceStatusRequest
+from aliyunsdkecs.request.v20140526.ModifySecurityGroupRuleRequest import ModifySecurityGroupRuleRequest
+from alibabacloud_alb20200616.client import Client as Alb20200616Client
+from alibabacloud_tea_openapi import models as open_api_models
+from alibabacloud_alb20200616 import models as alb_models
+from alibabacloud_alb20200616 import models as alb_20200616_models
+from alibabacloud_tea_util import models as util_models
+
+
+logging.basicConfig(level=logging.INFO,
+                    format='%(asctime)s %(filename)s[line:%(lineno)d] %(levelname)s %(message)s',
+                    datefmt='%a, %d %b %Y %H:%M:%S')
+
+
+def send_msg_to_feishu(webhook, key_word, msg_text):
+    """发送消息到飞书"""
+    headers = {'Content-Type': 'application/json'}
+    payload_message = {
+        "msg_type": "text",
+        "content": {
+            "text": '{}: {}'.format(key_word, msg_text)
+        }
+    }
+    response = requests.request('POST', url=webhook, headers=headers, data=json.dumps(payload_message))
+    logging.info(response.text)
+
+
+def connect_client(access_key_id, access_key_secret, region_id):
+    """
+    初始化账号,连接客户端
+    :param access_key_id: access key Id, type-string
+    :param access_key_secret: access key secret, type-string
+    :param region_id: region_id
+    :return: clt
+    """
+    try:
+        clt = AcsClient(ak=access_key_id, secret=access_key_secret, region_id=region_id)
+        return clt
+    except Exception as e:
+        # 失败,记录报错信息,发送通知,停止并退出
+        logging.error(e)
+        sys.exit()
+
+
+def connect_alb_client(access_key_id, access_key_secret, endpoint):
+    """
+    初始化ALB客户端
+    :param access_key_id: access key Id, type-string
+    :param access_key_secret: access key secret, type-string
+    :return: alb_client
+    """
+    config = open_api_models.Config(
+        access_key_id=access_key_id,
+        access_key_secret=access_key_secret,
+        endpoint=endpoint
+    )
+    alb_client = Alb20200616Client(config)
+    return alb_client
+
+
+
+
+
+def build_create_instances_request(image_id, vswitch_id, security_group_id, zone_id, instance_type, instance_name,
+                                   disk_size, disk_category, key_pair_name, tags):
+    """
+    购买服务器参数配置
+    :param image_id: 使用的镜像信息 type-string
+    :param vswitch_id: 选择的交换机 type-string
+    :param security_group_id: 当前vpc类型的安全组 type-string
+    :param zone_id: 服务器所在区域 type-string
+    :param instance_type: 实例规格 type-string
+    :param instance_name: 实例命名 type-string
+    :param disk_size: 磁盘大小,单位:G,type-string
+    :param disk_category: 磁盘类型 type-string
+    :param key_pair_name: 密钥对名称 type-string
+    :param tags: 标签 type-list, eg: [{"Key": "ecs", "Value": "rov-server.prod"}, ...]
+    :return: request
+    """
+    request = RunInstancesRequest()
+    request.set_ImageId(image_id)
+    request.set_VSwitchId(vswitch_id)
+    request.set_SecurityGroupId(security_group_id)
+    request.set_ZoneId(zone_id)
+    request.set_InstanceType(instance_type)
+    request.set_InstanceName(instance_name)
+    request.set_SystemDiskSize(disk_size)
+    request.set_SystemDiskCategory(disk_category)
+    request.set_KeyPairName(key_pair_name)
+    request.set_Tags(tags)
+    return request
+
+
+def send_req(client, request):
+    """
+    发送API请求
+    :param client: 客户端连接
+    :param request: 请求配置
+    :return: response
+    """
+    request.set_accept_format('json')
+    response = client.do_action_with_exception(request)
+    #print(response)
+    response = json.loads(response)
+    print(response)
+        # logging.info(response)
+    print(response.get('Code'))
+    return response
+    #except Exception as e:
+        # 失败,记录报错信息,发送通知,停止并退出
+    #logging.error(e)
+    #sys.exit()
+
+
+def check_instance_running(ecs_client, instance_ids):
+    """
+    检查服务器运行状态
+    :param ecs_client: 客户端连接
+    :param instance_ids: 实例id列表, type-list
+    :return: running_count,Status为Running的实例数
+    """
+    try:
+        request = DescribeInstancesRequest()
+        request.set_InstanceIds(json.dumps(instance_ids))
+        request.set_PageSize(100)
+        response = send_request(ecs_client=ecs_client, request=request)
+        if response.get('Code') is None:
+            instances_list = response.get('Instances').get('Instance')
+            running_count = 0
+            running_instances = []
+            for instance_detail in instances_list:
+                if instance_detail.get('Status') == "Running":
+                    running_count += 1
+                    running_instances.append(instance_detail.get('InstanceId'))
+            return running_count, running_instances
+        else:
+            # 失败,记录报错信息,发送通知,停止并退出
+            logging.error(response)
+            sys.exit()
+    except Exception as e:
+        # 失败,记录报错信息,发送通知,停止并退出
+        logging.error(e)
+        sys.exit()
+
+
+def get_ip_address(ecs_client, instance_id):
+    """
+    获取实例IP地址
+    :param ecs_client: 客户端连接
+    :param instance_id: 实例id, type-string
+    :return: ip_address, type-string
+    """
+    request = DescribeNetworkInterfacesRequest()
+    request.set_accept_format('json')
+    request.set_InstanceId(instance_id)
+    response = send_request(ecs_client=ecs_client, request=request)
+    ip_address = response['NetworkInterfaceSets']['NetworkInterfaceSet'][0]['PrivateIpAddress']
+    return ip_address
+
+
+
+def create_multiple_instances(amount, ecs_client,
+                              image_id, vswitch_id, security_group_id, zone_id, instance_type, instance_name,
+                              disk_size, disk_category, key_pair_name, tags):
+    """
+    创建多个ECS实例
+    :param amount: 创建实例数 type-int 取值范围:[1, 100]
+    :param ecs_client: 购买机器客户端连接
+    :param image_id: 使用的镜像信息 type-string
+    :param vswitch_id: 选择的交换机 type-string
+    :param security_group_id: 当前vpc类型的安全组 type-string
+    :param zone_id: 服务器所在区域 type-string
+    :param instance_type: 实例规格 type-string
+    :param instance_name: 实例命名 type-string
+    :param disk_size: 磁盘大小,单位:G,type-string
+    :param disk_category: 磁盘类型 type-string
+    :param key_pair_name: 密钥对名称 type-string
+    :param tags: 标签 type-list, eg: [{"Key": "ecs", "Value": "rov-server.prod"}, ...]
+    :return:
+    """
+    logging.info(f"create instances start, request amount: {amount}.")
+    # 1. 连接客户端
+    # create_instances_clt = connect_client(
+    #     access_key_id=access_key_id, access_key_secret=access_key_secret, region_id=region_id
+    # )
+    # 2. 请求参数配置
+    request = build_create_instances_request(
+        image_id=image_id, vswitch_id=vswitch_id, security_group_id=security_group_id, zone_id=zone_id,
+        instance_type=instance_type, instance_name=instance_name, disk_size=disk_size, disk_category=disk_category,
+        key_pair_name=key_pair_name, tags=tags
+    )
+    request.set_Amount(amount)
+    # 3. 发送API请求,购买机器并启动
+    response = send_request(ecs_client=ecs_client, request=request)
+    if response.get('Code') is None:
+        instance_ids = response.get('InstanceIdSets').get('InstanceIdSet')
+        logging.info(f"success amount: {len(instance_ids)}, instance ids: {instance_ids}.")
+        # 获取机器运行状态
+        running_amount = 0
+        while running_amount < amount:
+            time.sleep(10)
+            running_amount, running_instances = check_instance_running(ecs_client=ecs_client, instance_ids=instance_ids)
+            logging.info(f"running amount: {running_amount}, running instances: {running_instances}.")
+        return instance_ids
+    else:
+        # 失败,记录报错信息,发送通知,停止并退出
+        logging.error(response)
+        sys.exit()
+
+
+
+def release_instances(ecs_client, instance_ids, force=False):
+    """
+    释放实例
+    :param ecs_client:
+    :param instance_ids: instance_id, type-list
+    :param force: 是否强制释放, True-强制释放, False-正常释放, type-bool
+    :return:
+    """
+    request = DeleteInstancesRequest()
+    request.set_InstanceIds(instance_ids)
+    request.set_Force(force)
+    response = send_request(ecs_client=ecs_client, request=request)
+    return response
+
+
+
+def get_instances_status(ecs_client, instance_ids):
+    """
+    获取实例运行状态
+    :param ecs_client:
+    :param instance_ids: instance_id, type-list
+    :return:
+    """
+    request = DescribeInstanceStatusRequest()
+    request.set_InstanceIds(instance_ids)
+    request.set_PageSize(50)
+    response = send_request(ecs_client=ecs_client, request=request)
+    return response
+
+
+
+def stop_instances(ecs_client, instance_ids, force_stop=False):
+    """
+    停止实例
+    :param ecs_client:
+    :param instance_ids: 实例ID, type-list
+    :param force_stop: 是否强制关机, True-强制关机, False-正常关机, type-bool
+    :return:
+    """
+    request = StopInstancesRequest()
+    request.set_InstanceIds(instance_ids)
+    request.set_ForceStop(force_stop)
+    response = send_request(ecs_client=ecs_client, request=request)
+    return response
+
+
+
+
+def send_request(ecs_client, request):
+    """
+    发送API请求
+    :param ecs_client: 客户端连接
+    :param request: 请求配置
+    :return: response
+    """
+    print(2222222222)
+    print(ecs_client, request)
+    print(3333333333)
+
+    request.set_accept_format('json')
+    try:
+        response = ecs_client.do_action_with_exception(request)
+        response = json.loads(response)
+        # logging.info(response)
+        return response
+    except Exception as e:
+        # 失败,记录报错信息,发送通知,停止并退出
+        logging.error(e)
+        sys.exit()
+
+
+
+def run_command(ecs_client, instance_ids, command):
+    """
+    批量执行命令
+    :param ecs_client: 客户端连接
+    :param instance_ids: 实例id列表, type-list, 最多能指定50台ECS实例ID
+    :param command: 命令 type-string
+    :return:
+    """
+    print(11111)
+    print(ecs_client, instance_ids, command)
+    for i in range(len(instance_ids) // 50 + 1):
+        instance_id_list = instance_ids[i * 50:(i + 1) * 50]
+        if len(instance_id_list) == 0:
+            return
+        request = RunCommandRequest()
+        request.set_accept_format('json')
+        request.set_Type("RunShellScript")
+        request.set_CommandContent(command)
+        request.set_InstanceIds(instance_id_list)
+        response = send_request(ecs_client=ecs_client, request=request)
+        logging.info(response)
+
+
+def send_file_to_ecs(ecs_client, instance_id_list, target_dir, name, content):
+    """
+    发送文件到ecs;alb应用,区分上方clb
+    :param ecs_client:
+    :param instance_id_list: 最多能指定50台ECS实例ID
+    :param target_dir: 文件存放目录 type-string
+    :param name: 文件名 type-string
+    :param content: 文件内容 type-string
+    :return:
+    """
+    if not instance_id_list:
+        logging.warning("实例ID列表为空,无法发送文件。")
+        return
+
+    for i in range(len(instance_id_list) // 50 + 1):
+        instance_ids = instance_id_list[i * 50:(i + 1) * 50]
+        if len(instance_ids) == 0:
+            logging.info("没有更多的实例ID需要发送文件,退出。")
+            return
+        request = SendFileRequest()
+        request.set_Content(content)
+        request.set_TargetDir(target_dir)
+        request.set_Name(name)
+        request.set_Overwrite(True)
+        request.set_InstanceIds(instance_ids)
+        try:
+            logging.info(f"正在向实例 {instance_ids} 发送文件 '{name}' 到目录 '{target_dir}'")
+            response = send_request(ecs_client=ecs_client, request=request)
+            logging.info(f"成功发送文件到实例 {instance_ids},响应: {response}")
+        except Exception as e:
+            logging.error(f"发送文件到实例 {instance_ids} 失败,错误: {str(e)}")
+
+
+
+def add_servers_to_server_group(alb_client, server_group_id, instance_id, weight, port):
+    """
+    添加服务器到ALB服务器组
+    :param alb_client: ALB客户端连接
+    :param server_group_id: 服务器组ID
+    :param instance_id: 实例ID
+    :param weight: 权重
+    :param port: 后端服务器使用的端口
+    """
+    server = alb_models.AddServersToServerGroupRequestServers(
+        server_id=instance_id,
+        server_type='ecs',
+        weight=weight,
+        port=port
+    )
+    request = alb_models.AddServersToServerGroupRequest(
+        server_group_id=server_group_id,
+        servers=[server]
+    )
+    runtime = util_models.RuntimeOptions()
+    try:
+        alb_client.add_servers_to_server_group_with_options(request, runtime)
+        logging.info(f"Successfully added server {instance_id} to server group {server_group_id} with weight {weight}.")
+    except Exception as e:
+        logging.error(f"Failed to add server {instance_id} to server group {server_group_id}: {str(e)}")
+
+
+
+def remove_servers_from_server_group(alb_client, server_group_id, instance_ids, port):
+    """
+    从ALB服务器组中移除服务器
+    :param alb_client: ALB客户端连接
+    :param server_group_id: 服务器组ID
+    :param instance_ids: 实例ID
+    :param port: 后端服务器使用的端口
+    """
+    for instance_id in instance_ids:
+        server = alb_models.RemoveServersFromServerGroupRequestServers(
+            port=port,
+            server_id=instance_id,
+            server_type='ecs'
+        )
+        request = alb_models.RemoveServersFromServerGroupRequest(
+            server_group_id=server_group_id,
+            servers=[server]
+        )
+        runtime = util_models.RuntimeOptions()
+        try:
+            alb_client.remove_servers_from_server_group_with_options(request, runtime)
+            logging.info(f"Successfully removed server {instance_id} from server group {server_group_id}.")
+        except Exception as e:
+            logging.error(f"Failed to remove server {instance_id} from server group {server_group_id}: {str(e)}")
+
+
+
+
+def list_server_group_servers(alb_client, server_group_id):
+    """
+    列出服务器组中的服务器并返回实例ID列表
+    @param alb_client: ALB客户端
+    @param server_group_id: 服务器组ID
+    @return: 实例ID列表
+    """
+    list_server_group_servers_request = alb_20200616_models.ListServerGroupServersRequest(
+        server_group_id=server_group_id
+    )
+    runtime = util_models.RuntimeOptions()
+
+    try:
+        response = alb_client.list_server_group_servers_with_options(list_server_group_servers_request, runtime)
+        instance_ids = [server.server_id for server in response.body.servers]
+        return instance_ids
+    except Exception as error:
+        print(str(error))
+        UtilClient.assert_as_string(str(error))
+        return []
+
+
+async def list_server_group_servers_async(alb_client, server_group_id):
+    """
+    异步列出指定服务器组中的服务器并返回实例ID列表
+    @param alb_client: ALB客户端
+    @param server_group_id: 服务器组ID
+    @return: 实例ID列表
+    """
+    list_server_group_servers_request = alb_20200616_models.ListServerGroupServersRequest(
+        server_group_id=server_group_id
+    )
+    runtime = util_models.RuntimeOptions()
+
+    try:
+        response = await alb_client.list_server_group_servers_with_options_async(list_server_group_servers_request, runtime)
+        instance_ids = [server.server_id for server in response.body.servers]
+        return instance_ids
+    except Exception as error:
+        print(str(error))
+        UtilClient.assert_as_string(str(error))
+        return []
+
+
+def update_server_group_server_weight(alb_client, server_group_id, instance_id, weight, port):
+    """
+    更指定服务器在服务器组中的权重
+    :param alb_client: ALB客户端
+    :param server_group_id: 服务器组ID
+    :param instance_id: 实例ID
+    :param weight: 权重值
+    :param port: 后端服务器使用的端口
+    """
+    server = alb_20200616_models.UpdateServerGroupServersAttributeRequestServers(
+        server_type='Ecs',
+        server_id=instance_id,
+        weight=weight,
+        port=port
+    )
+    request = alb_20200616_models.UpdateServerGroupServersAttributeRequest(
+        servers=[server],
+        server_group_id=server_group_id
+    )
+    runtime = util_models.RuntimeOptions()
+    try:
+        alb_client.update_server_group_servers_attribute_with_options(request, runtime)
+        print(f"Successfully updated server {instance_id} in group {server_group_id} to weight {weight}.")
+    except Exception as error:
+        print(str(error))
+        UtilClient.assert_as_string(str(error))
+
+def update_server_group_servers_attribute(alb_client, server_group_id_list, instance_id_list, weight_list, port):
+    """
+    更新服务器组中的服务器权重
+    :param alb_client: ALB客户端
+    :param server_group_id_list: 服务器组ID列表
+    :param instance_id_list: 实例ID列表
+    :param weight_list: 权重修改列表 type-list [(weight, sleep_time), ...]
+    :param port: 后端服务器使用的端口
+    """
+    for server_group_id in server_group_id_list:
+        for instance_id in instance_id_list:
+            for weight, sleep_time in weight_list:
+                update_server_group_server_weight(alb_client, server_group_id, instance_id, weight, port)
+                time.sleep(sleep_time)
+
+
+
+
+async def update_server_group_server_weight_async(alb_client, server_group_id, instance_id, weight, port):
+    """
+    异步更新特定服务器在服务器组中的权重
+    :param alb_client: ALB客户端
+    :param server_group_id: 服务器组ID
+    :param instance_id: 实例ID
+    :param weight: 权重值
+    :param port: 后端服务器使用的端口
+    """
+    server = alb_20200616_models.UpdateServerGroupServersAttributeRequestServers(
+        server_type='Ecs',
+        server_id=instance_id,
+        weight=weight,
+        port=port
+    )
+    request = alb_20200616_models.UpdateServerGroupServersAttributeRequest(
+        servers=[server],
+        server_group_id=server_group_id
+    )
+    runtime = util_models.RuntimeOptions()
+    try:
+        await alb_client.update_server_group_servers_attribute_with_options_async(request, runtime)
+        print(f"Successfully updated server {instance_id} in group {server_group_id} to weight {weight} asynchronously.")
+    except Exception as error:
+        print(str(error))
+        UtilClient.assert_as_string(str(error))
+
+async def update_server_group_servers_attribute_async(alb_client, server_group_id_list, instance_ids, weight_list, port):
+    """
+    异步更新服务器组中的服务器属性
+    :param alb_client: ALB客户端
+    :param server_group_id_list: 服务器组ID列表
+    :param instance_ids: 实例ID列表
+    :param weight_list: 权重修改列表 type-list [(weight, sleep_time), ...]
+    """
+    tasks = []
+    for server_group_id in server_group_id_list:
+        for instance_id in instance_ids:
+            for weight, sleep_time in weight_list:
+                tasks.append(update_server_group_server_weight_async(alb_client, server_group_id, instance_id, weight, port))
+                await asyncio.sleep(sleep_time)
+    await asyncio.gather(*tasks)