| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209 |
- from datetime import datetime
- from typing import Dict, Any
- import pandas
- import pandas as pd
- from helper.MySQLHelper import MySQLHelper
- from util import feishu_inform_util
- fei_shu_webhook = "https://open.feishu.cn/open-apis/bot/v2/hook/c09712a8-22cd-4bfa-93a5-30ae7b1db11b"
- mysql_helper = MySQLHelper(
- host="rm-t4na9qj85v7790tf84o.mysql.singapore.rds.aliyuncs.com",
- username="readonly",
- password="HdkZ4TDmeK6SQ3BRtJBk",
- database="aigc-admin-prod"
- )
- column_width_map = {
- "状态": "80px",
- "任务数": "80px",
- "记录数": "80px",
- }
- def build_table_json(df: pandas.DataFrame, title: str) -> Dict[str, Any]:
- columns = []
- for column in df.columns:
- columns.append({
- "name": column,
- "display_name": column,
- "width": column_width_map.get(column, "auto"),
- "data_type": "text",
- "horizontal_align": "center",
- "vertical_align": "center",
- })
- rows = df.to_dict(orient="records")
- return {
- "schema": "2.0",
- "header": {
- "title": {
- "content": title
- },
- "template": "green",
- },
- "body": {
- "elements": [
- {
- "tag": "table",
- "element_id": "table_detail",
- "margin": "0px 0px 0px 0px",
- "page_size": 10,
- "header_style": {
- "text_align": "center",
- "text_size": "normal",
- "background_style": "none",
- "text_color": "grey",
- "bold": True,
- "lines": 1
- },
- "columns": columns,
- "rows": rows
- }
- ]
- }
- }
- def task_exe_step_stat(ts: int, workflow_id: str, workflow_name: str):
- sql = f'''
- select step_name AS '步骤名称',
- status as '执行状态',
- error_msg as '错误原因',
- count(1) AS '个数'
- from (
- select swt.task_id,
- step_name,
- case
- when status = 0 then '初始化'
- when status = 1 then '运行中'
- when status = 2 then '成功'
- when status = 3 then '失败'
- else '未知' end AS status,
- case
- when swtes.error_msg like '%Data too long%' then '数据超过字段长度限制'
- when swtes.error_msg like '%Deadlock%' then '数据库死锁'
- when (swtes.error_msg = '' or swtes.error_msg is null) then ''
- else substring_index(swtes.error_msg, ',', 1)
- end AS error_msg
- from supply_workflow_task swt
- left join supply_workflow_task_exe_step swtes on swt.task_id = swtes.task_id
- where swt.create_timestamp >= {ts}
- and swt.workflow_id = {workflow_id}
- ) as t
- group by step_name, status, error_msg;
- '''
- stat = mysql_helper.execute_query(sql)
- df = pd.DataFrame(stat)
- feishu_inform_util.send_card_msg_to_feishu(
- webhook=fei_shu_webhook,
- card_json=build_table_json(df, f"【workflow-{workflow_name}】各步骤执行情况")
- )
- def workflow_status_stat(ts: int, workflow_id: str, workflow_name: str):
- sql = f'''
- select t.step as '阶段',
- t.status as '状态',
- t.error_msg as '失败原因',
- t.task_cnt as '任务数',
- t.cnt as '记录数'
- from (
- select '创建爬取计划' as step,
- workflow_id as workflow_id,
- status as status,
- error_msg as error_msg,
- count(distinct task_id) as task_cnt,
- count(distinct crawler_plan_id) as cnt
- from (
- select swt.workflow_id as workflow_id,
- swt.task_id as task_id,
- swcpr.crawler_plan_id as crawler_plan_id,
- case
- when swcpr.status = 0 then '初始化'
- when swcpr.status = 1 then '运行中'
- when swcpr.status = 2 then '成功'
- when swcpr.status = 3 then '失败'
- when swt.task_status = 0 then '初始化'
- when swt.task_status = 1 then '运行中'
- when swt.task_status = 2 then '成功'
- when swt.task_status = 3 then '失败'
- else '未知' end AS status,
- case
- when swcpr.error_msg like '%Data too long%' then '数据超过字段长度限制'
- when swcpr.error_msg like '%Deadlock%' then '数据库死锁'
- when (swcpr.error_msg <> '' AND swcpr.error_msg is not null) then substring_index(swcpr.error_msg, ',', 1)
- when swt.error_msg like '%Data too long%' then '数据超过字段长度限制'
- when swt.error_msg like '%Deadlock%' then '数据库死锁'
- when (swt.error_msg <> '' AND swt.error_msg is not null) then substring_index(swt.error_msg, ',', 1)
- else ''
- end AS error_msg
- from supply_workflow_task swt
- left join supply_workflow_crawler_plan_record swcpr
- on swt.task_id = swcpr.task_id
- where swt.create_timestamp >= {ts}
- ) as t
- group by workflow_id, status, error_msg
- union all
- select '生成计划绑定' as step,
- workflow_id as workflow_id,
- status as status,
- error_msg as error_msg,
- count(distinct task_id) as task_cnt,
- count(1) as cnt
- from (
- select swt.workflow_id as workflow_id,
- swt.task_id as task_id,
- case
- when swppr.status = 0 then '初始化'
- when swppr.status = 1 then '运行中'
- when swppr.status = 2 then '成功'
- when swppr.status = 3 then '失败'
- when swt.task_status = 0 then '初始化'
- when swt.task_status = 1 then '运行中'
- when swt.task_status = 2 then '成功'
- when swt.task_status = 3 then '失败'
- else '未知' end AS status,
- case
- when swppr.error_msg like '%Data too long%' then '数据超过字段长度限制'
- when swppr.error_msg like '%Deadlock%' then '数据库死锁'
- when (swppr.error_msg <> '' and swppr.error_msg is not null) then substring_index(swppr.error_msg, ',', 1)
- when swt.error_msg like '%Data too long%' then '数据超过字段长度限制'
- when swt.error_msg like '%Deadlock%' then '数据库死锁'
- when (swt.error_msg <> '' AND swt.error_msg is not null) then substring_index(swt.error_msg, ',', 1)
- else ''
- end AS error_msg
- from supply_workflow_task swt
- left join supply_workflow_produce_bind_record swppr
- on swt.task_id = swppr.task_id
- where swt.create_timestamp >= {ts}
- ) as t
- group by workflow_id, status, error_msg
- ) as t
- left join supply_workflow sw on t.workflow_id = sw.id
- where workflow_id = '{workflow_id}';
- '''
- stat = mysql_helper.execute_query(sql)
- df = pd.DataFrame(stat)
- feishu_inform_util.send_card_msg_to_feishu(
- webhook=fei_shu_webhook,
- card_json=build_table_json(df, f"【workflow-{workflow_name}】爬取和生成详情")
- )
- def main():
- today_midnight = datetime.now().replace(hour=0, minute=0, second=0, microsecond=0)
- timestamp_ms = int(today_midnight.timestamp() * 1000)
- workflows = mysql_helper.execute_query('select id, name from supply_workflow;')
- for workflow in workflows:
- task_exe_step_stat(timestamp_ms, workflow['id'], workflow['name'])
- workflow_status_stat(timestamp_ms, workflow['id'], workflow['name'])
- if __name__ == '__main__':
- main()
|