Browse Source

feat:修改监控

zhaohaipeng 7 giờ trước cách đây
mục cha
commit
9374a784bc
1 tập tin đã thay đổi với 86 bổ sung66 xóa
  1. 86 66
      monitor/supply_workflow_monitor.py

+ 86 - 66
monitor/supply_workflow_monitor.py

@@ -277,71 +277,91 @@ def task_exe_step_stat_query(ts: int, workflow_id: str) -> List[Dict[str, Any]]:
 
 
 def crawler_and_produce_stat_query(ts: int, workflow_id: str) -> List[Dict[str, Any]]:
 def crawler_and_produce_stat_query(ts: int, workflow_id: str) -> List[Dict[str, Any]]:
     sql = f'''
     sql = f'''
-            select t.step      as '阶段',
-                   t.status    as '状态',
-                   t.error_msg as '失败原因',
-                   t.task_cnt  as '任务数',
-                   t.cnt       as '记录数'
-            from (
-                     select '创建爬取计划'                  as step,
-                            workflow_id                     as workflow_id,
-                            status                          as status,
-                            error_msg                       as error_msg,
-                            count(distinct task_id)         as task_cnt,
-                            count(distinct crawler_plan_id) as cnt
-                     from (
-                              select swt.workflow_id       as workflow_id,
-                                     swt.task_id           as task_id,
-                                     swcpr.crawler_plan_id as crawler_plan_id,
-                                     case
-                                         when swcpr.status = 0 then '初始化'
-                                         when swcpr.status = 1 then '运行中'
-                                         when swcpr.status = 2 then '成功'
-                                         when swcpr.status = 3 then '失败' 
-                                         else '未知' end   AS status,
-                                     case
-                                         when swcpr.error_msg like '%Data too long%' then '数据超过字段长度限制'
-                                         when swcpr.error_msg like '%Deadlock%' then '数据库死锁'
-                                         when (swcpr.error_msg <> '' AND swcpr.error_msg is not null) then substring_index(swcpr.error_msg, ',', 1) 
-                                         else '' 
-                                         end               AS error_msg
-                              from supply_workflow_task swt
-                                       left join supply_workflow_crawler_plan_record swcpr
-                                                 on swt.task_id = swcpr.task_id
-                              where swt.create_timestamp >= {ts}
-                          ) as t
-                     group by workflow_id, status, error_msg
-                     union all
-                     select '生成计划绑定'          as step,
-                            workflow_id             as workflow_id,
-                            status                  as status,
-                            error_msg               as error_msg,
-                            count(distinct task_id) as task_cnt,
-                            count(1)                as cnt
-                     from (
-                              select swt.workflow_id     as workflow_id,
-                                     swt.task_id         as task_id,
-                                     case
-                                         when swppr.status = 0 then '初始化'
-                                         when swppr.status = 1 then '运行中'
-                                         when swppr.status = 2 then '成功'
-                                         when swppr.status = 3 then '失败'
-                                         else '未知' end AS status,
-                                     case
-                                         when swppr.error_msg like '%Data too long%' then '数据超过字段长度限制'
-                                         when swppr.error_msg like '%Deadlock%' then '数据库死锁'
-                                         when (swppr.error_msg <> '' and swppr.error_msg is not null) then substring_index(swppr.error_msg, ',', 1)
-                                         else ''
-                                         end             AS error_msg
-                              from supply_workflow_task swt
-                                       left join supply_workflow_produce_bind_record swppr
-                                                 on swt.task_id = swppr.task_id
-                              where swt.create_timestamp >= {ts}
-                          ) as t
-                     group by workflow_id, status, error_msg
-                 ) as t
-                     left join supply_workflow sw on t.workflow_id = sw.id
-            where workflow_id = '{workflow_id}';
+            SELECT t.step      AS '阶段',
+                   t.status    AS '状态',
+                   t.error_msg AS '失败原因',
+                   t.task_cnt  AS '任务数',
+                   t.cnt       AS '记录数'
+            FROM (
+                     SELECT '创建爬取计划'                  AS step,
+                            workflow_id                     AS workflow_id,
+                            status                          AS status,
+                            error_msg                       AS error_msg,
+                            COUNT(DISTINCT task_id)         AS task_cnt,
+                            COUNT(DISTINCT crawler_plan_id) AS cnt
+                     FROM (
+                              SELECT swt.workflow_id       AS workflow_id,
+                                     swt.task_id           AS task_id,
+                                     swcpr.crawler_plan_id AS crawler_plan_id,
+                                     CASE
+                                         WHEN swcpr.status = 0 THEN '初始化'
+                                         WHEN swcpr.status = 1 THEN '运行中'
+                                         WHEN swcpr.status = 2 THEN '成功'
+                                         WHEN swcpr.status = 3 THEN '失败'
+                                         WHEN swt.task_status = 0 THEN '初始化'
+                                         WHEN swt.task_status = 1 THEN '运行中'
+                                         WHEN swt.task_status = 2 THEN '成功'
+                                         WHEN swt.task_status = 3 THEN '失败'
+                                         ELSE '未知' END   AS status,
+                                     CASE
+                                         WHEN swcpr.status = 2 THEN ''
+                                         WHEN swcpr.error_msg LIKE '%Data too long%' THEN '数据超过字段长度限制'
+                                         WHEN swcpr.error_msg LIKE '%Deadlock%' THEN '数据库死锁'
+                                         WHEN (swcpr.error_msg <> '' AND swcpr.error_msg IS NOT NULL)
+                                             THEN SUBSTRING_INDEX(swcpr.error_msg, ',', 1)
+                                         WHEN swt.error_msg LIKE '%Data too long%' THEN '数据超过字段长度限制'
+                                         WHEN swt.error_msg LIKE '%Deadlock%' THEN '数据库死锁'
+                                         WHEN (swt.error_msg <> '' AND swt.error_msg IS NOT NULL)
+                                             THEN SUBSTRING_INDEX(swt.error_msg, ',', 1)
+                                         ELSE ''
+                                         END               AS error_msg
+                              FROM supply_workflow_task swt
+                                       LEFT JOIN supply_workflow_crawler_plan_record swcpr
+                                                 ON swt.task_id = swcpr.task_id
+                              WHERE swt.create_timestamp >= {ts}
+                          ) AS t
+                     GROUP BY workflow_id, status, error_msg
+                     UNION ALL
+                     SELECT '生成计划绑定'          AS step,
+                            workflow_id             AS workflow_id,
+                            status                  AS status,
+                            error_msg               AS error_msg,
+                            COUNT(DISTINCT task_id) AS task_cnt,
+                            COUNT(1)                AS cnt
+                     FROM (
+                              SELECT swt.workflow_id     AS workflow_id,
+                                     swt.task_id         AS task_id,
+                                     CASE
+                                         WHEN swppr.status = 0 THEN '初始化'
+                                         WHEN swppr.status = 1 THEN '运行中'
+                                         WHEN swppr.status = 2 THEN '成功'
+                                         WHEN swppr.status = 3 THEN '失败'
+                                         WHEN swt.task_status = 0 THEN '初始化'
+                                         WHEN swt.task_status = 1 THEN '运行中'
+                                         WHEN swt.task_status = 2 THEN '成功'
+                                         WHEN swt.task_status = 3 THEN '失败'
+                                         ELSE '未知' END AS status,
+                                     CASE
+                                         WHEN swppr.status = 2 THEN ''
+                                         WHEN swppr.error_msg LIKE '%Data too long%' THEN '数据超过字段长度限制'
+                                         WHEN swppr.error_msg LIKE '%Deadlock%' THEN '数据库死锁'
+                                         WHEN (swppr.error_msg <> '' AND swppr.error_msg IS NOT NULL)
+                                             THEN SUBSTRING_INDEX(swppr.error_msg, ',', 1)
+                                         WHEN swt.error_msg LIKE '%Data too long%' THEN '数据超过字段长度限制'
+                                         WHEN swt.error_msg LIKE '%Deadlock%' THEN '数据库死锁'
+                                         WHEN (swt.error_msg <> '' AND swt.error_msg IS NOT NULL)
+                                             THEN SUBSTRING_INDEX(swt.error_msg, ',', 1)
+                                         ELSE ''
+                                         END             AS error_msg
+                              FROM supply_workflow_task swt
+                                       LEFT JOIN supply_workflow_produce_bind_record swppr
+                                                 ON swt.task_id = swppr.task_id
+                              WHERE swt.create_timestamp >= {ts}
+                          ) AS t
+                     GROUP BY workflow_id, status, error_msg
+                 ) AS t
+                     LEFT JOIN supply_workflow sw ON t.workflow_id = sw.id
+            WHERE workflow_id = '{workflow_id}';
             '''
             '''
     return mysql_helper.execute_query(sql)
     return mysql_helper.execute_query(sql)
 
 
@@ -418,7 +438,7 @@ def workflow_dashboard_monitor():
     if dashboard_stat is not None:
     if dashboard_stat is not None:
         dashboard_stat['header_style'] = {
         dashboard_stat['header_style'] = {
             "text_size": "normal",
             "text_size": "normal",
-            "text_align":"center",
+            "text_align": "center",
         }
         }
         elements.append(dashboard_stat)
         elements.append(dashboard_stat)
     else:
     else: