from datetime import datetime from typing import Dict, Any, List, Optional import pandas import pandas as pd from helper.MySQLHelper import MySQLHelper from util import feishu_inform_util fei_shu_webhook = "https://open.feishu.cn/open-apis/bot/v2/hook/c09712a8-22cd-4bfa-93a5-30ae7b1db11b" mysql_helper = MySQLHelper( host="rm-t4na9qj85v7790tf84o.mysql.singapore.rds.aliyuncs.com", username="readonly", password="HdkZ4TDmeK6SQ3BRtJBk", database="aigc-admin-prod" ) column_width_map = { "状态": "80px", "任务数": "80px", "记录数": "80px", "当日计划数": "90px", "昨日计划数": "90px", "当日任务数": "90px", "昨日任务数": "90px", "当日待处理": "90px", "当日运行中": "90px", "当日成功": "80px", "当日失败": "80px", } header_template = ["wathet", "turquoise", "purple", "indigo", "green", "grey"] header_template_index = 0 def build_config_json() -> Dict[str, Any]: return { "width_mode": "default", "enable_forward": False, } def build_markdown_element_json(content: str) -> Dict[str, Any]: return { "tag": "markdown", "element_id": "markdown", "content": content } def build_table_element_json(df: pandas.DataFrame) -> Optional[Dict[str, Any]]: if df.empty: return None columns = [] for column in df.columns: columns.append({ "name": column, "display_name": column, "width": column_width_map.get(column, "auto"), "data_type": "text", "horizontal_align": "center", "vertical_align": "center", }) rows = df.to_dict(orient="records") return { "tag": "table", "element_id": "table_detail", "margin": "2px 0px", "page_size": 10, "header_style": { "text_align": "center", "text_size": "normal", "background_style": "none", "text_color": "grey", "bold": True, "lines": 1 }, "columns": columns, "rows": rows } def build_markdown_table_element_json(df: pandas.DataFrame) -> Dict[str, Any]: # 表头 header = "| " + " | ".join(df.columns) + " |\n" # 分隔线 separator = "| " + " | ".join(["---"] * len(df.columns)) + " |\n" # 数据行 rows = "\n".join( "| " + " | ".join(str(val) for val in row) + " |" for row in df.to_numpy() # 或 df.values ) content = header + separator + rows return build_markdown_element_json(content) def build_collapsible_panel_element_json(elements: List[Dict[str, Any]], title: str) -> Dict[str, Any]: return { "tag": "collapsible_panel", "expanded": False, "header": { "title": { "tag": "plain_text", "content": title }, "vertical_align": "center", }, "border": { "color": "grey", "corner_radius": "5px" }, "elements": elements } def build_hr_element_json() -> Dict[str, Any]: return { "tag": "hr", "element_id": "custom_id", "margin": "10px 0px" } def build_text_tag_json(content: str, color: str = 'green') -> Dict[str, Any]: return { "tag": "text_tag", "element_id": "custom_id", "text": { "tag": "plain_text", "content": content }, "color": color } def build_sub_title_json(content: str) -> Dict[str, Any]: return { "tag": "plain_text", "content": content } def build_header_json(content: str, template: str = 'green', sub_title: Dict[str, Any] = None, text_tag_list: List[Dict[str, Any]] = []) -> Dict[str, Any]: return { "title": { "content": content }, "text_tag_list": text_tag_list, "template": template, "subtitle": sub_title } def build_card_json(body_elements: List[Dict[str, Any]], header: Dict[str, Any], config: Dict[str, Any] = None) -> Dict[str, Any]: return { "schema": "2.0", "config": config, "header": header, "body": { "elements": body_elements } } def supply_workflow_dashboard_stat(ts: int) -> List[Dict[str, Any]]: yesterday = ts - 86400000 sql = f""" SELECT sw.name AS '策略名', CASE WHEN sw.status = 1 THEN '开启中' WHEN sw.status = 0 THEN '已关闭' ELSE '未知' END AS 状态, IFNULL(stat.crawler_plan_cnt, 0) AS '当日计划数', IFNULL(stat_yest.crawler_plan_cnt, 0) AS '昨日计划数', IFNULL(stat.total_cnt, 0) AS '当日任务数', IFNULL(stat_yest.total_cnt, 0) AS '昨日任务数', IFNULL(stat.init_cnt, 0) AS '当日待处理', IFNULL(stat.running_cnt, 0) AS '当日运行中', IFNULL(stat.success_cnt, 0) AS '当日成功', IFNULL(stat.fail_success, 0) AS '当日失败' FROM supply_workflow sw LEFT JOIN ( SELECT swt.workflow_id, COUNT(DISTINCT swt.task_id) AS total_cnt, COUNT(DISTINCT IF(task_status = 0, swt.task_id, NULL)) AS init_cnt, COUNT(DISTINCT IF(task_status = 1, swt.task_id, NULL)) AS running_cnt, COUNT(DISTINCT IF(task_status = 2, swt.task_id, NULL)) AS success_cnt, COUNT(DISTINCT IF(task_status = 3, swt.task_id, NULL)) AS fail_success, COUNT(DISTINCT swcpr.crawler_plan_id) AS crawler_plan_cnt FROM supply_workflow_task swt LEFT JOIN supply_workflow_crawler_plan_record swcpr ON swt.task_id = swcpr.task_id WHERE swt.create_timestamp >= {ts} GROUP BY swt.workflow_id ) stat ON sw.id = stat.workflow_id LEFT JOIN ( SELECT swt.workflow_id, COUNT(DISTINCT swt.task_id) AS total_cnt, COUNT(DISTINCT IF(task_status = 0, swt.task_id, NULL)) AS init_cnt, COUNT(DISTINCT IF(task_status = 1, swt.task_id, NULL)) AS running_cnt, COUNT(DISTINCT IF(task_status = 2, swt.task_id, NULL)) AS success_cnt, COUNT(DISTINCT IF(task_status = 3, swt.task_id, NULL)) AS fail_success, COUNT(DISTINCT swcpr.crawler_plan_id) AS crawler_plan_cnt FROM supply_workflow_task swt LEFT JOIN supply_workflow_crawler_plan_record swcpr ON swt.task_id = swcpr.task_id WHERE swt.create_timestamp >= {yesterday} AND swt.create_timestamp < {ts} GROUP BY swt.workflow_id ) stat_yest ON sw.id = stat_yest.workflow_id ORDER BY sw.status DESC, sw.create_timestamp DESC; """ return mysql_helper.execute_query(sql) def workflow_task_status_stat(ts: int, workflow_id: str) -> List[Dict[str, Any]]: sql = f""" select workflow_id as 'workflow_id', task_status as '任务状态', count(distinct task_id) as '任务数', count(task_input) as '需求数' from ( select workflow_id as workflow_id, task_id as task_id, task_input as task_input, case when task_status = 0 then '初始化' when task_status = 1 then '运行中' when task_status = 2 then '成功' when task_status = 3 then '失败' else '未知' end AS task_status from supply_workflow sw left join supply_workflow_task swt on sw.id = swt.workflow_id where sw.id = '{workflow_id}' and swt.create_timestamp >= {ts} ) as t group by t.task_status, t.workflow_id; """ result = mysql_helper.execute_query(sql) return result def task_exe_step_stat_query(ts: int, workflow_id: str) -> List[Dict[str, Any]]: sql = f''' select step_name AS '步骤名称', status as '执行状态', error_msg as '错误原因', count(1) AS '任务数' from ( select swt.task_id, step_name, case when status = 0 then '初始化' when status = 1 then '运行中' when status = 2 then '成功' when status = 3 then '失败' else '未知' end AS status, case when swtes.error_msg like '%Data too long%' then '数据超过字段长度限制' when swtes.error_msg like '%Deadlock%' then '数据库死锁' when (swtes.error_msg <> '' and swtes.error_msg is not null) then substring_index(swtes.error_msg, ',', 1) else '' end AS error_msg from supply_workflow_task swt left join supply_workflow_task_exe_step swtes on swt.task_id = swtes.task_id where swt.create_timestamp >= {ts} and swt.workflow_id = {workflow_id} ) as t group by step_name, status, error_msg; ''' return mysql_helper.execute_query(sql) def crawler_and_produce_stat_query(ts: int, workflow_id: str) -> List[Dict[str, Any]]: sql = f''' SELECT t.step AS '阶段', t.status AS '状态', t.error_msg AS '失败原因', t.task_cnt AS '任务数', t.cnt AS '记录数' FROM ( SELECT '创建爬取计划' AS step, workflow_id AS workflow_id, status AS status, error_msg AS error_msg, COUNT(DISTINCT task_id) AS task_cnt, COUNT(DISTINCT crawler_plan_id) AS cnt FROM ( SELECT swt.workflow_id AS workflow_id, swt.task_id AS task_id, swcpr.crawler_plan_id AS crawler_plan_id, CASE WHEN swcpr.status = 0 THEN '初始化' WHEN swcpr.status = 1 THEN '运行中' WHEN swcpr.status = 2 THEN '成功' WHEN swcpr.status = 3 THEN '失败' WHEN swt.task_status = 0 THEN '初始化' WHEN swt.task_status = 1 THEN '运行中' WHEN swt.task_status = 2 THEN '成功' WHEN swt.task_status = 3 THEN '失败' ELSE '未知' END AS status, CASE WHEN swcpr.status = 2 THEN '' WHEN swcpr.error_msg LIKE '%Data too long%' THEN '数据超过字段长度限制' WHEN swcpr.error_msg LIKE '%Deadlock%' THEN '数据库死锁' WHEN (swcpr.error_msg <> '' AND swcpr.error_msg IS NOT NULL) THEN SUBSTRING_INDEX(swcpr.error_msg, ',', 1) WHEN swt.error_msg LIKE '%Data too long%' THEN '数据超过字段长度限制' WHEN swt.error_msg LIKE '%Deadlock%' THEN '数据库死锁' WHEN (swt.error_msg <> '' AND swt.error_msg IS NOT NULL) THEN SUBSTRING_INDEX(swt.error_msg, ',', 1) ELSE '' END AS error_msg FROM supply_workflow_task swt LEFT JOIN supply_workflow_crawler_plan_record swcpr ON swt.task_id = swcpr.task_id WHERE swt.create_timestamp >= {ts} ) AS t GROUP BY workflow_id, status, error_msg UNION ALL SELECT '生成计划绑定' AS step, workflow_id AS workflow_id, status AS status, error_msg AS error_msg, COUNT(DISTINCT task_id) AS task_cnt, COUNT(1) AS cnt FROM ( SELECT swt.workflow_id AS workflow_id, swt.task_id AS task_id, CASE WHEN swppr.status = 0 THEN '初始化' WHEN swppr.status = 1 THEN '运行中' WHEN swppr.status = 2 THEN '成功' WHEN swppr.status = 3 THEN '失败' WHEN swt.task_status = 0 THEN '初始化' WHEN swt.task_status = 1 THEN '运行中' WHEN swt.task_status = 2 THEN '成功' WHEN swt.task_status = 3 THEN '失败' ELSE '未知' END AS status, CASE WHEN swppr.status = 2 THEN '' WHEN swppr.error_msg LIKE '%Data too long%' THEN '数据超过字段长度限制' WHEN swppr.error_msg LIKE '%Deadlock%' THEN '数据库死锁' WHEN (swppr.error_msg <> '' AND swppr.error_msg IS NOT NULL) THEN SUBSTRING_INDEX(swppr.error_msg, ',', 1) WHEN swt.error_msg LIKE '%Data too long%' THEN '数据超过字段长度限制' WHEN swt.error_msg LIKE '%Deadlock%' THEN '数据库死锁' WHEN (swt.error_msg <> '' AND swt.error_msg IS NOT NULL) THEN SUBSTRING_INDEX(swt.error_msg, ',', 1) ELSE '' END AS error_msg FROM supply_workflow_task swt LEFT JOIN supply_workflow_produce_bind_record swppr ON swt.task_id = swppr.task_id WHERE swt.create_timestamp >= {ts} ) AS t GROUP BY workflow_id, status, error_msg ) AS t LEFT JOIN supply_workflow sw ON t.workflow_id = sw.id WHERE workflow_id = '{workflow_id}'; ''' return mysql_helper.execute_query(sql) def workflow_monitor(workflow_id: str, workflow_name: str, status: int): today_midnight = datetime.now().replace(hour=0, minute=0, second=0, microsecond=0) start_dt_str = today_midnight.strftime('%Y-%m-%d %H:%M:%S') end_dt_str = datetime.now().strftime('%Y-%m-%d %H:%M:%S') timestamp_ms = int(today_midnight.timestamp() * 1000) task_status_stat = workflow_task_status_stat(timestamp_ms, workflow_id) exe_step_stat = task_exe_step_stat_query(timestamp_ms, workflow_id) status_stat = crawler_and_produce_stat_query(timestamp_ms, workflow_id) # 构建标题 text_tag_list = [] if status == 0: text_tag_list.append(build_text_tag_json("已关闭", "red")) elif status == 1: text_tag_list.append(build_text_tag_json("开启中", "green")) else: text_tag_list.append(build_text_tag_json("未知", "orange")) template = header_template[header_template_index % len(header_template)] sub_title = build_sub_title_json(f"{start_dt_str} - {end_dt_str}") header = build_header_json(f"【workflow策略】{workflow_name}", template, sub_title, text_tag_list) task_status_df = pd.DataFrame(task_status_stat) if "任务数" not in task_status_df.columns: task_info = f"**总任务数**: 0" else: # 先构建总任务数部分 task_info = f"**总任务数**: {task_status_df['任务数'].sum()}" # 再逐行添加各状态的任务数 for _, row in task_status_df.iterrows(): task_info += f"\n**{row['任务状态']}任务数**: {row['任务数']}" elements = [build_markdown_element_json(task_info)] exe_step_table = build_table_element_json(pd.DataFrame(exe_step_stat)) if exe_step_table is not None: elements.append(build_hr_element_json()) elements.append(exe_step_table) status_stat_table = build_table_element_json(pd.DataFrame(status_stat)) if status_stat_table is not None: elements.append(build_hr_element_json()) elements.append(status_stat_table) config = build_config_json() card_json = build_card_json(elements, header, config) feishu_inform_util.send_card_msg_to_feishu( webhook=fei_shu_webhook, card_json=card_json ) def workflow_dashboard_monitor(): today_midnight = datetime.now().replace(hour=0, minute=0, second=0, microsecond=0) result = supply_workflow_dashboard_stat(int(today_midnight.timestamp() * 1000)) start_dt_str = today_midnight.strftime('%Y-%m-%d %H:%M') end_dt_str = datetime.now().strftime('%Y-%m-%d %H:%M') sub_title = build_sub_title_json(f"统计时间: {start_dt_str} - {end_dt_str}") header = build_header_json("【供给workflow】当日任务统计", "blue", sub_title, []) dashboard_stat = build_table_element_json(pd.DataFrame(result)) elements = [] if dashboard_stat is not None: dashboard_stat['header_style'] = { "text_size": "normal", "text_align": "center", } elements.append(dashboard_stat) else: elements.append(build_markdown_element_json("**今日workflow还未创建新的任务,请关注**")) config = build_config_json() card_json = build_card_json(elements, header, config) feishu_inform_util.send_card_msg_to_feishu( webhook=fei_shu_webhook, card_json=card_json ) # 总体情况发送到大群 feishu_inform_util.send_card_msg_to_feishu( webhook="https://open.feishu.cn/open-apis/bot/v2/hook/9f5c5cce-5eb2-4731-b368-33926f5549f9", card_json=card_json ) def main(): workflow_dashboard_monitor() global header_template_index workflows = mysql_helper.execute_query('select id, name, status from supply_workflow;') for workflow in workflows: try: workflow_monitor(workflow['id'], workflow['name'], workflow['status']) except Exception as e: print(f"【workflow策略】{workflow['name']} 监控异常: {e}") header_template_index += 1 if __name__ == '__main__': main()