decodeTask.py 4.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137
  1. import json
  2. from loguru import logger
  3. import sys
  4. import time
  5. from utils.sync_mysql_help import mysql
  6. from typing import List
  7. from datetime import datetime
  8. from utils.params import DecodeParam
  9. from examples.run_batch import process_single_video
  10. from src.workflows.what_deconstruction_workflow import WhatDeconstructionWorkflow
  11. logger.add(sink=sys.stderr, level="ERROR", backtrace=True, diagnose=True)
  12. def save_decode_result(results, timestamp, total, success_count, fail_count):
  13. output_data = {
  14. "timestamp": timestamp,
  15. "total": total,
  16. "success_count": success_count,
  17. "fail_count": fail_count,
  18. "results": results
  19. }
  20. return output_data
  21. def invoke_decode_workflow(video_list: List[DecodeParam]):
  22. """主函数"""
  23. print("=" * 80)
  24. print("批量处理视频 - What 解构工作流(视频分析版本)")
  25. print("=" * 80)
  26. # 1. 读取视频列表
  27. print(f"\n[1] 读取视频列表...{video_list}")
  28. # 2. 初始化工作流
  29. print("\n[2] 初始化工作流...")
  30. try:
  31. workflow = WhatDeconstructionWorkflow(
  32. model_provider="google_genai",
  33. max_depth=10
  34. )
  35. print(f"✅ 工作流初始化成功")
  36. except Exception as e:
  37. print(f"❌ 工作流初始化失败: {e}")
  38. import traceback
  39. traceback.print_exc()
  40. return
  41. # 3. 准备结果文件路径和时间戳
  42. print("\n[3] 准备结果文件...")
  43. # 4. 批量处理视频(每处理完一个立即保存)
  44. print("\n[4] 开始批量处理视频(每处理完一个立即保存结果)...")
  45. results = []
  46. total = len(video_list)
  47. success_count = 0
  48. fail_count = 0
  49. for index, video_data in enumerate[DecodeParam](video_list, 1):
  50. # 处理单个视频
  51. result = process_single_video(workflow, video_data, index, total)
  52. results.append(result)
  53. # 更新统计
  54. if result["success"]:
  55. success_count += 1
  56. else:
  57. fail_count += 1
  58. timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
  59. # 立即保存结果到文件
  60. output_data = save_decode_result(results, timestamp, total, success_count, fail_count)
  61. # print(f" 保存结果到文件... [{success_count}成功/{fail_count}失败/{total}总计]")
  62. if output_data:
  63. print(f"✅ 结果已实时保存到: {output_path}")
  64. return output_data
  65. else:
  66. print(f"❌ 保存结果失败,但将继续处理")
  67. return None
  68. def get_decode_result_by_id(task_id:str):
  69. sql = "SELECT * FROM decode_record WHERE task_id = %s AND task_status = 2 "
  70. tasks = mysql.fetchone(sql, (task_id,))
  71. if not tasks:
  72. logger.info(f"task_id = {task_id} , 任务不存在")
  73. return None
  74. return tasks['decode_result'], tasks['task_status']
  75. def decode_task_status_handler(task_id:str):
  76. # 从数据库中获取任务,每次获取一个
  77. sql = "SELECT * FROM decode_record WHERE task_status = 1 "
  78. """json"""
  79. tasks = mysql.fetchall(sql)
  80. """字典"""
  81. # tasks = mysql.fetchall(sql,())
  82. if not tasks:
  83. logger.info("任务列表为空")
  84. return
  85. for task in tasks:
  86. task_id = task['task_id']
  87. task_params = json.loads(task['task_params'])
  88. logger.info(f"task_id = {task_id} , task_params = {task_params}")
  89. #如何任务超过30分钟,则认为任务超时,更新任务状态为3
  90. task_create_timestamp = task['create_timestamp']
  91. current_timestamp = int(time.time() * 1000)
  92. # 获取任务结果
  93. try:
  94. decode_result = invoke_decode_workflow(task_params)
  95. if decode_result:
  96. # 更新任务状态为2,任务完成
  97. sql = "UPDATE decode_record SET task_status = 2, decode_result = %s WHERE task_id = %s"
  98. mysql.execute(sql, (json.dumps(decode_result, ensure_ascii=False), task_id))
  99. logger.info(f"task_id = {task_id} , decode_result = {decode_result}")
  100. else:
  101. if current_timestamp - task_create_timestamp > 1000 * 60 * 60:
  102. sql = "UPDATE decode_record SET task_status = 3, decode_result = %s WHERE task_id = %s"
  103. mysql.execute(sql, (json.dumps(decode_result, ensure_ascii=False), task_id))
  104. logger.info(f"task_id = {task_id} ,任务状态异常")
  105. except Exception as e:
  106. logger.error(f"task_id = {task_id} , error = {e}")
  107. sql = "UPDATE decode_record SET task_status = 3, decode_result = '任务异常' WHERE task_id = %s"
  108. mysql.execute(sql, (task_id))
  109. logger.info(f"task_id = {task_id} ,任务异常")
  110. continue