#!/bin/env python # coding=utf-8 import docker import sys import requests import queue import threading import time import gateway_config import alb_utils from longvideoapi.longvideoapi_config import server_group_id_list # 从配置文件中获取应用程序名称和容器仓库地址 apps = gateway_config.apps repository = gateway_config.repository registry = gateway_config.registry version = sys.argv[1] # 从命令行参数获取版本号 class MyThread(threading.Thread): def __init__(self, func): threading.Thread.__init__(self) self.func = func def run(self): self.func() def checkHealth(ipadd): while True: health_url = 'http://%s:9000/healthcheck' % (ipadd) header = {"Content-Type": "application/json"} try: health_code = requests.get(health_url).status_code except Exception as e: continue # 如果请求失败,继续重试 if health_code == 200: print("httpcode 200,开始挂载流量") return False # 健康检查通过,返回 def update(instance_id, port): time.sleep(10) # 等待10秒钟 global success_count ipadd = alb_utils.get_ip_address(ecs_client, instance_id) # 使用 utils 获取实例的 IP 地址 print("服务器信息:" + "%s/%s" % (instance_id, ipadd)) client = docker.DockerClient(base_url='tcp://%s:2375' % (ipadd), timeout=60) # 尝试删除旧的容器 try: id = client.containers.get(apps) id.remove(force=True) except Exception as e: print("容器不存在或者无法删除当前容器") # 尝试登录并启动新的容器 try: # 使用 gateway_config 中的 Docker 登录配置 docker_config = gateway_config.docker_config client.login(username=docker_config['username'], password=docker_config['password'], registry=docker_config['registry']) client.containers.run(registry.format(apps, version), detach=True, cap_add='SYS_PTRACE', network_mode='host', name=apps, volumes={'/datalog/': {'bind': '/datalog/', 'mode': 'rw'}}) print("开始健康检查") checkHealth(ipadd) print("%s :权重修改中......" % (ipadd)) weight_list = [(10, 5), (20, 5), (40, 5), (60, 5), (80, 5), (100, 5)] # weight_list = [(10, 10), (20, 10), (40, 10), (60, 10), (80, 10), (100, 10)] alb_utils.update_server_group_servers_attribute(alb_client, server_group_id_list=gateway_config.server_group_id_list, instance_id_list=[instance_id], weight_list=weight_list, port=port) success_count += 1 print("更新进度" + "%s/%s" % (success_count, total)) except Exception as e: print(e) sys.exit() def pull_image(): """从镜像仓库中拉取指定版本的镜像""" instanceId = q1.get() ipaddr = alb_utils.get_ip_address(ecs_client, instanceId) cd_url = "tcp://{}:2375".format(ipaddr) client = docker.DockerClient(base_url=cd_url, timeout=30) try: client.images.pull(repository.format(apps), tag=version) # print(ipaddr, "pull images success ") return True except Exception as e: print(e, "images pull fail") return False if __name__ == '__main__': # 初始化 ECS 客户端 ecs_client = alb_utils.connect_client(access_key_id=gateway_config.ecs_client_params['access_key_id'], access_key_secret=gateway_config.ecs_client_params['access_key_secret'], region_id=gateway_config.ecs_client_params['region_id']) # 初始化 ALB 客户端 alb_client = alb_utils.connect_alb_client(gateway_config.alb_client_params['access_key_id'], gateway_config.alb_client_params['access_key_secret'], endpoint=gateway_config.alb_client_params['endpoint'] ) success_count = 0 threads = [] total = 0 InstanceIDs = [] q1 = queue.Queue() # 获取 ALB 下服务器组的实例 ID res = alb_utils.list_server_group_servers(alb_client=alb_client, server_group_id=gateway_config.server_group_id_list[0]) total += len(res) print(f"获取 ALB 下服务器组的实例 ID = {res} total = {total}") InstanceIDs.extend(res) print(InstanceIDs) # 将实例 ID 放入队列中 for instance_id in InstanceIDs: q1.put(instance_id) # 多线程预先拉取镜像 for i in range(len(InstanceIDs)): thread = MyThread(pull_image) thread.start() threads.append(thread) for thread in threads: thread.join() # 更新每个实例 for instanceID in InstanceIDs: alb_utils.update_server_group_server_weight(alb_client=alb_client, server_group_id_list=server_group_id_list, instance_id_list=instanceID, weight=0, port=gateway_config.port) update(instanceID, port=gateway_config.port)