瀏覽代碼

feat:修改监控

zhaohaipeng 6 天之前
父節點
當前提交
443c326f82
共有 1 個文件被更改,包括 121 次插入40 次删除
  1. 121 40
      monitor/supply_workflow_monitor.py

+ 121 - 40
monitor/supply_workflow_monitor.py

@@ -68,51 +68,132 @@ def find_all_workflow() -> List[Dict[str, Any]]:
     return mysql_helper.execute_query(sql)
 
 
-def task_exe_step_stat(ts: int):
-    workflows = find_all_workflow()
-    for workflow in workflows:
-        sql = f'''
-                    select step_name AS "步骤名称",
-                           status    as '执行状态',
-                           error_msg as '错误原因',
-                           count(1)  AS '个数'
-                    from (
-                             select swt.task_id,
-                                    step_name,
-                                    case
-                                        when status = 0 then '初始化'
-                                        when status = 1 then '运行中'
-                                        when status = 2 then '成功'
-                                        when status = 3 then '失败'
-                                        else '未知' end AS status,
-                                    case
-                                        when swtes.error_msg like '%Data too long%' then '数据超过字段长度限制'
-                                        when swtes.error_msg like '%Deadlock%' then '数据库死锁'
-                                        when (swtes.error_msg = '' or swtes.error_msg is null) then ''
-                                        else substring_index(swtes.error_msg, ',', 1)
-                                        end             AS error_msg
-                             from supply_workflow_task swt
-                                      left join supply_workflow_task_exe_step swtes on swt.task_id = swtes.task_id
-                             where swt.create_timestamp >= {ts}
-                               and swt.workflow_id = {workflow['id']}
-                         ) as t
-                    group by step_name, status, error_msg;
-                '''
-        stat = mysql_helper.execute_query(sql)
-        df = pd.DataFrame(stat)
-
-        msg = df_to_markdown_table(df)
-
-        feishu_inform_util.send_card_msg_to_feishu(
-            webhook=fei_shu_webhook,
-            card_json=build_card_join(msg, f"【workflow-{workflow['name']}】各步骤执行情况")
-        )
+def task_exe_step_stat(ts: int, workflow_id: str, workflow_name: str):
+    sql = f'''
+            select step_name AS '步骤名称',
+                   status    as '执行状态',
+                   error_msg as '错误原因',
+                   count(1)  AS '个数'
+            from (
+                     select swt.task_id,
+                            step_name,
+                            case
+                                when status = 0 then '初始化'
+                                when status = 1 then '运行中'
+                                when status = 2 then '成功'
+                                when status = 3 then '失败'
+                                else '未知' end AS status,
+                            case
+                                when swtes.error_msg like '%Data too long%' then '数据超过字段长度限制'
+                                when swtes.error_msg like '%Deadlock%' then '数据库死锁'
+                                when (swtes.error_msg = '' or swtes.error_msg is null) then ''
+                                else substring_index(swtes.error_msg, ',', 1)
+                                end             AS error_msg
+                     from supply_workflow_task swt
+                              left join supply_workflow_task_exe_step swtes on swt.task_id = swtes.task_id
+                     where swt.create_timestamp >= {ts}
+                       and swt.workflow_id = {workflow_id}
+                 ) as t
+            group by step_name, status, error_msg;
+        '''
+    stat = mysql_helper.execute_query(sql)
+    df = pd.DataFrame(stat)
+
+    msg = df_to_markdown_table(df)
+
+    feishu_inform_util.send_card_msg_to_feishu(
+        webhook=fei_shu_webhook,
+        card_json=build_card_join(msg, f"【workflow-{workflow_name}】各步骤执行情况")
+    )
+
+
+def workflow_status_stat(ts: int, workflow_id: str, workflow_name: str):
+    sql = f'''
+            select sw.name     as '策略名',
+                   t.step      as '阶段',
+                   t.status    as '状态',
+                   t.error_msg as '失败原因',
+                   t.task_cnt  as '任务数',
+                   t.cnt       as '记录数'
+            from (
+                     select '创建爬取计划'                  as step,
+                            workflow_id                     as workflow_id,
+                            status                          as status,
+                            error_msg                       as error_msg,
+                            count(distinct task_id)         as task_cnt,
+                            count(distinct crawler_plan_id) as cnt
+                     from (
+                              select swt.workflow_id       as workflow_id,
+                                     swt.task_id           as task_id,
+                                     swcpr.crawler_plan_id as crawler_plan_id,
+                                     case
+                                         when swcpr.status = 0 then '初始化'
+                                         when swcpr.status = 1 then '运行中'
+                                         when swcpr.status = 2 then '成功'
+                                         when swcpr.status = 3 then '失败'
+                                         else '未知' end   AS status,
+                                     case
+                                         when swcpr.error_msg like '%Data too long%' then '数据超过字段长度限制'
+                                         when swcpr.error_msg like '%Deadlock%' then '数据库死锁'
+                                         when (swcpr.error_msg = '' or swcpr.error_msg is null) then ''
+                                         else substring_index(swcpr.error_msg, ',', 1)
+                                         end               AS error_msg
+                              from supply_workflow_task swt
+                                       left join supply_workflow_crawler_plan_record swcpr
+                                                 on swt.task_id = swcpr.task_id
+                              where swt.create_timestamp >= {ts}
+                          ) as t
+                     group by workflow_id, status, error_msg
+                     union all
+                     select '生成计划绑定'          as step,
+                            workflow_id             as workflow_id,
+                            status                  as status,
+                            error_msg               as error_msg,
+                            count(distinct task_id) as task_cnt,
+                            count(1)                as cnt
+                     from (
+                              select swt.workflow_id     as workflow_id,
+                                     swt.task_id         as task_id,
+                                     case
+                                         when swppr.status = 0 then '初始化'
+                                         when swppr.status = 1 then '运行中'
+                                         when swppr.status = 2 then '成功'
+                                         when swppr.status = 3 then '失败'
+                                         else '未知' end AS status,
+                                     case
+                                         when swppr.error_msg like '%Data too long%' then '数据超过字段长度限制'
+                                         when swppr.error_msg like '%Deadlock%' then '数据库死锁'
+                                         when (swppr.error_msg = '' or swppr.error_msg is null) then ''
+                                         else substring_index(swppr.error_msg, ',', 1)
+                                         end             AS error_msg
+                              from supply_workflow_task swt
+                                       left join supply_workflow_produce_bind_record swppr
+                                                 on swt.task_id = swppr.task_id
+                              where swt.create_timestamp >= {ts}
+                          ) as t
+                     group by workflow_id, status, error_msg
+                 ) as t
+                     left join supply_workflow sw on t.workflow_id = sw.id
+            where workflow_id = '{workflow_id}';
+            '''
+    stat = mysql_helper.execute_query(sql)
+    df = pd.DataFrame(stat)
+
+    msg = df_to_markdown_table(df)
+
+    feishu_inform_util.send_card_msg_to_feishu(
+        webhook=fei_shu_webhook,
+        card_json=build_card_join(msg, f"【workflow-{workflow_name}】爬取和生成详情")
+    )
 
 
 def main():
     today_midnight = datetime.now().replace(hour=0, minute=0, second=0, microsecond=0)
     timestamp_ms = int(today_midnight.timestamp() * 1000)
-    task_exe_step_stat(timestamp_ms)
+    workflows = find_all_workflow()
+    for workflow in workflows:
+        task_exe_step_stat(timestamp_ms, workflow['id'], workflow['name'])
+        workflow_status_stat(timestamp_ms, workflow['id'], workflow['name'])
 
 
 if __name__ == '__main__':