import json from loguru import logger import sys import time from utils.sync_mysql_help import mysql from typing import List from datetime import datetime from utils.params import DecodeParam from examples.run_batch import process_single_video from src.workflows.what_deconstruction_workflow import WhatDeconstructionWorkflow logger.add(sink=sys.stderr, level="ERROR", backtrace=True, diagnose=True) def save_decode_result(results, timestamp, total, success_count, fail_count): output_data = { "timestamp": timestamp, "total": total, "success_count": success_count, "fail_count": fail_count, "results": results } return output_data def invoke_decode_workflow(video_list: List[DecodeParam]): """主函数""" print("=" * 80) print("批量处理视频 - What 解构工作流(视频分析版本)") print("=" * 80) # 1. 读取视频列表 print(f"\n[1] 读取视频列表...{video_list}") # 2. 初始化工作流 print("\n[2] 初始化工作流...") try: workflow = WhatDeconstructionWorkflow( model_provider="google_genai", max_depth=10 ) print(f"✅ 工作流初始化成功") except Exception as e: print(f"❌ 工作流初始化失败: {e}") import traceback traceback.print_exc() return # 3. 准备结果文件路径和时间戳 print("\n[3] 准备结果文件...") # 4. 批量处理视频(每处理完一个立即保存) print("\n[4] 开始批量处理视频(每处理完一个立即保存结果)...") results = [] total = len(video_list) success_count = 0 fail_count = 0 for index, video_data in enumerate[DecodeParam](video_list, 1): # 处理单个视频 result = process_single_video(workflow, video_data, index, total) results.append(result) # 更新统计 if result["success"]: success_count += 1 else: fail_count += 1 timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") # 立即保存结果到文件 output_data = save_decode_result(results, timestamp, total, success_count, fail_count) # print(f" 保存结果到文件... [{success_count}成功/{fail_count}失败/{total}总计]") if output_data: print(f"✅ 结果已实时保存到: {output_path}") return output_data else: print(f"❌ 保存结果失败,但将继续处理") return None def get_decode_result_by_id(task_id:str): sql = "SELECT * FROM decode_record WHERE task_id = %s AND task_status = 2 " tasks = mysql.fetchone(sql, (task_id,)) if not tasks: logger.info(f"task_id = {task_id} , 任务不存在") return None return tasks['decode_result'], tasks['task_status'] def decode_task_status_handler(task_id:str): # 从数据库中获取任务,每次获取一个 sql = "SELECT * FROM decode_record WHERE task_status = 1 " """json""" tasks = mysql.fetchall(sql) """字典""" # tasks = mysql.fetchall(sql,()) if not tasks: logger.info("任务列表为空") return for task in tasks: task_id = task['task_id'] task_params = json.loads(task['task_params']) logger.info(f"task_id = {task_id} , task_params = {task_params}") #如何任务超过30分钟,则认为任务超时,更新任务状态为3 task_create_timestamp = task['create_timestamp'] current_timestamp = int(time.time() * 1000) # 获取任务结果 try: decode_result = invoke_decode_workflow(task_params) if decode_result: # 更新任务状态为2,任务完成 sql = "UPDATE decode_record SET task_status = 2, decode_result = %s WHERE task_id = %s" mysql.execute(sql, (json.dumps(decode_result, ensure_ascii=False), task_id)) logger.info(f"task_id = {task_id} , decode_result = {decode_result}") else: if current_timestamp - task_create_timestamp > 1000 * 60 * 60: sql = "UPDATE decode_record SET task_status = 3, decode_result = %s WHERE task_id = %s" mysql.execute(sql, (json.dumps(decode_result, ensure_ascii=False), task_id)) logger.info(f"task_id = {task_id} ,任务状态异常") except Exception as e: logger.error(f"task_id = {task_id} , error = {e}") sql = "UPDATE decode_record SET task_status = 3, decode_result = '任务异常' WHERE task_id = %s" mysql.execute(sql, (task_id)) logger.info(f"task_id = {task_id} ,任务异常") continue