supply_workflow_monitor.py 8.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200
  1. from datetime import datetime
  2. from typing import Dict, List, Any
  3. import pandas as pd
  4. from helper.MySQLHelper import MySQLHelper
  5. from util import feishu_inform_util
  6. fei_shu_webhook = "https://open.feishu.cn/open-apis/bot/v2/hook/c09712a8-22cd-4bfa-93a5-30ae7b1db11b"
  7. mysql_helper = MySQLHelper(
  8. host="rm-t4na9qj85v7790tf84o.mysql.singapore.rds.aliyuncs.com",
  9. username="readonly",
  10. password="HdkZ4TDmeK6SQ3BRtJBk",
  11. database="aigc-admin-prod"
  12. )
  13. def build_card_join(content: str, title: str):
  14. return {
  15. "schema": "2.0",
  16. "header": {
  17. "title": {
  18. "content": title
  19. },
  20. "template": "green",
  21. },
  22. "body": {
  23. "elements": [
  24. {
  25. "tag": "markdown",
  26. "element_id": "detail",
  27. "margin": "0px 0px 0px 0px",
  28. "content": content,
  29. "text_size": "normal",
  30. "text_align": "left"
  31. }
  32. ]
  33. }
  34. }
  35. def df_to_markdown_table(df: pd.DataFrame) -> str:
  36. """将DataFrame转换为标准Markdown表格字符串"""
  37. headers = list(df.columns)
  38. str_rows = df.astype(str).values
  39. col_widths = []
  40. for i, h in enumerate(headers):
  41. max_w = len(str(h))
  42. for row in str_rows:
  43. max_w = max(max_w, len(str(row[i])))
  44. col_widths.append(max_w)
  45. def _row(values):
  46. cells = [str(v).ljust(col_widths[i]) for i, v in enumerate(values)]
  47. return '| ' + ' | '.join(cells) + ' |'
  48. sep = '| ' + ' | '.join('-' * w for w in col_widths) + ' |'
  49. lines = [_row(headers), sep]
  50. for row in str_rows:
  51. lines.append(_row(row))
  52. return '\n'.join(lines)
  53. def find_all_workflow() -> List[Dict[str, Any]]:
  54. sql = 'select id, name from supply_workflow;'
  55. return mysql_helper.execute_query(sql)
  56. def task_exe_step_stat(ts: int, workflow_id: str, workflow_name: str):
  57. sql = f'''
  58. select step_name AS '步骤名称',
  59. status as '执行状态',
  60. error_msg as '错误原因',
  61. count(1) AS '个数'
  62. from (
  63. select swt.task_id,
  64. step_name,
  65. case
  66. when status = 0 then '初始化'
  67. when status = 1 then '运行中'
  68. when status = 2 then '成功'
  69. when status = 3 then '失败'
  70. else '未知' end AS status,
  71. case
  72. when swtes.error_msg like '%Data too long%' then '数据超过字段长度限制'
  73. when swtes.error_msg like '%Deadlock%' then '数据库死锁'
  74. when (swtes.error_msg = '' or swtes.error_msg is null) then ''
  75. else substring_index(swtes.error_msg, ',', 1)
  76. end AS error_msg
  77. from supply_workflow_task swt
  78. left join supply_workflow_task_exe_step swtes on swt.task_id = swtes.task_id
  79. where swt.create_timestamp >= {ts}
  80. and swt.workflow_id = {workflow_id}
  81. ) as t
  82. group by step_name, status, error_msg;
  83. '''
  84. stat = mysql_helper.execute_query(sql)
  85. df = pd.DataFrame(stat)
  86. msg = df_to_markdown_table(df)
  87. feishu_inform_util.send_card_msg_to_feishu(
  88. webhook=fei_shu_webhook,
  89. card_json=build_card_join(msg, f"【workflow-{workflow_name}】各步骤执行情况")
  90. )
  91. def workflow_status_stat(ts: int, workflow_id: str, workflow_name: str):
  92. sql = f'''
  93. select sw.name as '策略名',
  94. t.step as '阶段',
  95. t.status as '状态',
  96. t.error_msg as '失败原因',
  97. t.task_cnt as '任务数',
  98. t.cnt as '记录数'
  99. from (
  100. select '创建爬取计划' as step,
  101. workflow_id as workflow_id,
  102. status as status,
  103. error_msg as error_msg,
  104. count(distinct task_id) as task_cnt,
  105. count(distinct crawler_plan_id) as cnt
  106. from (
  107. select swt.workflow_id as workflow_id,
  108. swt.task_id as task_id,
  109. swcpr.crawler_plan_id as crawler_plan_id,
  110. case
  111. when swcpr.status = 0 then '初始化'
  112. when swcpr.status = 1 then '运行中'
  113. when swcpr.status = 2 then '成功'
  114. when swcpr.status = 3 then '失败'
  115. else '未知' end AS status,
  116. case
  117. when swcpr.error_msg like '%Data too long%' then '数据超过字段长度限制'
  118. when swcpr.error_msg like '%Deadlock%' then '数据库死锁'
  119. when (swcpr.error_msg = '' or swcpr.error_msg is null) then ''
  120. else substring_index(swcpr.error_msg, ',', 1)
  121. end AS error_msg
  122. from supply_workflow_task swt
  123. left join supply_workflow_crawler_plan_record swcpr
  124. on swt.task_id = swcpr.task_id
  125. where swt.create_timestamp >= {ts}
  126. ) as t
  127. group by workflow_id, status, error_msg
  128. union all
  129. select '生成计划绑定' as step,
  130. workflow_id as workflow_id,
  131. status as status,
  132. error_msg as error_msg,
  133. count(distinct task_id) as task_cnt,
  134. count(1) as cnt
  135. from (
  136. select swt.workflow_id as workflow_id,
  137. swt.task_id as task_id,
  138. case
  139. when swppr.status = 0 then '初始化'
  140. when swppr.status = 1 then '运行中'
  141. when swppr.status = 2 then '成功'
  142. when swppr.status = 3 then '失败'
  143. else '未知' end AS status,
  144. case
  145. when swppr.error_msg like '%Data too long%' then '数据超过字段长度限制'
  146. when swppr.error_msg like '%Deadlock%' then '数据库死锁'
  147. when (swppr.error_msg = '' or swppr.error_msg is null) then ''
  148. else substring_index(swppr.error_msg, ',', 1)
  149. end AS error_msg
  150. from supply_workflow_task swt
  151. left join supply_workflow_produce_bind_record swppr
  152. on swt.task_id = swppr.task_id
  153. where swt.create_timestamp >= {ts}
  154. ) as t
  155. group by workflow_id, status, error_msg
  156. ) as t
  157. left join supply_workflow sw on t.workflow_id = sw.id
  158. where workflow_id = '{workflow_id}';
  159. '''
  160. stat = mysql_helper.execute_query(sql)
  161. df = pd.DataFrame(stat)
  162. msg = df_to_markdown_table(df)
  163. feishu_inform_util.send_card_msg_to_feishu(
  164. webhook=fei_shu_webhook,
  165. card_json=build_card_join(msg, f"【workflow-{workflow_name}】爬取和生成详情")
  166. )
  167. def main():
  168. today_midnight = datetime.now().replace(hour=0, minute=0, second=0, microsecond=0)
  169. timestamp_ms = int(today_midnight.timestamp() * 1000)
  170. workflows = find_all_workflow()
  171. for workflow in workflows:
  172. task_exe_step_stat(timestamp_ms, workflow['id'], workflow['name'])
  173. workflow_status_stat(timestamp_ms, workflow['id'], workflow['name'])
  174. if __name__ == '__main__':
  175. main()