supply_workflow_monitor.py 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436
  1. from datetime import datetime
  2. from typing import Dict, Any, List, Optional
  3. import pandas
  4. import pandas as pd
  5. from helper.MySQLHelper import MySQLHelper
  6. from util import feishu_inform_util
  7. fei_shu_webhook = "https://open.feishu.cn/open-apis/bot/v2/hook/c09712a8-22cd-4bfa-93a5-30ae7b1db11b"
  8. mysql_helper = MySQLHelper(
  9. host="rm-t4na9qj85v7790tf84o.mysql.singapore.rds.aliyuncs.com",
  10. username="readonly",
  11. password="HdkZ4TDmeK6SQ3BRtJBk",
  12. database="aigc-admin-prod"
  13. )
  14. column_width_map = {
  15. "状态": "80px",
  16. "任务数": "80px",
  17. "记录数": "80px",
  18. "待处理": "80px",
  19. "运行中": "80px",
  20. "成功": "80px",
  21. "失败": "80px",
  22. }
  23. header_template = ["wathet", "turquoise", "yellow", "orange", "carmine", "violet", "purple", "indigo", "green"]
  24. header_template_index = 0
  25. def build_config_json() -> Dict[str, Any]:
  26. return {
  27. "width_mode": "default",
  28. "enable_forward": False,
  29. }
  30. def build_markdown_element_json(content: str) -> Dict[str, Any]:
  31. return {
  32. "tag": "markdown",
  33. "element_id": "markdown",
  34. "content": content
  35. }
  36. def build_table_element_json(df: pandas.DataFrame) -> Optional[Dict[str, Any]]:
  37. if df.empty:
  38. return None
  39. columns = []
  40. for column in df.columns:
  41. columns.append({
  42. "name": column,
  43. "display_name": column,
  44. "width": column_width_map.get(column, "auto"),
  45. "data_type": "text",
  46. "horizontal_align": "center",
  47. "vertical_align": "center",
  48. })
  49. rows = df.to_dict(orient="records")
  50. return {
  51. "tag": "table",
  52. "element_id": "table_detail",
  53. "margin": "2px 0px",
  54. "page_size": 10,
  55. "header_style": {
  56. "text_align": "center",
  57. "text_size": "normal",
  58. "background_style": "none",
  59. "text_color": "grey",
  60. "bold": True,
  61. "lines": 1
  62. },
  63. "columns": columns,
  64. "rows": rows
  65. }
  66. def build_markdown_table_element_json(df: pandas.DataFrame) -> Dict[str, Any]:
  67. # 表头
  68. header = "| " + " | ".join(df.columns) + " |\n"
  69. # 分隔线
  70. separator = "| " + " | ".join(["---"] * len(df.columns)) + " |\n"
  71. # 数据行
  72. rows = "\n".join(
  73. "| " + " | ".join(str(val) for val in row) + " |"
  74. for row in df.to_numpy() # 或 df.values
  75. )
  76. content = header + separator + rows
  77. return build_markdown_element_json(content)
  78. def build_collapsible_panel_element_json(elements: List[Dict[str, Any]], title: str) -> Dict[str, Any]:
  79. return {
  80. "tag": "collapsible_panel",
  81. "expanded": False,
  82. "header": {
  83. "title": {
  84. "tag": "plain_text",
  85. "content": title
  86. },
  87. "vertical_align": "center",
  88. },
  89. "border": {
  90. "color": "grey",
  91. "corner_radius": "5px"
  92. },
  93. "elements": elements
  94. }
  95. def build_hr_element_json() -> Dict[str, Any]:
  96. return {
  97. "tag": "hr",
  98. "element_id": "custom_id",
  99. "margin": "10px 0px"
  100. }
  101. def build_text_tag_json(content: str, color: str = 'green') -> Dict[str, Any]:
  102. return {
  103. "tag": "text_tag",
  104. "element_id": "custom_id",
  105. "text": {
  106. "tag": "plain_text",
  107. "content": content
  108. },
  109. "color": color
  110. }
  111. def build_sub_title_json(content: str) -> Dict[str, Any]:
  112. return {
  113. "tag": "plain_text",
  114. "content": content
  115. }
  116. def build_header_json(content: str, template: str = 'green', sub_title: Dict[str, Any] = None, text_tag_list: List[Dict[str, Any]] = []) -> Dict[str, Any]:
  117. return {
  118. "title": {
  119. "content": content
  120. },
  121. "text_tag_list": text_tag_list,
  122. "template": template,
  123. "subtitle": sub_title
  124. }
  125. def build_card_json(body_elements: List[Dict[str, Any]], header: Dict[str, Any], config: Dict[str, Any] = None) -> Dict[str, Any]:
  126. return {
  127. "schema": "2.0",
  128. "config": config,
  129. "header": header,
  130. "body": {
  131. "elements": body_elements
  132. }
  133. }
  134. def supply_workflow_dashboard_stat(ts: int) -> List[Dict[str, Any]]:
  135. sql = f"""
  136. SELECT sw.name AS '策略名',
  137. case
  138. when sw.status = 1 then '开启中'
  139. when sw.status = 0 then '已关闭'
  140. else '未知' end AS 状态,
  141. swt.init_cnt AS '待处理',
  142. swt.running_cnt AS '运行中',
  143. swt.success_cnt AS '成功',
  144. swt.fail_success AS '失败'
  145. FROM supply_workflow sw
  146. LEFT JOIN (
  147. SELECT workflow_id,
  148. COUNT(DISTINCT IF(task_status = 0, task_id, NULL)) AS init_cnt,
  149. COUNT(DISTINCT IF(task_status = 1, task_id, NULL)) AS running_cnt,
  150. COUNT(DISTINCT IF(task_status = 2, task_id, NULL)) AS success_cnt,
  151. COUNT(DISTINCT IF(task_status = 3, task_id, NULL)) AS fail_success
  152. FROM supply_workflow_task
  153. WHERE create_timestamp >= {ts}
  154. GROUP BY workflow_id
  155. ) swt ON sw.id = swt.workflow_id
  156. ORDER BY sw.status DESC, sw.create_timestamp DESC;
  157. """
  158. return mysql_helper.execute_query(sql)
  159. def workflow_task_status_stat(ts: int, workflow_id: str) -> List[Dict[str, Any]]:
  160. sql = f"""
  161. select workflow_id as 'workflow_id',
  162. task_status as '任务状态',
  163. count(distinct task_id) as '任务数',
  164. count(task_input) as '需求数'
  165. from (
  166. select workflow_id as workflow_id,
  167. task_id as task_id,
  168. task_input as task_input,
  169. case
  170. when task_status = 0 then '初始化'
  171. when task_status = 1 then '运行中'
  172. when task_status = 2 then '成功'
  173. when task_status = 3 then '失败'
  174. else '未知' end AS task_status
  175. from supply_workflow sw
  176. left join supply_workflow_task swt
  177. on sw.id = swt.workflow_id
  178. where sw.id = '{workflow_id}'
  179. and swt.create_timestamp >= {ts}
  180. ) as t
  181. group by t.task_status, t.workflow_id;
  182. """
  183. result = mysql_helper.execute_query(sql)
  184. return result
  185. def task_exe_step_stat_query(ts: int, workflow_id: str) -> List[Dict[str, Any]]:
  186. sql = f'''
  187. select step_name AS '步骤名称',
  188. status as '执行状态',
  189. error_msg as '错误原因',
  190. count(1) AS '任务数'
  191. from (
  192. select swt.task_id,
  193. step_name,
  194. case
  195. when status = 0 then '初始化'
  196. when status = 1 then '运行中'
  197. when status = 2 then '成功'
  198. when status = 3 then '失败'
  199. else '未知' end AS status,
  200. case
  201. when swtes.error_msg like '%Data too long%' then '数据超过字段长度限制'
  202. when swtes.error_msg like '%Deadlock%' then '数据库死锁'
  203. when (swtes.error_msg <> '' and swtes.error_msg is not null) then substring_index(swtes.error_msg, ',', 1)
  204. else '' end AS error_msg
  205. from supply_workflow_task swt
  206. left join supply_workflow_task_exe_step swtes on swt.task_id = swtes.task_id
  207. where swt.create_timestamp >= {ts}
  208. and swt.workflow_id = {workflow_id}
  209. ) as t
  210. group by step_name, status, error_msg;
  211. '''
  212. return mysql_helper.execute_query(sql)
  213. def crawler_and_produce_stat_query(ts: int, workflow_id: str) -> List[Dict[str, Any]]:
  214. sql = f'''
  215. select t.step as '阶段',
  216. t.status as '状态',
  217. t.error_msg as '失败原因',
  218. t.task_cnt as '任务数',
  219. t.cnt as '记录数'
  220. from (
  221. select '创建爬取计划' as step,
  222. workflow_id as workflow_id,
  223. status as status,
  224. error_msg as error_msg,
  225. count(distinct task_id) as task_cnt,
  226. count(distinct crawler_plan_id) as cnt
  227. from (
  228. select swt.workflow_id as workflow_id,
  229. swt.task_id as task_id,
  230. swcpr.crawler_plan_id as crawler_plan_id,
  231. case
  232. when swcpr.status = 0 then '初始化'
  233. when swcpr.status = 1 then '运行中'
  234. when swcpr.status = 2 then '成功'
  235. when swcpr.status = 3 then '失败'
  236. when swt.task_status = 0 then '初始化'
  237. when swt.task_status = 1 then '运行中'
  238. when swt.task_status = 2 then '成功'
  239. when swt.task_status = 3 then '失败'
  240. else '未知' end AS status,
  241. case
  242. when swcpr.error_msg like '%Data too long%' then '数据超过字段长度限制'
  243. when swcpr.error_msg like '%Deadlock%' then '数据库死锁'
  244. when (swcpr.error_msg <> '' AND swcpr.error_msg is not null) then substring_index(swcpr.error_msg, ',', 1)
  245. when swt.error_msg like '%Data too long%' then '数据超过字段长度限制'
  246. when swt.error_msg like '%Deadlock%' then '数据库死锁'
  247. when (swt.error_msg <> '' AND swt.error_msg is not null) then substring_index(swt.error_msg, ',', 1)
  248. else ''
  249. end AS error_msg
  250. from supply_workflow_task swt
  251. left join supply_workflow_crawler_plan_record swcpr
  252. on swt.task_id = swcpr.task_id
  253. where swt.create_timestamp >= {ts}
  254. ) as t
  255. group by workflow_id, status, error_msg
  256. union all
  257. select '生成计划绑定' as step,
  258. workflow_id as workflow_id,
  259. status as status,
  260. error_msg as error_msg,
  261. count(distinct task_id) as task_cnt,
  262. count(1) as cnt
  263. from (
  264. select swt.workflow_id as workflow_id,
  265. swt.task_id as task_id,
  266. case
  267. when swppr.status = 0 then '初始化'
  268. when swppr.status = 1 then '运行中'
  269. when swppr.status = 2 then '成功'
  270. when swppr.status = 3 then '失败'
  271. when swt.task_status = 0 then '初始化'
  272. when swt.task_status = 1 then '运行中'
  273. when swt.task_status = 2 then '成功'
  274. when swt.task_status = 3 then '失败'
  275. else '未知' end AS status,
  276. case
  277. when swppr.error_msg like '%Data too long%' then '数据超过字段长度限制'
  278. when swppr.error_msg like '%Deadlock%' then '数据库死锁'
  279. when (swppr.error_msg <> '' and swppr.error_msg is not null) then substring_index(swppr.error_msg, ',', 1)
  280. when swt.error_msg like '%Data too long%' then '数据超过字段长度限制'
  281. when swt.error_msg like '%Deadlock%' then '数据库死锁'
  282. when (swt.error_msg <> '' AND swt.error_msg is not null) then substring_index(swt.error_msg, ',', 1)
  283. else ''
  284. end AS error_msg
  285. from supply_workflow_task swt
  286. left join supply_workflow_produce_bind_record swppr
  287. on swt.task_id = swppr.task_id
  288. where swt.create_timestamp >= {ts}
  289. ) as t
  290. group by workflow_id, status, error_msg
  291. ) as t
  292. left join supply_workflow sw on t.workflow_id = sw.id
  293. where workflow_id = '{workflow_id}';
  294. '''
  295. return mysql_helper.execute_query(sql)
  296. def workflow_monitor(workflow_id: str, workflow_name: str, status: int):
  297. today_midnight = datetime.now().replace(hour=0, minute=0, second=0, microsecond=0)
  298. start_dt_str = today_midnight.strftime('%Y-%m-%d %H:%M:%S')
  299. end_dt_str = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
  300. timestamp_ms = int(today_midnight.timestamp() * 1000)
  301. task_status_stat = workflow_task_status_stat(timestamp_ms, workflow_id)
  302. exe_step_stat = task_exe_step_stat_query(timestamp_ms, workflow_id)
  303. status_stat = crawler_and_produce_stat_query(timestamp_ms, workflow_id)
  304. # 构建标题
  305. text_tag_list = []
  306. if status == 0:
  307. text_tag_list.append(build_text_tag_json("已关闭", "red"))
  308. elif status == 1:
  309. text_tag_list.append(build_text_tag_json("开启中", "green"))
  310. else:
  311. text_tag_list.append(build_text_tag_json("未知", "orange"))
  312. template = header_template[header_template_index % len(header_template)]
  313. sub_title = build_sub_title_json(f"{start_dt_str} - {end_dt_str}")
  314. header = build_header_json(f"【workflow策略】{workflow_name}", template, sub_title, text_tag_list)
  315. task_status_df = pd.DataFrame(task_status_stat)
  316. if "任务数" not in task_status_df.columns:
  317. task_info = f"**总任务数**: 0"
  318. else:
  319. # 先构建总任务数部分
  320. task_info = f"**总任务数**: {task_status_df['任务数'].sum()}"
  321. # 再逐行添加各状态的任务数
  322. for _, row in task_status_df.iterrows():
  323. task_info += f"\n**{row['任务状态']}任务数**: {row['任务数']}"
  324. elements = [build_markdown_element_json(task_info)]
  325. exe_step_table = build_table_element_json(pd.DataFrame(exe_step_stat))
  326. if exe_step_table is not None:
  327. elements.append(build_hr_element_json())
  328. elements.append(exe_step_table)
  329. status_stat_table = build_table_element_json(pd.DataFrame(status_stat))
  330. if status_stat_table is not None:
  331. elements.append(build_hr_element_json())
  332. elements.append(status_stat_table)
  333. config = build_config_json()
  334. card_json = build_card_json(elements, header, config)
  335. feishu_inform_util.send_card_msg_to_feishu(
  336. webhook=fei_shu_webhook,
  337. card_json=card_json
  338. )
  339. def workflow_dashboard_monitor():
  340. today_midnight = datetime.now().replace(hour=0, minute=0, second=0, microsecond=0)
  341. result = supply_workflow_dashboard_stat(int(today_midnight.timestamp() * 1000))
  342. start_dt_str = today_midnight.strftime('%Y-%m-%d %H:%M')
  343. end_dt_str = datetime.now().strftime('%Y-%m-%d %H:%M')
  344. sub_title = build_sub_title_json(f"统计时间: {start_dt_str} - {end_dt_str}")
  345. header = build_header_json("【供给workflow】当日任务统计", "blue", sub_title, [])
  346. dashboard_stat = build_table_element_json(pd.DataFrame(result))
  347. elements = []
  348. if dashboard_stat is not None:
  349. elements.append(dashboard_stat)
  350. else:
  351. elements.append(build_markdown_element_json("**今日workflow还未创建新的任务,请关注**"))
  352. config = build_config_json()
  353. card_json = build_card_json(elements, header, config)
  354. feishu_inform_util.send_card_msg_to_feishu(
  355. webhook=fei_shu_webhook,
  356. card_json=card_json
  357. )
  358. # 总体情况发送到大群
  359. feishu_inform_util.send_card_msg_to_feishu(
  360. webhook="https://open.feishu.cn/open-apis/bot/v2/hook/9f5c5cce-5eb2-4731-b368-33926f5549f9",
  361. card_json=card_json
  362. )
  363. def main():
  364. workflow_dashboard_monitor()
  365. global header_template_index
  366. workflows = mysql_helper.execute_query('select id, name, status from supply_workflow;')
  367. for workflow in workflows:
  368. try:
  369. workflow_monitor(workflow['id'], workflow['name'], workflow['status'])
  370. except Exception as e:
  371. print(f"【workflow策略】{workflow['name']} 监控异常: {e}")
  372. if __name__ == '__main__':
  373. main()