supply_workflow_monitor.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269
  1. from datetime import datetime
  2. from typing import Dict, Any
  3. import pandas
  4. import pandas as pd
  5. from helper.MySQLHelper import MySQLHelper
  6. from util import feishu_inform_util
  7. fei_shu_webhook = "https://open.feishu.cn/open-apis/bot/v2/hook/c09712a8-22cd-4bfa-93a5-30ae7b1db11b"
  8. mysql_helper = MySQLHelper(
  9. host="rm-t4na9qj85v7790tf84o.mysql.singapore.rds.aliyuncs.com",
  10. username="readonly",
  11. password="HdkZ4TDmeK6SQ3BRtJBk",
  12. database="aigc-admin-prod"
  13. )
  14. column_width_map = {
  15. "状态": "80px",
  16. "任务数": "80px",
  17. "记录数": "80px",
  18. }
  19. header_template = ["blue", "wathet", "turquoise", "green", "yellow", "orange"]
  20. header_template_index = 0
  21. def build_collapsible_panel_table_json(df_map: Dict[str, pandas.DataFrame], title: str) -> Dict[str, Any]:
  22. elements = []
  23. for key, df in df_map.items():
  24. # 表头
  25. header = "| " + " | ".join(df.columns) + " |\n"
  26. # 分隔线
  27. separator = "| " + " | ".join(["---"] * len(df.columns)) + " |\n"
  28. # 数据行
  29. rows = "\n".join(
  30. "| " + " | ".join(str(val) for val in row) + " |"
  31. for row in df.to_numpy() # 或 df.values
  32. )
  33. content = header + separator + rows
  34. elements.append({
  35. "tag": "collapsible_panel",
  36. "expanded": False,
  37. "header": {
  38. "title": {
  39. "tag": "plain_text",
  40. "content": key
  41. },
  42. "vertical_align": "center",
  43. "template": header_template[header_template_index % len(header_template)],
  44. },
  45. "border": {
  46. "color": "grey",
  47. "corner_radius": "5px"
  48. },
  49. "elements": [
  50. {
  51. "tag": "markdown",
  52. "content": content
  53. }
  54. ]
  55. })
  56. return {
  57. "schema": "2.0",
  58. "header": {
  59. "title": {
  60. "content": title
  61. },
  62. "template": header_template[header_template_index % len(header_template)],
  63. },
  64. "body": {
  65. "elements": elements
  66. }
  67. }
  68. def build_table_json(df: pandas.DataFrame, title: str) -> Dict[str, Any]:
  69. columns = []
  70. for column in df.columns:
  71. columns.append({
  72. "name": column,
  73. "display_name": column,
  74. "width": column_width_map.get(column, "auto"),
  75. "data_type": "text",
  76. "horizontal_align": "center",
  77. "vertical_align": "center",
  78. })
  79. rows = df.to_dict(orient="records")
  80. return {
  81. "schema": "2.0",
  82. "header": {
  83. "title": {
  84. "content": title
  85. },
  86. "template": header_template[header_template_index % len(header_template)],
  87. },
  88. "body": {
  89. "elements": [
  90. {
  91. "tag": "table",
  92. "element_id": "table_detail",
  93. "margin": "0px 0px 0px 0px",
  94. "page_size": 10,
  95. "header_style": {
  96. "text_align": "center",
  97. "text_size": "normal",
  98. "background_style": "none",
  99. "text_color": "grey",
  100. "bold": True,
  101. "lines": 1
  102. },
  103. "columns": columns,
  104. "rows": rows
  105. }
  106. ]
  107. }
  108. }
  109. def task_exe_step_stat_query(ts: int, workflow_id: str):
  110. sql = f'''
  111. select step_name AS '步骤名称',
  112. status as '执行状态',
  113. error_msg as '错误原因',
  114. count(1) AS '个数'
  115. from (
  116. select swt.task_id,
  117. step_name,
  118. case
  119. when status = 0 then '初始化'
  120. when status = 1 then '运行中'
  121. when status = 2 then '成功'
  122. when status = 3 then '失败'
  123. when swt.task_status = 0 then '初始化'
  124. when swt.task_status = 1 then '运行中'
  125. when swt.task_status = 2 then '成功'
  126. when swt.task_status = 3 then '失败'
  127. else '未知' end AS status,
  128. case
  129. when swtes.error_msg like '%Data too long%' then '数据超过字段长度限制'
  130. when swtes.error_msg like '%Deadlock%' then '数据库死锁'
  131. when (swtes.error_msg <> '' and swtes.error_msg is not null) then substring_index(swtes.error_msg, ',', 1)
  132. when swt.error_msg like '%Data too long%' then '数据超过字段长度限制'
  133. when swt.error_msg like '%Deadlock%' then '数据库死锁'
  134. when (swt.error_msg <> '' AND swt.error_msg is not null) then substring_index(swt.error_msg, ',', 1)
  135. else '' end AS error_msg
  136. from supply_workflow_task swt
  137. left join supply_workflow_task_exe_step swtes on swt.task_id = swtes.task_id
  138. where swt.create_timestamp >= {ts}
  139. and swt.workflow_id = {workflow_id}
  140. ) as t
  141. group by step_name, status, error_msg;
  142. '''
  143. return mysql_helper.execute_query(sql)
  144. def workflow_status_stat_query(ts: int, workflow_id: str):
  145. sql = f'''
  146. select t.step as '阶段',
  147. t.status as '状态',
  148. t.error_msg as '失败原因',
  149. t.task_cnt as '任务数',
  150. t.cnt as '记录数'
  151. from (
  152. select '创建爬取计划' as step,
  153. workflow_id as workflow_id,
  154. status as status,
  155. error_msg as error_msg,
  156. count(distinct task_id) as task_cnt,
  157. count(distinct crawler_plan_id) as cnt
  158. from (
  159. select swt.workflow_id as workflow_id,
  160. swt.task_id as task_id,
  161. swcpr.crawler_plan_id as crawler_plan_id,
  162. case
  163. when swcpr.status = 0 then '初始化'
  164. when swcpr.status = 1 then '运行中'
  165. when swcpr.status = 2 then '成功'
  166. when swcpr.status = 3 then '失败'
  167. when swt.task_status = 0 then '初始化'
  168. when swt.task_status = 1 then '运行中'
  169. when swt.task_status = 2 then '成功'
  170. when swt.task_status = 3 then '失败'
  171. else '未知' end AS status,
  172. case
  173. when swcpr.error_msg like '%Data too long%' then '数据超过字段长度限制'
  174. when swcpr.error_msg like '%Deadlock%' then '数据库死锁'
  175. when (swcpr.error_msg <> '' AND swcpr.error_msg is not null) then substring_index(swcpr.error_msg, ',', 1)
  176. when swt.error_msg like '%Data too long%' then '数据超过字段长度限制'
  177. when swt.error_msg like '%Deadlock%' then '数据库死锁'
  178. when (swt.error_msg <> '' AND swt.error_msg is not null) then substring_index(swt.error_msg, ',', 1)
  179. else ''
  180. end AS error_msg
  181. from supply_workflow_task swt
  182. left join supply_workflow_crawler_plan_record swcpr
  183. on swt.task_id = swcpr.task_id
  184. where swt.create_timestamp >= {ts}
  185. ) as t
  186. group by workflow_id, status, error_msg
  187. union all
  188. select '生成计划绑定' as step,
  189. workflow_id as workflow_id,
  190. status as status,
  191. error_msg as error_msg,
  192. count(distinct task_id) as task_cnt,
  193. count(1) as cnt
  194. from (
  195. select swt.workflow_id as workflow_id,
  196. swt.task_id as task_id,
  197. case
  198. when swppr.status = 0 then '初始化'
  199. when swppr.status = 1 then '运行中'
  200. when swppr.status = 2 then '成功'
  201. when swppr.status = 3 then '失败'
  202. when swt.task_status = 0 then '初始化'
  203. when swt.task_status = 1 then '运行中'
  204. when swt.task_status = 2 then '成功'
  205. when swt.task_status = 3 then '失败'
  206. else '未知' end AS status,
  207. case
  208. when swppr.error_msg like '%Data too long%' then '数据超过字段长度限制'
  209. when swppr.error_msg like '%Deadlock%' then '数据库死锁'
  210. when (swppr.error_msg <> '' and swppr.error_msg is not null) then substring_index(swppr.error_msg, ',', 1)
  211. when swt.error_msg like '%Data too long%' then '数据超过字段长度限制'
  212. when swt.error_msg like '%Deadlock%' then '数据库死锁'
  213. when (swt.error_msg <> '' AND swt.error_msg is not null) then substring_index(swt.error_msg, ',', 1)
  214. else ''
  215. end AS error_msg
  216. from supply_workflow_task swt
  217. left join supply_workflow_produce_bind_record swppr
  218. on swt.task_id = swppr.task_id
  219. where swt.create_timestamp >= {ts}
  220. ) as t
  221. group by workflow_id, status, error_msg
  222. ) as t
  223. left join supply_workflow sw on t.workflow_id = sw.id
  224. where workflow_id = '{workflow_id}';
  225. '''
  226. return mysql_helper.execute_query(sql)
  227. def main():
  228. global header_template_index
  229. today_midnight = datetime.now().replace(hour=0, minute=0, second=0, microsecond=0)
  230. timestamp_ms = int(today_midnight.timestamp() * 1000)
  231. workflows = mysql_helper.execute_query('select id, name from supply_workflow;')
  232. for workflow in workflows:
  233. exe_step_stat = task_exe_step_stat_query(timestamp_ms, workflow['id'])
  234. status_stat = workflow_status_stat_query(timestamp_ms, workflow['id'])
  235. df = pd.DataFrame(exe_step_stat)
  236. df_map = {
  237. "各步骤执行情况统计": pd.DataFrame(exe_step_stat),
  238. "爬取和生成情况统计": pd.DataFrame(status_stat),
  239. }
  240. feishu_inform_util.send_card_msg_to_feishu(
  241. webhook=fei_shu_webhook,
  242. card_json=build_collapsible_panel_table_json(df_map, f"【workflow-{workflow['name']}】数据统计")
  243. )
  244. if __name__ == '__main__':
  245. main()