xueyiming 3 周之前
父节点
当前提交
cee77fd4fc
共有 1 个文件被更改,包括 7 次插入17 次删除
  1. 7 17
      ad/pai_flow_operator2.py

+ 7 - 17
ad/pai_flow_operator2.py

@@ -49,21 +49,10 @@ def get_data_from_odps(project, table, num):
         sql = f'select * from {table} limit {num}'
         # 执行 SQL 查询
         with odps.execute_sql(sql).open_reader() as reader:
+            df = reader.to_pandas()
             # 查询数量小于目标数量时 返回空
-            if reader.count < num:
+            if len(df) < num:
                 return None
-            # 获取字段名称
-            column_names = reader.schema.names
-            # 获取查询结果数据
-            data = []
-            for record in reader:
-                record_list = list(record)
-                numbers = []
-                for item in record_list:
-                    numbers.append(item[1])
-                data.append(numbers)
-            # 将数据和字段名称组合成 DataFrame
-            df = pd.DataFrame(data, columns=column_names)
             return df
     except Exception as e:
         print(f"发生错误: {e}")
@@ -536,10 +525,11 @@ def update_online_model():
 
 def validate_model_data_accuracy(start_time):
     node_dict = get_node_dict()
-    train_node_id = node_dict['虚拟起始节点']
-    execute_type = 'EXECUTE_FROM_HERE'
-    validate_res = PAIClient.create_job(experiment_id, train_node_id, execute_type)
-    validate_job_id = validate_res['JobId']
+    # train_node_id = node_dict['虚拟起始节点']
+    # execute_type = 'EXECUTE_FROM_HERE'
+    # validate_res = PAIClient.create_job(experiment_id, train_node_id, execute_type)
+    # validate_job_id = validate_res['JobId']
+    validate_job_id = 'job-5h6k4wioq74ow5apuw'
     validate_job_detail = wait_job_end(validate_job_id)
     if validate_job_detail['Status'] == 'Succeeded':
         pipeline_run_id = validate_job_detail['RunId']