pai_flow_operator2.py 25 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614
  1. # -*- coding: utf-8 -*-
  2. import os
  3. import re
  4. import time
  5. import json
  6. import pandas as pd
  7. from alibabacloud_paistudio20210202.client import Client as PaiStudio20210202Client
  8. from alibabacloud_tea_openapi import models as open_api_models
  9. from alibabacloud_paistudio20210202 import models as pai_studio_20210202_models
  10. from alibabacloud_tea_util import models as util_models
  11. from alibabacloud_tea_util.client import Client as UtilClient
  12. from alibabacloud_eas20210701.client import Client as eas20210701Client
  13. from alibabacloud_paiflow20210202 import models as paiflow_20210202_models
  14. from alibabacloud_paiflow20210202.client import Client as PAIFlow20210202Client
  15. from datetime import datetime, timedelta
  16. from odps import ODPS
  17. from ad_monitor_util import _monitor
  18. target_names = {
  19. '样本shuffle',
  20. '模型训练-样本shufle',
  21. '模型训练-自定义',
  22. '模型增量训练',
  23. '模型导出-2',
  24. '更新EAS服务(Beta)-1',
  25. '虚拟起始节点',
  26. '二分类评估-1',
  27. '二分类评估-2',
  28. '预测结果对比'
  29. }
  30. experiment_id = "draft-wqgkag89sbh9v1zvut"
  31. def get_odps_instance(project):
  32. odps = ODPS(
  33. access_id='LTAIWYUujJAm7CbH',
  34. secret_access_key='RfSjdiWwED1sGFlsjXv0DlfTnZTG1P',
  35. project=project,
  36. endpoint='http://service.cn.maxcompute.aliyun.com/api',
  37. )
  38. return odps
  39. def get_data_from_odps(project, table, num):
  40. odps = get_odps_instance(project)
  41. try:
  42. # 要查询的 SQL 语句
  43. sql = f'select * from {table} limit {num}'
  44. # 执行 SQL 查询
  45. with odps.execute_sql(sql).open_reader() as reader:
  46. # 查询数量小于目标数量时 返回空
  47. if reader.count < num:
  48. return None
  49. # 获取字段名称
  50. column_names = reader.schema.names
  51. # 获取查询结果数据
  52. data = []
  53. for record in reader:
  54. record_list = list(record)
  55. numbers = []
  56. for item in record_list:
  57. numbers.append(item[1])
  58. data.append(numbers)
  59. # 将数据和字段名称组合成 DataFrame
  60. df = pd.DataFrame(data, columns=column_names)
  61. return df
  62. except Exception as e:
  63. print(f"发生错误: {e}")
  64. def get_dict_from_odps(project, table):
  65. odps = get_odps_instance(project)
  66. try:
  67. # 要查询的 SQL 语句
  68. sql = f'select * from {table}'
  69. # 执行 SQL 查询
  70. with odps.execute_sql(sql).open_reader() as reader:
  71. data = {}
  72. for record in reader:
  73. record_list = list(record)
  74. key = record_list[0][1]
  75. value = record_list[1][1]
  76. data[key] = value
  77. return data
  78. except Exception as e:
  79. print(f"发生错误: {e}")
  80. def get_dates_between(start_date_str, end_date_str):
  81. start_date = datetime.strptime(start_date_str, '%Y%m%d')
  82. end_date = datetime.strptime(end_date_str, '%Y%m%d')
  83. dates = []
  84. current_date = start_date
  85. while current_date <= end_date:
  86. dates.append(current_date.strftime('%Y%m%d'))
  87. current_date += timedelta(days=1)
  88. return dates
  89. def read_file_to_list():
  90. try:
  91. current_dir = os.getcwd()
  92. file_path = os.path.join(current_dir, 'ad', 'holidays.txt')
  93. with open(file_path, 'r', encoding='utf-8') as file:
  94. content = file.read()
  95. return content.split('\n')
  96. except FileNotFoundError:
  97. raise Exception(f"错误:未找到 {file_path} 文件。")
  98. except Exception as e:
  99. raise Exception(f"错误:发生了一个未知错误: {e}")
  100. return []
  101. def get_previous_days_date(days):
  102. current_date = datetime.now()
  103. previous_date = current_date - timedelta(days=days)
  104. return previous_date.strftime('%Y%m%d')
  105. def remove_elements(lst1, lst2):
  106. return [element for element in lst1 if element not in lst2]
  107. def process_list(lst, append_str):
  108. # 给列表中每个元素拼接相同的字符串
  109. appended_list = [append_str + element for element in lst]
  110. # 将拼接后的列表元素用逗号拼接成一个字符串
  111. result_str = ','.join(appended_list)
  112. return result_str
  113. def get_train_data_list():
  114. start_date = '20250223'
  115. end_date = get_previous_days_date(2)
  116. date_list = get_dates_between(start_date, end_date)
  117. filter_date_list = read_file_to_list()
  118. date_list = remove_elements(date_list, filter_date_list)
  119. return date_list
  120. def update_train_tables(old_str):
  121. date_list = get_train_data_list()
  122. train_list = ["'" + item + "'" for item in date_list]
  123. result = ','.join(train_list)
  124. start_index = old_str.find('where dt in (')
  125. if start_index != -1:
  126. equal_sign_index = start_index + len('where dt in (')
  127. # 找到下一个双引号的位置
  128. next_quote_index = old_str.find(')', equal_sign_index)
  129. if next_quote_index != -1:
  130. # 进行替换
  131. new_value = old_str[:equal_sign_index] + result + old_str[next_quote_index:]
  132. return new_value
  133. return None
  134. def update_train_table(old_str, table):
  135. address = 'odps://pai_algo/tables/'
  136. train_table = address + table
  137. start_index = old_str.find('-Dtrain_tables="')
  138. if start_index != -1:
  139. # 确定等号的位置
  140. equal_sign_index = start_index + len('-Dtrain_tables="')
  141. # 找到下一个双引号的位置
  142. next_quote_index = old_str.find('"', equal_sign_index)
  143. if next_quote_index != -1:
  144. # 进行替换
  145. new_value = old_str[:equal_sign_index] + train_table + old_str[next_quote_index:]
  146. return new_value
  147. return None
  148. class PAIClient:
  149. def __init__(self):
  150. pass
  151. @staticmethod
  152. def create_client() -> PaiStudio20210202Client:
  153. """
  154. 使用AK&SK初始化账号Client
  155. @return: Client
  156. @throws Exception
  157. """
  158. # 工程代码泄露可能会导致 AccessKey 泄露,并威胁账号下所有资源的安全性。以下代码示例仅供参考。
  159. # 建议使用更安全的 STS 方式,更多鉴权访问方式请参见:https://help.aliyun.com/document_detail/378659.html。
  160. config = open_api_models.Config(
  161. access_key_id="LTAI5tFGqgC8f3mh1fRCrAEy",
  162. access_key_secret="XhOjK9XmTYRhVAtf6yii4s4kZwWzvV"
  163. )
  164. # Endpoint 请参考 https://api.aliyun.com/product/PaiStudio
  165. config.endpoint = f'pai.cn-hangzhou.aliyuncs.com'
  166. return PaiStudio20210202Client(config)
  167. @staticmethod
  168. def create_eas_client() -> eas20210701Client:
  169. """
  170. 使用AK&SK初始化账号Client
  171. @return: Client
  172. @throws Exception
  173. """
  174. # 工程代码泄露可能会导致 AccessKey 泄露,并威胁账号下所有资源的安全性。以下代码示例仅供参考。
  175. # 建议使用更安全的 STS 方式,更多鉴权访问方式请参见:https://help.aliyun.com/document_detail/378659.html。
  176. config = open_api_models.Config(
  177. access_key_id="LTAI5tFGqgC8f3mh1fRCrAEy",
  178. access_key_secret="XhOjK9XmTYRhVAtf6yii4s4kZwWzvV"
  179. )
  180. # Endpoint 请参考 https://api.aliyun.com/product/PaiStudio
  181. config.endpoint = f'pai-eas.cn-hangzhou.aliyuncs.com'
  182. return eas20210701Client(config)
  183. @staticmethod
  184. def create_flow_client() -> PAIFlow20210202Client:
  185. """
  186. 使用AK&SK初始化账号Client
  187. @return: Client
  188. @throws Exception
  189. """
  190. # 工程代码泄露可能会导致 AccessKey 泄露,并威胁账号下所有资源的安全性。以下代码示例仅供参考。
  191. # 建议使用更安全的 STS 方式,更多鉴权访问方式请参见:https://help.aliyun.com/document_detail/378659.html。
  192. config = open_api_models.Config(
  193. # 必填,请确保代码运行环境设置了环境变量 ALIBABA_CLOUD_ACCESS_KEY_ID。,
  194. access_key_id="LTAI5tFGqgC8f3mh1fRCrAEy",
  195. # 必填,请确保代码运行环境设置了环境变量 ALIBABA_CLOUD_ACCESS_KEY_SECRET。,
  196. access_key_secret="XhOjK9XmTYRhVAtf6yii4s4kZwWzvV"
  197. )
  198. # Endpoint 请参考 https://api.aliyun.com/product/PAIFlow
  199. config.endpoint = f'paiflow.cn-hangzhou.aliyuncs.com'
  200. return PAIFlow20210202Client(config)
  201. @staticmethod
  202. def get_work_flow_draft_list(workspace_id: str):
  203. client = PAIClient.create_client()
  204. list_experiments_request = pai_studio_20210202_models.ListExperimentsRequest(
  205. workspace_id=workspace_id
  206. )
  207. runtime = util_models.RuntimeOptions()
  208. headers = {}
  209. try:
  210. resp = client.list_experiments_with_options(list_experiments_request, headers, runtime)
  211. return resp.body.to_map()
  212. except Exception as error:
  213. print(error.message)
  214. print(error.data.get("Recommend"))
  215. UtilClient.assert_as_string(error.message)
  216. @staticmethod
  217. def get_work_flow_draft(experiment_id: str):
  218. client = PAIClient.create_client()
  219. runtime = util_models.RuntimeOptions()
  220. headers = {}
  221. try:
  222. # 复制代码运行请自行打印 API 的返回值
  223. resp = client.get_experiment_with_options(experiment_id, headers, runtime)
  224. return resp.body.to_map()
  225. except Exception as error:
  226. # 此处仅做打印展示,请谨慎对待异常处理,在工程项目中切勿直接忽略异常。
  227. # 错误 message
  228. print(error.message)
  229. # 诊断地址
  230. print(error.data.get("Recommend"))
  231. UtilClient.assert_as_string(error.message)
  232. @staticmethod
  233. def get_describe_service(service_name: str):
  234. client = PAIClient.create_eas_client()
  235. runtime = util_models.RuntimeOptions()
  236. headers = {}
  237. try:
  238. # 复制代码运行请自行打印 API 的返回值
  239. resp = client.describe_service_with_options('cn-hangzhou', service_name, headers, runtime)
  240. return resp.body.to_map()
  241. except Exception as error:
  242. # 此处仅做打印展示,请谨慎对待异常处理,在工程项目中切勿直接忽略异常。
  243. # 错误 message
  244. print(error.message)
  245. # 诊断地址
  246. print(error.data.get("Recommend"))
  247. UtilClient.assert_as_string(error.message)
  248. @staticmethod
  249. def update_experiment_content(experiment_id: str, content: str, version: int):
  250. client = PAIClient.create_client()
  251. update_experiment_content_request = pai_studio_20210202_models.UpdateExperimentContentRequest(content=content,
  252. version=version)
  253. runtime = util_models.RuntimeOptions()
  254. headers = {}
  255. try:
  256. # 复制代码运行请自行打印 API 的返回值
  257. resp = client.update_experiment_content_with_options(experiment_id, update_experiment_content_request,
  258. headers, runtime)
  259. print(resp.body.to_map())
  260. except Exception as error:
  261. # 此处仅做打印展示,请谨慎对待异常处理,在工程项目中切勿直接忽略异常。
  262. # 错误 message
  263. print(error.message)
  264. # 诊断地址
  265. print(error.data.get("Recommend"))
  266. UtilClient.assert_as_string(error.message)
  267. @staticmethod
  268. def create_job(experiment_id: str, node_id: str, execute_type: str):
  269. client = PAIClient.create_client()
  270. create_job_request = pai_studio_20210202_models.CreateJobRequest()
  271. create_job_request.experiment_id = experiment_id
  272. create_job_request.node_id = node_id
  273. create_job_request.execute_type = execute_type
  274. runtime = util_models.RuntimeOptions()
  275. headers = {}
  276. try:
  277. # 复制代码运行请自行打印 API 的返回值
  278. resp = client.create_job_with_options(create_job_request, headers, runtime)
  279. return resp.body.to_map()
  280. except Exception as error:
  281. # 此处仅做打印展示,请谨慎对待异常处理,在工程项目中切勿直接忽略异常。
  282. # 错误 message
  283. print(error.message)
  284. # 诊断地址
  285. print(error.data.get("Recommend"))
  286. UtilClient.assert_as_string(error.message)
  287. @staticmethod
  288. def get_job_detail(job_id: str):
  289. client = PAIClient.create_client()
  290. get_job_request = pai_studio_20210202_models.GetJobRequest(
  291. verbose=False
  292. )
  293. runtime = util_models.RuntimeOptions()
  294. headers = {}
  295. try:
  296. # 复制代码运行请自行打印 API 的返回值
  297. resp = client.get_job_with_options(job_id, get_job_request, headers, runtime)
  298. return resp.body.to_map()
  299. except Exception as error:
  300. # 此处仅做打印展示,请谨慎对待异常处理,在工程项目中切勿直接忽略异常。
  301. # 错误 message
  302. print(error.message)
  303. # 诊断地址
  304. print(error.data.get("Recommend"))
  305. UtilClient.assert_as_string(error.message)
  306. @staticmethod
  307. def get_flow_out_put(pipeline_run_id: str, node_id: str, depth: int):
  308. client = PAIClient.create_flow_client()
  309. list_pipeline_run_node_outputs_request = paiflow_20210202_models.ListPipelineRunNodeOutputsRequest(
  310. depth=depth
  311. )
  312. runtime = util_models.RuntimeOptions()
  313. headers = {}
  314. try:
  315. # 复制代码运行请自行打印 API 的返回值
  316. resp = client.list_pipeline_run_node_outputs_with_options(pipeline_run_id, node_id,
  317. list_pipeline_run_node_outputs_request, headers,
  318. runtime)
  319. return resp.body.to_map()
  320. except Exception as error:
  321. # 此处仅做打印展示,请谨慎对待异常处理,在工程项目中切勿直接忽略异常。
  322. # 错误 message
  323. print(error.message)
  324. # 诊断地址
  325. print(error.data.get("Recommend"))
  326. UtilClient.assert_as_string(error.message)
  327. def extract_date_yyyymmdd(input_string):
  328. pattern = r'\d{8}'
  329. matches = re.findall(pattern, input_string)
  330. if matches:
  331. return matches[0]
  332. return None
  333. def get_online_version_dt(service_name: str):
  334. model_detail = PAIClient.get_describe_service(service_name)
  335. service_config_str = model_detail['ServiceConfig']
  336. service_config = json.loads(service_config_str)
  337. model_path = service_config['model_path']
  338. online_date = extract_date_yyyymmdd(model_path)
  339. return online_date
  340. def update_online_flow():
  341. online_version_dt = get_online_version_dt('ad_rank_dnn_v11_easyrec')
  342. draft = PAIClient.get_work_flow_draft(experiment_id)
  343. print(json.dumps(draft, ensure_ascii=False))
  344. content = draft['Content']
  345. version = draft['Version']
  346. print(content)
  347. content_json = json.loads(content)
  348. nodes = content_json.get('nodes')
  349. global_params = content_json.get('globalParams')
  350. bizdate = get_previous_days_date(1)
  351. for global_param in global_params:
  352. if global_param['name'] == 'bizdate':
  353. global_param['value'] = bizdate
  354. if global_param['name'] == 'online_version_dt':
  355. global_param['value'] = online_version_dt
  356. if global_param['name'] == 'eval_date':
  357. global_param['value'] = bizdate
  358. for node in nodes:
  359. name = node['name']
  360. if name == '样本shuffle':
  361. properties = node['properties']
  362. for property in properties:
  363. if property['name'] == 'sql':
  364. value = property['value']
  365. new_value = update_train_tables(value)
  366. if new_value is None:
  367. print("error")
  368. property['value'] = new_value
  369. new_content = json.dumps(content_json, ensure_ascii=False)
  370. PAIClient.update_experiment_content(experiment_id, new_content, version)
  371. def update_shuffle_flow(table):
  372. draft = PAIClient.get_work_flow_draft(experiment_id)
  373. print(json.dumps(draft, ensure_ascii=False))
  374. content = draft['Content']
  375. version = draft['Version']
  376. content_json = json.loads(content)
  377. nodes = content_json.get('nodes')
  378. for node in nodes:
  379. name = node['name']
  380. if name == '模型训练-样本shufle':
  381. properties = node['properties']
  382. for property in properties:
  383. if property['name'] == 'sql':
  384. value = property['value']
  385. new_value = update_train_table(value, table)
  386. if new_value is None:
  387. print("error")
  388. property['value'] = new_value
  389. new_content = json.dumps(content_json, ensure_ascii=False)
  390. PAIClient.update_experiment_content(experiment_id, new_content, version)
  391. def update_shuffle_flow_1():
  392. draft = PAIClient.get_work_flow_draft(experiment_id)
  393. print(json.dumps(draft, ensure_ascii=False))
  394. content = draft['Content']
  395. version = draft['Version']
  396. print(content)
  397. content_json = json.loads(content)
  398. nodes = content_json.get('nodes')
  399. for node in nodes:
  400. name = node['name']
  401. if name == '模型训练-样本shufle':
  402. properties = node['properties']
  403. for property in properties:
  404. if property['name'] == 'sql':
  405. value = property['value']
  406. new_value = update_train_tables(value)
  407. if new_value is None:
  408. print("error")
  409. property['value'] = new_value
  410. new_content = json.dumps(content_json, ensure_ascii=False)
  411. PAIClient.update_experiment_content(experiment_id, new_content, version)
  412. def wait_job_end(job_id: str):
  413. while True:
  414. job_detail = PAIClient.get_job_detail(job_id)
  415. print(job_detail)
  416. statue = job_detail['Status']
  417. # Initialized: 初始化完成 Starting:开始 WorkflowServiceStarting:准备提交 Running:运行中 ReadyToSchedule:准备运行(前序节点未完成导致)
  418. if (statue == 'Initialized' or statue == 'Starting' or statue == 'WorkflowServiceStarting'
  419. or statue == 'Running' or statue == 'ReadyToSchedule'):
  420. # 睡眠300s 等待下次获取
  421. time.sleep(300)
  422. continue
  423. # Failed:运行失败 Terminating:终止中 Terminated:已终止 Unknown:未知 Skipped:跳过(前序节点失败导致) Succeeded:运行成功
  424. if statue == 'Failed' or statue == 'Terminating' or statue == 'Unknown' or statue == 'Skipped' or statue == 'Succeeded':
  425. return job_detail
  426. def get_node_dict():
  427. draft = PAIClient.get_work_flow_draft(experiment_id)
  428. content = draft['Content']
  429. content_json = json.loads(content)
  430. nodes = content_json.get('nodes')
  431. node_dict = {}
  432. for node in nodes:
  433. name = node['name']
  434. # 检查名称是否在目标名称集合中
  435. if name in target_names:
  436. node_dict[name] = node['id']
  437. return node_dict
  438. def train_model():
  439. node_dict = get_node_dict()
  440. train_node_id = node_dict['样本shuffle']
  441. execute_type = 'EXECUTE_ONE'
  442. validate_res = PAIClient.create_job(experiment_id, train_node_id, execute_type)
  443. validate_job_id = validate_res['JobId']
  444. validate_job_detail = wait_job_end(validate_job_id)
  445. if validate_job_detail['Status'] == 'Succeeded':
  446. pipeline_run_id = validate_job_detail['RunId']
  447. node_id = validate_job_detail['PaiflowNodeId']
  448. flow_out_put_detail = PAIClient.get_flow_out_put(pipeline_run_id, node_id, 2)
  449. out_puts = flow_out_put_detail['Outputs']
  450. table = None
  451. for out_put in out_puts:
  452. if out_put["Producer"] == node_dict['样本shuffle'] and out_put["Name"] == "outputTable":
  453. value1 = json.loads(out_put["Info"]['value'])
  454. table = value1['location']['table']
  455. if table is not None:
  456. update_shuffle_flow(table)
  457. node_dict = get_node_dict()
  458. train_node_id = node_dict['模型训练-样本shufle']
  459. execute_type = 'EXECUTE_ONE'
  460. train_res = PAIClient.create_job(experiment_id, train_node_id, execute_type)
  461. train_job_id = train_res['JobId']
  462. train_job_detail = wait_job_end(train_job_id)
  463. if train_job_detail['Status'] == 'Succeeded':
  464. export_node_id = node_dict['模型导出-2']
  465. export_res = PAIClient.create_job(experiment_id, export_node_id, execute_type)
  466. export_job_id = export_res['JobId']
  467. export_job_detail = wait_job_end(export_job_id)
  468. if export_job_detail['Status'] == 'Succeeded':
  469. return True
  470. return False
  471. def update_online_model():
  472. node_dict = get_node_dict()
  473. train_node_id = node_dict['更新EAS服务(Beta)-1']
  474. execute_type = 'EXECUTE_ONE'
  475. train_res = PAIClient.create_job(experiment_id, train_node_id, execute_type)
  476. train_job_id = train_res['JobId']
  477. train_job_detail = wait_job_end(train_job_id)
  478. if train_job_detail['Status'] == 'Succeeded':
  479. return True
  480. return False
  481. def validate_model_data_accuracy(start_time):
  482. node_dict = get_node_dict()
  483. train_node_id = node_dict['虚拟起始节点']
  484. execute_type = 'EXECUTE_FROM_HERE'
  485. validate_res = PAIClient.create_job(experiment_id, train_node_id, execute_type)
  486. validate_job_id = validate_res['JobId']
  487. validate_job_detail = wait_job_end(validate_job_id)
  488. if validate_job_detail['Status'] == 'Succeeded':
  489. pipeline_run_id = validate_job_detail['RunId']
  490. node_id = validate_job_detail['PaiflowNodeId']
  491. flow_out_put_detail = PAIClient.get_flow_out_put(pipeline_run_id, node_id, 3)
  492. print(flow_out_put_detail)
  493. table_dict = {}
  494. out_puts = flow_out_put_detail['Outputs']
  495. for out_put in out_puts:
  496. if out_put["Producer"] == node_dict['二分类评估-1'] and out_put["Name"] == "outputMetricTable":
  497. value1 = json.loads(out_put["Info"]['value'])
  498. table_dict['二分类评估-1'] = value1['location']['table']
  499. if out_put["Producer"] == node_dict['二分类评估-2'] and out_put["Name"] == "outputMetricTable":
  500. value2 = json.loads(out_put["Info"]['value'])
  501. table_dict['二分类评估-2'] = value2['location']['table']
  502. if out_put["Producer"] == node_dict['预测结果对比'] and out_put["Name"] == "outputTable":
  503. value3 = json.loads(out_put["Info"]['value'])
  504. table_dict['预测结果对比'] = value3['location']['table']
  505. num = 10
  506. df = get_data_from_odps('pai_algo', table_dict['预测结果对比'], 10)
  507. # 对指定列取绝对值再求和
  508. old_abs_avg = df['old_error'].abs().sum() / num
  509. new_abs_avg = df['new_error'].abs().sum() / num
  510. new_auc = get_dict_from_odps('pai_algo', table_dict['二分类评估-1'])['AUC']
  511. old_auc = get_dict_from_odps('pai_algo', table_dict['二分类评估-2'])['AUC']
  512. bizdate = get_previous_days_date(1)
  513. score_diff = abs(old_abs_avg - new_abs_avg)
  514. msg = ""
  515. level = ""
  516. if new_abs_avg > 0.1:
  517. msg += f'线上模型评估{bizdate}的数据,绝对误差大于0.1,请检查'
  518. level = 'error'
  519. elif score_diff > 0.05:
  520. msg += f'两个模型评估${bizdate}的数据,两个模型分数差异为: ${score_diff}, 大于0.05, 请检查'
  521. level = 'error'
  522. else:
  523. # update_online_model()
  524. msg += 'DNN广告模型更新完成'
  525. level = 'info'
  526. step_end_time = int(time.time())
  527. elapsed = step_end_time - start_time
  528. # 初始化表格头部
  529. top10_msg = "| CID | 老模型相对真实CTCVR的变化 | 新模型相对真实CTCVR的变化 |"
  530. top10_msg += "\n| ---- | --------- | -------- |"
  531. for index, row in df.iterrows():
  532. # 获取指定列的元素
  533. cid = row['cid']
  534. old_error = row['old_error']
  535. new_error = row['new_error']
  536. top10_msg += f"\n| {int(cid)} | {old_error} | {new_error} | "
  537. print(top10_msg)
  538. msg += f"\n\t - 老模型AUC: {old_auc}"
  539. msg += f"\n\t - 新模型AUC: {new_auc}"
  540. msg += f"\n\t - 老模型Top10差异平均值: {old_abs_avg}"
  541. msg += f"\n\t - 新模型Top10差异平均值: {new_abs_avg}"
  542. _monitor(level, msg, start_time, elapsed, top10_msg)
  543. if __name__ == '__main__':
  544. start_time = int(time.time())
  545. # 1.更新工作流
  546. update_online_flow()
  547. # 2.训练模型
  548. train_res = train_model()
  549. if train_res:
  550. # 3. 验证模型数据 & 更新模型到线上
  551. validate_model_data_accuracy(start_time)
  552. else:
  553. print('train_model_error')