qingqu-git 2 лет назад
Родитель
Сommit
18d093324b
3 измененных файлов с 121 добавлено и 20 удалено
  1. 32 3
      rec_model_config.py
  2. 51 17
      rec_model_update.py
  3. 38 0
      utils.py

+ 32 - 3
rec_model_config.py

@@ -1,8 +1,9 @@
+import os
 # OSS配置
 OSS_CONFIG = {
     'endpoint': 'http://oss-cn-hangzhou-internal.aliyuncs.com',
-    'accessKeyId': 'LTAIP6x1l3DXfSxm',
-    'accessKeySecret': 'KbTaM9ars4OX3PMS6Xm7rtxGr1FLon',
+    'accessKeyId': 'LTAI5tFFF44gSGQTvmozG3zK',
+    'accessKeySecret': 'oQlax6KAEENCQRbWkbSFiW9z2KSre8',
     'bucket_name': 'recommend-sort-models'
 }
 
@@ -20,4 +21,32 @@ create_client_params = {
     'access_key_id': 'LTAI4GBWbFvvXoXsSVBe1o9f',
     'access_key_secret': 'kRAikWitb4kDxaAyBqNrmLmllMEDO3',
     'region_id': 'cn-hangzhou'
-}
+}
+start_sh_dir = os.path.dirname(os.path.realpath(__file__))
+start_sh_filename = 'deep_model_start.sh'
+with open(file=os.path.join(start_sh_dir, start_sh_filename), mode='r', encoding='utf-8') as rf:
+    file_content = rf.read()
+    print(file_content)
+    #logging.info(f"start sh file content: {file_content}")
+
+
+start_sh = {
+    'target_dir': '/data/SortService',
+    'name': start_sh_filename,
+    'content': file_content,
+}
+
+
+check_sh_filename = 'deep_model_check.sh'
+check_sh_dir = os.path.dirname(os.path.realpath(__file__))
+with open(file=os.path.join(check_sh_dir, check_sh_filename), mode='r', encoding='utf-8') as rf:
+    check_file_content = rf.read()
+    print(check_file_content)
+
+
+check_sh = {
+    'target_dir': '/data/SortService',
+    'name': check_sh_filename,
+    'content': check_file_content,
+}
+

+ 51 - 17
rec_model_update.py

@@ -31,21 +31,22 @@ from threading import Timer
 logging.basicConfig(level=logging.INFO,
                     format='%(asctime)s %(filename)s[line:%(lineno)d] %(levelname)s %(message)s',
                     datefmt='%a, %d %b %Y %H:%M:%S')
+health_instances=[]
 
-
-def rec_server_health_check(client, instance_id):
+def rec_server_health_check(client, instance_id, ip_address):
     """
     服务健康检查
     :param client: 客户端连接
     :param instance_id: instanceId
     :return:
     """
-    global health_instances
-    ip_address = utils.get_ip_address(client=client, instance_id=instance_id)
+    #health_instances = []
+    #ip_address = utils.get_ip_address(client=client, instance_id=instance_id)
     while True:
         health_check_url = f"http://{ip_address}:8501/v1/models/deepfm"
         try:
             http_code = requests.get(health_check_url).status_code
+            print(http_code)
         except:
             logging.info("images is downloading")
             http_code = 0
@@ -77,24 +78,50 @@ def model_oss_check(model_file, local_path):
         Timer(60, model_oss_check, args=[model_file]).start()
 
 
-def server_restart(slb_client, instance_id, ip_address, image_name):
+def server_restart(slb_client, instance_id, ip_address, ins_name, dt):
     try:
         # 1. 检查模型文件
-
+        #ip_address = 
         # 2. 摘流量
         utils.set_instance_weight_process(client=slb_client,
                                           slb_id=rec_model_config.slb_id,
                                           instance_id_list=[instance_id],
                                           weight_list=[(0, 60)])
         logging.info(f"set weight with 0 finished.")
-        # 3. 重启容器
+        #3.scp start.sh 
+        instance_id_list=[]
+        instance_id_list.append(instance_id)
+        #print(rec_model_config)
+        utils.send_file_to_ecs(client=slb_client, instance_id_list=instance_id_list, **rec_model_config.start_sh)
+        #utils.send_file_to_ecs(client=slb_client, instance_id_list=instance_id_list, **rec_model_config.check_sh)
+
+        #server_check_sh = os.path.join(rec_model_config.check_sh['target_dir'], rec_model_config.check_sh['name'])
+        #server_check_commend = f"sh {server_check_sh} {dt}"
+        #print(server_check_commend)
+        #print(instance_id)
+        #command_status=utils.run_per_command(client=slb_client, instance=instance_id, command=server_check_commend)
+        #print("command_status:",command_status)
+        # 4. 重启容器
         docker_client = docker.DockerClient(base_url=f'tcp://{ip_address}:2375', timeout=60)
