Bläddra i källkod

feat:修改监控

zhaohaipeng 6 dagar sedan
förälder
incheckning
5840e6653f
1 ändrade filer med 46 tillägg och 38 borttagningar
  1. 46 38
      monitor/supply_workflow_monitor.py

+ 46 - 38
monitor/supply_workflow_monitor.py

@@ -16,12 +16,12 @@ mysql_helper = MySQLHelper(
 )
 
 
-def build_card_join(content: str):
+def build_card_join(content: str, title: str):
     return {
         "schema": "2.0",
         "header": {
             "title": {
-                "content": "任务执行步骤统计"
+                "content": title
             },
             "template": "green",
         },
@@ -63,48 +63,56 @@ def df_to_markdown_table(df: pd.DataFrame) -> str:
     return '\n'.join(lines)
 
 
-def task_exe_step_stat(ts: int) -> List[Dict[str, Any]]:
-    sql = f'''
-select step_name AS "步骤名称",
-       status    as '执行状态',
-       error_msg as '错误原因',
-       count(1)  AS '个数'
-from (
-         select step_name,
-                case
-                    when status = 0 then '初始化'
-                    when status = 1 then '运行中'
-                    when status = 2 then '成功'
-                    when status = 3 then '失败'
-                    else '未知' end AS status,
-                case
-                    when error_msg like '%Data too long%' then '数据超过字段长度限制'
-                    when error_msg like '%Deadlock%' then '数据库死锁'
-                    when (error_msg = '' or error_msg is null) then ''
-                    else substring_index(error_msg, ',', 1)
-                    end             AS error_msg
-         from supply_workflow_task_exe_step
-         where create_timestamp >= {ts} 
-     ) as t
-group by step_name, status, error_msg
-'''
+def find_all_workflow() -> List[Dict[str, Any]]:
+    sql = 'select id, name from supply_workflow;'
     return mysql_helper.execute_query(sql)
 
 
+def task_exe_step_stat(ts: int):
+    workflows = find_all_workflow()
+    for workflow in workflows:
+        sql = f'''
+                    select step_name AS "步骤名称",
+                           status    as '执行状态',
+                           error_msg as '错误原因',
+                           count(1)  AS '个数'
+                    from (
+                             select swt.task_id,
+                                    step_name,
+                                    case
+                                        when status = 0 then '初始化'
+                                        when status = 1 then '运行中'
+                                        when status = 2 then '成功'
+                                        when status = 3 then '失败'
+                                        else '未知' end AS status,
+                                    case
+                                        when swtes.error_msg like '%Data too long%' then '数据超过字段长度限制'
+                                        when swtes.error_msg like '%Deadlock%' then '数据库死锁'
+                                        when (swtes.error_msg = '' or swtes.error_msg is null) then ''
+                                        else substring_index(swtes.error_msg, ',', 1)
+                                        end             AS error_msg
+                             from supply_workflow_task swt
+                                      left join supply_workflow_task_exe_step swtes on swt.task_id = swtes.task_id
+                             where swt.create_timestamp >= {ts}
+                               and swt.workflow_id = {workflow['id']}
+                         ) as t
+                    group by step_name, status, error_msg;
+                '''
+        stat = mysql_helper.execute_query(sql)
+        df = pd.DataFrame(stat)
+
+        msg = df_to_markdown_table(df)
+
+        feishu_inform_util.send_card_msg_to_feishu(
+            webhook=fei_shu_webhook,
+            card_json=build_card_join(msg, f"【workflow-{workflow['name']}】各步骤执行情况")
+        )
+
+
 def main():
     today_midnight = datetime.now().replace(hour=0, minute=0, second=0, microsecond=0)
     timestamp_ms = int(today_midnight.timestamp() * 1000)
-    stat = task_exe_step_stat(timestamp_ms)
-    df = pd.DataFrame(stat)
-
-    print("当日任务步骤执行统计")
-
-    msg = df_to_markdown_table(df)
-
-    feishu_inform_util.send_card_msg_to_feishu(
-        webhook=fei_shu_webhook,
-        card_json=build_card_join(msg)
-    )
+    task_exe_step_stat(timestamp_ms)
 
 
 if __name__ == '__main__':