supply_workflow_monitor.py 4.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127
  1. import unicodedata
  2. from datetime import datetime
  3. from typing import Dict, List, Any
  4. import pandas as pd
  5. from helper.MySQLHelper import MySQLHelper
  6. from util import feishu_inform_util
  7. fei_shu_webhook = "https://open.feishu.cn/open-apis/bot/v2/hook/c09712a8-22cd-4bfa-93a5-30ae7b1db11b"
  8. mysql_helper = MySQLHelper(
  9. host="rm-t4na9qj85v7790tf84o.mysql.singapore.rds.aliyuncs.com",
  10. username="readonly",
  11. password="HdkZ4TDmeK6SQ3BRtJBk",
  12. database="aigc-admin-prod"
  13. )
  14. def _display_width(s: str) -> int:
  15. """计算字符串在终端中的显示宽度,CJK字符占2宽度,其余占1"""
  16. w = 0
  17. for ch in s:
  18. w += 2 if unicodedata.east_asian_width(ch) in ('F', 'W', 'A') else 1
  19. return w
  20. def _pad_to_width(s: str, target_width: int) -> str:
  21. """将字符串右侧填充空格至指定的显示宽度"""
  22. return s + ' ' * (target_width - _display_width(s))
  23. def print_df_table(df: pd.DataFrame, fmt: str = "grid") -> str:
  24. """将DataFrame转换为对齐的二维表格字符串,自适应中英文混排宽度
  25. Args:
  26. df: pandas DataFrame
  27. fmt: 输出格式,'grid' 为终端对齐表格,'markdown' 为 Markdown 表格(适合飞书渲染)
  28. """
  29. headers = list(df.columns)
  30. str_rows = df.astype(str).values
  31. if fmt == "markdown":
  32. char_widths = []
  33. for i, h in enumerate(headers):
  34. max_w = len(str(h))
  35. for row in str_rows:
  36. max_w = max(max_w, len(str(row[i])))
  37. char_widths.append(max_w)
  38. def _md_cell(v, w):
  39. return str(v).ljust(w)
  40. header_cells = [_md_cell(str(h), char_widths[i]) for i, h in enumerate(headers)]
  41. sep_cells = ['-' * w for w in char_widths]
  42. lines = ['| ' + ' | '.join(header_cells) + ' |',
  43. '| ' + ' | '.join(sep_cells) + ' |']
  44. for row in str_rows:
  45. cells = [_md_cell(str(row[i]), char_widths[i]) for i in range(len(headers))]
  46. lines.append('| ' + ' | '.join(cells) + ' |')
  47. return '\n'.join(lines)
  48. col_widths = []
  49. for i, h in enumerate(headers):
  50. max_w = _display_width(str(h))
  51. for row in str_rows:
  52. max_w = max(max_w, _display_width(str(row[i])))
  53. col_widths.append(max_w)
  54. sep = '+' + '+'.join('-' * (w + 2) for w in col_widths) + '+'
  55. def _row(values):
  56. cells = [_pad_to_width(str(v), col_widths[i]) for i, v in enumerate(values)]
  57. return '| ' + ' | '.join(cells) + ' |'
  58. lines = [sep, _row(headers), sep]
  59. for row in str_rows:
  60. lines.append(_row(row))
  61. lines.append(sep)
  62. return '\n'.join(lines)
  63. def task_exe_step_stat(ts: int) -> List[Dict[str, Any]]:
  64. sql = f'''
  65. select step_name AS "步骤名称",
  66. case
  67. when status = 0 then '初始化'
  68. when status = 1 then '运行中'
  69. when status = 2 then '成功'
  70. when status = 3 then '失败'
  71. else '未知'
  72. end AS '执行状态',
  73. case
  74. when error_msg like '%Data too long%' then '数据超过字段长度限制'
  75. when error_msg like '%Deadlock%' then '数据库死锁'
  76. when error_msg = '' then ''
  77. else '其他错误'
  78. end AS '错误原因',
  79. cnt AS '个数'
  80. from (
  81. select step_name, status, error_msg, count(1) as cnt
  82. from supply_workflow_task_exe_step
  83. where create_timestamp >= {ts}
  84. group by step_name, status, error_msg
  85. ) as t
  86. '''
  87. return mysql_helper.execute_query(sql)
  88. def main():
  89. today_midnight = datetime.now().replace(hour=0, minute=0, second=0, microsecond=0)
  90. timestamp_ms = int(today_midnight.timestamp() * 1000)
  91. stat = task_exe_step_stat(timestamp_ms)
  92. df = pd.DataFrame(stat)
  93. print("当日任务步骤执行统计")
  94. msg = print_df_table(df, fmt="grid")
  95. feishu_inform_util.send_card_msg_to_feishu(
  96. webhook=fei_shu_webhook,
  97. card_json=feishu_inform_util.build_card_json(msg, "当日任务步骤执行统计")
  98. )
  99. if __name__ == '__main__':
  100. main()