| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438 |
- from datetime import datetime
- from typing import Dict, Any, List, Optional
- import pandas
- import pandas as pd
- from helper.MySQLHelper import MySQLHelper
- from util import feishu_inform_util
- fei_shu_webhook = "https://open.feishu.cn/open-apis/bot/v2/hook/c09712a8-22cd-4bfa-93a5-30ae7b1db11b"
- mysql_helper = MySQLHelper(
- host="rm-t4na9qj85v7790tf84o.mysql.singapore.rds.aliyuncs.com",
- username="readonly",
- password="HdkZ4TDmeK6SQ3BRtJBk",
- database="aigc-admin-prod"
- )
- column_width_map = {
- "状态": "80px",
- "任务数": "80px",
- "记录数": "80px",
- "待处理": "80px",
- "运行中": "80px",
- "成功": "80px",
- "失败": "80px",
- }
- header_template = ["wathet", "turquoise", "purple", "indigo", "green"]
- header_template_index = 0
- def build_config_json() -> Dict[str, Any]:
- return {
- "width_mode": "default",
- "enable_forward": False,
- }
- def build_markdown_element_json(content: str) -> Dict[str, Any]:
- return {
- "tag": "markdown",
- "element_id": "markdown",
- "content": content
- }
- def build_table_element_json(df: pandas.DataFrame) -> Optional[Dict[str, Any]]:
- if df.empty:
- return None
- columns = []
- for column in df.columns:
- columns.append({
- "name": column,
- "display_name": column,
- "width": column_width_map.get(column, "auto"),
- "data_type": "text",
- "horizontal_align": "center",
- "vertical_align": "center",
- })
- rows = df.to_dict(orient="records")
- return {
- "tag": "table",
- "element_id": "table_detail",
- "margin": "2px 0px",
- "page_size": 10,
- "header_style": {
- "text_align": "center",
- "text_size": "normal",
- "background_style": "none",
- "text_color": "grey",
- "bold": True,
- "lines": 1
- },
- "columns": columns,
- "rows": rows
- }
- def build_markdown_table_element_json(df: pandas.DataFrame) -> Dict[str, Any]:
- # 表头
- header = "| " + " | ".join(df.columns) + " |\n"
- # 分隔线
- separator = "| " + " | ".join(["---"] * len(df.columns)) + " |\n"
- # 数据行
- rows = "\n".join(
- "| " + " | ".join(str(val) for val in row) + " |"
- for row in df.to_numpy() # 或 df.values
- )
- content = header + separator + rows
- return build_markdown_element_json(content)
- def build_collapsible_panel_element_json(elements: List[Dict[str, Any]], title: str) -> Dict[str, Any]:
- return {
- "tag": "collapsible_panel",
- "expanded": False,
- "header": {
- "title": {
- "tag": "plain_text",
- "content": title
- },
- "vertical_align": "center",
- },
- "border": {
- "color": "grey",
- "corner_radius": "5px"
- },
- "elements": elements
- }
- def build_hr_element_json() -> Dict[str, Any]:
- return {
- "tag": "hr",
- "element_id": "custom_id",
- "margin": "10px 0px"
- }
- def build_text_tag_json(content: str, color: str = 'green') -> Dict[str, Any]:
- return {
- "tag": "text_tag",
- "element_id": "custom_id",
- "text": {
- "tag": "plain_text",
- "content": content
- },
- "color": color
- }
- def build_sub_title_json(content: str) -> Dict[str, Any]:
- return {
- "tag": "plain_text",
- "content": content
- }
- def build_header_json(content: str, template: str = 'green', sub_title: Dict[str, Any] = None, text_tag_list: List[Dict[str, Any]] = []) -> Dict[str, Any]:
- return {
- "title": {
- "content": content
- },
- "text_tag_list": text_tag_list,
- "template": template,
- "subtitle": sub_title
- }
- def build_card_json(body_elements: List[Dict[str, Any]], header: Dict[str, Any], config: Dict[str, Any] = None) -> Dict[str, Any]:
- return {
- "schema": "2.0",
- "config": config,
- "header": header,
- "body": {
- "elements": body_elements
- }
- }
- def supply_workflow_dashboard_stat(ts: int) -> List[Dict[str, Any]]:
- sql = f"""
- SELECT sw.name AS '策略名',
- case
- when sw.status = 1 then '开启中'
- when sw.status = 0 then '已关闭'
- else '未知' end AS 状态,
- swt.init_cnt AS '待处理',
- swt.running_cnt AS '运行中',
- swt.success_cnt AS '成功',
- swt.fail_success AS '失败'
- FROM supply_workflow sw
- LEFT JOIN (
- SELECT workflow_id,
- COUNT(DISTINCT IF(task_status = 0, task_id, NULL)) AS init_cnt,
- COUNT(DISTINCT IF(task_status = 1, task_id, NULL)) AS running_cnt,
- COUNT(DISTINCT IF(task_status = 2, task_id, NULL)) AS success_cnt,
- COUNT(DISTINCT IF(task_status = 3, task_id, NULL)) AS fail_success
- FROM supply_workflow_task
- WHERE create_timestamp >= {ts}
- GROUP BY workflow_id
- ) swt ON sw.id = swt.workflow_id
- ORDER BY sw.status DESC, sw.create_timestamp DESC;
- """
- return mysql_helper.execute_query(sql)
- def workflow_task_status_stat(ts: int, workflow_id: str) -> List[Dict[str, Any]]:
- sql = f"""
- select workflow_id as 'workflow_id',
- task_status as '任务状态',
- count(distinct task_id) as '任务数',
- count(task_input) as '需求数'
- from (
- select workflow_id as workflow_id,
- task_id as task_id,
- task_input as task_input,
- case
- when task_status = 0 then '初始化'
- when task_status = 1 then '运行中'
- when task_status = 2 then '成功'
- when task_status = 3 then '失败'
- else '未知' end AS task_status
- from supply_workflow sw
- left join supply_workflow_task swt
- on sw.id = swt.workflow_id
- where sw.id = '{workflow_id}'
- and swt.create_timestamp >= {ts}
- ) as t
- group by t.task_status, t.workflow_id;
- """
- result = mysql_helper.execute_query(sql)
- return result
- def task_exe_step_stat_query(ts: int, workflow_id: str) -> List[Dict[str, Any]]:
- sql = f'''
- select step_name AS '步骤名称',
- status as '执行状态',
- error_msg as '错误原因',
- count(1) AS '任务数'
- from (
- select swt.task_id,
- step_name,
- case
- when status = 0 then '初始化'
- when status = 1 then '运行中'
- when status = 2 then '成功'
- when status = 3 then '失败'
- else '未知' end AS status,
- case
- when swtes.error_msg like '%Data too long%' then '数据超过字段长度限制'
- when swtes.error_msg like '%Deadlock%' then '数据库死锁'
- when (swtes.error_msg <> '' and swtes.error_msg is not null) then substring_index(swtes.error_msg, ',', 1)
- else '' end AS error_msg
- from supply_workflow_task swt
- left join supply_workflow_task_exe_step swtes on swt.task_id = swtes.task_id
- where swt.create_timestamp >= {ts}
- and swt.workflow_id = {workflow_id}
- ) as t
- group by step_name, status, error_msg;
- '''
- return mysql_helper.execute_query(sql)
- def crawler_and_produce_stat_query(ts: int, workflow_id: str) -> List[Dict[str, Any]]:
- sql = f'''
- select t.step as '阶段',
- t.status as '状态',
- t.error_msg as '失败原因',
- t.task_cnt as '任务数',
- t.cnt as '记录数'
- from (
- select '创建爬取计划' as step,
- workflow_id as workflow_id,
- status as status,
- error_msg as error_msg,
- count(distinct task_id) as task_cnt,
- count(distinct crawler_plan_id) as cnt
- from (
- select swt.workflow_id as workflow_id,
- swt.task_id as task_id,
- swcpr.crawler_plan_id as crawler_plan_id,
- case
- when swcpr.status = 0 then '初始化'
- when swcpr.status = 1 then '运行中'
- when swcpr.status = 2 then '成功'
- when swcpr.status = 3 then '失败'
- when swt.task_status = 0 then '初始化'
- when swt.task_status = 1 then '运行中'
- when swt.task_status = 2 then '成功'
- when swt.task_status = 3 then '失败'
- else '未知' end AS status,
- case
- when swcpr.error_msg like '%Data too long%' then '数据超过字段长度限制'
- when swcpr.error_msg like '%Deadlock%' then '数据库死锁'
- when (swcpr.error_msg <> '' AND swcpr.error_msg is not null) then substring_index(swcpr.error_msg, ',', 1)
- when swt.error_msg like '%Data too long%' then '数据超过字段长度限制'
- when swt.error_msg like '%Deadlock%' then '数据库死锁'
- when (swt.error_msg <> '' AND swt.error_msg is not null) then substring_index(swt.error_msg, ',', 1)
- else ''
- end AS error_msg
- from supply_workflow_task swt
- left join supply_workflow_crawler_plan_record swcpr
- on swt.task_id = swcpr.task_id
- where swt.create_timestamp >= {ts}
- ) as t
- group by workflow_id, status, error_msg
- union all
- select '生成计划绑定' as step,
- workflow_id as workflow_id,
- status as status,
- error_msg as error_msg,
- count(distinct task_id) as task_cnt,
- count(1) as cnt
- from (
- select swt.workflow_id as workflow_id,
- swt.task_id as task_id,
- case
- when swppr.status = 0 then '初始化'
- when swppr.status = 1 then '运行中'
- when swppr.status = 2 then '成功'
- when swppr.status = 3 then '失败'
- when swt.task_status = 0 then '初始化'
- when swt.task_status = 1 then '运行中'
- when swt.task_status = 2 then '成功'
- when swt.task_status = 3 then '失败'
- else '未知' end AS status,
- case
- when swppr.error_msg like '%Data too long%' then '数据超过字段长度限制'
- when swppr.error_msg like '%Deadlock%' then '数据库死锁'
- when (swppr.error_msg <> '' and swppr.error_msg is not null) then substring_index(swppr.error_msg, ',', 1)
- when swt.error_msg like '%Data too long%' then '数据超过字段长度限制'
- when swt.error_msg like '%Deadlock%' then '数据库死锁'
- when (swt.error_msg <> '' AND swt.error_msg is not null) then substring_index(swt.error_msg, ',', 1)
- else ''
- end AS error_msg
- from supply_workflow_task swt
- left join supply_workflow_produce_bind_record swppr
- on swt.task_id = swppr.task_id
- where swt.create_timestamp >= {ts}
- ) as t
- group by workflow_id, status, error_msg
- ) as t
- left join supply_workflow sw on t.workflow_id = sw.id
- where workflow_id = '{workflow_id}';
- '''
- return mysql_helper.execute_query(sql)
- def workflow_monitor(workflow_id: str, workflow_name: str, status: int):
- today_midnight = datetime.now().replace(hour=0, minute=0, second=0, microsecond=0)
- start_dt_str = today_midnight.strftime('%Y-%m-%d %H:%M:%S')
- end_dt_str = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
- timestamp_ms = int(today_midnight.timestamp() * 1000)
- task_status_stat = workflow_task_status_stat(timestamp_ms, workflow_id)
- exe_step_stat = task_exe_step_stat_query(timestamp_ms, workflow_id)
- status_stat = crawler_and_produce_stat_query(timestamp_ms, workflow_id)
- # 构建标题
- text_tag_list = []
- if status == 0:
- text_tag_list.append(build_text_tag_json("已关闭", "red"))
- elif status == 1:
- text_tag_list.append(build_text_tag_json("开启中", "green"))
- else:
- text_tag_list.append(build_text_tag_json("未知", "orange"))
- template = header_template[header_template_index % len(header_template)]
- sub_title = build_sub_title_json(f"{start_dt_str} - {end_dt_str}")
- header = build_header_json(f"【workflow策略】{workflow_name}", template, sub_title, text_tag_list)
- task_status_df = pd.DataFrame(task_status_stat)
- if "任务数" not in task_status_df.columns:
- task_info = f"**总任务数**: 0"
- else:
- # 先构建总任务数部分
- task_info = f"**总任务数**: {task_status_df['任务数'].sum()}"
- # 再逐行添加各状态的任务数
- for _, row in task_status_df.iterrows():
- task_info += f"\n**{row['任务状态']}任务数**: {row['任务数']}"
- elements = [build_markdown_element_json(task_info)]
- exe_step_table = build_table_element_json(pd.DataFrame(exe_step_stat))
- if exe_step_table is not None:
- elements.append(build_hr_element_json())
- elements.append(exe_step_table)
- status_stat_table = build_table_element_json(pd.DataFrame(status_stat))
- if status_stat_table is not None:
- elements.append(build_hr_element_json())
- elements.append(status_stat_table)
- config = build_config_json()
- card_json = build_card_json(elements, header, config)
- feishu_inform_util.send_card_msg_to_feishu(
- webhook=fei_shu_webhook,
- card_json=card_json
- )
- def workflow_dashboard_monitor():
- today_midnight = datetime.now().replace(hour=0, minute=0, second=0, microsecond=0)
- result = supply_workflow_dashboard_stat(int(today_midnight.timestamp() * 1000))
- start_dt_str = today_midnight.strftime('%Y-%m-%d %H:%M')
- end_dt_str = datetime.now().strftime('%Y-%m-%d %H:%M')
- sub_title = build_sub_title_json(f"统计时间: {start_dt_str} - {end_dt_str}")
- header = build_header_json("【供给workflow】当日任务统计", "blue", sub_title, [])
- dashboard_stat = build_table_element_json(pd.DataFrame(result))
- elements = []
- if dashboard_stat is not None:
- elements.append(dashboard_stat)
- else:
- elements.append(build_markdown_element_json("**今日workflow还未创建新的任务,请关注**"))
- config = build_config_json()
- card_json = build_card_json(elements, header, config)
- feishu_inform_util.send_card_msg_to_feishu(
- webhook=fei_shu_webhook,
- card_json=card_json
- )
- # 总体情况发送到大群
- feishu_inform_util.send_card_msg_to_feishu(
- webhook="https://open.feishu.cn/open-apis/bot/v2/hook/9f5c5cce-5eb2-4731-b368-33926f5549f9",
- card_json=card_json
- )
- def main():
- workflow_dashboard_monitor()
- global header_template_index
- workflows = mysql_helper.execute_query('select id, name, status from supply_workflow;')
- for workflow in workflows:
- try:
- workflow_monitor(workflow['id'], workflow['name'], workflow['status'])
- except Exception as e:
- print(f"【workflow策略】{workflow['name']} 监控异常: {e}")
- header_template_index += 1
- if __name__ == '__main__':
- main()
|