import logging import json import sys import time from asyncio import wait_for import requests import asyncio import time from alibabacloud_tea_util.client import Client as UtilClient from aliyunsdkcore.client import AcsClient from aliyunsdkecs.request.v20140526.RunInstancesRequest import RunInstancesRequest from aliyunsdkecs.request.v20140526.DescribeInstancesRequest import DescribeInstancesRequest from aliyunsdkecs.request.v20140526.DescribeNetworkInterfacesRequest import DescribeNetworkInterfacesRequest from aliyunsdkecs.request.v20140526.RunCommandRequest import RunCommandRequest from aliyunsdkecs.request.v20140526.SendFileRequest import SendFileRequest from aliyunsdkecs.request.v20140526.StopInstancesRequest import StopInstancesRequest from aliyunsdkecs.request.v20140526.DeleteInstancesRequest import DeleteInstancesRequest from aliyunsdkecs.request.v20140526.DescribeInstanceStatusRequest import DescribeInstanceStatusRequest from aliyunsdkecs.request.v20140526.ModifySecurityGroupRuleRequest import ModifySecurityGroupRuleRequest from alibabacloud_alb20200616.client import Client as Alb20200616Client from alibabacloud_tea_openapi import models as open_api_models from alibabacloud_alb20200616 import models as alb_models from alibabacloud_alb20200616 import models as alb_20200616_models from alibabacloud_tea_util import models as util_models logging.basicConfig(level=logging.INFO, format='%(asctime)s %(filename)s[line:%(lineno)d] %(levelname)s %(message)s', datefmt='%a, %d %b %Y %H:%M:%S') def send_msg_to_feishu(webhook, key_word, msg_text): """发送消息到飞书""" headers = {'Content-Type': 'application/json'} payload_message = { "msg_type": "text", "content": { "text": '{}: {}'.format(key_word, msg_text) } } response = requests.request('POST', url=webhook, headers=headers, data=json.dumps(payload_message)) logging.info(response.text) def connect_client(access_key_id, access_key_secret, region_id): """ 初始化账号,连接客户端 :param access_key_id: access key Id, type-string :param access_key_secret: access key secret, type-string :param region_id: region_id :return: clt """ try: clt = AcsClient(ak=access_key_id, secret=access_key_secret, region_id=region_id) return clt except Exception as e: # 失败,记录报错信息,发送通知,停止并退出 logging.error(e) sys.exit() def connect_alb_client(access_key_id, access_key_secret, endpoint): """ 初始化ALB客户端 :param access_key_id: access key Id, type-string :param access_key_secret: access key secret, type-string :return: alb_client """ config = open_api_models.Config( access_key_id=access_key_id, access_key_secret=access_key_secret, endpoint=endpoint ) alb_client = Alb20200616Client(config) return alb_client def build_create_instances_request(image_id, vswitch_id, security_group_id, zone_id, instance_type, instance_name, disk_size, disk_category, key_pair_name, tags): """ 购买服务器参数配置 :param image_id: 使用的镜像信息 type-string :param vswitch_id: 选择的交换机 type-string :param security_group_id: 当前vpc类型的安全组 type-string :param zone_id: 服务器所在区域 type-string :param instance_type: 实例规格 type-string :param instance_name: 实例命名 type-string :param disk_size: 磁盘大小,单位:G,type-string :param disk_category: 磁盘类型 type-string :param key_pair_name: 密钥对名称 type-string :param tags: 标签 type-list, eg: [{"Key": "ecs", "Value": "rov-server.prod"}, ...] :return: request """ request = RunInstancesRequest() request.set_ImageId(image_id) request.set_VSwitchId(vswitch_id) request.set_SecurityGroupId(security_group_id) request.set_ZoneId(zone_id) request.set_InstanceType(instance_type) request.set_InstanceName(instance_name) request.set_SystemDiskSize(disk_size) request.set_SystemDiskCategory(disk_category) request.set_KeyPairName(key_pair_name) request.set_Tags(tags) return request def send_req(client, request): """ 发送API请求 :param client: 客户端连接 :param request: 请求配置 :return: response """ request.set_accept_format('json') response = client.do_action_with_exception(request) #print(response) response = json.loads(response) print(response) # logging.info(response) print(response.get('Code')) return response #except Exception as e: # 失败,记录报错信息,发送通知,停止并退出 #logging.error(e) #sys.exit() def check_instance_running(ecs_client, instance_ids): """ 检查服务器运行状态 :param ecs_client: 客户端连接 :param instance_ids: 实例id列表, type-list :return: running_count,Status为Running的实例数 """ try: request = DescribeInstancesRequest() request.set_InstanceIds(json.dumps(instance_ids)) request.set_PageSize(100) response = send_request(ecs_client=ecs_client, request=request) if response.get('Code') is None: instances_list = response.get('Instances').get('Instance') running_count = 0 running_instances = [] for instance_detail in instances_list: if instance_detail.get('Status') == "Running": running_count += 1 running_instances.append(instance_detail.get('InstanceId')) return running_count, running_instances else: # 失败,记录报错信息,发送通知,停止并退出 logging.error(response) sys.exit() except Exception as e: # 失败,记录报错信息,发送通知,停止并退出 logging.error(e) sys.exit() def get_ip_address(ecs_client, instance_id): """ 获取实例IP地址 :param ecs_client: 客户端连接 :param instance_id: 实例id, type-string :return: ip_address, type-string """ request = DescribeNetworkInterfacesRequest() request.set_accept_format('json') request.set_InstanceId(instance_id) response = send_request(ecs_client=ecs_client, request=request) ip_address = response['NetworkInterfaceSets']['NetworkInterfaceSet'][0]['PrivateIpAddress'] return ip_address def create_multiple_instances(amount, ecs_client, image_id, vswitch_id, security_group_id, zone_id, instance_type, instance_name, disk_size, disk_category, key_pair_name, tags): """ 创建多个ECS实例 :param amount: 创建实例数 type-int 取值范围:[1, 100] :param ecs_client: 购买机器客户端连接 :param image_id: 使用的镜像信息 type-string :param vswitch_id: 选择的交换机 type-string :param security_group_id: 当前vpc类型的安全组 type-string :param zone_id: 服务器所在区域 type-string :param instance_type: 实例规格 type-string :param instance_name: 实例命名 type-string :param disk_size: 磁盘大小,单位:G,type-string :param disk_category: 磁盘类型 type-string :param key_pair_name: 密钥对名称 type-string :param tags: 标签 type-list, eg: [{"Key": "ecs", "Value": "rov-server.prod"}, ...] :return: """ logging.info(f"create instances start, request amount: {amount}.") # 1. 连接客户端 # create_instances_clt = connect_client( # access_key_id=access_key_id, access_key_secret=access_key_secret, region_id=region_id # ) # 2. 请求参数配置 request = build_create_instances_request( image_id=image_id, vswitch_id=vswitch_id, security_group_id=security_group_id, zone_id=zone_id, instance_type=instance_type, instance_name=instance_name, disk_size=disk_size, disk_category=disk_category, key_pair_name=key_pair_name, tags=tags ) request.set_Amount(amount) # 3. 发送API请求,购买机器并启动 response = send_request(ecs_client=ecs_client, request=request) if response.get('Code') is None: instance_ids = response.get('InstanceIdSets').get('InstanceIdSet') logging.info(f"success amount: {len(instance_ids)}, instance ids: {instance_ids}.") # 获取机器运行状态 running_amount = 0 while running_amount < amount: time.sleep(10) running_amount, running_instances = check_instance_running(ecs_client=ecs_client, instance_ids=instance_ids) logging.info(f"running amount: {running_amount}, running instances: {running_instances}.") return instance_ids else: # 失败,记录报错信息,发送通知,停止并退出 logging.error(response) sys.exit() def release_instances(ecs_client, instance_ids, force=False): """ 释放实例 :param ecs_client: :param instance_ids: instance_id, type-list :param force: 是否强制释放, True-强制释放, False-正常释放, type-bool :return: """ request = DeleteInstancesRequest() request.set_InstanceIds(instance_ids) request.set_Force(force) response = send_request(ecs_client=ecs_client, request=request) return response def get_instances_status(ecs_client, instance_ids): """ 获取实例运行状态 :param ecs_client: :param instance_ids: instance_id, type-list :return: """ request = DescribeInstanceStatusRequest() request.set_InstanceIds(instance_ids) request.set_PageSize(50) response = send_request(ecs_client=ecs_client, request=request) return response def stop_instances(ecs_client, instance_ids, force_stop=False): """ 停止实例 :param ecs_client: :param instance_ids: 实例ID, type-list :param force_stop: 是否强制关机, True-强制关机, False-正常关机, type-bool :return: """ request = StopInstancesRequest() request.set_InstanceIds(instance_ids) request.set_ForceStop(force_stop) response = send_request(ecs_client=ecs_client, request=request) return response def send_request(ecs_client, request): """ 发送API请求 :param ecs_client: 客户端连接 :param request: 请求配置 :return: response """ request.set_accept_format('json') try: response = ecs_client.do_action_with_exception(request) response = json.loads(response) # logging.info(response) return response except Exception as e: # 失败,记录报错信息,发送通知,停止并退出 logging.error(e) sys.exit() def run_command(ecs_client, instance_ids, command): """ 批量执行命令 :param ecs_client: 客户端连接 :param instance_ids: 实例id列表, type-list, 最多能指定50台ECS实例ID :param command: 命令 type-string :return: """ for i in range(len(instance_ids) // 50 + 1): instance_id_list = instance_ids[i * 50:(i + 1) * 50] if len(instance_id_list) == 0: return request = RunCommandRequest() request.set_accept_format('json') request.set_Type("RunShellScript") request.set_CommandContent(command) request.set_InstanceIds(instance_id_list) response = send_request(ecs_client=ecs_client, request=request) logging.info(response) def send_file_to_ecs(ecs_client, instance_id_list, target_dir, name, content): """ 发送文件到ecs;alb应用,区分上方clb :param ecs_client: :param instance_id_list: 最多能指定50台ECS实例ID :param target_dir: 文件存放目录 type-string :param name: 文件名 type-string :param content: 文件内容 type-string :return: """ if not instance_id_list: logging.warning("实例ID列表为空,无法发送文件。") return for i in range(len(instance_id_list) // 50 + 1): instance_ids = instance_id_list[i * 50:(i + 1) * 50] if len(instance_ids) == 0: logging.info("没有更多的实例ID需要发送文件,退出。") return request = SendFileRequest() request.set_Content(content) request.set_TargetDir(target_dir) request.set_Name(name) request.set_Overwrite(True) request.set_InstanceIds(instance_ids) try: logging.info(f"正在向实例 {instance_ids} 发送文件 '{name}' 到目录 '{target_dir}'") response = send_request(ecs_client=ecs_client, request=request) logging.info(f"成功发送文件到实例 {instance_ids},响应: {response}") except Exception as e: logging.error(f"发送文件到实例 {instance_ids} 失败,错误: {str(e)}") def add_servers_to_server_group(alb_client, server_group_id, instance_id, weight, port): """ 添加服务器到ALB服务器组 :param alb_client: ALB客户端连接 :param server_group_id: 服务器组ID :param instance_id: 实例ID :param weight: 权重 :param port: 后端服务器使用的端口 """ server = alb_models.AddServersToServerGroupRequestServers( server_id=instance_id, server_type='ecs', weight=weight, port=port ) request = alb_models.AddServersToServerGroupRequest( server_group_id=server_group_id, servers=[server] ) runtime = util_models.RuntimeOptions( connect_timeout=5000, read_timeout=60000 ) try: alb_client.add_servers_to_server_group_with_options(request, runtime) logging.info(f"Successfully added server {instance_id} to server group {server_group_id} with weight {weight}.") except Exception as e: logging.error(f"Failed to add server {instance_id} to server group {server_group_id}: {str(e)}") def remove_servers_from_server_group(alb_client, server_group_id, instance_ids, port): """ 从ALB服务器组中移除服务器 :param alb_client: ALB客户端连接 :param server_group_id: 服务器组ID :param instance_ids: 实例ID :param port: 后端服务器使用的端口 """ for instance_id in instance_ids: server = alb_models.RemoveServersFromServerGroupRequestServers( port=port, server_id=instance_id, server_type='ecs' ) request = alb_models.RemoveServersFromServerGroupRequest( server_group_id=server_group_id, servers=[server] ) runtime = util_models.RuntimeOptions( connect_timeout=5000, read_timeout=60000 ) try: alb_client.remove_servers_from_server_group_with_options(request, runtime) logging.info(f"Successfully removed server {instance_id} from server group {server_group_id}.") except Exception as e: logging.error(f"Failed to remove server {instance_id} from server group {server_group_id}: {str(e)}") def list_server_group_servers(alb_client, server_group_id): """ 列出服务器组中的服务器并返回实例ID列表 @param alb_client: ALB客户端 @param server_group_id: 服务器组ID @return: 实例ID列表 """ instance_ids = [] next_token = None while True: try: list_server_group_servers_request = alb_20200616_models.ListServerGroupServersRequest( server_group_id=server_group_id, max_results=100, next_token=next_token ) runtime = util_models.RuntimeOptions( connect_timeout=5000, read_timeout=60000 ) response = alb_client.list_server_group_servers_with_options(list_server_group_servers_request, runtime) next_token = UtilClient.to_map(response.body).get('NextToken') sub_instance_ids = [server.server_id for server in response.body.servers] if len(sub_instance_ids) > 0: instance_ids.extend(sub_instance_ids) if next_token is None: break except Exception as error: logging.error(error) return instance_ids async def list_server_group_servers_async(alb_client, server_group_id): """ 异步列出指定服务器组中的服务器并返回实例ID列表 @param alb_client: ALB客户端 @param server_group_id: 服务器组ID @return: 实例ID列表 """ list_server_group_servers_request = alb_20200616_models.ListServerGroupServersRequest( server_group_id=server_group_id, max_results=100 ) runtime = util_models.RuntimeOptions( connect_timeout=5000, read_timeout=60000 ) try: response = await alb_client.list_server_group_servers_with_options_async(list_server_group_servers_request, runtime) instance_ids = [server.server_id for server in response.body.servers] return instance_ids except Exception as error: print(str(error)) UtilClient.assert_as_string(str(error)) return [] def update_server_group_server_weight(alb_client, server_group_id, instance_id, weight, port): """ 更指定服务器在服务器组中的权重 :param alb_client: ALB客户端 :param server_group_id: 服务器组ID :param instance_id: 实例ID :param weight: 权重值 :param port: 后端服务器使用的端口 """ server = alb_20200616_models.UpdateServerGroupServersAttributeRequestServers( server_type='Ecs', server_id=instance_id, weight=weight, port=port ) request = alb_20200616_models.UpdateServerGroupServersAttributeRequest( servers=[server], server_group_id=server_group_id ) runtime = util_models.RuntimeOptions( connect_timeout=5000, read_timeout=60000 ) try: alb_client.update_server_group_servers_attribute_with_options(request, runtime) print(f"Successfully updated server {instance_id} in group {server_group_id} to weight {weight}.") except Exception as error: print(str(error)) UtilClient.assert_as_string(str(error)) def update_server_group_servers_attribute(alb_client, server_group_id_list, instance_id_list, weight_list, port): """ 更新服务器组中的服务器权重 :param alb_client: ALB客户端 :param server_group_id_list: 服务器组ID列表 :param instance_id_list: 实例ID列表 :param weight_list: 权重修改列表 type-list [(weight, sleep_time), ...] :param port: 后端服务器使用的端口 """ for server_group_id in server_group_id_list: for instance_id in instance_id_list: for weight, sleep_time in weight_list: update_server_group_server_weight(alb_client, server_group_id, instance_id, weight, port) time.sleep(sleep_time) async def update_server_group_server_weight_async(alb_client, server_group_id, instance_id, weight, port): """ 异步更新特定服务器在服务器组中的权重 :param alb_client: ALB客户端 :param server_group_id: 服务器组ID :param instance_id: 实例ID :param weight: 权重值 :param port: 后端服务器使用的端口 """ server = alb_20200616_models.UpdateServerGroupServersAttributeRequestServers( server_type='Ecs', server_id=instance_id, weight=weight, port=port ) request = alb_20200616_models.UpdateServerGroupServersAttributeRequest( servers=[server], server_group_id=server_group_id ) runtime = util_models.RuntimeOptions( connect_timeout=5000, read_timeout=60000 ) try: await alb_client.update_server_group_servers_attribute_with_options_async(request, runtime) print(f"Successfully updated server {instance_id} in group {server_group_id} to weight {weight} asynchronously.") except Exception as error: print(str(error)) UtilClient.assert_as_string(str(error)) async def update_server_group_servers_attribute_async(alb_client, server_group_id_list, instance_ids, weight_list, port): """ 异步更新服务器组中的服务器属性 :param alb_client: ALB客户端 :param server_group_id_list: 服务器组ID列表 :param instance_ids: 实例ID列表 :param weight_list: 权重修改列表 type-list [(weight, sleep_time), ...] """ tasks = [] for server_group_id in server_group_id_list: for instance_id in instance_ids: for weight, sleep_time in weight_list: tasks.append(update_server_group_server_weight_async(alb_client, server_group_id, instance_id, weight, port)) await asyncio.sleep(sleep_time) await asyncio.gather(*tasks)