""" 推荐模型进行日更新 1. 检查oss中模型文件夹是否准备好 y: 下一步 n: 等待5分钟后再次检查 2. 获取slb下所有机器 3. 逐台更新 a. 从oss下载模型文件夹到服务器 /data/offline_dir b. 检查模型文件 ok: 下一步 fail: 发送报警到飞书 c. 摘流量 d. 重启服务 e. 健康检查 ok: 下一步 fail: 发送报警到飞书 f. 挂流量 """ import datetime import logging import time import docker import oss2 import os import requests import rec_model_config import utils from threading import Timer logging.basicConfig(level=logging.INFO, format='%(asctime)s %(filename)s[line:%(lineno)d] %(levelname)s %(message)s', datefmt='%a, %d %b %Y %H:%M:%S') health_instances=[] def rec_server_health_check(client, instance_id, ip_address): """ 服务健康检查 :param client: 客户端连接 :param instance_id: instanceId :return: """ #health_instances = [] #ip_address = utils.get_ip_address(client=client, instance_id=instance_id) while True: health_check_url = f"http://{ip_address}:8501/v1/models/deepfm" try: http_code = requests.get(health_check_url).status_code print(http_code) except: logging.info("images is downloading") http_code = 0 if http_code == 200: health_instances.append((instance_id, ip_address)) break else: time.sleep(10) def model_oss_check(model_file, local_path): """判断oss是否有该文件并下载""" auth = oss2.Auth(access_key_id=rec_model_config.OSS_CONFIG['accessKeyId'], access_key_secret=rec_model_config.OSS_CONFIG['accessKeySecret']) bucket = oss2.Bucket(auth, endpoint=rec_model_config.OSS_CONFIG['endpoint'], bucket_name=rec_model_config.OSS_CONFIG['bucket_name']) exist = bucket.object_exists(model_file) # 返回值为true表示文件存在,false表示文件不存在。 if exist: # 下载文件到本地服务器 if not os.path.exists(local_path): os.makedirs(local_path) local_file = os.path.join(local_path, model_file) bucket.get_object_to_file(model_file, local_file) return True else: # 模型文件不存在,5分钟后重新检查 Timer(60, model_oss_check, args=[model_file]).start() def server_restart(slb_client, instance_id, ip_address, ins_name, dt): try: # 1. 检查模型文件 #ip_address = # 2. 摘流量 utils.set_instance_weight_process(client=slb_client, slb_id=rec_model_config.slb_id, instance_id_list=[instance_id], weight_list=[(0, 60)]) logging.info(f"set weight with 0 finished.") #3.scp start.sh instance_id_list=[] instance_id_list.append(instance_id) #print(rec_model_config) utils.send_file_to_ecs(client=slb_client, instance_id_list=instance_id_list, **rec_model_config.start_sh) #utils.send_file_to_ecs(client=slb_client, instance_id_list=instance_id_list, **rec_model_config.check_sh) #server_check_sh = os.path.join(rec_model_config.check_sh['target_dir'], rec_model_config.check_sh['name']) #server_check_commend = f"sh {server_check_sh} {dt}" #print(server_check_commend) #print(instance_id) #command_status=utils.run_per_command(client=slb_client, instance=instance_id, command=server_check_commend) #print("command_status:",command_status) # 4. 重启容器 docker_client = docker.DockerClient(base_url=f'tcp://{ip_address}:2375', timeout=60) logging.info(docker_client) #print(docker_client.containers.list) image_id = 0 try: image_id = docker_client.containers.get(ins_name) #print(image_id) image_id.stop() image_id.remove() except: image_id = 0 #image_id.sop() #print("image_id",image_id) server_start_sh = os.path.join(rec_model_config.start_sh['target_dir'], rec_model_config.start_sh['name']) server_start_commend = f"sh {server_start_sh} {dt}" utils.run_command(client=slb_client, instance_ids=instance_id_list, command=server_start_commend) logging.info(f"docker restart finished.") time.sleep(5) # 4. 探活 rec_server_health_check(slb_client, instance_id, ip_address) time.sleep(30) logging.info(f"health check finished.") # 5. 挂流量 add_weight_list = [(10, 30), (20, 20), (40, 10), (60, 10), (80, 10), (100, 10)] utils.set_instance_weight_process(client=slb_client, slb_id=rec_model_config.slb_id, instance_id_list=[instance_id], weight_list=add_weight_list) logging.info(f"server restart finished, instance: {instance_id}/{ip_address}") except Exception as e: print(e) logging.error(f"server restart fail, instance: {instance_id}") logging.error(e) def main(): try: # 1. 检查oss中模型文件夹是否准备好并下载 now_date = datetime.datetime.today()- datetime.timedelta(days=1) print("model update date",now_date) dt = datetime.datetime.strftime(now_date, '%Y%m%d') model_file = f"{dt}.tar" local_path = '/data/rec_model' #print(rec_model_config.start_sh) check_res = model_oss_check(model_file=model_file, local_path=local_path) check_res = True if check_res is True: # 2. 获取slb下所有机器 slb_client = utils.connect_client(access_key_id=rec_model_config.slb_client_params['access_key_id'], access_key_secret=rec_model_config.slb_client_params['access_key_secret'], region_id=rec_model_config.slb_client_params['region_id']) online_instance_ids = utils.get_instance_ids(client=slb_client, slb_id=rec_model_config.slb_id) online_instance_count = len(online_instance_ids) #online_instance_count=1 logging.info(f"online instance count: {online_instance_count}.") logging.info(f"online instance ids: {online_instance_ids}") #online_instance_ids=['i-bp13mx85rzardscc89z3'] # 3. 逐台更新 #image_name = 'tensorflow/serving' docker_ins_name = 'deepfm' for i, instance_id in enumerate(online_instance_ids): logging.info(f"instance:{instance_id}") # 3.1. 获取ip ip_address = utils.get_ip_address(client=slb_client, instance_id=instance_id) logging.info(f"ip address:{instance_id}/{ip_address}") # 3.2. scp 模型文件到服务器 ecs_model_path = '/data/offline_dir' scp_command = f"scp {local_path}/{model_file} {ip_address}:{ecs_model_path}" #os.system(command=scp_command) # 3.3 服务重启 server_restart(slb_client=slb_client, instance_id=instance_id, ip_address=ip_address, ins_name=docker_ins_name, dt=dt) logging.info(f"重启进度: {i+1}/{online_instance_count}") #break logging.info(f"server restart finished!") except Exception as e: logging.error(e) if __name__ == '__main__': main()