supply_workflow_monitor.py 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457
  1. from datetime import datetime
  2. from typing import Dict, Any, List, Optional
  3. import pandas
  4. import pandas as pd
  5. from helper.MySQLHelper import MySQLHelper
  6. from util import feishu_inform_util
  7. fei_shu_webhook = "https://open.feishu.cn/open-apis/bot/v2/hook/c09712a8-22cd-4bfa-93a5-30ae7b1db11b"
  8. mysql_helper = MySQLHelper(
  9. host="rm-t4na9qj85v7790tf84o.mysql.singapore.rds.aliyuncs.com",
  10. username="readonly",
  11. password="HdkZ4TDmeK6SQ3BRtJBk",
  12. database="aigc-admin-prod"
  13. )
  14. column_width_map = {
  15. "状态": "80px",
  16. "任务数": "80px",
  17. "记录数": "80px",
  18. "当日计划数": "90px",
  19. "昨日计划数": "90px",
  20. "当日任务数": "90px",
  21. "昨日任务数": "90px",
  22. "当日待处理": "90px",
  23. "当日运行中": "90px",
  24. "当日成功": "80px",
  25. "当日失败": "80px",
  26. }
  27. header_template = ["wathet", "turquoise", "purple", "indigo", "green", "grey"]
  28. header_template_index = 0
  29. def build_config_json() -> Dict[str, Any]:
  30. return {
  31. "width_mode": "default",
  32. "enable_forward": False,
  33. }
  34. def build_markdown_element_json(content: str) -> Dict[str, Any]:
  35. return {
  36. "tag": "markdown",
  37. "element_id": "markdown",
  38. "content": content
  39. }
  40. def build_table_element_json(df: pandas.DataFrame) -> Optional[Dict[str, Any]]:
  41. if df.empty:
  42. return None
  43. columns = []
  44. for column in df.columns:
  45. columns.append({
  46. "name": column,
  47. "display_name": column,
  48. "width": column_width_map.get(column, "auto"),
  49. "data_type": "text",
  50. "horizontal_align": "center",
  51. "vertical_align": "center",
  52. })
  53. rows = df.to_dict(orient="records")
  54. return {
  55. "tag": "table",
  56. "element_id": "table_detail",
  57. "margin": "2px 0px",
  58. "page_size": 10,
  59. "header_style": {
  60. "text_align": "center",
  61. "text_size": "normal",
  62. "background_style": "none",
  63. "text_color": "grey",
  64. "bold": True,
  65. "lines": 1
  66. },
  67. "columns": columns,
  68. "rows": rows
  69. }
  70. def build_markdown_table_element_json(df: pandas.DataFrame) -> Dict[str, Any]:
  71. # 表头
  72. header = "| " + " | ".join(df.columns) + " |\n"
  73. # 分隔线
  74. separator = "| " + " | ".join(["---"] * len(df.columns)) + " |\n"
  75. # 数据行
  76. rows = "\n".join(
  77. "| " + " | ".join(str(val) for val in row) + " |"
  78. for row in df.to_numpy() # 或 df.values
  79. )
  80. content = header + separator + rows
  81. return build_markdown_element_json(content)
  82. def build_collapsible_panel_element_json(elements: List[Dict[str, Any]], title: str) -> Dict[str, Any]:
  83. return {
  84. "tag": "collapsible_panel",
  85. "expanded": False,
  86. "header": {
  87. "title": {
  88. "tag": "plain_text",
  89. "content": title
  90. },
  91. "vertical_align": "center",
  92. },
  93. "border": {
  94. "color": "grey",
  95. "corner_radius": "5px"
  96. },
  97. "elements": elements
  98. }
  99. def build_hr_element_json() -> Dict[str, Any]:
  100. return {
  101. "tag": "hr",
  102. "element_id": "custom_id",
  103. "margin": "10px 0px"
  104. }
  105. def build_text_tag_json(content: str, color: str = 'green') -> Dict[str, Any]:
  106. return {
  107. "tag": "text_tag",
  108. "element_id": "custom_id",
  109. "text": {
  110. "tag": "plain_text",
  111. "content": content
  112. },
  113. "color": color
  114. }
  115. def build_sub_title_json(content: str) -> Dict[str, Any]:
  116. return {
  117. "tag": "plain_text",
  118. "content": content
  119. }
  120. def build_header_json(content: str, template: str = 'green', sub_title: Dict[str, Any] = None, text_tag_list: List[Dict[str, Any]] = []) -> Dict[str, Any]:
  121. return {
  122. "title": {
  123. "content": content
  124. },
  125. "text_tag_list": text_tag_list,
  126. "template": template,
  127. "subtitle": sub_title
  128. }
  129. def build_card_json(body_elements: List[Dict[str, Any]], header: Dict[str, Any], config: Dict[str, Any] = None) -> Dict[str, Any]:
  130. return {
  131. "schema": "2.0",
  132. "config": config,
  133. "header": header,
  134. "body": {
  135. "elements": body_elements
  136. }
  137. }
  138. def supply_workflow_dashboard_stat(ts: int) -> List[Dict[str, Any]]:
  139. yesterday = ts - 86400000
  140. sql = f"""
  141. SELECT sw.name AS '策略名',
  142. CASE
  143. WHEN sw.status = 1 THEN '开启中'
  144. WHEN sw.status = 0 THEN '已关闭'
  145. ELSE '未知'
  146. END AS 状态,
  147. IFNULL(stat.crawler_plan_cnt, 0) AS '当日计划数',
  148. IFNULL(stat_yest.crawler_plan_cnt, 0) AS '昨日计划数',
  149. IFNULL(stat.total_cnt, 0) AS '当日任务数',
  150. IFNULL(stat_yest.total_cnt, 0) AS '昨日任务数',
  151. IFNULL(stat.init_cnt, 0) AS '当日待处理',
  152. IFNULL(stat.running_cnt, 0) AS '当日运行中',
  153. IFNULL(stat.success_cnt, 0) AS '当日成功',
  154. IFNULL(stat.fail_success, 0) AS '当日失败'
  155. FROM supply_workflow sw
  156. LEFT JOIN (
  157. SELECT swt.workflow_id,
  158. COUNT(DISTINCT swt.task_id) AS total_cnt,
  159. COUNT(DISTINCT IF(task_status = 0, swt.task_id, NULL)) AS init_cnt,
  160. COUNT(DISTINCT IF(task_status = 1, swt.task_id, NULL)) AS running_cnt,
  161. COUNT(DISTINCT IF(task_status = 2, swt.task_id, NULL)) AS success_cnt,
  162. COUNT(DISTINCT IF(task_status = 3, swt.task_id, NULL)) AS fail_success,
  163. COUNT(DISTINCT swcpr.crawler_plan_id) AS crawler_plan_cnt
  164. FROM supply_workflow_task swt
  165. LEFT JOIN supply_workflow_crawler_plan_record swcpr
  166. ON swt.task_id = swcpr.task_id
  167. WHERE swt.create_timestamp >= {ts}
  168. GROUP BY swt.workflow_id
  169. ) stat ON sw.id = stat.workflow_id
  170. LEFT JOIN (
  171. SELECT swt.workflow_id,
  172. COUNT(DISTINCT swt.task_id) AS total_cnt,
  173. COUNT(DISTINCT IF(task_status = 0, swt.task_id, NULL)) AS init_cnt,
  174. COUNT(DISTINCT IF(task_status = 1, swt.task_id, NULL)) AS running_cnt,
  175. COUNT(DISTINCT IF(task_status = 2, swt.task_id, NULL)) AS success_cnt,
  176. COUNT(DISTINCT IF(task_status = 3, swt.task_id, NULL)) AS fail_success,
  177. COUNT(DISTINCT swcpr.crawler_plan_id) AS crawler_plan_cnt
  178. FROM supply_workflow_task swt
  179. LEFT JOIN supply_workflow_crawler_plan_record swcpr
  180. ON swt.task_id = swcpr.task_id
  181. WHERE swt.create_timestamp >= {yesterday}
  182. AND swt.create_timestamp < {ts}
  183. GROUP BY swt.workflow_id
  184. ) stat_yest ON sw.id = stat_yest.workflow_id
  185. ORDER BY sw.status DESC, sw.create_timestamp DESC;
  186. """
  187. return mysql_helper.execute_query(sql)
  188. def workflow_task_status_stat(ts: int, workflow_id: str) -> List[Dict[str, Any]]:
  189. sql = f"""
  190. select workflow_id as 'workflow_id',
  191. task_status as '任务状态',
  192. count(distinct task_id) as '任务数',
  193. count(task_input) as '需求数'
  194. from (
  195. select workflow_id as workflow_id,
  196. task_id as task_id,
  197. task_input as task_input,
  198. case
  199. when task_status = 0 then '初始化'
  200. when task_status = 1 then '运行中'
  201. when task_status = 2 then '成功'
  202. when task_status = 3 then '失败'
  203. else '未知' end AS task_status
  204. from supply_workflow sw
  205. left join supply_workflow_task swt
  206. on sw.id = swt.workflow_id
  207. where sw.id = '{workflow_id}'
  208. and swt.create_timestamp >= {ts}
  209. ) as t
  210. group by t.task_status, t.workflow_id;
  211. """
  212. result = mysql_helper.execute_query(sql)
  213. return result
  214. def task_exe_step_stat_query(ts: int, workflow_id: str) -> List[Dict[str, Any]]:
  215. sql = f'''
  216. select step_name AS '步骤名称',
  217. status as '执行状态',
  218. error_msg as '错误原因',
  219. count(1) AS '任务数'
  220. from (
  221. select swt.task_id,
  222. step_name,
  223. case
  224. when status = 0 then '初始化'
  225. when status = 1 then '运行中'
  226. when status = 2 then '成功'
  227. when status = 3 then '失败'
  228. else '未知' end AS status,
  229. case
  230. when swtes.error_msg like '%Data too long%' then '数据超过字段长度限制'
  231. when swtes.error_msg like '%Deadlock%' then '数据库死锁'
  232. when (swtes.error_msg <> '' and swtes.error_msg is not null) then substring_index(swtes.error_msg, ',', 1)
  233. else '' end AS error_msg
  234. from supply_workflow_task swt
  235. left join supply_workflow_task_exe_step swtes on swt.task_id = swtes.task_id
  236. where swt.create_timestamp >= {ts}
  237. and swt.workflow_id = {workflow_id}
  238. ) as t
  239. group by step_name, status, error_msg;
  240. '''
  241. return mysql_helper.execute_query(sql)
  242. def crawler_and_produce_stat_query(ts: int, workflow_id: str) -> List[Dict[str, Any]]:
  243. sql = f'''
  244. select t.step as '阶段',
  245. t.status as '状态',
  246. t.error_msg as '失败原因',
  247. t.task_cnt as '任务数',
  248. t.cnt as '记录数'
  249. from (
  250. select '创建爬取计划' as step,
  251. workflow_id as workflow_id,
  252. status as status,
  253. error_msg as error_msg,
  254. count(distinct task_id) as task_cnt,
  255. count(distinct crawler_plan_id) as cnt
  256. from (
  257. select swt.workflow_id as workflow_id,
  258. swt.task_id as task_id,
  259. swcpr.crawler_plan_id as crawler_plan_id,
  260. case
  261. when swcpr.status = 0 then '初始化'
  262. when swcpr.status = 1 then '运行中'
  263. when swcpr.status = 2 then '成功'
  264. when swcpr.status = 3 then '失败'
  265. else '未知' end AS status,
  266. case
  267. when swcpr.error_msg like '%Data too long%' then '数据超过字段长度限制'
  268. when swcpr.error_msg like '%Deadlock%' then '数据库死锁'
  269. when (swcpr.error_msg <> '' AND swcpr.error_msg is not null) then substring_index(swcpr.error_msg, ',', 1)
  270. else ''
  271. end AS error_msg
  272. from supply_workflow_task swt
  273. left join supply_workflow_crawler_plan_record swcpr
  274. on swt.task_id = swcpr.task_id
  275. where swt.create_timestamp >= {ts}
  276. ) as t
  277. group by workflow_id, status, error_msg
  278. union all
  279. select '生成计划绑定' as step,
  280. workflow_id as workflow_id,
  281. status as status,
  282. error_msg as error_msg,
  283. count(distinct task_id) as task_cnt,
  284. count(1) as cnt
  285. from (
  286. select swt.workflow_id as workflow_id,
  287. swt.task_id as task_id,
  288. case
  289. when swppr.status = 0 then '初始化'
  290. when swppr.status = 1 then '运行中'
  291. when swppr.status = 2 then '成功'
  292. when swppr.status = 3 then '失败'
  293. else '未知' end AS status,
  294. case
  295. when swppr.error_msg like '%Data too long%' then '数据超过字段长度限制'
  296. when swppr.error_msg like '%Deadlock%' then '数据库死锁'
  297. when (swppr.error_msg <> '' and swppr.error_msg is not null) then substring_index(swppr.error_msg, ',', 1)
  298. else ''
  299. end AS error_msg
  300. from supply_workflow_task swt
  301. left join supply_workflow_produce_bind_record swppr
  302. on swt.task_id = swppr.task_id
  303. where swt.create_timestamp >= {ts}
  304. ) as t
  305. group by workflow_id, status, error_msg
  306. ) as t
  307. left join supply_workflow sw on t.workflow_id = sw.id
  308. where workflow_id = '{workflow_id}';
  309. '''
  310. return mysql_helper.execute_query(sql)
  311. def workflow_monitor(workflow_id: str, workflow_name: str, status: int):
  312. today_midnight = datetime.now().replace(hour=0, minute=0, second=0, microsecond=0)
  313. start_dt_str = today_midnight.strftime('%Y-%m-%d %H:%M:%S')
  314. end_dt_str = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
  315. timestamp_ms = int(today_midnight.timestamp() * 1000)
  316. task_status_stat = workflow_task_status_stat(timestamp_ms, workflow_id)
  317. exe_step_stat = task_exe_step_stat_query(timestamp_ms, workflow_id)
  318. status_stat = crawler_and_produce_stat_query(timestamp_ms, workflow_id)
  319. # 构建标题
  320. text_tag_list = []
  321. if status == 0:
  322. text_tag_list.append(build_text_tag_json("已关闭", "red"))
  323. elif status == 1:
  324. text_tag_list.append(build_text_tag_json("开启中", "green"))
  325. else:
  326. text_tag_list.append(build_text_tag_json("未知", "orange"))
  327. template = header_template[header_template_index % len(header_template)]
  328. sub_title = build_sub_title_json(f"{start_dt_str} - {end_dt_str}")
  329. header = build_header_json(f"【workflow策略】{workflow_name}", template, sub_title, text_tag_list)
  330. task_status_df = pd.DataFrame(task_status_stat)
  331. if "任务数" not in task_status_df.columns:
  332. task_info = f"**总任务数**: 0"
  333. else:
  334. # 先构建总任务数部分
  335. task_info = f"**总任务数**: {task_status_df['任务数'].sum()}"
  336. # 再逐行添加各状态的任务数
  337. for _, row in task_status_df.iterrows():
  338. task_info += f"\n**{row['任务状态']}任务数**: {row['任务数']}"
  339. elements = [build_markdown_element_json(task_info)]
  340. exe_step_table = build_table_element_json(pd.DataFrame(exe_step_stat))
  341. if exe_step_table is not None:
  342. elements.append(build_hr_element_json())
  343. elements.append(exe_step_table)
  344. status_stat_table = build_table_element_json(pd.DataFrame(status_stat))
  345. if status_stat_table is not None:
  346. elements.append(build_hr_element_json())
  347. elements.append(status_stat_table)
  348. config = build_config_json()
  349. card_json = build_card_json(elements, header, config)
  350. feishu_inform_util.send_card_msg_to_feishu(
  351. webhook=fei_shu_webhook,
  352. card_json=card_json
  353. )
  354. def workflow_dashboard_monitor():
  355. today_midnight = datetime.now().replace(hour=0, minute=0, second=0, microsecond=0)
  356. result = supply_workflow_dashboard_stat(int(today_midnight.timestamp() * 1000))
  357. start_dt_str = today_midnight.strftime('%Y-%m-%d %H:%M')
  358. end_dt_str = datetime.now().strftime('%Y-%m-%d %H:%M')
  359. sub_title = build_sub_title_json(f"统计时间: {start_dt_str} - {end_dt_str}")
  360. header = build_header_json("【供给workflow】当日任务统计", "blue", sub_title, [])
  361. dashboard_stat = build_table_element_json(pd.DataFrame(result))
  362. elements = []
  363. if dashboard_stat is not None:
  364. dashboard_stat['header_style'] = {
  365. "text_size": "normal",
  366. "text_align":"center",
  367. }
  368. elements.append(dashboard_stat)
  369. else:
  370. elements.append(build_markdown_element_json("**今日workflow还未创建新的任务,请关注**"))
  371. config = build_config_json()
  372. card_json = build_card_json(elements, header, config)
  373. feishu_inform_util.send_card_msg_to_feishu(
  374. webhook=fei_shu_webhook,
  375. card_json=card_json
  376. )
  377. # 总体情况发送到大群
  378. feishu_inform_util.send_card_msg_to_feishu(
  379. webhook="https://open.feishu.cn/open-apis/bot/v2/hook/9f5c5cce-5eb2-4731-b368-33926f5549f9",
  380. card_json=card_json
  381. )
  382. def main():
  383. workflow_dashboard_monitor()
  384. global header_template_index
  385. workflows = mysql_helper.execute_query('select id, name, status from supply_workflow;')
  386. for workflow in workflows:
  387. try:
  388. workflow_monitor(workflow['id'], workflow['name'], workflow['status'])
  389. except Exception as e:
  390. print(f"【workflow策略】{workflow['name']} 监控异常: {e}")
  391. header_template_index += 1
  392. if __name__ == '__main__':
  393. main()