supply_workflow_monitor.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220
  1. from datetime import datetime
  2. from typing import Dict, Any
  3. import pandas
  4. import pandas as pd
  5. from helper.MySQLHelper import MySQLHelper
  6. from util import feishu_inform_util
  7. fei_shu_webhook = "https://open.feishu.cn/open-apis/bot/v2/hook/c09712a8-22cd-4bfa-93a5-30ae7b1db11b"
  8. mysql_helper = MySQLHelper(
  9. host="rm-t4na9qj85v7790tf84o.mysql.singapore.rds.aliyuncs.com",
  10. username="readonly",
  11. password="HdkZ4TDmeK6SQ3BRtJBk",
  12. database="aigc-admin-prod"
  13. )
  14. column_width_map = {
  15. "状态": "80px",
  16. "任务数": "80px",
  17. "记录数": "80px",
  18. }
  19. header_template = ["blue", "wathet", "turquoise", "green", "yellow", "orange"]
  20. header_template_index = 0
  21. def build_table_json(df: pandas.DataFrame, title: str) -> Dict[str, Any]:
  22. columns = []
  23. for column in df.columns:
  24. columns.append({
  25. "name": column,
  26. "display_name": column,
  27. "width": column_width_map.get(column, "auto"),
  28. "data_type": "text",
  29. "horizontal_align": "center",
  30. "vertical_align": "center",
  31. })
  32. rows = df.to_dict(orient="records")
  33. return {
  34. "schema": "2.0",
  35. "header": {
  36. "title": {
  37. "content": title
  38. },
  39. "template": header_template[header_template_index % len(header_template)],
  40. },
  41. "body": {
  42. "elements": [
  43. {
  44. "tag": "table",
  45. "element_id": "table_detail",
  46. "margin": "0px 0px 0px 0px",
  47. "page_size": 10,
  48. "header_style": {
  49. "text_align": "center",
  50. "text_size": "normal",
  51. "background_style": "none",
  52. "text_color": "grey",
  53. "bold": True,
  54. "lines": 1
  55. },
  56. "columns": columns,
  57. "rows": rows
  58. }
  59. ]
  60. }
  61. }
  62. def task_exe_step_stat(ts: int, workflow_id: str, workflow_name: str):
  63. sql = f'''
  64. select step_name AS '步骤名称',
  65. status as '执行状态',
  66. error_msg as '错误原因',
  67. count(1) AS '个数'
  68. from (
  69. select swt.task_id,
  70. step_name,
  71. case
  72. when status = 0 then '初始化'
  73. when status = 1 then '运行中'
  74. when status = 2 then '成功'
  75. when status = 3 then '失败'
  76. when swt.task_status = 0 then '初始化'
  77. when swt.task_status = 1 then '运行中'
  78. when swt.task_status = 2 then '成功'
  79. when swt.task_status = 3 then '失败'
  80. else '未知' end AS status,
  81. case
  82. when swtes.error_msg like '%Data too long%' then '数据超过字段长度限制'
  83. when swtes.error_msg like '%Deadlock%' then '数据库死锁'
  84. when (swtes.error_msg <> '' and swtes.error_msg is not null) then substring_index(swtes.error_msg, ',', 1)
  85. when swt.error_msg like '%Data too long%' then '数据超过字段长度限制'
  86. when swt.error_msg like '%Deadlock%' then '数据库死锁'
  87. when (swt.error_msg <> '' AND swt.error_msg is not null) then substring_index(swt.error_msg, ',', 1)
  88. else '' end AS error_msg
  89. from supply_workflow_task swt
  90. left join supply_workflow_task_exe_step swtes on swt.task_id = swtes.task_id
  91. where swt.create_timestamp >= {ts}
  92. and swt.workflow_id = {workflow_id}
  93. ) as t
  94. group by step_name, status, error_msg;
  95. '''
  96. stat = mysql_helper.execute_query(sql)
  97. df = pd.DataFrame(stat)
  98. feishu_inform_util.send_card_msg_to_feishu(
  99. webhook=fei_shu_webhook,
  100. card_json=build_table_json(df, f"【workflow-{workflow_name}】各步骤执行情况")
  101. )
  102. def workflow_status_stat(ts: int, workflow_id: str, workflow_name: str):
  103. sql = f'''
  104. select t.step as '阶段',
  105. t.status as '状态',
  106. t.error_msg as '失败原因',
  107. t.task_cnt as '任务数',
  108. t.cnt as '记录数'
  109. from (
  110. select '创建爬取计划' as step,
  111. workflow_id as workflow_id,
  112. status as status,
  113. error_msg as error_msg,
  114. count(distinct task_id) as task_cnt,
  115. count(distinct crawler_plan_id) as cnt
  116. from (
  117. select swt.workflow_id as workflow_id,
  118. swt.task_id as task_id,
  119. swcpr.crawler_plan_id as crawler_plan_id,
  120. case
  121. when swcpr.status = 0 then '初始化'
  122. when swcpr.status = 1 then '运行中'
  123. when swcpr.status = 2 then '成功'
  124. when swcpr.status = 3 then '失败'
  125. when swt.task_status = 0 then '初始化'
  126. when swt.task_status = 1 then '运行中'
  127. when swt.task_status = 2 then '成功'
  128. when swt.task_status = 3 then '失败'
  129. else '未知' end AS status,
  130. case
  131. when swcpr.error_msg like '%Data too long%' then '数据超过字段长度限制'
  132. when swcpr.error_msg like '%Deadlock%' then '数据库死锁'
  133. when (swcpr.error_msg <> '' AND swcpr.error_msg is not null) then substring_index(swcpr.error_msg, ',', 1)
  134. when swt.error_msg like '%Data too long%' then '数据超过字段长度限制'
  135. when swt.error_msg like '%Deadlock%' then '数据库死锁'
  136. when (swt.error_msg <> '' AND swt.error_msg is not null) then substring_index(swt.error_msg, ',', 1)
  137. else ''
  138. end AS error_msg
  139. from supply_workflow_task swt
  140. left join supply_workflow_crawler_plan_record swcpr
  141. on swt.task_id = swcpr.task_id
  142. where swt.create_timestamp >= {ts}
  143. ) as t
  144. group by workflow_id, status, error_msg
  145. union all
  146. select '生成计划绑定' as step,
  147. workflow_id as workflow_id,
  148. status as status,
  149. error_msg as error_msg,
  150. count(distinct task_id) as task_cnt,
  151. count(1) as cnt
  152. from (
  153. select swt.workflow_id as workflow_id,
  154. swt.task_id as task_id,
  155. case
  156. when swppr.status = 0 then '初始化'
  157. when swppr.status = 1 then '运行中'
  158. when swppr.status = 2 then '成功'
  159. when swppr.status = 3 then '失败'
  160. when swt.task_status = 0 then '初始化'
  161. when swt.task_status = 1 then '运行中'
  162. when swt.task_status = 2 then '成功'
  163. when swt.task_status = 3 then '失败'
  164. else '未知' end AS status,
  165. case
  166. when swppr.error_msg like '%Data too long%' then '数据超过字段长度限制'
  167. when swppr.error_msg like '%Deadlock%' then '数据库死锁'
  168. when (swppr.error_msg <> '' and swppr.error_msg is not null) then substring_index(swppr.error_msg, ',', 1)
  169. when swt.error_msg like '%Data too long%' then '数据超过字段长度限制'
  170. when swt.error_msg like '%Deadlock%' then '数据库死锁'
  171. when (swt.error_msg <> '' AND swt.error_msg is not null) then substring_index(swt.error_msg, ',', 1)
  172. else ''
  173. end AS error_msg
  174. from supply_workflow_task swt
  175. left join supply_workflow_produce_bind_record swppr
  176. on swt.task_id = swppr.task_id
  177. where swt.create_timestamp >= {ts}
  178. ) as t
  179. group by workflow_id, status, error_msg
  180. ) as t
  181. left join supply_workflow sw on t.workflow_id = sw.id
  182. where workflow_id = '{workflow_id}';
  183. '''
  184. stat = mysql_helper.execute_query(sql)
  185. df = pd.DataFrame(stat)
  186. feishu_inform_util.send_card_msg_to_feishu(
  187. webhook=fei_shu_webhook,
  188. card_json=build_table_json(df, f"【workflow-{workflow_name}】爬取和生成详情")
  189. )
  190. def main():
  191. global header_template_index
  192. today_midnight = datetime.now().replace(hour=0, minute=0, second=0, microsecond=0)
  193. timestamp_ms = int(today_midnight.timestamp() * 1000)
  194. workflows = mysql_helper.execute_query('select id, name from supply_workflow;')
  195. for workflow in workflows:
  196. task_exe_step_stat(timestamp_ms, workflow['id'], workflow['name'])
  197. workflow_status_stat(timestamp_ms, workflow['id'], workflow['name'])
  198. header_template_index = header_template_index + 1
  199. if __name__ == '__main__':
  200. main()