|
|
@@ -26,6 +26,56 @@ header_template = ["blue", "wathet", "turquoise", "green", "yellow", "orange"]
|
|
|
header_template_index = 0
|
|
|
|
|
|
|
|
|
+def build_collapsible_panel_table_json(df_map: Dict[str, pandas.DataFrame], title: str) -> Dict[str, Any]:
|
|
|
+ elements = []
|
|
|
+ for key, df in df_map.items():
|
|
|
+ # 表头
|
|
|
+ header = "| " + " | ".join(df.columns) + " |\n"
|
|
|
+ # 分隔线
|
|
|
+ separator = "| " + " | ".join(["---"] * len(df.columns)) + " |\n"
|
|
|
+ # 数据行
|
|
|
+ rows = "\n".join(
|
|
|
+ "| " + " | ".join(str(val) for val in row) + " |"
|
|
|
+ for row in df.to_numpy() # 或 df.values
|
|
|
+ )
|
|
|
+ content = header + separator + rows
|
|
|
+
|
|
|
+ elements.append({
|
|
|
+ "tag": "collapsible_panel",
|
|
|
+ "expanded": False,
|
|
|
+ "header": {
|
|
|
+ "title": {
|
|
|
+ "tag": "plain_text",
|
|
|
+ "content": key
|
|
|
+ },
|
|
|
+ "vertical_align": "center",
|
|
|
+ "template": header_template[header_template_index % len(header_template)],
|
|
|
+ },
|
|
|
+ "border": {
|
|
|
+ "color": "grey",
|
|
|
+ "corner_radius": "5px"
|
|
|
+ },
|
|
|
+ "elements": [
|
|
|
+ {
|
|
|
+ "tag": "markdown",
|
|
|
+ "content": content
|
|
|
+ }
|
|
|
+ ]
|
|
|
+ })
|
|
|
+ return {
|
|
|
+ "schema": "2.0",
|
|
|
+ "header": {
|
|
|
+ "title": {
|
|
|
+ "content": title
|
|
|
+ },
|
|
|
+ "template": header_template[header_template_index % len(header_template)],
|
|
|
+ },
|
|
|
+ "body": {
|
|
|
+ "elements": elements
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
def build_table_json(df: pandas.DataFrame, title: str) -> Dict[str, Any]:
|
|
|
columns = []
|
|
|
for column in df.columns:
|
|
|
@@ -71,7 +121,7 @@ def build_table_json(df: pandas.DataFrame, title: str) -> Dict[str, Any]:
|
|
|
}
|
|
|
|
|
|
|
|
|
-def task_exe_step_stat(ts: int, workflow_id: str, workflow_name: str):
|
|
|
+def task_exe_step_stat_query(ts: int, workflow_id: str):
|
|
|
sql = f'''
|
|
|
select step_name AS '步骤名称',
|
|
|
status as '执行状态',
|
|
|
@@ -105,16 +155,10 @@ def task_exe_step_stat(ts: int, workflow_id: str, workflow_name: str):
|
|
|
) as t
|
|
|
group by step_name, status, error_msg;
|
|
|
'''
|
|
|
- stat = mysql_helper.execute_query(sql)
|
|
|
- df = pd.DataFrame(stat)
|
|
|
+ return mysql_helper.execute_query(sql)
|
|
|
|
|
|
- feishu_inform_util.send_card_msg_to_feishu(
|
|
|
- webhook=fei_shu_webhook,
|
|
|
- card_json=build_table_json(df, f"【workflow-{workflow_name}】各步骤执行情况")
|
|
|
- )
|
|
|
|
|
|
-
|
|
|
-def workflow_status_stat(ts: int, workflow_id: str, workflow_name: str):
|
|
|
+def workflow_status_stat_query(ts: int, workflow_id: str):
|
|
|
sql = f'''
|
|
|
select t.step as '阶段',
|
|
|
t.status as '状态',
|
|
|
@@ -196,13 +240,7 @@ def workflow_status_stat(ts: int, workflow_id: str, workflow_name: str):
|
|
|
left join supply_workflow sw on t.workflow_id = sw.id
|
|
|
where workflow_id = '{workflow_id}';
|
|
|
'''
|
|
|
- stat = mysql_helper.execute_query(sql)
|
|
|
- df = pd.DataFrame(stat)
|
|
|
-
|
|
|
- feishu_inform_util.send_card_msg_to_feishu(
|
|
|
- webhook=fei_shu_webhook,
|
|
|
- card_json=build_table_json(df, f"【workflow-{workflow_name}】爬取和生成详情")
|
|
|
- )
|
|
|
+ return mysql_helper.execute_query(sql)
|
|
|
|
|
|
|
|
|
def main():
|
|
|
@@ -211,9 +249,20 @@ def main():
|
|
|
timestamp_ms = int(today_midnight.timestamp() * 1000)
|
|
|
workflows = mysql_helper.execute_query('select id, name from supply_workflow;')
|
|
|
for workflow in workflows:
|
|
|
- task_exe_step_stat(timestamp_ms, workflow['id'], workflow['name'])
|
|
|
- workflow_status_stat(timestamp_ms, workflow['id'], workflow['name'])
|
|
|
- header_template_index = header_template_index + 1
|
|
|
+ exe_step_stat = task_exe_step_stat_query(timestamp_ms, workflow['id'])
|
|
|
+ status_stat = workflow_status_stat_query(timestamp_ms, workflow['id'])
|
|
|
+
|
|
|
+ df = pd.DataFrame(exe_step_stat)
|
|
|
+
|
|
|
+ df_map = {
|
|
|
+ "各步骤执行情况统计": pd.DataFrame(exe_step_stat),
|
|
|
+ "爬取和生成情况统计": pd.DataFrame(status_stat),
|
|
|
+ }
|
|
|
+
|
|
|
+ feishu_inform_util.send_card_msg_to_feishu(
|
|
|
+ webhook=fei_shu_webhook,
|
|
|
+ card_json=build_collapsible_panel_table_json(df_map, f"【workflow-{workflow['name']}】数据统计")
|
|
|
+ )
|
|
|
|
|
|
|
|
|
if __name__ == '__main__':
|