-        image_id = docker_client.containers.get(image_name)
-        image_id.restart()
+        logging.info(docker_client)
+        #print(docker_client.containers.list)
+        image_id = 0
+        try:
+            image_id = docker_client.containers.get(ins_name)
+            #print(image_id)
+            image_id.stop()
+            image_id.remove()
+        except:
+            image_id = 0
+        #image_id.sop()
+        #print("image_id",image_id)
+        server_start_sh = os.path.join(rec_model_config.start_sh['target_dir'], rec_model_config.start_sh['name'])
+        server_start_commend = f"sh {server_start_sh}"
+        utils.run_command(client=slb_client, instance_ids=instance_id_list, command=server_start_commend)
         logging.info(f"docker restart finished.")
         time.sleep(5)
         # 4. 探活
-        rec_server_health_check(slb_client, instance_id)
+        rec_server_health_check(slb_client, instance_id, ip_address)
         time.sleep(30)
         logging.info(f"health check finished.")
         # 5. 挂流量
@@ -102,9 +129,10 @@ def server_restart(slb_client, instance_id, ip_address, image_name):
         utils.set_instance_weight_process(client=slb_client,
                                           slb_id=rec_model_config.slb_id,
                                           instance_id_list=[instance_id],
-                                          weight_list=add_weight_list)
+                                           weight_list=add_weight_list)
         logging.info(f"server restart finished, instance: {instance_id}/{ip_address}")
     except Exception as e:
+        print(e)
         logging.error(f"server restart fail, instance: {instance_id}")
         logging.error(e)
 
@@ -112,11 +140,14 @@ def server_restart(slb_client, instance_id, ip_address, image_name):
 def main():
     try:
         # 1. 检查oss中模型文件夹是否准备好并下载
-        now_date = datetime.datetime.today()
+        now_date = datetime.datetime.today()- datetime.timedelta(days=1)
+        print("model update date",now_date)
         dt = datetime.datetime.strftime(now_date, '%Y%m%d')
         model_file = f"{dt}.tar"
         local_path = '/data/rec_model'
+        #print(rec_model_config.start_sh)
         check_res = model_oss_check(model_file=model_file, local_path=local_path)
+        check_res = True
         if check_res is True:
             # 2. 获取slb下所有机器
             slb_client = utils.connect_client(access_key_id=rec_model_config.slb_client_params['access_key_id'],
@@ -125,11 +156,13 @@ def main():
 
             online_instance_ids = utils.get_instance_ids(client=slb_client, slb_id=rec_model_config.slb_id)
             online_instance_count = len(online_instance_ids)
+            #online_instance_count=1
             logging.info(f"online instance count: {online_instance_count}.")
             logging.info(f"online instance ids: {online_instance_ids}")
-
+            #online_instance_ids=['i-bp13mx85rzardscc89z3']
             # 3. 逐台更新
-            image_name = 'tensorflow/serving'
+            #image_name = 'tensorflow/serving'
+            docker_ins_name = 'deepfm'
             for i, instance_id in enumerate(online_instance_ids):
                 logging.info(f"instance:{instance_id}")
                 # 3.1. 获取ip
@@ -138,15 +171,16 @@ def main():
                 # 3.2. scp 模型文件到服务器
                 ecs_model_path = '/data/offline_dir'
                 scp_command = f"scp {local_path}/{model_file} {ip_address}:{ecs_model_path}"
-                os.system(command=scp_command)
+                #os.system(command=scp_command)
                 # 3.3 服务重启
                 server_restart(slb_client=slb_client, instance_id=instance_id,
-                               ip_address=ip_address, image_name=image_name)
+                               ip_address=ip_address, ins_name=docker_ins_name, dt=dt)
                 logging.info(f"重启进度: {i+1}/{online_instance_count}")
+                #break
             logging.info(f"server restart finished!")
     except Exception as e:
         logging.error(e)
 
 
 if __name__ == '__main__':
-    main()
+    main()

+ 38 - 0
utils.py

@@ -103,6 +103,26 @@ def send_request(client, request):
         sys.exit()
 
 
+def send_req(client, request):
+    """
+    发送API请求
+    :param client: 客户端连接
+    :param request: 请求配置
+    :return: response
+    """
+    request.set_accept_format('json')
+    response = client.do_action_with_exception(request)
+    #print(response)
+    response = json.loads(response)
+    print(response)
+        # logging.info(response)
+    print(response.get('Code'))
+    return response
+    #except Exception as e:
+        # 失败,记录报错信息,发送通知,停止并退出
+    #logging.error(e)
+    #sys.exit()
+
 def check_instance_running(client, instance_ids):
     """
     检查服务器运行状态
@@ -203,6 +223,24 @@ def run_command(client, instance_ids, command):
         response = send_request(client=client, request=request)
         logging.info(response)
 
+def run_per_command(client, instance, command):
+    """
+    批量执行命令
+    :param client: 客户端连接
+    :param instance_ids: 实例id列表, type-list, 最多能指定50台ECS实例ID
+    :param command: 命令 type-string
+    :return:
+    """
+    #for i in range(len(instance_ids) // 50 + 1)
+    request = RunCommandRequest()
+    request.set_accept_format('json')
+    request.set_Type("RunShellScript")
+    request.set_CommandContent(command)
+    request.set_InstanceIds([instance])
+    response = send_req(client=client, request=request)
+    logging.info(response)
+    return response
+
 
 def get_instance_ids(client, slb_id):
     """