| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100 |
- import json
- from loguru import logger
- import sys
- import time
- from utils.sync_mysql_help import mysql
- from typing import Dict, Any
- from utils.params import DecodeWorkflowParam
- from src.workflows.decode_workflow import DecodeWorkflow
- logger.add(sink=sys.stderr, level="ERROR", backtrace=True, diagnose=True)
- def invoke_evaluate_workflow(evaluate_params: Dict[str, Any]):
- """主函数"""
- workflow = DecodeWorkflow()
- result = workflow.invoke(evaluate_params)
- if result:
- return result
- else:
- print(f"❌ 保存结果失败,但将继续处理")
- return None
- def get_evaluate_result_by_id(evaluate_id:str):
- sql = "SELECT * FROM evaluate_record WHERE evaluate_id = %s"
- evaluate_record = mysql.fetchone(sql, (evaluate_id,))
- if not evaluate_record:
- logger.info(f"evaluate_id = {evaluate_id} , 任务不存在")
- return None
- return evaluate_record['evaluate_result'], evaluate_record['status'],evaluate_record['error_reason']
- def evaluate_task_status_handler():
- # 从数据库中获取任务,每次获取一个
- sql = "SELECT * FROM evaluate_record WHERE status = 0 ORDER BY create_timestamp ASC LIMIT 1"
- task = mysql.fetchone(sql)
-
- if not task:
- logger.info("任务列表为空")
- return
- else:
- task_id = task['evaluate_id']
- sql = "UPDATE evaluate_record SET status = 1 WHERE evaluate_id = %s AND status = 0"
- mysql.execute(sql, (task_id,))
-
- # 获取任务结果
- try:
- evaluate_id = task['evaluate_id']
- video_url = task['video_url']
- video_id = task['video_id']
- evaluate_params = {'evaluate_id':evaluate_id, 'video_id':video_id, 'video_url':video_url}
- logger.info(f"evaluate_id = {evaluate_id} , video_id = {video_id}")
- task_create_timestamp = task['created_at']
- current_timestamp = int(time.time() * 1000)
- evaluate_result = invoke_evaluate_workflow(evaluate_params)
-
- logger.info(f" 🐔 评估结果: evaluate_result = {evaluate_result}")
- # 根据返回结果判断任务状态:仅当 status==2 时置为完成,否则置为失败
- status_code = None
- if isinstance(evaluate_result, dict):
- raw_status = evaluate_result.get("status")
- if raw_status is not None:
- try:
- status_code = int(raw_status)
- except Exception:
- status_code = 3
- else:
- status_code = 2
- else:
- status_code = 2 if evaluate_result else 3
- elapsed = current_timestamp - task_create_timestamp
- if status_code == 2:
- sql = "UPDATE evaluate_record SET status = 2 WHERE evaluate_id = %s"
- mysql.execute(sql, (evaluate_id,))
- logger.info(f"evaluate_id = {evaluate_id} , 任务完成,更新为2")
- elif elapsed > 1000 * 60 * 60:
- sql = "UPDATE evaluate_record SET status = 3 WHERE evaluate_id = %s"
- mysql.execute(sql, (evaluate_id,))
- logger.info(f"evaluate_id = {evaluate_id} ,任务超时,更新为3")
- else:
- logger.info(f"evaluate_id = {evaluate_id} ,未成功且未超时,保持状态1")
- except Exception as e:
- logger.error(f"evaluate_id = {evaluate_id} , error = {e}")
- sql = "UPDATE evaluate_record SET status = 3, error_reason = %s WHERE evaluate_id = %s"
- mysql.execute(sql, (json.dumps(str(e)),evaluate_id))
- logger.info(f"evaluate_id = {evaluate_id} ,任务异常")
- raise RuntimeError(f"evaluate_id={evaluate_id} 任务异常: {e}")
- # if __name__ == "__main__":
- # decode_task_status_handler()
-
|