|
|
@@ -302,58 +302,66 @@ def crawler_and_produce_stat_query(ts: int, workflow_id: str) -> List[Dict[str,
|
|
|
return mysql_helper.execute_query(sql)
|
|
|
|
|
|
|
|
|
-def main():
|
|
|
- global header_template_index
|
|
|
+def workflow_monitor(workflow_id: str, workflow_name: str, status: int):
|
|
|
today_midnight = datetime.now().replace(hour=0, minute=0, second=0, microsecond=0)
|
|
|
|
|
|
start_dt_str = today_midnight.strftime('%Y-%m-%d %H:%M:%S')
|
|
|
end_dt_str = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
|
|
|
timestamp_ms = int(today_midnight.timestamp() * 1000)
|
|
|
+
|
|
|
+ task_status_stat = workflow_task_status_stat(timestamp_ms, workflow_id)
|
|
|
+
|
|
|
+ exe_step_stat = task_exe_step_stat_query(timestamp_ms, workflow_id)
|
|
|
+ status_stat = crawler_and_produce_stat_query(timestamp_ms, workflow_id)
|
|
|
+
|
|
|
+ # 构建标题
|
|
|
+
|
|
|
+ text_tag_list = []
|
|
|
+ if status == 0:
|
|
|
+ text_tag_list.append(build_text_tag_json("已关闭", "red"))
|
|
|
+ elif status == 1:
|
|
|
+ text_tag_list.append(build_text_tag_json("开启中", "green"))
|
|
|
+ else:
|
|
|
+ text_tag_list.append(build_text_tag_json("未知", "orange"))
|
|
|
+
|
|
|
+ template = header_template[header_template_index % len(header_template)]
|
|
|
+ sub_title = build_sub_title_json(f"{start_dt_str} - {end_dt_str}")
|
|
|
+ header = build_header_json(f"【workflow策略】{workflow_name}", template, sub_title, text_tag_list)
|
|
|
+
|
|
|
+ # 构建下文内容
|
|
|
+ task_status_df = pd.DataFrame(task_status_stat)
|
|
|
+ task_info = f"**总任务数**: {task_status_df['任务数'].sum()}" + \
|
|
|
+ "".join([
|
|
|
+ f"\n**{item['任务状态']}任务数**: {item['任务数']}"
|
|
|
+ for item in task_status_df.to_dict(orient='records')
|
|
|
+ ])
|
|
|
+
|
|
|
+ elements = [
|
|
|
+ build_markdown_element_json(task_info),
|
|
|
+ build_hr_element_json(),
|
|
|
+ build_table_element_json(pd.DataFrame(exe_step_stat)),
|
|
|
+ build_hr_element_json(),
|
|
|
+ build_table_element_json(pd.DataFrame(status_stat)),
|
|
|
+ ]
|
|
|
+
|
|
|
+ config = build_config_json()
|
|
|
+
|
|
|
+ card_json = build_card_json(elements, header, config)
|
|
|
+
|
|
|
+ feishu_inform_util.send_card_msg_to_feishu(
|
|
|
+ webhook=fei_shu_webhook,
|
|
|
+ card_json=card_json
|
|
|
+ )
|
|
|
+
|
|
|
+
|
|
|
+def main():
|
|
|
+ global header_template_index
|
|
|
workflows = mysql_helper.execute_query('select id, name, status from supply_workflow;')
|
|
|
for workflow in workflows:
|
|
|
- task_status_stat = workflow_task_status_stat(timestamp_ms, workflow['id'])
|
|
|
-
|
|
|
- exe_step_stat = task_exe_step_stat_query(timestamp_ms, workflow['id'])
|
|
|
- status_stat = crawler_and_produce_stat_query(timestamp_ms, workflow['id'])
|
|
|
-
|
|
|
- # 构建标题
|
|
|
-
|
|
|
- text_tag_list = []
|
|
|
- if workflow['status'] == 0:
|
|
|
- text_tag_list.append(build_text_tag_json("已关闭", "red"))
|
|
|
- elif workflow['status'] == 1:
|
|
|
- text_tag_list.append(build_text_tag_json("开启中", "green"))
|
|
|
- else:
|
|
|
- text_tag_list.append(build_text_tag_json("未知", "orange"))
|
|
|
-
|
|
|
- template = header_template[header_template_index % len(header_template)]
|
|
|
- sub_title = build_sub_title_json(f"{start_dt_str} - {end_dt_str}")
|
|
|
- header = build_header_json(f"【workflow策略】{workflow['name']}", template, sub_title, text_tag_list)
|
|
|
-
|
|
|
- # 构建下文内容
|
|
|
- task_status_df = pd.DataFrame(task_status_stat)
|
|
|
- task_info = f"**总任务数**: {task_status_df['任务数'].sum()}" + \
|
|
|
- "".join([
|
|
|
- f"\n**{item['任务状态']}任务数**: {item['任务数']}"
|
|
|
- for item in task_status_df.to_dict(orient='records')
|
|
|
- ])
|
|
|
-
|
|
|
- elements = [
|
|
|
- build_markdown_element_json(task_info),
|
|
|
- build_hr_element_json(),
|
|
|
- build_table_element_json(pd.DataFrame(exe_step_stat)),
|
|
|
- build_hr_element_json(),
|
|
|
- build_table_element_json(pd.DataFrame(status_stat)),
|
|
|
- ]
|
|
|
-
|
|
|
- config = build_config_json()
|
|
|
-
|
|
|
- card_json = build_card_json(elements, header, config)
|
|
|
-
|
|
|
- feishu_inform_util.send_card_msg_to_feishu(
|
|
|
- webhook=fei_shu_webhook,
|
|
|
- card_json=card_json
|
|
|
- )
|
|
|
+ try:
|
|
|
+ workflow_monitor(workflow['id'], workflow['name'], workflow['status'])
|
|
|
+ except Exception as e:
|
|
|
+ print(f"【workflow策略】{workflow['name']} 监控异常: {e}")
|
|
|
|
|
|
|
|
|
if __name__ == '__main__':
|