Browse Source

feat:添加supply workflow监控

zhaohaipeng 1 week ago
parent
commit
64cc4c0998
1 changed files with 35 additions and 51 deletions
  1. 35 51
      monitor/supply_workflow_monitor.py

+ 35 - 51
monitor/supply_workflow_monitor.py

@@ -1,4 +1,3 @@
-import unicodedata
 from datetime import datetime
 from typing import Dict, List, Any
 
@@ -17,66 +16,50 @@ mysql_helper = MySQLHelper(
 )
 
 
-def _display_width(s: str) -> int:
-    """计算字符串在终端中的显示宽度,CJK字符占2宽度,其余占1"""
-    w = 0
-    for ch in s:
-        w += 2 if unicodedata.east_asian_width(ch) in ('F', 'W', 'A') else 1
-    return w
-
-
-def _pad_to_width(s: str, target_width: int) -> str:
-    """将字符串右侧填充空格至指定的显示宽度"""
-    return s + ' ' * (target_width - _display_width(s))
-
-
-def print_df_table(df: pd.DataFrame, fmt: str = "grid") -> str:
-    """将DataFrame转换为对齐的二维表格字符串,自适应中英文混排宽度
-
-    Args:
-        df: pandas DataFrame
-        fmt: 输出格式,'grid' 为终端对齐表格,'markdown' 为 Markdown 表格(适合飞书渲染)
-    """
+def build_card_join(content: str):
+    return {
+        "schema": "2.0",
+        "header": {
+            "title": {
+                "content": "任务执行步骤统计"
+            },
+            "template": "green",
+        },
+        "body": {
+            "elements": [
+                {
+                    "tag": "markdown",
+                    "element_id": "detail",
+                    "margin": "0px 0px 0px 0px",
+                    "content": content,
+                    "text_size": "normal",
+                    "text_align": "left"
+                }
+            ]
+        }
+    }
+
+
+def df_to_markdown_table(df: pd.DataFrame) -> str:
+    """将DataFrame转换为标准Markdown表格字符串"""
     headers = list(df.columns)
     str_rows = df.astype(str).values
 
-    if fmt == "markdown":
-        char_widths = []
-        for i, h in enumerate(headers):
-            max_w = len(str(h))
-            for row in str_rows:
-                max_w = max(max_w, len(str(row[i])))
-            char_widths.append(max_w)
-
-        def _md_cell(v, w):
-            return str(v).ljust(w)
-
-        header_cells = [_md_cell(str(h), char_widths[i]) for i, h in enumerate(headers)]
-        sep_cells = ['-' * w for w in char_widths]
-        lines = ['| ' + ' | '.join(header_cells) + ' |',
-                 '| ' + ' | '.join(sep_cells) + ' |']
-        for row in str_rows:
-            cells = [_md_cell(str(row[i]), char_widths[i]) for i in range(len(headers))]
-            lines.append('| ' + ' | '.join(cells) + ' |')
-        return '\n'.join(lines)
-
     col_widths = []
     for i, h in enumerate(headers):
-        max_w = _display_width(str(h))
+        max_w = len(str(h))
         for row in str_rows:
-            max_w = max(max_w, _display_width(str(row[i])))
+            max_w = max(max_w, len(str(row[i])))
         col_widths.append(max_w)
 
-    sep = '+' + '+'.join('-' * (w + 2) for w in col_widths) + '+'
-
     def _row(values):
-        cells = [_pad_to_width(str(v), col_widths[i]) for i, v in enumerate(values)]
+        cells = [str(v).ljust(col_widths[i]) for i, v in enumerate(values)]
         return '| ' + ' | '.join(cells) + ' |'
 
-    lines = [sep, _row(headers), sep]
+    sep = '| ' + ' | '.join('-' * w for w in col_widths) + ' |'
+    lines = [_row(headers), sep]
     for row in str_rows:
         lines.append(_row(row))
-    lines.append(sep)
     return '\n'.join(lines)
 
 
@@ -94,7 +77,8 @@ def task_exe_step_stat(ts: int) -> List[Dict[str, Any]]:
                when error_msg like '%Data too long%' then '数据超过字段长度限制'
                when error_msg like '%Deadlock%' then '数据库死锁'
                when error_msg = '' then ''
-               else '其他错误'
+               when error_msg is null then ''
+               else error_msg
                end AS '错误原因',
            cnt AS '个数'
     from (
@@ -115,11 +99,11 @@ def main():
 
     print("当日任务步骤执行统计")
 
-    msg = print_df_table(df, fmt="grid")
+    msg = df_to_markdown_table(df)
 
     feishu_inform_util.send_card_msg_to_feishu(
         webhook=fei_shu_webhook,
-        card_json=feishu_inform_util.build_card_json(msg, "当日任务步骤执行统计")
+        card_json=build_card_join(msg)
     )