zhaohaipeng пре 6 дана
родитељ
комит
309a6d9d07
1 измењених фајлова са 148 додато и 77 уклоњено
  1. 148 77
      monitor/supply_workflow_monitor.py

+ 148 - 77
monitor/supply_workflow_monitor.py

@@ -1,5 +1,5 @@
 from datetime import datetime
-from typing import Dict, Any
+from typing import Dict, Any, List
 
 import pandas
 import pandas as pd
@@ -26,57 +26,22 @@ header_template = ["blue", "wathet", "turquoise", "green", "yellow", "orange"]
 header_template_index = 0
 
 
-def build_collapsible_panel_table_json(df_map: Dict[str, pandas.DataFrame], title: str) -> Dict[str, Any]:
-    elements = []
-    for key, df in df_map.items():
-        # 表头
-        header = "| " + " | ".join(df.columns) + " |\n"
-        # 分隔线
-        separator = "| " + " | ".join(["---"] * len(df.columns)) + " |\n"
-        # 数据行
-        rows = "\n".join(
-            "| " + " | ".join(str(val) for val in row) + " |"
-            for row in df.to_numpy()  # 或 df.values
-        )
-        content = header + separator + rows
-
-        elements.append({
-            "tag": "collapsible_panel",
-            "expanded": False,
-            "header": {
-                "title": {
-                    "tag": "plain_text",
-                    "content": key
-                },
-                "vertical_align": "center",
-                "template": header_template[header_template_index % len(header_template)],
-            },
-            "border": {
-                "color": "grey",
-                "corner_radius": "5px"
-            },
-            "elements": [
-                {
-                    "tag": "markdown",
-                    "content": content
-                }
-            ]
-        })
+def build_config_json() -> Dict[str, Any]:
     return {
-        "schema": "2.0",
-        "header": {
-            "title": {
-                "content": title
-            },
-            "template": header_template[header_template_index % len(header_template)],
-        },
-        "body": {
-            "elements": elements
-        }
+        "width_mode": "default",
+        "enable_forward": False,
+    }
+
+
+def build_markdown_element_json(content: str) -> Dict[str, Any]:
+    return {
+        "tag": "markdown",
+        "element_id": "markdown",
+        "content": content
     }
 
 
-def build_table_json(df: pandas.DataFrame, title: str) -> Dict[str, Any]:
+def build_table_element_json(df: pandas.DataFrame) -> Dict[str, Any]:
     columns = []
     for column in df.columns:
         columns.append({
@@ -89,39 +54,120 @@ def build_table_json(df: pandas.DataFrame, title: str) -> Dict[str, Any]:
         })
 
     rows = df.to_dict(orient="records")
+    return {
+        "tag": "table",
+        "element_id": "table_detail",
+        "margin": "4px 0px",
+        "page_size": 10,
+        "header_style": {
+            "text_align": "center",
+            "text_size": "normal",
+            "background_style": "none",
+            "text_color": "grey",
+            "bold": True,
+            "lines": 1
+        },
+        "columns": columns,
+        "rows": rows
+    }
+
+
+def build_markdown_table_element_json(df: pandas.DataFrame) -> Dict[str, Any]:
+    # 表头
+    header = "| " + " | ".join(df.columns) + " |\n"
+    # 分隔线
+    separator = "| " + " | ".join(["---"] * len(df.columns)) + " |\n"
+    # 数据行
+    rows = "\n".join(
+        "| " + " | ".join(str(val) for val in row) + " |"
+        for row in df.to_numpy()  # 或 df.values
+    )
+    content = header + separator + rows
+    return build_markdown_element_json(content)
 
+
+def build_collapsible_panel_element_json(elements: List[Dict[str, Any]], title: str) -> Dict[str, Any]:
     return {
-        "schema": "2.0",
+        "tag": "collapsible_panel",
+        "expanded": False,
         "header": {
             "title": {
+                "tag": "plain_text",
                 "content": title
             },
-            "template": header_template[header_template_index % len(header_template)],
+            "vertical_align": "center",
+        },
+        "border": {
+            "color": "grey",
+            "corner_radius": "5px"
         },
+        "elements": elements
+    }
+
+
+def build_hr_element_json() -> Dict[str, Any]:
+    return {
+        "tag": "hr",
+        "element_id": "custom_id",
+        "margin": "10px 0px"
+    }
+
+
+def build_sub_title_json(content: str) -> Dict[str, Any]:
+    return {
+        "tag": "plain_text",
+        "content": content
+    }
+
+
+def build_header_json(content: str, template: str = 'green', sub_title: Dict[str, Any] = None) -> Dict[str, Any]:
+    return {
+        "title": {
+            "content": content
+        },
+        "template": template,
+        "subtitle": sub_title
+    }
+
+
+def build_card_json(body_elements: List[Dict[str, Any]], header: Dict[str, Any], config: Dict[str, Any] = None) -> Dict[str, Any]:
+    return {
+        "schema": "2.0",
+        "config": config,
+        "header": header,
         "body": {
-            "elements": [
-                {
-                    "tag": "table",
-                    "element_id": "table_detail",
-                    "margin": "0px 0px 0px 0px",
-                    "page_size": 10,
-                    "header_style": {
-                        "text_align": "center",
-                        "text_size": "normal",
-                        "background_style": "none",
-                        "text_color": "grey",
-                        "bold": True,
-                        "lines": 1
-                    },
-                    "columns": columns,
-                    "rows": rows
-                }
-            ]
+            "elements": body_elements
         }
     }
 
 
