pai_flow_operator.py 20 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480
  1. # -*- coding: utf-8 -*-
  2. import os
  3. import re
  4. import sys
  5. from typing import List
  6. from datetime import datetime, timedelta
  7. import time
  8. from alibabacloud_paistudio20210202.client import Client as PaiStudio20210202Client
  9. from alibabacloud_tea_openapi import models as open_api_models
  10. from alibabacloud_paistudio20210202 import models as pai_studio_20210202_models
  11. from alibabacloud_tea_util import models as util_models
  12. from alibabacloud_tea_util.client import Client as UtilClient
  13. from alibabacloud_eas20210701.client import Client as eas20210701Client
  14. from alibabacloud_paiflow20210202 import models as paiflow_20210202_models
  15. from alibabacloud_paiflow20210202.client import Client as PAIFlow20210202Client
  16. from odps import ODPS
  17. target_names = {
  18. '模型训练-自定义',
  19. '模型增量训练',
  20. '模型导出-2',
  21. '更新EAS服务(Beta)-1',
  22. '虚拟起始节点',
  23. '二分类评估-1',
  24. '二分类评估-2',
  25. '预测结果对比'
  26. }
  27. import json
  28. def get_odps_instance(project):
  29. odps = ODPS(
  30. access_id='LTAIWYUujJAm7CbH',
  31. secret_access_key='RfSjdiWwED1sGFlsjXv0DlfTnZTG1P',
  32. project=project,
  33. endpoint='http://service.cn.maxcompute.aliyun.com/api',
  34. )
  35. return odps
  36. def get_dates_between(start_date_str, end_date_str):
  37. start_date = datetime.strptime(start_date_str, '%Y%m%d')
  38. end_date = datetime.strptime(end_date_str, '%Y%m%d')
  39. dates = []
  40. current_date = start_date
  41. while current_date <= end_date:
  42. dates.append(current_date.strftime('%Y%m%d'))
  43. current_date += timedelta(days=1)
  44. return dates
  45. def read_file_to_list():
  46. try:
  47. current_dir = os.getcwd()
  48. file_path = os.path.join(current_dir, 'holidays.txt')
  49. with open(file_path, 'r', encoding='utf-8') as file:
  50. content = file.read()
  51. return content.split('\n')
  52. except FileNotFoundError:
  53. print(f"错误:未找到 {file_path} 文件。")
  54. except Exception as e:
  55. print(f"错误:发生了一个未知错误: {e}")
  56. return []
  57. def get_previous_days_date(days):
  58. current_date = datetime.now()
  59. previous_date = current_date - timedelta(days=days)
  60. return previous_date.strftime('%Y%m%d')
  61. def remove_elements(lst1, lst2):
  62. return [element for element in lst1 if element not in lst2]
  63. def process_list(lst, append_str):
  64. # 给列表中每个元素拼接相同的字符串
  65. appended_list = [append_str + element for element in lst]
  66. # 将拼接后的列表元素用逗号拼接成一个字符串
  67. result_str = ','.join(appended_list)
  68. return result_str
  69. def get_train_tables():
  70. start_date = '20250223'
  71. end_date = get_previous_days_date(2)
  72. date_list = get_dates_between(start_date, end_date)
  73. filter_date_list = read_file_to_list()
  74. date_list = remove_elements(date_list, filter_date_list)
  75. address = 'odps://loghubods/tables/ad_easyrec_train_data_v3_sampled/dt='
  76. train_tables = process_list(date_list, address)
  77. return train_tables
  78. def update_train_tables(old_str):
  79. train_tables = get_train_tables()
  80. start_index = old_str.find('-Dtrain_tables="')
  81. if start_index != -1:
  82. # 确定等号的位置
  83. equal_sign_index = start_index + len('-Dtrain_tables="')
  84. # 找到下一个双引号的位置
  85. next_quote_index = old_str.find('"', equal_sign_index)
  86. if next_quote_index != -1:
  87. # 进行替换
  88. new_value = old_str[:equal_sign_index] + train_tables + old_str[next_quote_index:]
  89. return new_value
  90. return None
  91. class PAIClient:
  92. def __init__(self):
  93. pass
  94. @staticmethod
  95. def create_client() -> PaiStudio20210202Client:
  96. """
  97. 使用AK&SK初始化账号Client
  98. @return: Client
  99. @throws Exception
  100. """
  101. # 工程代码泄露可能会导致 AccessKey 泄露,并威胁账号下所有资源的安全性。以下代码示例仅供参考。
  102. # 建议使用更安全的 STS 方式,更多鉴权访问方式请参见:https://help.aliyun.com/document_detail/378659.html。
  103. config = open_api_models.Config(
  104. access_key_id="LTAI5tFGqgC8f3mh1fRCrAEy",
  105. access_key_secret="XhOjK9XmTYRhVAtf6yii4s4kZwWzvV"
  106. )
  107. # Endpoint 请参考 https://api.aliyun.com/product/PaiStudio
  108. config.endpoint = f'pai.cn-hangzhou.aliyuncs.com'
  109. return PaiStudio20210202Client(config)
  110. @staticmethod
  111. def create_eas_client() -> eas20210701Client:
  112. """
  113. 使用AK&SK初始化账号Client
  114. @return: Client
  115. @throws Exception
  116. """
  117. # 工程代码泄露可能会导致 AccessKey 泄露,并威胁账号下所有资源的安全性。以下代码示例仅供参考。
  118. # 建议使用更安全的 STS 方式,更多鉴权访问方式请参见:https://help.aliyun.com/document_detail/378659.html。
  119. config = open_api_models.Config(
  120. access_key_id="LTAI5tFGqgC8f3mh1fRCrAEy",
  121. access_key_secret="XhOjK9XmTYRhVAtf6yii4s4kZwWzvV"
  122. )
  123. # Endpoint 请参考 https://api.aliyun.com/product/PaiStudio
  124. config.endpoint = f'pai-eas.cn-hangzhou.aliyuncs.com'
  125. return eas20210701Client(config)
  126. @staticmethod
  127. def create_flow_client() -> PAIFlow20210202Client:
  128. """
  129. 使用AK&SK初始化账号Client
  130. @return: Client
  131. @throws Exception
  132. """
  133. # 工程代码泄露可能会导致 AccessKey 泄露,并威胁账号下所有资源的安全性。以下代码示例仅供参考。
  134. # 建议使用更安全的 STS 方式,更多鉴权访问方式请参见:https://help.aliyun.com/document_detail/378659.html。
  135. config = open_api_models.Config(
  136. # 必填,请确保代码运行环境设置了环境变量 ALIBABA_CLOUD_ACCESS_KEY_ID。,
  137. access_key_id="LTAI5tFGqgC8f3mh1fRCrAEy",
  138. # 必填,请确保代码运行环境设置了环境变量 ALIBABA_CLOUD_ACCESS_KEY_SECRET。,
  139. access_key_secret="XhOjK9XmTYRhVAtf6yii4s4kZwWzvV"
  140. )
  141. # Endpoint 请参考 https://api.aliyun.com/product/PAIFlow
  142. config.endpoint = f'paiflow.cn-hangzhou.aliyuncs.com'
  143. return PAIFlow20210202Client(config)
  144. @staticmethod
  145. def get_work_flow_draft_list(workspace_id: str):
  146. client = PAIClient.create_client()
  147. list_experiments_request = pai_studio_20210202_models.ListExperimentsRequest(
  148. workspace_id=workspace_id
  149. )
  150. runtime = util_models.RuntimeOptions()
  151. headers = {}
  152. try:
  153. resp = client.list_experiments_with_options(list_experiments_request, headers, runtime)
  154. return resp.body.to_map()
  155. except Exception as error:
  156. print(error.message)
  157. print(error.data.get("Recommend"))
  158. UtilClient.assert_as_string(error.message)
  159. @staticmethod
  160. def get_work_flow_draft(experiment_id: str):
  161. client = PAIClient.create_client()
  162. runtime = util_models.RuntimeOptions()
  163. headers = {}
  164. try:
  165. # 复制代码运行请自行打印 API 的返回值
  166. resp = client.get_experiment_with_options(experiment_id, headers, runtime)
  167. return resp.body.to_map()
  168. except Exception as error:
  169. # 此处仅做打印展示,请谨慎对待异常处理,在工程项目中切勿直接忽略异常。
  170. # 错误 message
  171. print(error.message)
  172. # 诊断地址
  173. print(error.data.get("Recommend"))
  174. UtilClient.assert_as_string(error.message)
  175. @staticmethod
  176. def get_describe_service(service_name: str):
  177. client = PAIClient.create_eas_client()
  178. runtime = util_models.RuntimeOptions()
  179. headers = {}
  180. try:
  181. # 复制代码运行请自行打印 API 的返回值
  182. resp = client.describe_service_with_options('cn-hangzhou', service_name, headers, runtime)
  183. return resp.body.to_map()
  184. except Exception as error:
  185. # 此处仅做打印展示,请谨慎对待异常处理,在工程项目中切勿直接忽略异常。
  186. # 错误 message
  187. print(error.message)
  188. # 诊断地址
  189. print(error.data.get("Recommend"))
  190. UtilClient.assert_as_string(error.message)
  191. @staticmethod
  192. def update_experiment_content(experiment_id: str, content: str, version: int):
  193. client = PAIClient.create_client()
  194. update_experiment_content_request = pai_studio_20210202_models.UpdateExperimentContentRequest(content=content,
  195. version=version)
  196. runtime = util_models.RuntimeOptions()
  197. headers = {}
  198. try:
  199. # 复制代码运行请自行打印 API 的返回值
  200. resp = client.update_experiment_content_with_options(experiment_id, update_experiment_content_request,
  201. headers, runtime)
  202. print(resp.body.to_map())
  203. except Exception as error:
  204. # 此处仅做打印展示,请谨慎对待异常处理,在工程项目中切勿直接忽略异常。
  205. # 错误 message
  206. print(error.message)
  207. # 诊断地址
  208. print(error.data.get("Recommend"))
  209. UtilClient.assert_as_string(error.message)
  210. @staticmethod
  211. def create_job(experiment_id: str, node_id: str, execute_type: str):
  212. client = PAIClient.create_client()
  213. create_job_request = pai_studio_20210202_models.CreateJobRequest()
  214. create_job_request.experiment_id = experiment_id
  215. create_job_request.node_id = node_id
  216. create_job_request.execute_type = execute_type
  217. runtime = util_models.RuntimeOptions()
  218. headers = {}
  219. try:
  220. # 复制代码运行请自行打印 API 的返回值
  221. resp = client.create_job_with_options(create_job_request, headers, runtime)
  222. return resp.body.to_map()
  223. except Exception as error:
  224. # 此处仅做打印展示,请谨慎对待异常处理,在工程项目中切勿直接忽略异常。
  225. # 错误 message
  226. print(error.message)
  227. # 诊断地址
  228. print(error.data.get("Recommend"))
  229. UtilClient.assert_as_string(error.message)
  230. @staticmethod
  231. def get_job_detail(job_id: str):
  232. client = PAIClient.create_client()
  233. get_job_request = pai_studio_20210202_models.GetJobRequest(
  234. verbose=False
  235. )
  236. runtime = util_models.RuntimeOptions()
  237. headers = {}
  238. try:
  239. # 复制代码运行请自行打印 API 的返回值
  240. resp = client.get_job_with_options(job_id, get_job_request, headers, runtime)
  241. return resp.body.to_map()
  242. except Exception as error:
  243. # 此处仅做打印展示,请谨慎对待异常处理,在工程项目中切勿直接忽略异常。
  244. # 错误 message
  245. print(error.message)
  246. # 诊断地址
  247. print(error.data.get("Recommend"))
  248. UtilClient.assert_as_string(error.message)
  249. @staticmethod
  250. def get_flow_out_put(pipeline_run_id: str, node_id: str, depth: int):
  251. client = PAIClient.create_flow_client()
  252. list_pipeline_run_node_outputs_request = paiflow_20210202_models.ListPipelineRunNodeOutputsRequest(
  253. depth=depth
  254. )
  255. runtime = util_models.RuntimeOptions()
  256. headers = {}
  257. try:
  258. # 复制代码运行请自行打印 API 的返回值
  259. resp = client.list_pipeline_run_node_outputs_with_options(pipeline_run_id, node_id,
  260. list_pipeline_run_node_outputs_request, headers,
  261. runtime)
  262. return resp.body.to_map()
  263. except Exception as error:
  264. # 此处仅做打印展示,请谨慎对待异常处理,在工程项目中切勿直接忽略异常。
  265. # 错误 message
  266. print(error.message)
  267. # 诊断地址
  268. print(error.data.get("Recommend"))
  269. UtilClient.assert_as_string(error.message)
  270. def extract_date_yyyymmdd(input_string):
  271. pattern = r'\d{8}'
  272. matches = re.findall(pattern, input_string)
  273. if matches:
  274. return matches[0]
  275. return None
  276. def get_online_version_dt(service_name: str):
  277. model_detail = PAIClient.get_describe_service(service_name)
  278. service_config_str = model_detail['ServiceConfig']
  279. service_config = json.loads(service_config_str)
  280. model_path = service_config['model_path']
  281. online_date = extract_date_yyyymmdd(model_path)
  282. return online_date
  283. def update_online_flow():
  284. online_version_dt = get_online_version_dt('ad_rank_dnn_v11_easyrec')
  285. # print(online_version_dt)
  286. # body = PAIClient.get_work_flow_draft_list('96094')
  287. # print(json.dumps(body, ensure_ascii=False, indent=4))
  288. experiment_id = "draft-7u3e9v1uc5pohjl0t6"
  289. draft = PAIClient.get_work_flow_draft(experiment_id)
  290. print(json.dumps(draft, ensure_ascii=False))
  291. content = draft['Content']
  292. version = draft['Version']
  293. print(content)
  294. content_json = json.loads(content)
  295. nodes = content_json.get('nodes')
  296. global_params = content_json.get('globalParams')
  297. bizdate = get_previous_days_date(1)
  298. for global_param in global_params:
  299. if global_param['name'] == 'bizdate':
  300. global_param['value'] = bizdate
  301. if global_param['name'] == 'online_version_dt':
  302. global_param['value'] = '20250323'
  303. if global_param['name'] == 'eval_date':
  304. global_param['value'] = bizdate
  305. # print(global_params)
  306. # print(nodes)
  307. for node in nodes:
  308. name = node['name']
  309. if name == '模型训练-自定义':
  310. properties = node['properties']
  311. for property in properties:
  312. if property['name'] == 'sql':
  313. value = property['value']
  314. new_value = update_train_tables(value)
  315. # TODO 空值报警
  316. if new_value is None:
  317. print("error")
  318. property['value'] = new_value
  319. # print(name)
  320. # print(properties)
  321. new_content = json.dumps(content_json, ensure_ascii=False)
  322. PAIClient.update_experiment_content(experiment_id, new_content, version)
  323. def wait_job_end(job_id: str):
  324. while True:
  325. job_detail = PAIClient.get_job_detail(job_id)
  326. print(job_detail)
  327. statue = job_detail['Status']
  328. # Initialized: 初始化完成 Starting:开始 WorkflowServiceStarting:准备提交 Running:运行中 ReadyToSchedule:准备运行(前序节点未完成导致)
  329. if (statue == 'Initialized' or statue == 'Starting' or statue == 'WorkflowServiceStarting'
  330. or statue == 'Running' or statue == 'ReadyToSchedule'):
  331. # 睡眠300s 等待下次获取
  332. time.sleep(300)
  333. continue
  334. # Failed:运行失败 Terminating:终止中 Terminated:已终止 Unknown:未知 Skipped:跳过(前序节点失败导致) Succeeded:运行成功
  335. if statue == 'Failed' or statue == 'Terminating' or statue == 'Unknown' or statue == 'Skipped' or statue == 'Succeeded':
  336. return job_detail
  337. def get_node_dict():
  338. experiment_id = "draft-7u3e9v1uc5pohjl0t6"
  339. draft = PAIClient.get_work_flow_draft(experiment_id)
  340. content = draft['Content']
  341. content_json = json.loads(content)
  342. nodes = content_json.get('nodes')
  343. node_dict = {}
  344. for node in nodes:
  345. name = node['name']
  346. # 检查名称是否在目标名称集合中
  347. if name in target_names:
  348. node_dict[name] = node['id']
  349. return node_dict
  350. def train_model():
  351. node_dict = get_node_dict()
  352. experiment_id = "draft-7u3e9v1uc5pohjl0t6"
  353. train_node_id = node_dict['模型训练-自定义']
  354. execute_type = 'EXECUTE_ONE'
  355. train_res = PAIClient.create_job(experiment_id, train_node_id, execute_type)
  356. train_job_id = train_res['JobId']
  357. train_job_detail = wait_job_end(train_job_id)
  358. if train_job_detail['Status'] == 'Succeeded':
  359. experiment_id = "draft-7u3e9v1uc5pohjl0t6"
  360. export_node_id = node_dict['模型导出-2']
  361. export_res = PAIClient.create_job(experiment_id, export_node_id, execute_type)
  362. export_job_id = export_res['JobId']
  363. export_job_detail = wait_job_end(export_job_id)
  364. if export_job_detail['Status'] == 'Succeeded':
  365. return True
  366. def validate_model_data_accuracy():
  367. experiment_id = "draft-7u3e9v1uc5pohjl0t6"
  368. node_dict = get_node_dict()
  369. train_node_id = node_dict['虚拟起始节点']
  370. execute_type = 'EXECUTE_FROM_HERE'
  371. validate_res = PAIClient.create_job(experiment_id, train_node_id, execute_type)
  372. print(validate_res)
  373. validate_job_id = validate_res['JobId']
  374. print(validate_job_id)
  375. validate_job_detail = wait_job_end(validate_job_id)
  376. print('res')
  377. print(validate_job_detail)
  378. if validate_job_detail['Status'] == 'Succeeded':
  379. pipeline_run_id = validate_job_detail['RunId']
  380. node_id = validate_job_detail['PaiflowNodeId']
  381. flow_out_put_detail = PAIClient.get_flow_out_put(pipeline_run_id, node_id, 3)
  382. print(flow_out_put_detail)
  383. tabel_dict = {}
  384. out_puts = flow_out_put_detail['Outputs']
  385. for out_put in out_puts:
  386. if out_put["Producer"] == node_dict['二分类评估-1'] and out_put["Name"] == "outputMetricTable":
  387. value1 = json.loads(out_put["Info"]['value'])
  388. tabel_dict['二分类评估-1'] = value1['location']['table']
  389. if out_put["Producer"] == node_dict['二分类评估-2'] and out_put["Name"] == "outputMetricTable":
  390. value2 = json.loads(out_put["Info"]['value'])
  391. tabel_dict['二分类评估-2'] = value2['location']['table']
  392. if out_put["Producer"] == node_dict['预测结果对比'] and out_put["Name"] == "outputTable":
  393. value3 = json.loads(out_put["Info"]['value'])
  394. print(value3)
  395. tabel_dict['预测结果对比'] = value3['location']['table']
  396. print(tabel_dict)
  397. if __name__ == '__main__':
  398. # 1.更新工作流
  399. # update_online_flow()
  400. # 2.训练模型
  401. # train_model()
  402. # 3. 验证模型数据
  403. # validate_model_data_accuracy()
  404. node_dict = get_node_dict()
  405. str = '{"Creator": "204034041838504386", "ExecuteType": "EXECUTE_FROM_HERE", "ExperimentId": "draft-7u3e9v1uc5pohjl0t6", "GmtCreateTime": "2025-04-01T03:17:42.000+00:00", "JobId": "job-8u3ev2uf5ncoexj9p9", "PaiflowNodeId": "node-9wtveoz1tu89tqfoox", "RequestId": "6ED5FFB1-346B-5075-ACC9-029EB77E9F09", "RunId": "flow-lchat027733ttstdc0", "Status": "Succeeded", "WorkspaceId": "96094"}'
  406. validate_job_detail = json.loads(str)
  407. if validate_job_detail['Status'] == 'Succeeded':
  408. pipeline_run_id = validate_job_detail['RunId']
  409. node_id = validate_job_detail['PaiflowNodeId']
  410. flow_out_put_detail = PAIClient.get_flow_out_put(pipeline_run_id, node_id, 3)
  411. print(flow_out_put_detail)
  412. tabel_dict = {}
  413. out_puts = flow_out_put_detail['Outputs']
  414. for out_put in out_puts:
  415. if out_put["Producer"] == node_dict['二分类评估-1'] and out_put["Name"] == "outputMetricTable":
  416. value1 = json.loads(out_put["Info"]['value'])
  417. tabel_dict['二分类评估-1'] = value1['location']['table']
  418. if out_put["Producer"] == node_dict['二分类评估-2'] and out_put["Name"] == "outputMetricTable":
  419. value2 = json.loads(out_put["Info"]['value'])
  420. tabel_dict['二分类评估-2'] = value2['location']['table']
  421. if out_put["Producer"] == node_dict['预测结果对比'] and out_put["Name"] == "outputTable":
  422. value3 = json.loads(out_put["Info"]['value'])
  423. tabel_dict['预测结果对比'] = value3['location']['table']
  424. print(tabel_dict)