| 12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485 |
- from loguru import logger
- import sys
- import time
- from utils.sync_mysql_help import mysql
- from typing import Dict, Any
- from utils.params import DecodeWorkflowParam
- from src.workflows.decode_workflow import DecodeWorkflow
- logger.add(sink=sys.stderr, level="ERROR", backtrace=True, diagnose=True)
-
- def invoke_decode_workflow(task_params: Dict[str, Any]):
- """主函数"""
- result = DecodeWorkflow(task_params)
- if result:
- return result
- else:
- print(f"❌ 保存结果失败,但将继续处理")
- return None
- def get_decode_result_by_id(task_id:str):
- sql = "SELECT * FROM decode_videos WHERE task_id = %s AND task_status = 2 "
- tasks = mysql.fetchone(sql, (task_id,))
- if not tasks:
- logger.info(f"task_id = {task_id} , 任务不存在")
- return None
- return tasks['result'], tasks['status']
- def decode_task_status_handler():
- # 从数据库中获取任务,每次获取一个
- sql = "SELECT * FROM decode_record WHERE task_status = 0 "
- """json"""
- task = mysql.fetchone(sql)
-
- if not task:
- logger.info("任务列表为空")
- return
- else:
- task_id = task['task_id']
- sql = "UPDATE decode_record SET task_status = 1 WHERE task_id = %s"
- mysql.execute(sql, (task_id,))
-
- # 获取任务结果
- try:
- task_id = task['task_id']
- video_url = task['video_url']
- video_id = task['video_id']
- task_params = {'task_id':task_id, 'video_id':video_id, 'video_url':video_url}
- logger.info(f"task_id = {task_id} , video_id = {video_id}")
- task_create_timestamp = task['create_timestamp']
- current_timestamp = int(time.time() * 1000)
- decode_result = invoke_decode_workflow(task_params)
- if decode_result:
- # 更新任务状态为2,任务完成
- sql = "UPDATE decode_record SET task_status = 2, WHERE task_id = %s"
- mysql.execute(sql, (task_id))
- logger.info(f"task_id = {task_id} , decode_result = {decode_result}")
-
- else:
- if current_timestamp - task_create_timestamp > 1000 * 60 * 60:
- sql = "UPDATE decode_record SET task_status = 3, WHERE task_id = %s"
- mysql.execute(sql, (task_id))
- logger.info(f"task_id = {task_id} ,任务状态异常")
- except Exception as e:
- logger.error(f"task_id = {task_id} , error = {e}")
- sql = "UPDATE decode_record SET task_status = 3, WHERE task_id = %s"
- mysql.execute(sql, (task_id))
- logger.info(f"task_id = {task_id} ,任务异常")
- raise {"task_id": task_id, "error": '任务异常'}
|