-def task_exe_step_stat_query(ts: int, workflow_id: str):
+def workflow_task_status_stat(ts: int, workflow_id: str) -> List[Dict[str, Any]]:
+    sql = f"""
+            select workflow_id             as 'workflow_id',
+                   task_status             as '任务状态',
+                   count(distinct task_id) as '任务数',
+                   count(task_input)       as '需求数'
+            from (
+                     select workflow_id         as workflow_id,
+                            task_id             as task_id,
+                            task_input          as task_input,
+                            case
+                                when task_status = 0 then '初始化'
+                                when task_status = 1 then '运行中'
+                                when task_status = 2 then '成功'
+                                when task_status = 3 then '失败'
+                                else '未知' end AS task_status
+                     from supply_workflow_task
+                     where workflow_id = '{workflow_id}'
+                       and create_timestamp >= {ts}
+                 ) as t
+            group by t.task_status, t.workflow_id;
+            """
+    result = mysql_helper.execute_query(sql)
+    return result
+
+
+def task_exe_step_stat_query(ts: int, workflow_id: str) -> List[Dict[str, Any]]:
     sql = f'''
             select step_name AS '步骤名称',
                    status    as '执行状态',
@@ -158,7 +204,7 @@ def task_exe_step_stat_query(ts: int, workflow_id: str):
     return mysql_helper.execute_query(sql)
 
 
-def workflow_status_stat_query(ts: int, workflow_id: str):
+def crawler_and_produce_stat_query(ts: int, workflow_id: str) -> List[Dict[str, Any]]:
     sql = f'''
             select t.step      as '阶段',
                    t.status    as '状态',
@@ -246,24 +292,49 @@ def workflow_status_stat_query(ts: int, workflow_id: str):
 def main():
     global header_template_index
     today_midnight = datetime.now().replace(hour=0, minute=0, second=0, microsecond=0)
+
+    start_dt_str = today_midnight.strftime('%Y-%m-%d %H:%M:%S')
+    end_dt_str = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
     timestamp_ms = int(today_midnight.timestamp() * 1000)
     workflows = mysql_helper.execute_query('select id, name from supply_workflow;')
     for workflow in workflows:
+        task_status_stat = workflow_task_status_stat(timestamp_ms, workflow['id'])
+
         exe_step_stat = task_exe_step_stat_query(timestamp_ms, workflow['id'])
-        status_stat = workflow_status_stat_query(timestamp_ms, workflow['id'])
+        status_stat = crawler_and_produce_stat_query(timestamp_ms, workflow['id'])
 
-        df = pd.DataFrame(exe_step_stat)
+        # 构建标题
+        template = header_template[header_template_index % len(header_template)]
+        sub_title = build_sub_title_json(f"{start_dt_str} - {end_dt_str}")
+        header = build_header_json(f"【workflow-{workflow['name']}】", template, sub_title)
 
-        df_map = {
-            "各步骤执行情况统计": pd.DataFrame(exe_step_stat),
-            "爬取和生成情况统计": pd.DataFrame(status_stat),
-        }
+        # 构建下文内容
+        task_status_df = pd.DataFrame(task_status_stat)
+        task_info = f"**总任务数**: {task_status_df['任务数'].sum()}" + \
+                    "".join([
+                        f"\n**{item['任务状态']}任务数**: {item['任务数']}"
+                        for item in task_status_df.to_dict(orient='records')
+                    ])
+
+        elements = [
+            build_markdown_element_json(task_info),
+            build_hr_element_json(),
+            build_table_element_json(pd.DataFrame(exe_step_stat)),
+            build_hr_element_json(),
+            build_table_element_json(pd.DataFrame(status_stat)),
+        ]
+
+        config = build_config_json()
+
+        card_json = build_card_json(elements, header, config)
 
         feishu_inform_util.send_card_msg_to_feishu(
             webhook=fei_shu_webhook,
-            card_json=build_collapsible_panel_table_json(df_map, f"【workflow-{workflow['name']}】数据统计")
+            card_json=card_json
         )
 
+        break
+
 
 if __name__ == '__main__':
     main()