supply_workflow_monitor.py 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432
  1. from datetime import datetime
  2. from typing import Dict, Any, List, Optional
  3. import pandas
  4. import pandas as pd
  5. from helper.MySQLHelper import MySQLHelper
  6. from util import feishu_inform_util
  7. fei_shu_webhook = "https://open.feishu.cn/open-apis/bot/v2/hook/c09712a8-22cd-4bfa-93a5-30ae7b1db11b"
  8. mysql_helper = MySQLHelper(
  9. host="rm-t4na9qj85v7790tf84o.mysql.singapore.rds.aliyuncs.com",
  10. username="readonly",
  11. password="HdkZ4TDmeK6SQ3BRtJBk",
  12. database="aigc-admin-prod"
  13. )
  14. column_width_map = {
  15. "状态": "80px",
  16. "任务数": "80px",
  17. "记录数": "80px",
  18. "计划数": "80px",
  19. "待处理": "80px",
  20. "运行中": "80px",
  21. "成功": "80px",
  22. "失败": "80px",
  23. }
  24. header_template = ["wathet", "turquoise", "purple", "indigo", "green", "grey"]
  25. header_template_index = 0
  26. def build_config_json() -> Dict[str, Any]:
  27. return {
  28. "width_mode": "default",
  29. "enable_forward": False,
  30. }
  31. def build_markdown_element_json(content: str) -> Dict[str, Any]:
  32. return {
  33. "tag": "markdown",
  34. "element_id": "markdown",
  35. "content": content
  36. }
  37. def build_table_element_json(df: pandas.DataFrame) -> Optional[Dict[str, Any]]:
  38. if df.empty:
  39. return None
  40. columns = []
  41. for column in df.columns:
  42. columns.append({
  43. "name": column,
  44. "display_name": column,
  45. "width": column_width_map.get(column, "auto"),
  46. "data_type": "text",
  47. "horizontal_align": "center",
  48. "vertical_align": "center",
  49. })
  50. rows = df.to_dict(orient="records")
  51. return {
  52. "tag": "table",
  53. "element_id": "table_detail",
  54. "margin": "2px 0px",
  55. "page_size": 10,
  56. "header_style": {
  57. "text_align": "center",
  58. "text_size": "normal",
  59. "background_style": "none",
  60. "text_color": "grey",
  61. "bold": True,
  62. "lines": 1
  63. },
  64. "columns": columns,
  65. "rows": rows
  66. }
  67. def build_markdown_table_element_json(df: pandas.DataFrame) -> Dict[str, Any]:
  68. # 表头
  69. header = "| " + " | ".join(df.columns) + " |\n"
  70. # 分隔线
  71. separator = "| " + " | ".join(["---"] * len(df.columns)) + " |\n"
  72. # 数据行
  73. rows = "\n".join(
  74. "| " + " | ".join(str(val) for val in row) + " |"
  75. for row in df.to_numpy() # 或 df.values
  76. )
  77. content = header + separator + rows
  78. return build_markdown_element_json(content)
  79. def build_collapsible_panel_element_json(elements: List[Dict[str, Any]], title: str) -> Dict[str, Any]:
  80. return {
  81. "tag": "collapsible_panel",
  82. "expanded": False,
  83. "header": {
  84. "title": {
  85. "tag": "plain_text",
  86. "content": title
  87. },
  88. "vertical_align": "center",
  89. },
  90. "border": {
  91. "color": "grey",
  92. "corner_radius": "5px"
  93. },
  94. "elements": elements
  95. }
  96. def build_hr_element_json() -> Dict[str, Any]:
  97. return {
  98. "tag": "hr",
  99. "element_id": "custom_id",
  100. "margin": "10px 0px"
  101. }
  102. def build_text_tag_json(content: str, color: str = 'green') -> Dict[str, Any]:
  103. return {
  104. "tag": "text_tag",
  105. "element_id": "custom_id",
  106. "text": {
  107. "tag": "plain_text",
  108. "content": content
  109. },
  110. "color": color
  111. }
  112. def build_sub_title_json(content: str) -> Dict[str, Any]:
  113. return {
  114. "tag": "plain_text",
  115. "content": content
  116. }
  117. def build_header_json(content: str, template: str = 'green', sub_title: Dict[str, Any] = None, text_tag_list: List[Dict[str, Any]] = []) -> Dict[str, Any]:
  118. return {
  119. "title": {
  120. "content": content
  121. },
  122. "text_tag_list": text_tag_list,
  123. "template": template,
  124. "subtitle": sub_title
  125. }
  126. def build_card_json(body_elements: List[Dict[str, Any]], header: Dict[str, Any], config: Dict[str, Any] = None) -> Dict[str, Any]:
  127. return {
  128. "schema": "2.0",
  129. "config": config,
  130. "header": header,
  131. "body": {
  132. "elements": body_elements
  133. }
  134. }
  135. def supply_workflow_dashboard_stat(ts: int) -> List[Dict[str, Any]]:
  136. sql = f"""
  137. select sw.name AS '策略名',
  138. case
  139. when sw.status = 1 then '开启中'
  140. when sw.status = 0 then '已关闭'
  141. else '未知' end AS 状态,
  142. IFNULL(crawler_plan_cnt, 0) as '计划数',
  143. IFNULL(stat.total_cnt, 0) AS '任务数',
  144. IFNULL(stat.init_cnt, 0) AS '待处理',
  145. IFNULL(stat.running_cnt, 0) AS '运行中',
  146. IFNULL(stat.success_cnt, 0) AS '成功',
  147. IFNULL(stat.fail_success, 0) AS '失败'
  148. from supply_workflow sw
  149. left join (
  150. select swt.workflow_id,
  151. COUNT(distinct swt.task_id) AS total_cnt,
  152. COUNT(DISTINCT IF(task_status = 0, swt.task_id, NULL)) AS init_cnt,
  153. COUNT(DISTINCT IF(task_status = 1, swt.task_id, NULL)) AS running_cnt,
  154. COUNT(DISTINCT IF(task_status = 2, swt.task_id, NULL)) AS success_cnt,
  155. COUNT(DISTINCT IF(task_status = 3, swt.task_id, NULL)) AS fail_success,
  156. count(distinct swcpr.crawler_plan_id) AS crawler_plan_cnt
  157. from supply_workflow_task swt
  158. left join supply_workflow_crawler_plan_record swcpr
  159. on swt.task_id = swcpr.task_id
  160. where swt.create_timestamp >= {ts}
  161. group by swt.workflow_id
  162. ) stat on sw.id = stat.workflow_id
  163. ORDER BY sw.status DESC, sw.create_timestamp DESC;
  164. """
  165. return mysql_helper.execute_query(sql)
  166. def workflow_task_status_stat(ts: int, workflow_id: str) -> List[Dict[str, Any]]:
  167. sql = f"""
  168. select workflow_id as 'workflow_id',
  169. task_status as '任务状态',
  170. count(distinct task_id) as '任务数',
  171. count(task_input) as '需求数'
  172. from (
  173. select workflow_id as workflow_id,
  174. task_id as task_id,
  175. task_input as task_input,
  176. case
  177. when task_status = 0 then '初始化'
  178. when task_status = 1 then '运行中'
  179. when task_status = 2 then '成功'
  180. when task_status = 3 then '失败'
  181. else '未知' end AS task_status
  182. from supply_workflow sw
  183. left join supply_workflow_task swt
  184. on sw.id = swt.workflow_id
  185. where sw.id = '{workflow_id}'
  186. and swt.create_timestamp >= {ts}
  187. ) as t
  188. group by t.task_status, t.workflow_id;
  189. """
  190. result = mysql_helper.execute_query(sql)
  191. return result
  192. def task_exe_step_stat_query(ts: int, workflow_id: str) -> List[Dict[str, Any]]:
  193. sql = f'''
  194. select step_name AS '步骤名称',
  195. status as '执行状态',
  196. error_msg as '错误原因',
  197. count(1) AS '任务数'
  198. from (
  199. select swt.task_id,
  200. step_name,
  201. case
  202. when status = 0 then '初始化'
  203. when status = 1 then '运行中'
  204. when status = 2 then '成功'
  205. when status = 3 then '失败'
  206. else '未知' end AS status,
  207. case
  208. when swtes.error_msg like '%Data too long%' then '数据超过字段长度限制'
  209. when swtes.error_msg like '%Deadlock%' then '数据库死锁'
  210. when (swtes.error_msg <> '' and swtes.error_msg is not null) then substring_index(swtes.error_msg, ',', 1)
  211. else '' end AS error_msg
  212. from supply_workflow_task swt
  213. left join supply_workflow_task_exe_step swtes on swt.task_id = swtes.task_id
  214. where swt.create_timestamp >= {ts}
  215. and swt.workflow_id = {workflow_id}
  216. ) as t
  217. group by step_name, status, error_msg;
  218. '''
  219. return mysql_helper.execute_query(sql)
  220. def crawler_and_produce_stat_query(ts: int, workflow_id: str) -> List[Dict[str, Any]]:
  221. sql = f'''
  222. select t.step as '阶段',
  223. t.status as '状态',
  224. t.error_msg as '失败原因',
  225. t.task_cnt as '任务数',
  226. t.cnt as '记录数'
  227. from (
  228. select '创建爬取计划' as step,
  229. workflow_id as workflow_id,
  230. status as status,
  231. error_msg as error_msg,
  232. count(distinct task_id) as task_cnt,
  233. count(distinct crawler_plan_id) as cnt
  234. from (
  235. select swt.workflow_id as workflow_id,
  236. swt.task_id as task_id,
  237. swcpr.crawler_plan_id as crawler_plan_id,
  238. case
  239. when swcpr.status = 0 then '初始化'
  240. when swcpr.status = 1 then '运行中'
  241. when swcpr.status = 2 then '成功'
  242. when swcpr.status = 3 then '失败'
  243. else '未知' end AS status,
  244. case
  245. when swcpr.error_msg like '%Data too long%' then '数据超过字段长度限制'
  246. when swcpr.error_msg like '%Deadlock%' then '数据库死锁'
  247. when (swcpr.error_msg <> '' AND swcpr.error_msg is not null) then substring_index(swcpr.error_msg, ',', 1)
  248. else ''
  249. end AS error_msg
  250. from supply_workflow_task swt
  251. left join supply_workflow_crawler_plan_record swcpr
  252. on swt.task_id = swcpr.task_id
  253. where swt.create_timestamp >= {ts}
  254. ) as t
  255. group by workflow_id, status, error_msg
  256. union all
  257. select '生成计划绑定' as step,
  258. workflow_id as workflow_id,
  259. status as status,
  260. error_msg as error_msg,
  261. count(distinct task_id) as task_cnt,
  262. count(1) as cnt
  263. from (
  264. select swt.workflow_id as workflow_id,
  265. swt.task_id as task_id,
  266. case
  267. when swppr.status = 0 then '初始化'
  268. when swppr.status = 1 then '运行中'
  269. when swppr.status = 2 then '成功'
  270. when swppr.status = 3 then '失败'
  271. else '未知' end AS status,
  272. case
  273. when swppr.error_msg like '%Data too long%' then '数据超过字段长度限制'
  274. when swppr.error_msg like '%Deadlock%' then '数据库死锁'
  275. when (swppr.error_msg <> '' and swppr.error_msg is not null) then substring_index(swppr.error_msg, ',', 1)
  276. else ''
  277. end AS error_msg
  278. from supply_workflow_task swt
  279. left join supply_workflow_produce_bind_record swppr
  280. on swt.task_id = swppr.task_id
  281. where swt.create_timestamp >= {ts}
  282. ) as t
  283. group by workflow_id, status, error_msg
  284. ) as t
  285. left join supply_workflow sw on t.workflow_id = sw.id
  286. where workflow_id = '{workflow_id}';
  287. '''
  288. return mysql_helper.execute_query(sql)
  289. def workflow_monitor(workflow_id: str, workflow_name: str, status: int):
  290. today_midnight = datetime.now().replace(hour=0, minute=0, second=0, microsecond=0)
  291. start_dt_str = today_midnight.strftime('%Y-%m-%d %H:%M:%S')
  292. end_dt_str = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
  293. timestamp_ms = int(today_midnight.timestamp() * 1000)
  294. task_status_stat = workflow_task_status_stat(timestamp_ms, workflow_id)
  295. exe_step_stat = task_exe_step_stat_query(timestamp_ms, workflow_id)
  296. status_stat = crawler_and_produce_stat_query(timestamp_ms, workflow_id)
  297. # 构建标题
  298. text_tag_list = []
  299. if status == 0:
  300. text_tag_list.append(build_text_tag_json("已关闭", "red"))
  301. elif status == 1:
  302. text_tag_list.append(build_text_tag_json("开启中", "green"))
  303. else:
  304. text_tag_list.append(build_text_tag_json("未知", "orange"))
  305. template = header_template[header_template_index % len(header_template)]
  306. sub_title = build_sub_title_json(f"{start_dt_str} - {end_dt_str}")
  307. header = build_header_json(f"【workflow策略】{workflow_name}", template, sub_title, text_tag_list)
  308. task_status_df = pd.DataFrame(task_status_stat)
  309. if "任务数" not in task_status_df.columns:
  310. task_info = f"**总任务数**: 0"
  311. else:
  312. # 先构建总任务数部分
  313. task_info = f"**总任务数**: {task_status_df['任务数'].sum()}"
  314. # 再逐行添加各状态的任务数
  315. for _, row in task_status_df.iterrows():
  316. task_info += f"\n**{row['任务状态']}任务数**: {row['任务数']}"
  317. elements = [build_markdown_element_json(task_info)]
  318. exe_step_table = build_table_element_json(pd.DataFrame(exe_step_stat))
  319. if exe_step_table is not None:
  320. elements.append(build_hr_element_json())
  321. elements.append(exe_step_table)
  322. status_stat_table = build_table_element_json(pd.DataFrame(status_stat))
  323. if status_stat_table is not None:
  324. elements.append(build_hr_element_json())
  325. elements.append(status_stat_table)
  326. config = build_config_json()
  327. card_json = build_card_json(elements, header, config)
  328. feishu_inform_util.send_card_msg_to_feishu(
  329. webhook=fei_shu_webhook,
  330. card_json=card_json
  331. )
  332. def workflow_dashboard_monitor():
  333. today_midnight = datetime.now().replace(hour=0, minute=0, second=0, microsecond=0)
  334. result = supply_workflow_dashboard_stat(int(today_midnight.timestamp() * 1000))
  335. start_dt_str = today_midnight.strftime('%Y-%m-%d %H:%M')
  336. end_dt_str = datetime.now().strftime('%Y-%m-%d %H:%M')
  337. sub_title = build_sub_title_json(f"统计时间: {start_dt_str} - {end_dt_str}")
  338. header = build_header_json("【供给workflow】当日任务统计", "blue", sub_title, [])
  339. dashboard_stat = build_table_element_json(pd.DataFrame(result))
  340. elements = []
  341. if dashboard_stat is not None:
  342. elements.append(dashboard_stat)
  343. else:
  344. elements.append(build_markdown_element_json("**今日workflow还未创建新的任务,请关注**"))
  345. config = build_config_json()
  346. card_json = build_card_json(elements, header, config)
  347. feishu_inform_util.send_card_msg_to_feishu(
  348. webhook=fei_shu_webhook,
  349. card_json=card_json
  350. )
  351. # 总体情况发送到大群
  352. feishu_inform_util.send_card_msg_to_feishu(
  353. webhook="https://open.feishu.cn/open-apis/bot/v2/hook/9f5c5cce-5eb2-4731-b368-33926f5549f9",
  354. card_json=card_json
  355. )
  356. def main():
  357. workflow_dashboard_monitor()
  358. global header_template_index
  359. workflows = mysql_helper.execute_query('select id, name, status from supply_workflow;')
  360. for workflow in workflows:
  361. try:
  362. workflow_monitor(workflow['id'], workflow['name'], workflow['status'])
  363. except Exception as e:
  364. print(f"【workflow策略】{workflow['name']} 监控异常: {e}")
  365. header_template_index += 1
  366. if __name__ == '__main__':
  367. main()