decodeTask.py 2.8 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788
  1. from ast import Dict
  2. import json
  3. import re
  4. from loguru import logger
  5. import sys
  6. import time
  7. from utils.sync_mysql_help import mysql
  8. from typing import List
  9. from datetime import datetime
  10. from utils.params import DecodeWorkflowParam
  11. from src.workflows.decode_workflow import DecodeWorkflow
  12. from src.workflows.what_deconstruction_workflow import WhatDeconstructionWorkflow
  13. logger.add(sink=sys.stderr, level="ERROR", backtrace=True, diagnose=True)
  14. def invoke_decode_workflow(task_params: Dict[DecodeWorkflowParam]):
  15. """主函数"""
  16. result = DecodeWorkflow(task_params)
  17. if result:
  18. return result
  19. else:
  20. print(f"❌ 保存结果失败,但将继续处理")
  21. return None
  22. def get_decode_result_by_id(task_id:str):
  23. sql = "SELECT * FROM decode_record WHERE task_id = %s AND task_status = 2 "
  24. tasks = mysql.fetchone(sql, (task_id,))
  25. if not tasks:
  26. logger.info(f"task_id = {task_id} , 任务不存在")
  27. return None
  28. return tasks['decode_result'], tasks['task_status']
  29. def decode_task_status_handler():
  30. # 从数据库中获取任务,每次获取一个
  31. sql = "SELECT * FROM decode_record WHERE task_status = 0 "
  32. """json"""
  33. task = mysql.fetchone(sql)
  34. task_id = task['task_id']
  35. video_url = task['video_url']
  36. if not task:
  37. logger.info("任务列表为空")
  38. return
  39. else:
  40. sql = "UPDATE decode_record SET task_status = 1 WHERE task_id = %s"
  41. mysql.execute(sql, (task_id,))
  42. # 获取任务结果
  43. try:
  44. video_id = task['video_id']
  45. task_params = {'task_id':task_id, 'video_id':video_id, 'video_url':video_url}
  46. logger.info(f"task_id = {task_id} , video_id = {video_id}")
  47. task_create_timestamp = task['create_timestamp']
  48. current_timestamp = int(time.time() * 1000)
  49. decode_result = invoke_decode_workflow(task_params)
  50. if decode_result:
  51. # 更新任务状态为2,任务完成
  52. sql = "UPDATE decode_record SET task_status = 2, WHERE task_id = %s"
  53. mysql.execute(sql, (task_id))
  54. logger.info(f"task_id = {task_id} , decode_result = {decode_result}")
  55. else:
  56. if current_timestamp - task_create_timestamp > 1000 * 60 * 60:
  57. sql = "UPDATE decode_record SET task_status = 3, WHERE task_id = %s"
  58. mysql.execute(sql, (task_id))
  59. logger.info(f"task_id = {task_id} ,任务状态异常")
  60. except Exception as e:
  61. logger.error(f"task_id = {task_id} , error = {e}")
  62. sql = "UPDATE decode_record SET task_status = 3, WHERE task_id = %s"
  63. mysql.execute(sql, (task_id))
  64. logger.info(f"task_id = {task_id} ,任务异常")
  65. raise {"task_id": task_id, "error": '任务异常'}