浏览代码

增加更新工作流函数

xueyiming 2 周之前
父节点
当前提交
0e568cca1a
共有 1 个文件被更改,包括 44 次插入44 次删除
  1. 44 44
      ad/pai_flow_operator2.py

+ 44 - 44
ad/pai_flow_operator2.py

@@ -394,49 +394,6 @@ def get_online_version_dt(service_name: str):
     return online_date
 
 
-def update_online_flow():
-    try:
-        online_version_dt = get_online_version_dt('ad_rank_dnn_v11_easyrec')
-        draft = PAIClient.get_work_flow_draft(experiment_id)
-        print(json.dumps(draft, ensure_ascii=False))
-        content = draft['Content']
-        version = draft['Version']
-        print(content)
-        content_json = json.loads(content)
-        nodes = content_json.get('nodes')
-        global_params = content_json.get('globalParams')
-        bizdate = get_previous_days_date(1)
-        for global_param in global_params:
-            try:
-                if global_param['name'] == 'bizdate':
-                    global_param['value'] = bizdate
-                if global_param['name'] == 'online_version_dt':
-                    global_param['value'] = online_version_dt
-                if global_param['name'] == 'eval_date':
-                    global_param['value'] = bizdate
-            except KeyError:
-                raise Exception("在处理全局参数时,字典中缺少必要的键")
-        for node in nodes:
-            try:
-                name = node['name']
-                if name == '样本shuffle':
-                    properties = node['properties']
-                    for property in properties:
-                        if property['name'] == 'sql':
-                            value = property['value']
-                            new_value = update_train_tables(value)
-                            if new_value is None:
-                                print("error")
-                            property['value'] = new_value
-            except KeyError:
-                raise Exception("在处理节点属性时,字典中缺少必要的键")
-        new_content = json.dumps(content_json, ensure_ascii=False)
-        PAIClient.update_experiment_content(experiment_id, new_content, version)
-    except json.JSONDecodeError:
-        raise Exception("JSON 解析错误,可能是草稿内容格式不正确")
-    except Exception as e:
-        raise Exception(f"发生未知错误: {e}")
-
 
 def update_shuffle_flow(table):
     draft = PAIClient.get_work_flow_draft(experiment_id)
@@ -531,6 +488,49 @@ def get_job_dict():
                 job_dict[name] = job_detail['JobId']
     return job_dict
 
+@retry
+def update_online_flow():
+    try:
+        online_version_dt = get_online_version_dt('ad_rank_dnn_v11_easyrec')
+        draft = PAIClient.get_work_flow_draft(experiment_id)
+        print(json.dumps(draft, ensure_ascii=False))
+        content = draft['Content']
+        version = draft['Version']
+        print(content)
+        content_json = json.loads(content)
+        nodes = content_json.get('nodes')
+        global_params = content_json.get('globalParams')
+        bizdate = get_previous_days_date(1)
+        for global_param in global_params:
+            try:
+                if global_param['name'] == 'bizdate':
+                    global_param['value'] = bizdate
+                if global_param['name'] == 'online_version_dt':
+                    global_param['value'] = online_version_dt
+                if global_param['name'] == 'eval_date':
+                    global_param['value'] = bizdate
+            except KeyError:
+                raise Exception("在处理全局参数时,字典中缺少必要的键")
+        for node in nodes:
+            try:
+                name = node['name']
+                if name == '样本shuffle':
+                    properties = node['properties']
+                    for property in properties:
+                        if property['name'] == 'sql':
+                            value = property['value']
+                            new_value = update_train_tables(value)
+                            if new_value is None:
+                                print("error")
+                            property['value'] = new_value
+            except KeyError:
+                raise Exception("在处理节点属性时,字典中缺少必要的键")
+        new_content = json.dumps(content_json, ensure_ascii=False)
+        PAIClient.update_experiment_content(experiment_id, new_content, version)
+    except json.JSONDecodeError:
+        raise Exception("JSON 解析错误,可能是草稿内容格式不正确")
+    except Exception as e:
+        raise Exception(f"发生未知错误: {e}")
 
 @retry
 def shuffle_table():
@@ -707,7 +707,7 @@ def validate_model_data_accuracy():
 
 if __name__ == '__main__':
     start_time = int(time.time())
-    functions = [shuffle_table, shuffle_train_model, export_model, get_validate_model_data]
+    functions = [update_online_flow, shuffle_table, shuffle_train_model, export_model, get_validate_model_data]
     function_names = [func.__name__ for func in functions]
 
     start_function = None