| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111 |
- from datetime import datetime
- from typing import Dict, List, Any
- import pandas as pd
- from helper.MySQLHelper import MySQLHelper
- from util import feishu_inform_util
- fei_shu_webhook = "https://open.feishu.cn/open-apis/bot/v2/hook/c09712a8-22cd-4bfa-93a5-30ae7b1db11b"
- mysql_helper = MySQLHelper(
- host="rm-t4na9qj85v7790tf84o.mysql.singapore.rds.aliyuncs.com",
- username="readonly",
- password="HdkZ4TDmeK6SQ3BRtJBk",
- database="aigc-admin-prod"
- )
- def build_card_join(content: str):
- return {
- "schema": "2.0",
- "header": {
- "title": {
- "content": "任务执行步骤统计"
- },
- "template": "green",
- },
- "body": {
- "elements": [
- {
- "tag": "markdown",
- "element_id": "detail",
- "margin": "0px 0px 0px 0px",
- "content": content,
- "text_size": "normal",
- "text_align": "left"
- }
- ]
- }
- }
- def df_to_markdown_table(df: pd.DataFrame) -> str:
- """将DataFrame转换为标准Markdown表格字符串"""
- headers = list(df.columns)
- str_rows = df.astype(str).values
- col_widths = []
- for i, h in enumerate(headers):
- max_w = len(str(h))
- for row in str_rows:
- max_w = max(max_w, len(str(row[i])))
- col_widths.append(max_w)
- def _row(values):
- cells = [str(v).ljust(col_widths[i]) for i, v in enumerate(values)]
- return '| ' + ' | '.join(cells) + ' |'
- sep = '| ' + ' | '.join('-' * w for w in col_widths) + ' |'
- lines = [_row(headers), sep]
- for row in str_rows:
- lines.append(_row(row))
- return '\n'.join(lines)
- def task_exe_step_stat(ts: int) -> List[Dict[str, Any]]:
- sql = f'''
- select step_name AS "步骤名称",
- case
- when status = 0 then '初始化'
- when status = 1 then '运行中'
- when status = 2 then '成功'
- when status = 3 then '失败'
- else '未知'
- end AS '执行状态',
- case
- when error_msg like '%Data too long%' then '数据超过字段长度限制'
- when error_msg like '%Deadlock%' then '数据库死锁'
- when error_msg = '' then ''
- when error_msg is null then ''
- else error_msg
- end AS '错误原因',
- cnt AS '个数'
- from (
- select step_name, status, error_msg, count(1) as cnt
- from supply_workflow_task_exe_step
- where create_timestamp >= {ts}
- group by step_name, status, error_msg
- ) as t
- '''
- return mysql_helper.execute_query(sql)
- def main():
- today_midnight = datetime.now().replace(hour=0, minute=0, second=0, microsecond=0)
- timestamp_ms = int(today_midnight.timestamp() * 1000)
- stat = task_exe_step_stat(timestamp_ms)
- df = pd.DataFrame(stat)
- print("当日任务步骤执行统计")
- msg = df_to_markdown_table(df)
- feishu_inform_util.send_card_msg_to_feishu(
- webhook=fei_shu_webhook,
- card_json=build_card_join(msg)
- )
- if __name__ == '__main__':
- main()
|