from datetime import datetime from typing import Dict, Any import pandas import pandas as pd from helper.MySQLHelper import MySQLHelper from util import feishu_inform_util fei_shu_webhook = "https://open.feishu.cn/open-apis/bot/v2/hook/c09712a8-22cd-4bfa-93a5-30ae7b1db11b" mysql_helper = MySQLHelper( host="rm-t4na9qj85v7790tf84o.mysql.singapore.rds.aliyuncs.com", username="readonly", password="HdkZ4TDmeK6SQ3BRtJBk", database="aigc-admin-prod" ) column_width_map = { "状态": "80px", "任务数": "80px", "记录数": "80px", } header_template = ["blue", "wathet", "turquoise", "green", "yellow", "orange"] header_template_index = 0 def build_collapsible_panel_table_json(df_map: Dict[str, pandas.DataFrame], title: str) -> Dict[str, Any]: elements = [] for key, df in df_map.items(): # 表头 header = "| " + " | ".join(df.columns) + " |\n" # 分隔线 separator = "| " + " | ".join(["---"] * len(df.columns)) + " |\n" # 数据行 rows = "\n".join( "| " + " | ".join(str(val) for val in row) + " |" for row in df.to_numpy() # 或 df.values ) content = header + separator + rows elements.append({ "tag": "collapsible_panel", "expanded": False, "header": { "title": { "tag": "plain_text", "content": key }, "vertical_align": "center", "template": header_template[header_template_index % len(header_template)], }, "border": { "color": "grey", "corner_radius": "5px" }, "elements": [ { "tag": "markdown", "content": content } ] }) return { "schema": "2.0", "header": { "title": { "content": title }, "template": header_template[header_template_index % len(header_template)], }, "body": { "elements": elements } } def build_table_json(df: pandas.DataFrame, title: str) -> Dict[str, Any]: columns = [] for column in df.columns: columns.append({ "name": column, "display_name": column, "width": column_width_map.get(column, "auto"), "data_type": "text", "horizontal_align": "center", "vertical_align": "center", }) rows = df.to_dict(orient="records") return { "schema": "2.0", "header": { "title": { "content": title }, "template": header_template[header_template_index % len(header_template)], }, "body": { "elements": [ { "tag": "table", "element_id": "table_detail", "margin": "0px 0px 0px 0px", "page_size": 10, "header_style": { "text_align": "center", "text_size": "normal", "background_style": "none", "text_color": "grey", "bold": True, "lines": 1 }, "columns": columns, "rows": rows } ] } } def task_exe_step_stat_query(ts: int, workflow_id: str): sql = f''' select step_name AS '步骤名称', status as '执行状态', error_msg as '错误原因', count(1) AS '个数' from ( select swt.task_id, step_name, case when status = 0 then '初始化' when status = 1 then '运行中' when status = 2 then '成功' when status = 3 then '失败' when swt.task_status = 0 then '初始化' when swt.task_status = 1 then '运行中' when swt.task_status = 2 then '成功' when swt.task_status = 3 then '失败' else '未知' end AS status, case when swtes.error_msg like '%Data too long%' then '数据超过字段长度限制' when swtes.error_msg like '%Deadlock%' then '数据库死锁' when (swtes.error_msg <> '' and swtes.error_msg is not null) then substring_index(swtes.error_msg, ',', 1) when swt.error_msg like '%Data too long%' then '数据超过字段长度限制' when swt.error_msg like '%Deadlock%' then '数据库死锁' when (swt.error_msg <> '' AND swt.error_msg is not null) then substring_index(swt.error_msg, ',', 1) else '' end AS error_msg from supply_workflow_task swt left join supply_workflow_task_exe_step swtes on swt.task_id = swtes.task_id where swt.create_timestamp >= {ts} and swt.workflow_id = {workflow_id} ) as t group by step_name, status, error_msg; ''' return mysql_helper.execute_query(sql) def workflow_status_stat_query(ts: int, workflow_id: str): sql = f''' select t.step as '阶段', t.status as '状态', t.error_msg as '失败原因', t.task_cnt as '任务数', t.cnt as '记录数' from ( select '创建爬取计划' as step, workflow_id as workflow_id, status as status, error_msg as error_msg, count(distinct task_id) as task_cnt, count(distinct crawler_plan_id) as cnt from ( select swt.workflow_id as workflow_id, swt.task_id as task_id, swcpr.crawler_plan_id as crawler_plan_id, case when swcpr.status = 0 then '初始化' when swcpr.status = 1 then '运行中' when swcpr.status = 2 then '成功' when swcpr.status = 3 then '失败' when swt.task_status = 0 then '初始化' when swt.task_status = 1 then '运行中' when swt.task_status = 2 then '成功' when swt.task_status = 3 then '失败' else '未知' end AS status, case when swcpr.error_msg like '%Data too long%' then '数据超过字段长度限制' when swcpr.error_msg like '%Deadlock%' then '数据库死锁' when (swcpr.error_msg <> '' AND swcpr.error_msg is not null) then substring_index(swcpr.error_msg, ',', 1) when swt.error_msg like '%Data too long%' then '数据超过字段长度限制' when swt.error_msg like '%Deadlock%' then '数据库死锁' when (swt.error_msg <> '' AND swt.error_msg is not null) then substring_index(swt.error_msg, ',', 1) else '' end AS error_msg from supply_workflow_task swt left join supply_workflow_crawler_plan_record swcpr on swt.task_id = swcpr.task_id where swt.create_timestamp >= {ts} ) as t group by workflow_id, status, error_msg union all select '生成计划绑定' as step, workflow_id as workflow_id, status as status, error_msg as error_msg, count(distinct task_id) as task_cnt, count(1) as cnt from ( select swt.workflow_id as workflow_id, swt.task_id as task_id, case when swppr.status = 0 then '初始化' when swppr.status = 1 then '运行中' when swppr.status = 2 then '成功' when swppr.status = 3 then '失败' when swt.task_status = 0 then '初始化' when swt.task_status = 1 then '运行中' when swt.task_status = 2 then '成功' when swt.task_status = 3 then '失败' else '未知' end AS status, case when swppr.error_msg like '%Data too long%' then '数据超过字段长度限制' when swppr.error_msg like '%Deadlock%' then '数据库死锁' when (swppr.error_msg <> '' and swppr.error_msg is not null) then substring_index(swppr.error_msg, ',', 1) when swt.error_msg like '%Data too long%' then '数据超过字段长度限制' when swt.error_msg like '%Deadlock%' then '数据库死锁' when (swt.error_msg <> '' AND swt.error_msg is not null) then substring_index(swt.error_msg, ',', 1) else '' end AS error_msg from supply_workflow_task swt left join supply_workflow_produce_bind_record swppr on swt.task_id = swppr.task_id where swt.create_timestamp >= {ts} ) as t group by workflow_id, status, error_msg ) as t left join supply_workflow sw on t.workflow_id = sw.id where workflow_id = '{workflow_id}'; ''' return mysql_helper.execute_query(sql) def main(): global header_template_index today_midnight = datetime.now().replace(hour=0, minute=0, second=0, microsecond=0) timestamp_ms = int(today_midnight.timestamp() * 1000) workflows = mysql_helper.execute_query('select id, name from supply_workflow;') for workflow in workflows: exe_step_stat = task_exe_step_stat_query(timestamp_ms, workflow['id']) status_stat = workflow_status_stat_query(timestamp_ms, workflow['id']) df = pd.DataFrame(exe_step_stat) df_map = { "各步骤执行情况统计": pd.DataFrame(exe_step_stat), "爬取和生成情况统计": pd.DataFrame(status_stat), } feishu_inform_util.send_card_msg_to_feishu( webhook=fei_shu_webhook, card_json=build_collapsible_panel_table_json(df_map, f"【workflow-{workflow['name']}】数据统计") ) if __name__ == '__main__': main()