|
|
@@ -0,0 +1,89 @@
|
|
|
+import json
|
|
|
+from loguru import logger
|
|
|
+import sys
|
|
|
+import time
|
|
|
+from utils.sync_mysql_help import mysql
|
|
|
+from typing import Dict, Any
|
|
|
+
|
|
|
+from utils.params import DecodeWorkflowParam
|
|
|
+from src.workflows.decode_workflow import DecodeWorkflow
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+logger.add(sink=sys.stderr, level="ERROR", backtrace=True, diagnose=True)
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+def invoke_evaluate_workflow(evaluate_params: Dict[str, Any]):
|
|
|
+ """主函数"""
|
|
|
+ workflow = DecodeWorkflow()
|
|
|
+ result = workflow.invoke(evaluate_params)
|
|
|
+ if result:
|
|
|
+ return result
|
|
|
+ else:
|
|
|
+ print(f"❌ 保存结果失败,但将继续处理")
|
|
|
+ return None
|
|
|
+
|
|
|
+
|
|
|
+def get_evaluate_result_by_id(evaluate_id:str):
|
|
|
+ sql = "SELECT * FROM evaluate_record WHERE evaluate_id = %s"
|
|
|
+ tasks = mysql.fetchone(sql, (evaluate_id,))
|
|
|
+ if not tasks:
|
|
|
+ logger.info(f"evaluate_id = {evaluate_id} , 任务不存在")
|
|
|
+ return None
|
|
|
+ return tasks['evaluate_result'], tasks['status'],tasks['error_reason']
|
|
|
+
|
|
|
+
|
|
|
+def evaluate_task_status_handler():
|
|
|
+ # 从数据库中获取任务,每次获取一个
|
|
|
+ sql = "SELECT * FROM evaluate_record WHERE status = 0 ORDER BY create_timestamp ASC LIMIT 1"
|
|
|
+ task = mysql.fetchone(sql)
|
|
|
+
|
|
|
+
|
|
|
+ if not task:
|
|
|
+ logger.info("任务列表为空")
|
|
|
+ return
|
|
|
+ else:
|
|
|
+ task_id = task['evaluate_id']
|
|
|
+ sql = "UPDATE evaluate_record SET status = 1 WHERE evaluate_id = %s AND status = 0"
|
|
|
+ mysql.execute(sql, (task_id,))
|
|
|
+
|
|
|
+ # 获取任务结果
|
|
|
+ try:
|
|
|
+ evaluate_id = task['evaluate_id']
|
|
|
+ video_url = task['video_url']
|
|
|
+ video_id = task['video_id']
|
|
|
+ evaluate_params = {'evaluate_id':evaluate_id, 'video_id':video_id, 'video_url':video_url}
|
|
|
+ logger.info(f"evaluate_id = {evaluate_id} , video_id = {video_id}")
|
|
|
+ task_create_timestamp = task['created_at']
|
|
|
+ current_timestamp = int(time.time() * 1000)
|
|
|
+
|
|
|
+ evaluate_result = invoke_evaluate_workflow(evaluate_params)
|
|
|
+
|
|
|
+ if evaluate_result:
|
|
|
+ # 更新任务状态为2,任务完成
|
|
|
+ sql = "UPDATE evaluate_record SET status = 2, evaluate_result = %s WHERE evaluate_id = %s"
|
|
|
+ mysql.execute(sql, (json.dumps(evaluate_result),evaluate_id))
|
|
|
+ logger.info(f"evaluate_id = {evaluate_id} , evaluate_result = {evaluate_result}")
|
|
|
+
|
|
|
+
|
|
|
+ else:
|
|
|
+ if current_timestamp - task_create_timestamp > 1000 * 60 * 60:
|
|
|
+ sql = "UPDATE evaluate_record SET status = 3, error_reason = %s WHERE evaluate_id = %s"
|
|
|
+ mysql.execute(sql, (json.dumps(evaluate_result),evaluate_id))
|
|
|
+ logger.info(f"evaluate_id = {evaluate_id} ,任务状态异常")
|
|
|
+ except Exception as e:
|
|
|
+ logger.error(f"evaluate_id = {evaluate_id} , error = {e}")
|
|
|
+ sql = "UPDATE evaluate_record SET status = 3, error_reason = %s WHERE evaluate_id = %s"
|
|
|
+ mysql.execute(sql, (json.dumps(str(e)),evaluate_id))
|
|
|
+ logger.info(f"evaluate_id = {evaluate_id} ,任务异常")
|
|
|
+ raise RuntimeError(f"evaluate_id={evaluate_id} 任务异常: {e}")
|
|
|
+
|
|
|
+
|
|
|
+# if __name__ == "__main__":
|
|
|
+# decode_task_status_handler()
|
|
|
+
|