supply_workflow_monitor.py 21 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478
  1. from datetime import datetime
  2. from typing import Dict, Any, List, Optional
  3. import pandas
  4. import pandas as pd
  5. from helper.MySQLHelper import MySQLHelper
  6. from util import feishu_inform_util
  7. fei_shu_webhook = "https://open.feishu.cn/open-apis/bot/v2/hook/c09712a8-22cd-4bfa-93a5-30ae7b1db11b"
  8. mysql_helper = MySQLHelper(
  9. host="rm-t4na9qj85v7790tf84o.mysql.singapore.rds.aliyuncs.com",
  10. username="readonly",
  11. password="HdkZ4TDmeK6SQ3BRtJBk",
  12. database="aigc-admin-prod"
  13. )
  14. column_width_map = {
  15. "状态": "80px",
  16. "任务数": "80px",
  17. "记录数": "80px",
  18. "当日计划数": "90px",
  19. "昨日计划数": "90px",
  20. "当日任务数": "90px",
  21. "昨日任务数": "90px",
  22. "当日待处理": "90px",
  23. "当日运行中": "90px",
  24. "当日成功": "80px",
  25. "当日失败": "80px",
  26. }
  27. header_template = ["wathet", "turquoise", "purple", "indigo", "green", "grey"]
  28. header_template_index = 0
  29. def build_config_json() -> Dict[str, Any]:
  30. return {
  31. "width_mode": "default",
  32. "enable_forward": False,
  33. }
  34. def build_markdown_element_json(content: str) -> Dict[str, Any]:
  35. return {
  36. "tag": "markdown",
  37. "element_id": "markdown",
  38. "content": content
  39. }
  40. def build_table_element_json(df: pandas.DataFrame) -> Optional[Dict[str, Any]]:
  41. if df.empty:
  42. return None
  43. columns = []
  44. for column in df.columns:
  45. columns.append({
  46. "name": column,
  47. "display_name": column,
  48. "width": column_width_map.get(column, "auto"),
  49. "data_type": "text",
  50. "horizontal_align": "center",
  51. "vertical_align": "center",
  52. })
  53. rows = df.to_dict(orient="records")
  54. return {
  55. "tag": "table",
  56. "element_id": "table_detail",
  57. "margin": "2px 0px",
  58. "page_size": 10,
  59. "freeze_first_column": True,
  60. "header_style": {
  61. "text_align": "center",
  62. "text_size": "normal",
  63. "background_style": "none",
  64. "text_color": "grey",
  65. "bold": True,
  66. "lines": 1
  67. },
  68. "columns": columns,
  69. "rows": rows
  70. }
  71. def build_markdown_table_element_json(df: pandas.DataFrame) -> Dict[str, Any]:
  72. # 表头
  73. header = "| " + " | ".join(df.columns) + " |\n"
  74. # 分隔线
  75. separator = "| " + " | ".join(["---"] * len(df.columns)) + " |\n"
  76. # 数据行
  77. rows = "\n".join(
  78. "| " + " | ".join(str(val) for val in row) + " |"
  79. for row in df.to_numpy() # 或 df.values
  80. )
  81. content = header + separator + rows
  82. return build_markdown_element_json(content)
  83. def build_collapsible_panel_element_json(elements: List[Dict[str, Any]], title: str) -> Dict[str, Any]:
  84. return {
  85. "tag": "collapsible_panel",
  86. "expanded": False,
  87. "header": {
  88. "title": {
  89. "tag": "plain_text",
  90. "content": title
  91. },
  92. "vertical_align": "center",
  93. },
  94. "border": {
  95. "color": "grey",
  96. "corner_radius": "5px"
  97. },
  98. "elements": elements
  99. }
  100. def build_hr_element_json() -> Dict[str, Any]:
  101. return {
  102. "tag": "hr",
  103. "element_id": "custom_id",
  104. "margin": "10px 0px"
  105. }
  106. def build_text_tag_json(content: str, color: str = 'green') -> Dict[str, Any]:
  107. return {
  108. "tag": "text_tag",
  109. "element_id": "custom_id",
  110. "text": {
  111. "tag": "plain_text",
  112. "content": content
  113. },
  114. "color": color
  115. }
  116. def build_sub_title_json(content: str) -> Dict[str, Any]:
  117. return {
  118. "tag": "plain_text",
  119. "content": content
  120. }
  121. def build_header_json(content: str, template: str = 'green', sub_title: Dict[str, Any] = None, text_tag_list: List[Dict[str, Any]] = []) -> Dict[str, Any]:
  122. return {
  123. "title": {
  124. "content": content
  125. },
  126. "text_tag_list": text_tag_list,
  127. "template": template,
  128. "subtitle": sub_title
  129. }
  130. def build_card_json(body_elements: List[Dict[str, Any]], header: Dict[str, Any], config: Dict[str, Any] = None) -> Dict[str, Any]:
  131. return {
  132. "schema": "2.0",
  133. "config": config,
  134. "header": header,
  135. "body": {
  136. "elements": body_elements
  137. }
  138. }
  139. def supply_workflow_dashboard_stat(ts: int) -> List[Dict[str, Any]]:
  140. yesterday = ts - 86400000
  141. sql = f"""
  142. SELECT sw.name AS '策略名',
  143. CASE
  144. WHEN sw.status = 1 THEN '开启中'
  145. WHEN sw.status = 0 THEN '已关闭'
  146. ELSE '未知'
  147. END AS 状态,
  148. IFNULL(stat.crawler_plan_cnt, 0) AS '当日计划数',
  149. IFNULL(stat_yest.crawler_plan_cnt, 0) AS '昨日计划数',
  150. IFNULL(stat.total_cnt, 0) AS '当日任务数',
  151. IFNULL(stat_yest.total_cnt, 0) AS '昨日任务数',
  152. IFNULL(stat.success_cnt, 0) AS '当日成功',
  153. IFNULL(stat.fail_success, 0) AS '当日失败',
  154. IFNULL(stat.running_cnt, 0) AS '当日运行中',
  155. IFNULL(stat.init_cnt, 0) AS '当日待处理'
  156. FROM supply_workflow sw
  157. LEFT JOIN (
  158. SELECT swt.workflow_id,
  159. COUNT(DISTINCT swt.task_id) AS total_cnt,
  160. COUNT(DISTINCT IF(task_status = 0, swt.task_id, NULL)) AS init_cnt,
  161. COUNT(DISTINCT IF(task_status = 1, swt.task_id, NULL)) AS running_cnt,
  162. COUNT(DISTINCT IF(task_status = 2, swt.task_id, NULL)) AS success_cnt,
  163. COUNT(DISTINCT IF(task_status = 3, swt.task_id, NULL)) AS fail_success,
  164. COUNT(DISTINCT swcpr.crawler_plan_id) AS crawler_plan_cnt
  165. FROM supply_workflow_task swt
  166. LEFT JOIN supply_workflow_crawler_plan_record swcpr
  167. ON swt.task_id = swcpr.task_id
  168. WHERE swt.create_timestamp >= {ts}
  169. GROUP BY swt.workflow_id
  170. ) stat ON sw.id = stat.workflow_id
  171. LEFT JOIN (
  172. SELECT swt.workflow_id,
  173. COUNT(DISTINCT swt.task_id) AS total_cnt,
  174. COUNT(DISTINCT IF(task_status = 0, swt.task_id, NULL)) AS init_cnt,
  175. COUNT(DISTINCT IF(task_status = 1, swt.task_id, NULL)) AS running_cnt,
  176. COUNT(DISTINCT IF(task_status = 2, swt.task_id, NULL)) AS success_cnt,
  177. COUNT(DISTINCT IF(task_status = 3, swt.task_id, NULL)) AS fail_success,
  178. COUNT(DISTINCT swcpr.crawler_plan_id) AS crawler_plan_cnt
  179. FROM supply_workflow_task swt
  180. LEFT JOIN supply_workflow_crawler_plan_record swcpr
  181. ON swt.task_id = swcpr.task_id
  182. WHERE swt.create_timestamp >= {yesterday}
  183. AND swt.create_timestamp < {ts}
  184. GROUP BY swt.workflow_id
  185. ) stat_yest ON sw.id = stat_yest.workflow_id
  186. ORDER BY sw.status DESC, sw.create_timestamp DESC;
  187. """
  188. return mysql_helper.execute_query(sql)
  189. def workflow_task_status_stat(ts: int, workflow_id: str) -> List[Dict[str, Any]]:
  190. sql = f"""
  191. select workflow_id as 'workflow_id',
  192. task_status as '任务状态',
  193. count(distinct task_id) as '任务数',
  194. count(task_input) as '需求数'
  195. from (
  196. select workflow_id as workflow_id,
  197. task_id as task_id,
  198. task_input as task_input,
  199. case
  200. when task_status = 0 then '初始化'
  201. when task_status = 1 then '运行中'
  202. when task_status = 2 then '成功'
  203. when task_status = 3 then '失败'
  204. else '未知' end AS task_status
  205. from supply_workflow sw
  206. left join supply_workflow_task swt
  207. on sw.id = swt.workflow_id
  208. where sw.id = '{workflow_id}'
  209. and swt.create_timestamp >= {ts}
  210. ) as t
  211. group by t.task_status, t.workflow_id;
  212. """
  213. result = mysql_helper.execute_query(sql)
  214. return result
  215. def task_exe_step_stat_query(ts: int, workflow_id: str) -> List[Dict[str, Any]]:
  216. sql = f'''
  217. select step_name AS '步骤名称',
  218. status as '执行状态',
  219. error_msg as '错误原因',
  220. count(1) AS '任务数'
  221. from (
  222. select swt.task_id,
  223. step_name,
  224. case
  225. when status = 0 then '初始化'
  226. when status = 1 then '运行中'
  227. when status = 2 then '成功'
  228. when status = 3 then '失败'
  229. else '未知' end AS status,
  230. case
  231. when swtes.error_msg like '%Data too long%' then '数据超过字段长度限制'
  232. when swtes.error_msg like '%Deadlock%' then '数据库死锁'
  233. when (swtes.error_msg <> '' and swtes.error_msg is not null) then substring_index(swtes.error_msg, ',', 1)
  234. else '' end AS error_msg
  235. from supply_workflow_task swt
  236. left join supply_workflow_task_exe_step swtes on swt.task_id = swtes.task_id
  237. where swt.create_timestamp >= {ts}
  238. and swt.workflow_id = {workflow_id}
  239. ) as t
  240. group by step_name, status, error_msg;
  241. '''
  242. return mysql_helper.execute_query(sql)
  243. def crawler_and_produce_stat_query(ts: int, workflow_id: str) -> List[Dict[str, Any]]:
  244. sql = f'''
  245. SELECT t.step AS '阶段',
  246. t.status AS '状态',
  247. t.error_msg AS '失败原因',
  248. t.task_cnt AS '任务数',
  249. t.cnt AS '记录数'
  250. FROM (
  251. SELECT '创建爬取计划' AS step,
  252. workflow_id AS workflow_id,
  253. status AS status,
  254. error_msg AS error_msg,
  255. COUNT(DISTINCT task_id) AS task_cnt,
  256. COUNT(DISTINCT crawler_plan_id) AS cnt
  257. FROM (
  258. SELECT swt.workflow_id AS workflow_id,
  259. swt.task_id AS task_id,
  260. swcpr.crawler_plan_id AS crawler_plan_id,
  261. CASE
  262. WHEN swcpr.status = 0 THEN '初始化'
  263. WHEN swcpr.status = 1 THEN '运行中'
  264. WHEN swcpr.status = 2 THEN '成功'
  265. WHEN swcpr.status = 3 THEN '失败'
  266. WHEN swt.task_status = 0 THEN '初始化'
  267. WHEN swt.task_status = 1 THEN '运行中'
  268. WHEN swt.task_status = 2 THEN '成功'
  269. WHEN swt.task_status = 3 THEN '失败'
  270. ELSE '未知' END AS status,
  271. CASE
  272. WHEN swcpr.status = 2 THEN ''
  273. WHEN swcpr.error_msg LIKE '%Data too long%' THEN '数据超过字段长度限制'
  274. WHEN swcpr.error_msg LIKE '%Deadlock%' THEN '数据库死锁'
  275. WHEN (swcpr.error_msg <> '' AND swcpr.error_msg IS NOT NULL)
  276. THEN SUBSTRING_INDEX(swcpr.error_msg, ',', 1)
  277. WHEN swt.error_msg LIKE '%Data too long%' THEN '数据超过字段长度限制'
  278. WHEN swt.error_msg LIKE '%Deadlock%' THEN '数据库死锁'
  279. WHEN (swt.error_msg <> '' AND swt.error_msg IS NOT NULL)
  280. THEN SUBSTRING_INDEX(swt.error_msg, ',', 1)
  281. ELSE ''
  282. END AS error_msg
  283. FROM supply_workflow_task swt
  284. LEFT JOIN supply_workflow_crawler_plan_record swcpr
  285. ON swt.task_id = swcpr.task_id
  286. WHERE swt.create_timestamp >= {ts}
  287. ) AS t
  288. GROUP BY workflow_id, status, error_msg
  289. UNION ALL
  290. SELECT '生成计划绑定' AS step,
  291. workflow_id AS workflow_id,
  292. status AS status,
  293. error_msg AS error_msg,
  294. COUNT(DISTINCT task_id) AS task_cnt,
  295. COUNT(1) AS cnt
  296. FROM (
  297. SELECT swt.workflow_id AS workflow_id,
  298. swt.task_id AS task_id,
  299. CASE
  300. WHEN swppr.status = 0 THEN '初始化'
  301. WHEN swppr.status = 1 THEN '运行中'
  302. WHEN swppr.status = 2 THEN '成功'
  303. WHEN swppr.status = 3 THEN '失败'
  304. WHEN swt.task_status = 0 THEN '初始化'
  305. WHEN swt.task_status = 1 THEN '运行中'
  306. WHEN swt.task_status = 2 THEN '成功'
  307. WHEN swt.task_status = 3 THEN '失败'
  308. ELSE '未知' END AS status,
  309. CASE
  310. WHEN swppr.status = 2 THEN ''
  311. WHEN swppr.error_msg LIKE '%Data too long%' THEN '数据超过字段长度限制'
  312. WHEN swppr.error_msg LIKE '%Deadlock%' THEN '数据库死锁'
  313. WHEN (swppr.error_msg <> '' AND swppr.error_msg IS NOT NULL)
  314. THEN SUBSTRING_INDEX(swppr.error_msg, ',', 1)
  315. WHEN swt.error_msg LIKE '%Data too long%' THEN '数据超过字段长度限制'
  316. WHEN swt.error_msg LIKE '%Deadlock%' THEN '数据库死锁'
  317. WHEN (swt.error_msg <> '' AND swt.error_msg IS NOT NULL)
  318. THEN SUBSTRING_INDEX(swt.error_msg, ',', 1)
  319. ELSE ''
  320. END AS error_msg
  321. FROM supply_workflow_task swt
  322. LEFT JOIN supply_workflow_produce_bind_record swppr
  323. ON swt.task_id = swppr.task_id
  324. WHERE swt.create_timestamp >= {ts}
  325. ) AS t
  326. GROUP BY workflow_id, status, error_msg
  327. ) AS t
  328. LEFT JOIN supply_workflow sw ON t.workflow_id = sw.id
  329. WHERE workflow_id = '{workflow_id}';
  330. '''
  331. return mysql_helper.execute_query(sql)
  332. def workflow_monitor(workflow_id: str, workflow_name: str, status: int):
  333. today_midnight = datetime.now().replace(hour=0, minute=0, second=0, microsecond=0)
  334. start_dt_str = today_midnight.strftime('%Y-%m-%d %H:%M:%S')
  335. end_dt_str = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
  336. timestamp_ms = int(today_midnight.timestamp() * 1000)
  337. task_status_stat = workflow_task_status_stat(timestamp_ms, workflow_id)
  338. exe_step_stat = task_exe_step_stat_query(timestamp_ms, workflow_id)
  339. status_stat = crawler_and_produce_stat_query(timestamp_ms, workflow_id)
  340. # 构建标题
  341. text_tag_list = []
  342. if status == 0:
  343. text_tag_list.append(build_text_tag_json("已关闭", "red"))
  344. elif status == 1:
  345. text_tag_list.append(build_text_tag_json("开启中", "green"))
  346. else:
  347. text_tag_list.append(build_text_tag_json("未知", "orange"))
  348. template = header_template[header_template_index % len(header_template)]
  349. sub_title = build_sub_title_json(f"{start_dt_str} - {end_dt_str}")
  350. header = build_header_json(f"【workflow策略】{workflow_name}", template, sub_title, text_tag_list)
  351. task_status_df = pd.DataFrame(task_status_stat)
  352. if "任务数" not in task_status_df.columns:
  353. task_info = f"**总任务数**: 0"
  354. else:
  355. # 先构建总任务数部分
  356. task_info = f"**总任务数**: {task_status_df['任务数'].sum()}"
  357. # 再逐行添加各状态的任务数
  358. for _, row in task_status_df.iterrows():
  359. task_info += f"\n**{row['任务状态']}任务数**: {row['任务数']}"
  360. elements = [build_markdown_element_json(task_info)]
  361. exe_step_table = build_table_element_json(pd.DataFrame(exe_step_stat))
  362. if exe_step_table is not None:
  363. elements.append(build_hr_element_json())
  364. elements.append(exe_step_table)
  365. status_stat_table = build_table_element_json(pd.DataFrame(status_stat))
  366. if status_stat_table is not None:
  367. elements.append(build_hr_element_json())
  368. elements.append(status_stat_table)
  369. config = build_config_json()
  370. card_json = build_card_json(elements, header, config)
  371. feishu_inform_util.send_card_msg_to_feishu(
  372. webhook=fei_shu_webhook,
  373. card_json=card_json
  374. )
  375. def workflow_dashboard_monitor():
  376. today_midnight = datetime.now().replace(hour=0, minute=0, second=0, microsecond=0)
  377. result = supply_workflow_dashboard_stat(int(today_midnight.timestamp() * 1000))
  378. start_dt_str = today_midnight.strftime('%Y-%m-%d %H:%M')
  379. end_dt_str = datetime.now().strftime('%Y-%m-%d %H:%M')
  380. sub_title = build_sub_title_json(f"统计时间: {start_dt_str} - {end_dt_str}")
  381. header = build_header_json("【供给workflow】当日任务统计", "blue", sub_title, [])
  382. dashboard_stat = build_table_element_json(pd.DataFrame(result))
  383. elements = []
  384. if dashboard_stat is not None:
  385. dashboard_stat['header_style'] = {
  386. "text_size": "normal",
  387. "text_align": "center",
  388. }
  389. elements.append(dashboard_stat)
  390. else:
  391. elements.append(build_markdown_element_json("**今日workflow还未创建新的任务,请关注**"))
  392. config = build_config_json()
  393. card_json = build_card_json(elements, header, config)
  394. feishu_inform_util.send_card_msg_to_feishu(
  395. webhook=fei_shu_webhook,
  396. card_json=card_json
  397. )
  398. # 总体情况发送到大群
  399. feishu_inform_util.send_card_msg_to_feishu(
  400. webhook="https://open.feishu.cn/open-apis/bot/v2/hook/9f5c5cce-5eb2-4731-b368-33926f5549f9",
  401. card_json=card_json
  402. )
  403. def main():
  404. workflow_dashboard_monitor()
  405. global header_template_index
  406. workflows = mysql_helper.execute_query('select id, name, status from supply_workflow;')
  407. for workflow in workflows:
  408. try:
  409. workflow_monitor(workflow['id'], workflow['name'], workflow['status'])
  410. except Exception as e:
  411. print(f"【workflow策略】{workflow['name']} 监控异常: {e}")
  412. header_template_index += 1
  413. if __name__ == '__main__':
  414. main()