import json import os from loguru import logger import sys import time from utils.sync_mysql_help import mysql from typing import Dict, Any from utils.params import DecodeWorkflowParam from src.workflows.decode_workflow import DecodeWorkflow logger.add(sink=sys.stderr, level="ERROR", backtrace=True, diagnose=True) def invoke_decode_workflow(task_params: Dict[str, Any]): """主函数""" workflow = DecodeWorkflow() result = workflow.invoke(task_params) if result: return result else: print(f"❌ 保存结果失败,但将继续处理") return None def get_decode_result_by_id(task_id:str): sql = "SELECT * FROM decode_videos WHERE task_id = %s" tasks = mysql.fetchone(sql, (task_id,)) if not tasks: logger.info(f"task_id = {task_id} , 任务不存在") return None return tasks['result'], tasks['status'],tasks['error_reason'],tasks['search_keywords'] def decode_task_status_handler(): # 从数据库中获取任务,每次获取一个 sql = "SELECT * FROM decode_record WHERE task_status = 0 ORDER BY create_timestamp ASC LIMIT 1" task = mysql.fetchone(sql) if not task: logger.info("没有status为0的任务了") return else: task_id = task['task_id'] sql = "UPDATE decode_record SET task_status = 1 WHERE task_id = %s AND task_status = 0" mysql.execute(sql, (task_id,)) # 获取任务结果 try: logger.info(f"开始执行任务id = {task['task_id']}") task_id = task['task_id'] video_url = task['video_url'] video_id = task['video_id'] task_params = {'task_id':task_id, 'video_id':video_id, 'video_url':video_url} logger.info(f"task_id = {task_id} , video_id = {video_id}") task_create_timestamp = task['create_timestamp'] current_timestamp = int(time.time() * 1000) decode_result = invoke_decode_workflow(task_params) logger.info(f" 🐔 结构结果: decode_result = {decode_result}") # 根据返回结果判断任务状态:仅当 status==2 时置为完成,否则置为失败 status_code = None if isinstance(decode_result, dict): raw_status = decode_result.get("status") if raw_status is not None: try: status_code = int(raw_status) except Exception: status_code = 3 else: status_code = 2 else: status_code = 2 if decode_result else 3 elapsed = current_timestamp - task_create_timestamp if status_code == 2: sql = "UPDATE decode_record SET task_status = 2 WHERE task_id = %s" mysql.execute(sql, (task_id,)) logger.info(f"task_id = {task_id} , 任务完成,更新为2") elif elapsed > 1000 * 60 * 60: sql = "UPDATE decode_record SET task_status = 3 WHERE task_id = %s" mysql.execute(sql, (task_id,)) logger.info(f"task_id = {task_id} ,任务超时,更新为3") else: logger.info(f"task_id = {task_id} ,未成功且未超时,保持状态1") except Exception as e: logger.error(f"task_id = {task_id} , error = {e}") sql = "UPDATE decode_record SET task_status = 3 WHERE task_id = %s" mysql.execute(sql, (task_id,)) logger.info(f"task_id = {task_id} ,任务异常") raise RuntimeError(f"task_id={task_id} 任务异常: {e}") # if __name__ == "__main__": # decode_task_status_handler()