|
|
@@ -1,6 +1,7 @@
|
|
|
from datetime import datetime
|
|
|
-from typing import Dict, List, Any
|
|
|
+from typing import Dict, Any
|
|
|
|
|
|
+import pandas
|
|
|
import pandas as pd
|
|
|
|
|
|
from helper.MySQLHelper import MySQLHelper
|
|
|
@@ -15,8 +16,27 @@ mysql_helper = MySQLHelper(
|
|
|
database="aigc-admin-prod"
|
|
|
)
|
|
|
|
|
|
+column_width_map = {
|
|
|
+ "状态": "80px",
|
|
|
+ "任务数": "80px",
|
|
|
+ "记录数": "80px",
|
|
|
+}
|
|
|
+
|
|
|
+
|
|
|
+def build_table_json(df: pandas.DataFrame, title: str) -> Dict[str, Any]:
|
|
|
+ columns = []
|
|
|
+ for column in df.columns:
|
|
|
+ columns.append({
|
|
|
+ "name": column,
|
|
|
+ "display_name": column,
|
|
|
+ "width": column_width_map.get(column, "auto"),
|
|
|
+ "data_type": "text",
|
|
|
+ "horizontal_align": "center",
|
|
|
+ "vertical_align": "center",
|
|
|
+ })
|
|
|
+
|
|
|
+ rows = df.to_dict(orient="records")
|
|
|
|
|
|
-def build_card_join(content: str, title: str):
|
|
|
return {
|
|
|
"schema": "2.0",
|
|
|
"header": {
|
|
|
@@ -28,46 +48,26 @@ def build_card_join(content: str, title: str):
|
|
|
"body": {
|
|
|
"elements": [
|
|
|
{
|
|
|
- "tag": "markdown",
|
|
|
- "element_id": "detail",
|
|
|
+ "tag": "table",
|
|
|
+ "element_id": "table_detail",
|
|
|
"margin": "0px 0px 0px 0px",
|
|
|
- "content": content,
|
|
|
- "text_size": "normal",
|
|
|
- "text_align": "left"
|
|
|
+ "page_size": 10,
|
|
|
+ "header_style": {
|
|
|
+ "text_align": "center",
|
|
|
+ "text_size": "normal",
|
|
|
+ "background_style": "none",
|
|
|
+ "text_color": "grey",
|
|
|
+ "bold": True,
|
|
|
+ "lines": 1
|
|
|
+ },
|
|
|
+ "columns": columns,
|
|
|
+ "rows": rows
|
|
|
}
|
|
|
]
|
|
|
}
|
|
|
}
|
|
|
|
|
|
|
|
|
-def df_to_markdown_table(df: pd.DataFrame) -> str:
|
|
|
- """将DataFrame转换为标准Markdown表格字符串"""
|
|
|
- headers = list(df.columns)
|
|
|
- str_rows = df.astype(str).values
|
|
|
-
|
|
|
- col_widths = []
|
|
|
- for i, h in enumerate(headers):
|
|
|
- max_w = len(str(h))
|
|
|
- for row in str_rows:
|
|
|
- max_w = max(max_w, len(str(row[i])))
|
|
|
- col_widths.append(max_w)
|
|
|
-
|
|
|
- def _row(values):
|
|
|
- cells = [str(v).ljust(col_widths[i]) for i, v in enumerate(values)]
|
|
|
- return '| ' + ' | '.join(cells) + ' |'
|
|
|
-
|
|
|
- sep = '| ' + ' | '.join('-' * w for w in col_widths) + ' |'
|
|
|
- lines = [_row(headers), sep]
|
|
|
- for row in str_rows:
|
|
|
- lines.append(_row(row))
|
|
|
- return '\n'.join(lines)
|
|
|
-
|
|
|
-
|
|
|
-def find_all_workflow() -> List[Dict[str, Any]]:
|
|
|
- sql = 'select id, name from supply_workflow;'
|
|
|
- return mysql_helper.execute_query(sql)
|
|
|
-
|
|
|
-
|
|
|
def task_exe_step_stat(ts: int, workflow_id: str, workflow_name: str):
|
|
|
sql = f'''
|
|
|
select step_name AS '步骤名称',
|
|
|
@@ -99,18 +99,15 @@ def task_exe_step_stat(ts: int, workflow_id: str, workflow_name: str):
|
|
|
stat = mysql_helper.execute_query(sql)
|
|
|
df = pd.DataFrame(stat)
|
|
|
|
|
|
- msg = df_to_markdown_table(df)
|
|
|
-
|
|
|
feishu_inform_util.send_card_msg_to_feishu(
|
|
|
webhook=fei_shu_webhook,
|
|
|
- card_json=build_card_join(msg, f"【workflow-{workflow_name}】各步骤执行情况")
|
|
|
+ card_json=build_table_json(df, f"【workflow-{workflow_name}】各步骤执行情况")
|
|
|
)
|
|
|
|
|
|
|
|
|
def workflow_status_stat(ts: int, workflow_id: str, workflow_name: str):
|
|
|
sql = f'''
|
|
|
- select sw.name as '策略名',
|
|
|
- t.step as '阶段',
|
|
|
+ select t.step as '阶段',
|
|
|
t.status as '状态',
|
|
|
t.error_msg as '失败原因',
|
|
|
t.task_cnt as '任务数',
|
|
|
@@ -179,18 +176,16 @@ def workflow_status_stat(ts: int, workflow_id: str, workflow_name: str):
|
|
|
stat = mysql_helper.execute_query(sql)
|
|
|
df = pd.DataFrame(stat)
|
|
|
|
|
|
- msg = df_to_markdown_table(df)
|
|
|
-
|
|
|
feishu_inform_util.send_card_msg_to_feishu(
|
|
|
webhook=fei_shu_webhook,
|
|
|
- card_json=build_card_join(msg, f"【workflow-{workflow_name}】爬取和生成详情")
|
|
|
+ card_json=build_table_json(df, f"【workflow-{workflow_name}】爬取和生成详情")
|
|
|
)
|
|
|
|
|
|
|
|
|
def main():
|
|
|
today_midnight = datetime.now().replace(hour=0, minute=0, second=0, microsecond=0)
|
|
|
timestamp_ms = int(today_midnight.timestamp() * 1000)
|
|
|
- workflows = find_all_workflow()
|
|
|
+ workflows = mysql_helper.execute_query('select id, name from supply_workflow;')
|
|
|
for workflow in workflows:
|
|
|
task_exe_step_stat(timestamp_ms, workflow['id'], workflow['name'])
|
|
|
workflow_status_stat(timestamp_ms, workflow['id'], workflow['name'])
|