decodeTask.py 3.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110
  1. import json
  2. import os
  3. from loguru import logger
  4. import sys
  5. import time
  6. from utils.sync_mysql_help import mysql
  7. from typing import Dict, Any
  8. from utils.params import DecodeWorkflowParam
  9. from src.workflows.decode_workflow import DecodeWorkflow
  10. logger.add(sink=sys.stderr, level="ERROR", backtrace=True, diagnose=True)
  11. def invoke_decode_workflow(task_params: Dict[str, Any]):
  12. """主函数"""
  13. workflow = DecodeWorkflow()
  14. result = workflow.invoke(task_params)
  15. if result:
  16. return result
  17. else:
  18. print(f"❌ 保存结果失败,但将继续处理")
  19. return None
  20. def get_decode_result_by_id(task_id:str):
  21. sql = "SELECT * FROM decode_videos WHERE task_id = %s"
  22. tasks = mysql.fetchone(sql, (task_id,))
  23. if not tasks:
  24. logger.info(f"task_id = {task_id} , 任务不存在")
  25. return None
  26. return tasks['result'], tasks['status'],tasks['error_reason'],tasks['search_keywords']
  27. def decode_task_status_handler():
  28. # 从数据库中获取任务,每次获取一个
  29. api_key = os.getenv("GEMINI_API_KEY") or os.getenv("GOOGLE_API_KEY")
  30. logger.error(f"❌ GEMINI_API_KEY = {api_key}")
  31. sql = "SELECT * FROM decode_record WHERE task_status = 0 ORDER BY create_timestamp ASC LIMIT 1"
  32. task = mysql.fetchone(sql)
  33. if not task:
  34. logger.info("没有status为0的任务了")
  35. return
  36. else:
  37. task_id = task['task_id']
  38. sql = "UPDATE decode_record SET task_status = 1 WHERE task_id = %s AND task_status = 0"
  39. mysql.execute(sql, (task_id,))
  40. # 获取任务结果
  41. try:
  42. logger.info(f"开始执行任务id = {task['task_id']}")
  43. task_id = task['task_id']
  44. video_url = task['video_url']
  45. video_id = task['video_id']
  46. task_params = {'task_id':task_id, 'video_id':video_id, 'video_url':video_url}
  47. logger.info(f"task_id = {task_id} , video_id = {video_id}")
  48. task_create_timestamp = task['create_timestamp']
  49. current_timestamp = int(time.time() * 1000)
  50. decode_result = invoke_decode_workflow(task_params)
  51. logger.info(f" 🐔 结构结果: decode_result = {decode_result}")
  52. # 根据返回结果判断任务状态:仅当 status==2 时置为完成,否则置为失败
  53. status_code = None
  54. if isinstance(decode_result, dict):
  55. raw_status = decode_result.get("status")
  56. if raw_status is not None:
  57. try:
  58. status_code = int(raw_status)
  59. except Exception:
  60. status_code = 3
  61. else:
  62. status_code = 2
  63. else:
  64. status_code = 2 if decode_result else 3
  65. elapsed = current_timestamp - task_create_timestamp
  66. if status_code == 2:
  67. sql = "UPDATE decode_record SET task_status = 2 WHERE task_id = %s"
  68. mysql.execute(sql, (task_id,))
  69. logger.info(f"task_id = {task_id} , 任务完成,更新为2")
  70. elif elapsed > 1000 * 60 * 60:
  71. sql = "UPDATE decode_record SET task_status = 3 WHERE task_id = %s"
  72. mysql.execute(sql, (task_id,))
  73. logger.info(f"task_id = {task_id} ,任务超时,更新为3")
  74. else:
  75. logger.info(f"task_id = {task_id} ,未成功且未超时,保持状态1")
  76. except Exception as e:
  77. logger.error(f"task_id = {task_id} , error = {e}")
  78. sql = "UPDATE decode_record SET task_status = 3 WHERE task_id = %s"
  79. mysql.execute(sql, (task_id,))
  80. logger.info(f"task_id = {task_id} ,任务异常")
  81. raise RuntimeError(f"task_id={task_id} 任务异常: {e}")
  82. # if __name__ == "__main__":
  83. # decode_task_status_handler()