|
|
@@ -1,5 +1,5 @@
|
|
|
from datetime import datetime
|
|
|
-from typing import Dict, Any, List
|
|
|
+from typing import Dict, Any, List, Optional
|
|
|
|
|
|
import pandas
|
|
|
import pandas as pd
|
|
|
@@ -41,7 +41,9 @@ def build_markdown_element_json(content: str) -> Dict[str, Any]:
|
|
|
}
|
|
|
|
|
|
|
|
|
-def build_table_element_json(df: pandas.DataFrame) -> Dict[str, Any]:
|
|
|
+def build_table_element_json(df: pandas.DataFrame) -> Optional[Dict[str, Any]]:
|
|
|
+ if df.empty:
|
|
|
+ return None
|
|
|
columns = []
|
|
|
for column in df.columns:
|
|
|
columns.append({
|
|
|
@@ -170,9 +172,11 @@ def workflow_task_status_stat(ts: int, workflow_id: str) -> List[Dict[str, Any]]
|
|
|
when task_status = 2 then '成功'
|
|
|
when task_status = 3 then '失败'
|
|
|
else '未知' end AS task_status
|
|
|
- from supply_workflow_task
|
|
|
- where workflow_id = '{workflow_id}'
|
|
|
- and create_timestamp >= {ts}
|
|
|
+ from supply_workflow sw
|
|
|
+ left join supply_workflow_task swt
|
|
|
+ on sw.id = swt.workflow_id
|
|
|
+ where sw.id = '{workflow_id}'
|
|
|
+ and swt.create_timestamp >= {ts}
|
|
|
) as t
|
|
|
group by t.task_status, t.workflow_id;
|
|
|
"""
|
|
|
@@ -321,21 +325,27 @@ def workflow_monitor(workflow_id: str, workflow_name: str, status: int):
|
|
|
sub_title = build_sub_title_json(f"{start_dt_str} - {end_dt_str}")
|
|
|
header = build_header_json(f"【workflow策略】{workflow_name}", template, sub_title, text_tag_list)
|
|
|
|
|
|
- # 构建下文内容
|
|
|
task_status_df = pd.DataFrame(task_status_stat)
|
|
|
- task_info = f"**总任务数**: {task_status_df['任务数'].sum()}" + \
|
|
|
- "".join([
|
|
|
- f"\n**{item['任务状态']}任务数**: {item['任务数']}"
|
|
|
- for item in task_status_df.to_dict(orient='records')
|
|
|
- ])
|
|
|
-
|
|
|
- elements = [
|
|
|
- build_markdown_element_json(task_info),
|
|
|
- build_hr_element_json(),
|
|
|
- build_table_element_json(pd.DataFrame(exe_step_stat)),
|
|
|
- build_hr_element_json(),
|
|
|
- build_table_element_json(pd.DataFrame(status_stat)),
|
|
|
- ]
|
|
|
+ if "任务数" not in task_status_df.columns:
|
|
|
+ task_info = f"**总任务数**: 0"
|
|
|
+ else:
|
|
|
+ # 先构建总任务数部分
|
|
|
+ task_info = f"**总任务数**: {task_status_df['任务数'].sum()}"
|
|
|
+ # 再逐行添加各状态的任务数
|
|
|
+ for _, row in task_status_df.iterrows():
|
|
|
+ task_info += f"\n**{row['任务状态']}任务数**: {row['任务数']}"
|
|
|
+
|
|
|
+ elements = [build_markdown_element_json(task_info)]
|
|
|
+
|
|
|
+ exe_step_table = build_table_element_json(pd.DataFrame(exe_step_stat))
|
|
|
+ if exe_step_table is not None:
|
|
|
+ elements.append(build_hr_element_json())
|
|
|
+ elements.append(exe_step_table)
|
|
|
+
|
|
|
+ status_stat_table = build_table_element_json(pd.DataFrame(status_stat))
|
|
|
+ if status_stat_table is not None:
|
|
|
+ elements.append(build_hr_element_json())
|
|
|
+ elements.append(status_stat_table)
|
|
|
|
|
|
config = build_config_json()
|
|
|
|