Browse Source

add rec model update

liqian 1 năm trước cách đây
mục cha
commit
57498f1f2e
2 tập tin đã thay đổi với 175 bổ sung0 xóa
  1. 23 0
      rec_model_config.py
  2. 152 0
      rec_model_update.py

+ 23 - 0
rec_model_config.py

@@ -0,0 +1,23 @@
+# OSS配置
+OSS_CONFIG = {
+    'endpoint': 'http://oss-cn-hangzhou-internal.aliyuncs.com',
+    'accessKeyId': 'LTAIP6x1l3DXfSxm',
+    'accessKeySecret': 'KbTaM9ars4OX3PMS6Xm7rtxGr1FLon',
+    'bucket_name': ''
+}
+
+slb_id = ''
+
+# 修改负载均衡权限
+slb_client_params = {
+    'access_key_id': 'LTAIuPbTPL3LDDKN',
+    'access_key_secret': 'ORcNedKwWuwVtcq4IRFtUDZgS0b1le',
+    'region_id': 'cn-hangzhou'
+}
+
+# 购买机器权限
+create_client_params = {
+    'access_key_id': 'LTAI4GBWbFvvXoXsSVBe1o9f',
+    'access_key_secret': 'kRAikWitb4kDxaAyBqNrmLmllMEDO3',
+    'region_id': 'cn-hangzhou'
+}

+ 152 - 0
rec_model_update.py

@@ -0,0 +1,152 @@
+"""
+推荐模型进行日更新
+1. 检查oss中模型文件夹是否准备好
+    y: 下一步
+    n: 等待5分钟后再次检查
+2. 获取slb下所有机器
+3. 逐台更新
+    a. 从oss下载模型文件夹到服务器 /data/offline_dir
+    b. 检查模型文件
+        ok: 下一步
+        fail: 发送报警到飞书
+    c. 摘流量
+    d. 重启服务
+    e. 健康检查
+        ok: 下一步
+        fail: 发送报警到飞书
+    f. 挂流量
+"""
+import datetime
+import logging
+import time
+import docker
+import oss2
+import os
+import requests
+
+import rec_model_config
+import utils
+from threading import Timer
+
+logging.basicConfig(level=logging.INFO,
+                    format='%(asctime)s %(filename)s[line:%(lineno)d] %(levelname)s %(message)s',
+                    datefmt='%a, %d %b %Y %H:%M:%S')
+
+
+def rec_server_health_check(client, instance_id):
+    """
+    服务健康检查
+    :param client: 客户端连接
+    :param instance_id: instanceId
+    :return:
+    """
+    global health_instances
+    ip_address = utils.get_ip_address(client=client, instance_id=instance_id)
+    while True:
+        health_check_url = f"http://{ip_address}:5001/healthcheck"
+        try:
+            http_code = requests.get(health_check_url).status_code
+        except:
+            logging.info("images is downloading")
+            http_code = 0
+
+        if http_code == 200:
+            health_instances.append((instance_id, ip_address))
+            break
+        else:
+            time.sleep(10)
+
+
+def model_oss_check(model_file, local_path):
+    """判断oss是否有该文件并下载"""
+    auth = oss2.Auth(access_key_id=rec_model_config.OSS_CONFIG['accessKeyId'],
+                     access_key_secret=rec_model_config.OSS_CONFIG['accessKeySecret'])
+    bucket = oss2.Bucket(auth, endpoint=rec_model_config.OSS_CONFIG['endpoint'],
+                         bucket_name=rec_model_config.OSS_CONFIG['bucket_name'])
+    exist = bucket.object_exists(model_file)
+    # 返回值为true表示文件存在,false表示文件不存在。
+    if exist:
+        # 下载文件到本地服务器
+        if not os.path.exists(local_path):
+            os.makedirs(local_path)
+        local_file = os.path.join(local_path, model_file)
+        bucket.get_object_to_file(model_file, local_file)
+        return True
+    else:
+        # 模型文件不存在,5分钟后重新检查
+        Timer(60, model_oss_check, args=[model_file]).start()
+
+
+def server_restart(slb_client, instance_id, ip_address, image_name):
+    try:
+        # 1. 检查模型文件
+
+        # 2. 摘流量
+        utils.set_instance_weight_process(client=slb_client,
+                                          slb_id=rec_model_config.slb_id,
+                                          instance_id_list=[instance_id],
+                                          weight_list=[(0, 60)])
+        logging.info(f"set weight with 0 finished.")
+        # 3. 重启容器
+        docker_client = docker.DockerClient(base_url=f'tcp://{ip_address}:2375', timeout=60)
+        image_id = docker_client.containers.get(image_name)
+        image_id.restart()
+        logging.info(f"docker restart finished.")
+        time.sleep(5)
+        # 4. 探活
+        rec_server_health_check(slb_client, instance_id)
+        time.sleep(30)
+        logging.info(f"health check finished.")
+        # 5. 挂流量
+        add_weight_list = [(10, 30), (20, 20), (40, 10), (60, 10), (80, 10), (100, 10)]
+        utils.set_instance_weight_process(client=slb_client,
+                                          slb_id=rec_model_config.slb_id,
+                                          instance_id_list=[instance_id],
+                                          weight_list=add_weight_list)
+        logging.info(f"server restart finished, instance: {instance_id}/{ip_address}")
+    except Exception as e:
+        logging.error(f"server restart fail, instance: {instance_id}")
+        logging.error(e)
+
+
+def main():
+    try:
+        # 1. 检查oss中模型文件夹是否准备好并下载
+        now_date = datetime.datetime.today()
+        dt = datetime.datetime.strftime(now_date, '%Y%m%d')
+        model_file = f"{dt}.tar"
+        local_path = '/data/rec_model'
+        check_res = model_oss_check(model_file=model_file, local_path=local_path)
+        if check_res is True:
+            # 2. 获取slb下所有机器
+            slb_client = utils.connect_client(access_key_id=rec_model_config.slb_client_params['access_key_id'],
+                                              access_key_secret=rec_model_config.slb_client_params['access_key_secret'],
+                                              region_id=rec_model_config.slb_client_params['region_id'])
+
+            online_instance_ids = utils.get_instance_ids(client=slb_client, slb_id=rec_model_config.slb_id)
+            online_instance_count = len(online_instance_ids)
+            logging.info(f"online instance count: {online_instance_count}.")
+            logging.info(f"online instance ids: {online_instance_ids}")
+
+            # 3. 逐台更新
+            image_name = ''
+            for i, instance_id in enumerate(online_instance_ids):
+                logging.info(f"instance:{instance_id}")
+                # 3.1. 获取ip
+                ip_address = utils.get_ip_address(client=slb_client, instance_id=instance_id)
+                logging.info(f"ip address:{instance_id}/{ip_address}")
+                # 3.2. scp 模型文件到服务器
+                ecs_model_path = '/data/offline_dir'
+                scp_command = f"scp {local_path}/{model_file} {ip_address}:{ecs_model_path}"
+                os.system(command=scp_command)
+                # 3.3 服务重启
+                server_restart(slb_client=slb_client, instance_id=instance_id,
+                               ip_address=ip_address, image_name=image_name)
+                logging.info(f"重启进度: {i+1}/{online_instance_count}")
+            logging.info(f"server restart finished!")
+    except Exception as e:
+        logging.error(e)
+
+
+if __name__ == '__main__':
+    main()