| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137 |
- import json
- from loguru import logger
- import sys
- import time
- from utils.sync_mysql_help import mysql
- from typing import List
- from datetime import datetime
- from utils.params import DecodeParam
- from examples.run_batch import process_single_video
- from src.workflows.what_deconstruction_workflow import WhatDeconstructionWorkflow
- logger.add(sink=sys.stderr, level="ERROR", backtrace=True, diagnose=True)
- def save_decode_result(results, timestamp, total, success_count, fail_count):
- output_data = {
- "timestamp": timestamp,
- "total": total,
- "success_count": success_count,
- "fail_count": fail_count,
- "results": results
- }
- return output_data
-
- def invoke_decode_workflow(video_list: List[DecodeParam]):
- """主函数"""
- print("=" * 80)
- print("批量处理视频 - What 解构工作流(视频分析版本)")
- print("=" * 80)
- # 1. 读取视频列表
- print(f"\n[1] 读取视频列表...{video_list}")
- # 2. 初始化工作流
- print("\n[2] 初始化工作流...")
- try:
- workflow = WhatDeconstructionWorkflow(
- model_provider="google_genai",
- max_depth=10
- )
- print(f"✅ 工作流初始化成功")
- except Exception as e:
- print(f"❌ 工作流初始化失败: {e}")
- import traceback
- traceback.print_exc()
- return
- # 3. 准备结果文件路径和时间戳
- print("\n[3] 准备结果文件...")
-
- # 4. 批量处理视频(每处理完一个立即保存)
- print("\n[4] 开始批量处理视频(每处理完一个立即保存结果)...")
- results = []
- total = len(video_list)
- success_count = 0
- fail_count = 0
- for index, video_data in enumerate[DecodeParam](video_list, 1):
- # 处理单个视频
- result = process_single_video(workflow, video_data, index, total)
- results.append(result)
- # 更新统计
- if result["success"]:
- success_count += 1
- else:
- fail_count += 1
- timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
- # 立即保存结果到文件
- output_data = save_decode_result(results, timestamp, total, success_count, fail_count)
- # print(f" 保存结果到文件... [{success_count}成功/{fail_count}失败/{total}总计]")
- if output_data:
- print(f"✅ 结果已实时保存到: {output_path}")
- return output_data
- else:
- print(f"❌ 保存结果失败,但将继续处理")
- return None
- def get_decode_result_by_id(task_id:str):
- sql = "SELECT * FROM decode_record WHERE task_id = %s AND task_status = 2 "
- tasks = mysql.fetchone(sql, (task_id,))
- if not tasks:
- logger.info(f"task_id = {task_id} , 任务不存在")
- return None
- return tasks['decode_result'], tasks['task_status']
- def decode_task_status_handler(task_id:str):
- # 从数据库中获取任务,每次获取一个
- sql = "SELECT * FROM decode_record WHERE task_status = 1 "
- """json"""
- tasks = mysql.fetchall(sql)
- """字典"""
- # tasks = mysql.fetchall(sql,())
- if not tasks:
- logger.info("任务列表为空")
- return
- for task in tasks:
- task_id = task['task_id']
- task_params = json.loads(task['task_params'])
- logger.info(f"task_id = {task_id} , task_params = {task_params}")
- #如何任务超过30分钟,则认为任务超时,更新任务状态为3
- task_create_timestamp = task['create_timestamp']
- current_timestamp = int(time.time() * 1000)
-
-
- # 获取任务结果
- try:
- decode_result = invoke_decode_workflow(task_params)
- if decode_result:
- # 更新任务状态为2,任务完成
- sql = "UPDATE decode_record SET task_status = 2, decode_result = %s WHERE task_id = %s"
- mysql.execute(sql, (json.dumps(decode_result, ensure_ascii=False), task_id))
- logger.info(f"task_id = {task_id} , decode_result = {decode_result}")
- else:
- if current_timestamp - task_create_timestamp > 1000 * 60 * 60:
- sql = "UPDATE decode_record SET task_status = 3, decode_result = %s WHERE task_id = %s"
- mysql.execute(sql, (json.dumps(decode_result, ensure_ascii=False), task_id))
- logger.info(f"task_id = {task_id} ,任务状态异常")
- except Exception as e:
- logger.error(f"task_id = {task_id} , error = {e}")
- sql = "UPDATE decode_record SET task_status = 3, decode_result = '任务异常' WHERE task_id = %s"
- mysql.execute(sql, (task_id))
- logger.info(f"task_id = {task_id} ,任务异常")
- continue
|