from datetime import datetime from typing import Dict, List, Any import pandas as pd from helper.MySQLHelper import MySQLHelper from util import feishu_inform_util fei_shu_webhook = "https://open.feishu.cn/open-apis/bot/v2/hook/c09712a8-22cd-4bfa-93a5-30ae7b1db11b" mysql_helper = MySQLHelper( host="rm-t4na9qj85v7790tf84o.mysql.singapore.rds.aliyuncs.com", username="readonly", password="HdkZ4TDmeK6SQ3BRtJBk", database="aigc-admin-prod" ) def build_card_join(content: str): return { "schema": "2.0", "header": { "title": { "content": "任务执行步骤统计" }, "template": "green", }, "body": { "elements": [ { "tag": "markdown", "element_id": "detail", "margin": "0px 0px 0px 0px", "content": content, "text_size": "normal", "text_align": "left" } ] } } def df_to_markdown_table(df: pd.DataFrame) -> str: """将DataFrame转换为标准Markdown表格字符串""" headers = list(df.columns) str_rows = df.astype(str).values col_widths = [] for i, h in enumerate(headers): max_w = len(str(h)) for row in str_rows: max_w = max(max_w, len(str(row[i]))) col_widths.append(max_w) def _row(values): cells = [str(v).ljust(col_widths[i]) for i, v in enumerate(values)] return '| ' + ' | '.join(cells) + ' |' sep = '| ' + ' | '.join('-' * w for w in col_widths) + ' |' lines = [_row(headers), sep] for row in str_rows: lines.append(_row(row)) return '\n'.join(lines) def task_exe_step_stat(ts: int) -> List[Dict[str, Any]]: sql = f''' select step_name AS "步骤名称", status as '执行状态', error_msg as '错误原因', count(1) AS '个数' from ( select step_name, case when status = 0 then '初始化' when status = 1 then '运行中' when status = 2 then '成功' when status = 3 then '失败' else '未知' end AS status, case when error_msg like '%Data too long%' then '数据超过字段长度限制' when error_msg like '%Deadlock%' then '数据库死锁' when (error_msg = '' or error_msg is null) then '' else substring_index(error_msg, ',', 1) end AS error_msg from supply_workflow_task_exe_step where create_timestamp >= {ts} ) as t group by step_name, status, error_msg ''' return mysql_helper.execute_query(sql) def main(): today_midnight = datetime.now().replace(hour=0, minute=0, second=0, microsecond=0) timestamp_ms = int(today_midnight.timestamp() * 1000) stat = task_exe_step_stat(timestamp_ms) df = pd.DataFrame(stat) print("当日任务步骤执行统计") msg = df_to_markdown_table(df) feishu_inform_util.send_card_msg_to_feishu( webhook=fei_shu_webhook, card_json=build_card_join(msg) ) if __name__ == '__main__': main()