import json from loguru import logger import sys import time from utils.sync_mysql_help import mysql from typing import Dict, Any from utils.params import DecodeWorkflowParam from src.workflows.decode_workflow import DecodeWorkflow logger.add(sink=sys.stderr, level="ERROR", backtrace=True, diagnose=True) def invoke_evaluate_workflow(evaluate_params: Dict[str, Any]): """主函数""" workflow = DecodeWorkflow() result = workflow.invoke(evaluate_params) if result: return result else: print(f"❌ 保存结果失败,但将继续处理") return None def get_evaluate_result_by_id(evaluate_id:str): sql = "SELECT * FROM evaluate_record WHERE evaluate_id = %s" tasks = mysql.fetchone(sql, (evaluate_id,)) if not tasks: logger.info(f"evaluate_id = {evaluate_id} , 任务不存在") return None return tasks['evaluate_result'], tasks['status'],tasks['error_reason'] def evaluate_task_status_handler(): # 从数据库中获取任务,每次获取一个 sql = "SELECT * FROM evaluate_record WHERE status = 0 ORDER BY create_timestamp ASC LIMIT 1" task = mysql.fetchone(sql) if not task: logger.info("任务列表为空") return else: task_id = task['evaluate_id'] sql = "UPDATE evaluate_record SET status = 1 WHERE evaluate_id = %s AND status = 0" mysql.execute(sql, (task_id,)) # 获取任务结果 try: evaluate_id = task['evaluate_id'] video_url = task['video_url'] video_id = task['video_id'] evaluate_params = {'evaluate_id':evaluate_id, 'video_id':video_id, 'video_url':video_url} logger.info(f"evaluate_id = {evaluate_id} , video_id = {video_id}") task_create_timestamp = task['created_at'] current_timestamp = int(time.time() * 1000) evaluate_result = invoke_evaluate_workflow(evaluate_params) if evaluate_result: # 更新任务状态为2,任务完成 sql = "UPDATE evaluate_record SET status = 2, evaluate_result = %s WHERE evaluate_id = %s" mysql.execute(sql, (json.dumps(evaluate_result),evaluate_id)) logger.info(f"evaluate_id = {evaluate_id} , evaluate_result = {evaluate_result}") else: if current_timestamp - task_create_timestamp > 1000 * 60 * 60: sql = "UPDATE evaluate_record SET status = 3, error_reason = %s WHERE evaluate_id = %s" mysql.execute(sql, (json.dumps(evaluate_result),evaluate_id)) logger.info(f"evaluate_id = {evaluate_id} ,任务状态异常") except Exception as e: logger.error(f"evaluate_id = {evaluate_id} , error = {e}") sql = "UPDATE evaluate_record SET status = 3, error_reason = %s WHERE evaluate_id = %s" mysql.execute(sql, (json.dumps(str(e)),evaluate_id)) logger.info(f"evaluate_id = {evaluate_id} ,任务异常") raise RuntimeError(f"evaluate_id={evaluate_id} 任务异常: {e}") # if __name__ == "__main__": # decode_task_status_handler()