evalueteTask.py 3.1 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889
  1. import json
  2. from loguru import logger
  3. import sys
  4. import time
  5. from utils.sync_mysql_help import mysql
  6. from typing import Dict, Any
  7. from utils.params import DecodeWorkflowParam
  8. from src.workflows.decode_workflow import DecodeWorkflow
  9. logger.add(sink=sys.stderr, level="ERROR", backtrace=True, diagnose=True)
  10. def invoke_evaluate_workflow(evaluate_params: Dict[str, Any]):
  11. """主函数"""
  12. workflow = DecodeWorkflow()
  13. result = workflow.invoke(evaluate_params)
  14. if result:
  15. return result
  16. else:
  17. print(f"❌ 保存结果失败,但将继续处理")
  18. return None
  19. def get_evaluate_result_by_id(evaluate_id:str):
  20. sql = "SELECT * FROM evaluate_record WHERE evaluate_id = %s"
  21. tasks = mysql.fetchone(sql, (evaluate_id,))
  22. if not tasks:
  23. logger.info(f"evaluate_id = {evaluate_id} , 任务不存在")
  24. return None
  25. return tasks['evaluate_result'], tasks['status'],tasks['error_reason']
  26. def evaluate_task_status_handler():
  27. # 从数据库中获取任务,每次获取一个
  28. sql = "SELECT * FROM evaluate_record WHERE status = 0 ORDER BY create_timestamp ASC LIMIT 1"
  29. task = mysql.fetchone(sql)
  30. if not task:
  31. logger.info("任务列表为空")
  32. return
  33. else:
  34. task_id = task['evaluate_id']
  35. sql = "UPDATE evaluate_record SET status = 1 WHERE evaluate_id = %s AND status = 0"
  36. mysql.execute(sql, (task_id,))
  37. # 获取任务结果
  38. try:
  39. evaluate_id = task['evaluate_id']
  40. video_url = task['video_url']
  41. video_id = task['video_id']
  42. evaluate_params = {'evaluate_id':evaluate_id, 'video_id':video_id, 'video_url':video_url}
  43. logger.info(f"evaluate_id = {evaluate_id} , video_id = {video_id}")
  44. task_create_timestamp = task['created_at']
  45. current_timestamp = int(time.time() * 1000)
  46. evaluate_result = invoke_evaluate_workflow(evaluate_params)
  47. if evaluate_result:
  48. # 更新任务状态为2,任务完成
  49. sql = "UPDATE evaluate_record SET status = 2, evaluate_result = %s WHERE evaluate_id = %s"
  50. mysql.execute(sql, (json.dumps(evaluate_result),evaluate_id))
  51. logger.info(f"evaluate_id = {evaluate_id} , evaluate_result = {evaluate_result}")
  52. else:
  53. if current_timestamp - task_create_timestamp > 1000 * 60 * 60:
  54. sql = "UPDATE evaluate_record SET status = 3, error_reason = %s WHERE evaluate_id = %s"
  55. mysql.execute(sql, (json.dumps(evaluate_result),evaluate_id))
  56. logger.info(f"evaluate_id = {evaluate_id} ,任务状态异常")
  57. except Exception as e:
  58. logger.error(f"evaluate_id = {evaluate_id} , error = {e}")
  59. sql = "UPDATE evaluate_record SET status = 3, error_reason = %s WHERE evaluate_id = %s"
  60. mysql.execute(sql, (json.dumps(str(e)),evaluate_id))
  61. logger.info(f"evaluate_id = {evaluate_id} ,任务异常")
  62. raise RuntimeError(f"evaluate_id={evaluate_id} 任务异常: {e}")
  63. # if __name__ == "__main__":
  64. # decode_task_status_handler()