supply_workflow_monitor.py 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360
  1. from datetime import datetime
  2. from typing import Dict, Any, List
  3. import pandas
  4. import pandas as pd
  5. from helper.MySQLHelper import MySQLHelper
  6. from util import feishu_inform_util
  7. fei_shu_webhook = "https://open.feishu.cn/open-apis/bot/v2/hook/c09712a8-22cd-4bfa-93a5-30ae7b1db11b"
  8. mysql_helper = MySQLHelper(
  9. host="rm-t4na9qj85v7790tf84o.mysql.singapore.rds.aliyuncs.com",
  10. username="readonly",
  11. password="HdkZ4TDmeK6SQ3BRtJBk",
  12. database="aigc-admin-prod"
  13. )
  14. column_width_map = {
  15. "状态": "80px",
  16. "任务数": "80px",
  17. "记录数": "80px",
  18. }
  19. header_template = ["blue", "wathet", "turquoise", "green", "yellow", "orange"]
  20. header_template_index = 0
  21. def build_config_json() -> Dict[str, Any]:
  22. return {
  23. "width_mode": "default",
  24. "enable_forward": False,
  25. }
  26. def build_markdown_element_json(content: str) -> Dict[str, Any]:
  27. return {
  28. "tag": "markdown",
  29. "element_id": "markdown",
  30. "content": content
  31. }
  32. def build_table_element_json(df: pandas.DataFrame) -> Dict[str, Any]:
  33. columns = []
  34. for column in df.columns:
  35. columns.append({
  36. "name": column,
  37. "display_name": column,
  38. "width": column_width_map.get(column, "auto"),
  39. "data_type": "text",
  40. "horizontal_align": "center",
  41. "vertical_align": "center",
  42. })
  43. rows = df.to_dict(orient="records")
  44. return {
  45. "tag": "table",
  46. "element_id": "table_detail",
  47. "margin": "2px 0px",
  48. "page_size": 10,
  49. "header_style": {
  50. "text_align": "center",
  51. "text_size": "normal",
  52. "background_style": "none",
  53. "text_color": "grey",
  54. "bold": True,
  55. "lines": 1
  56. },
  57. "columns": columns,
  58. "rows": rows
  59. }
  60. def build_markdown_table_element_json(df: pandas.DataFrame) -> Dict[str, Any]:
  61. # 表头
  62. header = "| " + " | ".join(df.columns) + " |\n"
  63. # 分隔线
  64. separator = "| " + " | ".join(["---"] * len(df.columns)) + " |\n"
  65. # 数据行
  66. rows = "\n".join(
  67. "| " + " | ".join(str(val) for val in row) + " |"
  68. for row in df.to_numpy() # 或 df.values
  69. )
  70. content = header + separator + rows
  71. return build_markdown_element_json(content)
  72. def build_collapsible_panel_element_json(elements: List[Dict[str, Any]], title: str) -> Dict[str, Any]:
  73. return {
  74. "tag": "collapsible_panel",
  75. "expanded": False,
  76. "header": {
  77. "title": {
  78. "tag": "plain_text",
  79. "content": title
  80. },
  81. "vertical_align": "center",
  82. },
  83. "border": {
  84. "color": "grey",
  85. "corner_radius": "5px"
  86. },
  87. "elements": elements
  88. }
  89. def build_hr_element_json() -> Dict[str, Any]:
  90. return {
  91. "tag": "hr",
  92. "element_id": "custom_id",
  93. "margin": "10px 0px"
  94. }
  95. def build_text_tag_json(content: str, color: str = 'green') -> Dict[str, Any]:
  96. return {
  97. "tag": "text_tag",
  98. "element_id": "custom_id",
  99. "text": {
  100. "tag": "plain_text",
  101. "content": content
  102. },
  103. "color": color
  104. }
  105. def build_sub_title_json(content: str) -> Dict[str, Any]:
  106. return {
  107. "tag": "plain_text",
  108. "content": content
  109. }
  110. def build_header_json(content: str, template: str = 'green', sub_title: Dict[str, Any] = None, text_tag_list: List[Dict[str, Any]] = []) -> Dict[str, Any]:
  111. return {
  112. "title": {
  113. "content": content
  114. },
  115. "text_tag_list": text_tag_list,
  116. "template": template,
  117. "subtitle": sub_title
  118. }
  119. def build_card_json(body_elements: List[Dict[str, Any]], header: Dict[str, Any], config: Dict[str, Any] = None) -> Dict[str, Any]:
  120. return {
  121. "schema": "2.0",
  122. "config": config,
  123. "header": header,
  124. "body": {
  125. "elements": body_elements
  126. }
  127. }
  128. def workflow_task_status_stat(ts: int, workflow_id: str) -> List[Dict[str, Any]]:
  129. sql = f"""
  130. select workflow_id as 'workflow_id',
  131. task_status as '任务状态',
  132. count(distinct task_id) as '任务数',
  133. count(task_input) as '需求数'
  134. from (
  135. select workflow_id as workflow_id,
  136. task_id as task_id,
  137. task_input as task_input,
  138. case
  139. when task_status = 0 then '初始化'
  140. when task_status = 1 then '运行中'
  141. when task_status = 2 then '成功'
  142. when task_status = 3 then '失败'
  143. else '未知' end AS task_status
  144. from supply_workflow_task
  145. where workflow_id = '{workflow_id}'
  146. and create_timestamp >= {ts}
  147. ) as t
  148. group by t.task_status, t.workflow_id;
  149. """
  150. result = mysql_helper.execute_query(sql)
  151. return result
  152. def task_exe_step_stat_query(ts: int, workflow_id: str) -> List[Dict[str, Any]]:
  153. sql = f'''
  154. select step_name AS '步骤名称',
  155. status as '执行状态',
  156. error_msg as '错误原因',
  157. count(1) AS '任务数'
  158. from (
  159. select swt.task_id,
  160. step_name,
  161. case
  162. when status = 0 then '初始化'
  163. when status = 1 then '运行中'
  164. when status = 2 then '成功'
  165. when status = 3 then '失败'
  166. when swt.task_status = 0 then '初始化'
  167. when swt.task_status = 1 then '运行中'
  168. when swt.task_status = 2 then '成功'
  169. when swt.task_status = 3 then '失败'
  170. else '未知' end AS status,
  171. case
  172. when swtes.error_msg like '%Data too long%' then '数据超过字段长度限制'
  173. when swtes.error_msg like '%Deadlock%' then '数据库死锁'
  174. when (swtes.error_msg <> '' and swtes.error_msg is not null) then substring_index(swtes.error_msg, ',', 1)
  175. when swt.error_msg like '%Data too long%' then '数据超过字段长度限制'
  176. when swt.error_msg like '%Deadlock%' then '数据库死锁'
  177. when (swt.error_msg <> '' AND swt.error_msg is not null) then substring_index(swt.error_msg, ',', 1)
  178. else '' end AS error_msg
  179. from supply_workflow_task swt
  180. left join supply_workflow_task_exe_step swtes on swt.task_id = swtes.task_id
  181. where swt.create_timestamp >= {ts}
  182. and swt.workflow_id = {workflow_id}
  183. ) as t
  184. group by step_name, status, error_msg;
  185. '''
  186. return mysql_helper.execute_query(sql)
  187. def crawler_and_produce_stat_query(ts: int, workflow_id: str) -> List[Dict[str, Any]]:
  188. sql = f'''
  189. select t.step as '阶段',
  190. t.status as '状态',
  191. t.error_msg as '失败原因',
  192. t.task_cnt as '任务数',
  193. t.cnt as '记录数'
  194. from (
  195. select '创建爬取计划' as step,
  196. workflow_id as workflow_id,
  197. status as status,
  198. error_msg as error_msg,
  199. count(distinct task_id) as task_cnt,
  200. count(distinct crawler_plan_id) as cnt
  201. from (
  202. select swt.workflow_id as workflow_id,
  203. swt.task_id as task_id,
  204. swcpr.crawler_plan_id as crawler_plan_id,
  205. case
  206. when swcpr.status = 0 then '初始化'
  207. when swcpr.status = 1 then '运行中'
  208. when swcpr.status = 2 then '成功'
  209. when swcpr.status = 3 then '失败'
  210. when swt.task_status = 0 then '初始化'
  211. when swt.task_status = 1 then '运行中'
  212. when swt.task_status = 2 then '成功'
  213. when swt.task_status = 3 then '失败'
  214. else '未知' end AS status,
  215. case
  216. when swcpr.error_msg like '%Data too long%' then '数据超过字段长度限制'
  217. when swcpr.error_msg like '%Deadlock%' then '数据库死锁'
  218. when (swcpr.error_msg <> '' AND swcpr.error_msg is not null) then substring_index(swcpr.error_msg, ',', 1)
  219. when swt.error_msg like '%Data too long%' then '数据超过字段长度限制'
  220. when swt.error_msg like '%Deadlock%' then '数据库死锁'
  221. when (swt.error_msg <> '' AND swt.error_msg is not null) then substring_index(swt.error_msg, ',', 1)
  222. else ''
  223. end AS error_msg
  224. from supply_workflow_task swt
  225. left join supply_workflow_crawler_plan_record swcpr
  226. on swt.task_id = swcpr.task_id
  227. where swt.create_timestamp >= {ts}
  228. ) as t
  229. group by workflow_id, status, error_msg
  230. union all
  231. select '生成计划绑定' as step,
  232. workflow_id as workflow_id,
  233. status as status,
  234. error_msg as error_msg,
  235. count(distinct task_id) as task_cnt,
  236. count(1) as cnt
  237. from (
  238. select swt.workflow_id as workflow_id,
  239. swt.task_id as task_id,
  240. case
  241. when swppr.status = 0 then '初始化'
  242. when swppr.status = 1 then '运行中'
  243. when swppr.status = 2 then '成功'
  244. when swppr.status = 3 then '失败'
  245. when swt.task_status = 0 then '初始化'
  246. when swt.task_status = 1 then '运行中'
  247. when swt.task_status = 2 then '成功'
  248. when swt.task_status = 3 then '失败'
  249. else '未知' end AS status,
  250. case
  251. when swppr.error_msg like '%Data too long%' then '数据超过字段长度限制'
  252. when swppr.error_msg like '%Deadlock%' then '数据库死锁'
  253. when (swppr.error_msg <> '' and swppr.error_msg is not null) then substring_index(swppr.error_msg, ',', 1)
  254. when swt.error_msg like '%Data too long%' then '数据超过字段长度限制'
  255. when swt.error_msg like '%Deadlock%' then '数据库死锁'
  256. when (swt.error_msg <> '' AND swt.error_msg is not null) then substring_index(swt.error_msg, ',', 1)
  257. else ''
  258. end AS error_msg
  259. from supply_workflow_task swt
  260. left join supply_workflow_produce_bind_record swppr
  261. on swt.task_id = swppr.task_id
  262. where swt.create_timestamp >= {ts}
  263. ) as t
  264. group by workflow_id, status, error_msg
  265. ) as t
  266. left join supply_workflow sw on t.workflow_id = sw.id
  267. where workflow_id = '{workflow_id}';
  268. '''
  269. return mysql_helper.execute_query(sql)
  270. def main():
  271. global header_template_index
  272. today_midnight = datetime.now().replace(hour=0, minute=0, second=0, microsecond=0)
  273. start_dt_str = today_midnight.strftime('%Y-%m-%d %H:%M:%S')
  274. end_dt_str = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
  275. timestamp_ms = int(today_midnight.timestamp() * 1000)
  276. workflows = mysql_helper.execute_query('select id, name, status from supply_workflow;')
  277. for workflow in workflows:
  278. task_status_stat = workflow_task_status_stat(timestamp_ms, workflow['id'])
  279. exe_step_stat = task_exe_step_stat_query(timestamp_ms, workflow['id'])
  280. status_stat = crawler_and_produce_stat_query(timestamp_ms, workflow['id'])
  281. # 构建标题
  282. text_tag_list = []
  283. if workflow['status'] == 0:
  284. text_tag_list.append(build_text_tag_json("已关闭", "red"))
  285. elif workflow['status'] == 1:
  286. text_tag_list.append(build_text_tag_json("开启中", "green"))
  287. else:
  288. text_tag_list.append(build_text_tag_json("未知", "orange"))
  289. template = header_template[header_template_index % len(header_template)]
  290. sub_title = build_sub_title_json(f"{start_dt_str} - {end_dt_str}")
  291. header = build_header_json(f"【workflow策略】{workflow['name']}", template, sub_title, text_tag_list)
  292. # 构建下文内容
  293. task_status_df = pd.DataFrame(task_status_stat)
  294. task_info = f"**总任务数**: {task_status_df['任务数'].sum()}" + \
  295. "".join([
  296. f"\n**{item['任务状态']}任务数**: {item['任务数']}"
  297. for item in task_status_df.to_dict(orient='records')
  298. ])
  299. elements = [
  300. build_markdown_element_json(task_info),
  301. build_hr_element_json(),
  302. build_table_element_json(pd.DataFrame(exe_step_stat)),
  303. build_hr_element_json(),
  304. build_table_element_json(pd.DataFrame(status_stat)),
  305. ]
  306. config = build_config_json()
  307. card_json = build_card_json(elements, header, config)
  308. feishu_inform_util.send_card_msg_to_feishu(
  309. webhook=fei_shu_webhook,
  310. card_json=card_json
  311. )
  312. if __name__ == '__main__':
  313. main()