decodeTask.py 2.6 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283
  1. from loguru import logger
  2. import sys
  3. import time
  4. from utils.sync_mysql_help import mysql
  5. from typing import Dict, Any
  6. from utils.params import DecodeWorkflowParam
  7. from src.workflows.decode_workflow import DecodeWorkflow
  8. logger.add(sink=sys.stderr, level="ERROR", backtrace=True, diagnose=True)
  9. def invoke_decode_workflow(task_params: Dict[str, Any]):
  10. """主函数"""
  11. result = DecodeWorkflow(task_params)
  12. if result:
  13. return result
  14. else:
  15. print(f"❌ 保存结果失败,但将继续处理")
  16. return None
  17. def get_decode_result_by_id(task_id:str):
  18. sql = "SELECT * FROM decode_videos WHERE task_id = %s AND task_status = 2 "
  19. tasks = mysql.fetchone(sql, (task_id,))
  20. if not tasks:
  21. logger.info(f"task_id = {task_id} , 任务不存在")
  22. return None
  23. return tasks['result'], tasks['status']
  24. def decode_task_status_handler():
  25. # 从数据库中获取任务,每次获取一个
  26. sql = "SELECT * FROM decode_record WHERE task_status = 0 "
  27. """json"""
  28. task = mysql.fetchone(sql)
  29. task_id = task['task_id']
  30. video_url = task['video_url']
  31. if not task:
  32. logger.info("任务列表为空")
  33. return
  34. else:
  35. sql = "UPDATE decode_record SET task_status = 1 WHERE task_id = %s"
  36. mysql.execute(sql, (task_id,))
  37. # 获取任务结果
  38. try:
  39. video_id = task['video_id']
  40. task_params = {'task_id':task_id, 'video_id':video_id, 'video_url':video_url}
  41. logger.info(f"task_id = {task_id} , video_id = {video_id}")
  42. task_create_timestamp = task['create_timestamp']
  43. current_timestamp = int(time.time() * 1000)
  44. decode_result = invoke_decode_workflow(task_params)
  45. if decode_result:
  46. # 更新任务状态为2,任务完成
  47. sql = "UPDATE decode_record SET task_status = 2, WHERE task_id = %s"
  48. mysql.execute(sql, (task_id))
  49. logger.info(f"task_id = {task_id} , decode_result = {decode_result}")
  50. else:
  51. if current_timestamp - task_create_timestamp > 1000 * 60 * 60:
  52. sql = "UPDATE decode_record SET task_status = 3, WHERE task_id = %s"
  53. mysql.execute(sql, (task_id))
  54. logger.info(f"task_id = {task_id} ,任务状态异常")
  55. except Exception as e:
  56. logger.error(f"task_id = {task_id} , error = {e}")
  57. sql = "UPDATE decode_record SET task_status = 3, WHERE task_id = %s"
  58. mysql.execute(sql, (task_id))
  59. logger.info(f"task_id = {task_id} ,任务异常")
  60. raise {"task_id": task_id, "error": '任务异常'}