decodeTask.py 4.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136
  1. import json
  2. from loguru import logger
  3. import sys
  4. import time
  5. from utils.sync_mysql_help import mysql
  6. from typing import List
  7. from datetime import datetime
  8. from utils.params import DecodeParam
  9. from examples.run_batch import process_single_video
  10. from src.workflows.what_deconstruction_workflow import WhatDeconstructionWorkflow
  11. logger.add(sink=sys.stderr, level="ERROR", backtrace=True, diagnose=True)
  12. def save_decode_result(results, timestamp, total, success_count, fail_count):
  13. output_data = {
  14. "timestamp": timestamp,
  15. "total": total,
  16. "success_count": success_count,
  17. "fail_count": fail_count,
  18. "results": results
  19. }
  20. return output_data
  21. def invoke_decode_workflow(video_list: List[DecodeParam]):
  22. """主函数"""
  23. print("=" * 80)
  24. print("批量处理视频 - What 解构工作流(视频分析版本)")
  25. print("=" * 80)
  26. # 1. 读取视频列表
  27. print(f"\n[1] 读取视频列表...{video_list}")
  28. # 2. 初始化工作流
  29. print("\n[2] 初始化工作流...")
  30. try:
  31. workflow = WhatDeconstructionWorkflow(
  32. model_provider="google_genai",
  33. max_depth=10
  34. )
  35. print(f"✅ 工作流初始化成功")
  36. except Exception as e:
  37. print(f"❌ 工作流初始化失败: {e}")
  38. import traceback
  39. traceback.print_exc()
  40. return
  41. # 3. 准备结果文件路径和时间戳
  42. print("\n[3] 准备结果文件...")
  43. # 4. 批量处理视频(每处理完一个立即保存)
  44. print("\n[4] 开始批量处理视频(每处理完一个立即保存结果)...")
  45. results = []
  46. total = len(video_list)
  47. success_count = 0
  48. fail_count = 0
  49. for index, video_data in enumerate[DecodeParam](video_list, 1):
  50. # 处理单个视频
  51. result = process_single_video(workflow, video_data, index, total)
  52. results.append(result)
  53. # 更新统计
  54. if result["success"]:
  55. success_count += 1
  56. else:
  57. fail_count += 1
  58. timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
  59. # 立即保存结果到文件
  60. output_data = save_decode_result(results, timestamp, total, success_count, fail_count)
  61. # print(f" 保存结果到文件... [{success_count}成功/{fail_count}失败/{total}总计]")
  62. if output_data:
  63. return output_data
  64. else:
  65. print(f"❌ 保存结果失败,但将继续处理")
  66. return None
  67. def get_decode_result_by_id(task_id:str):
  68. sql = "SELECT * FROM decode_record WHERE task_id = %s AND task_status = 2 "
  69. tasks = mysql.fetchone(sql, (task_id,))
  70. if not tasks:
  71. logger.info(f"task_id = {task_id} , 任务不存在")
  72. return None
  73. return tasks['decode_result'], tasks['task_status']
  74. def decode_task_status_handler():
  75. # 从数据库中获取任务,每次获取一个
  76. sql = "SELECT * FROM decode_record WHERE task_status = 1 "
  77. """json"""
  78. tasks = mysql.fetchall(sql)
  79. """字典"""
  80. # tasks = mysql.fetchall(sql,())
  81. if not tasks:
  82. logger.info("任务列表为空")
  83. return
  84. for task in tasks:
  85. task_id = task['task_id']
  86. task_params = json.loads(task['task_params'])
  87. logger.info(f"task_id = {task_id} , task_params = {task_params}")
  88. #如何任务超过30分钟,则认为任务超时,更新任务状态为3
  89. task_create_timestamp = task['create_timestamp']
  90. current_timestamp = int(time.time() * 1000)
  91. # 获取任务结果
  92. try:
  93. decode_result = invoke_decode_workflow(task_params['video_list'])
  94. if decode_result:
  95. # 更新任务状态为2,任务完成
  96. sql = "UPDATE decode_record SET task_status = 2, decode_result = %s WHERE task_id = %s"
  97. mysql.execute(sql, (json.dumps(decode_result, ensure_ascii=False), task_id))
  98. logger.info(f"task_id = {task_id} , decode_result = {decode_result}")
  99. else:
  100. if current_timestamp - task_create_timestamp > 1000 * 60 * 60:
  101. sql = "UPDATE decode_record SET task_status = 5, decode_result = %s WHERE task_id = %s"
  102. mysql.execute(sql, (json.dumps(decode_result, ensure_ascii=False), task_id))
  103. logger.info(f"task_id = {task_id} ,任务状态异常")
  104. except Exception as e:
  105. logger.error(f"task_id = {task_id} , error = {e}")
  106. sql = "UPDATE decode_record SET task_status = 5, decode_result = '任务异常' WHERE task_id = %s"
  107. mysql.execute(sql, (task_id))
  108. logger.info(f"task_id = {task_id} ,任务异常")
  109. continue