Преглед на файлове

feat: 添加评估结果查询接口并重构评估任务处理逻辑

添加新的API端点/evaluate/result用于查询评估结果
重构evaluateTask.py中的评估任务处理逻辑,改进状态管理和错误处理
新增EvaluateStatusParam模型用于评估结果查询参数
在.env中添加APP_ENV配置项
max_liu преди 2 дни
родител
ревизия
ddc86a0c7d
променени са 5 файла, в които са добавени 56 реда и са изтрити 20 реда
  1. 1 0
      .env
  2. 30 19
      decode_task/evaluateTask.py
  3. 22 1
      main.py
  4. 2 0
      utils/params.py
  5. 1 0
      utils/sync_mysql_help.py

+ 1 - 0
.env

@@ -16,3 +16,4 @@ LANGSMITH_ENDPOINT=https://api.smith.langchain.com
 LANGSMITH_API_KEY=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
 LANGSMITH_PROJECT=aigc_what_decode
 
+APP_ENV=prod

+ 30 - 19
decode_task/evalueteTask.py → decode_task/evaluateTask.py

@@ -14,10 +14,6 @@ from src.workflows.decode_workflow import DecodeWorkflow
 logger.add(sink=sys.stderr, level="ERROR", backtrace=True, diagnose=True)
 
 
-
-   
-
-
 def invoke_evaluate_workflow(evaluate_params: Dict[str, Any]):
     """主函数"""
     workflow = DecodeWorkflow()
@@ -31,11 +27,11 @@ def invoke_evaluate_workflow(evaluate_params: Dict[str, Any]):
 
 def get_evaluate_result_by_id(evaluate_id:str):
     sql = "SELECT * FROM evaluate_record WHERE evaluate_id = %s"
-    tasks = mysql.fetchone(sql, (evaluate_id,))
-    if not tasks:
+    evaluate_record = mysql.fetchone(sql, (evaluate_id,))
+    if not evaluate_record:
         logger.info(f"evaluate_id = {evaluate_id} , 任务不存在")    
         return None
-    return tasks['evaluate_result'], tasks['status'],tasks['error_reason']
+    return evaluate_record['evaluate_result'], evaluate_record['status'],evaluate_record['error_reason']
 
 
 def  evaluate_task_status_handler():
@@ -63,19 +59,34 @@ def  evaluate_task_status_handler():
         current_timestamp = int(time.time() * 1000)
 
         evaluate_result = invoke_evaluate_workflow(evaluate_params)
-
-        if evaluate_result:
-            # 更新任务状态为2,任务完成
-            sql = "UPDATE evaluate_record SET status = 2, evaluate_result = %s WHERE evaluate_id = %s"
-            mysql.execute(sql, (json.dumps(evaluate_result),evaluate_id))
-            logger.info(f"evaluate_id = {evaluate_id} , evaluate_result = {evaluate_result}")
-
-           
+        
+        logger.info(f" 🐔 评估结果: evaluate_result = {evaluate_result}")
+
+        # 根据返回结果判断任务状态:仅当 status==2 时置为完成,否则置为失败
+        status_code = None
+        if isinstance(evaluate_result, dict):
+            raw_status = evaluate_result.get("status")
+            if raw_status is not None:
+                try:
+                    status_code = int(raw_status)
+                except Exception:
+                    status_code = 3
+            else:
+                status_code = 2
+        else:
+            status_code = 2 if evaluate_result else 3
+
+        elapsed = current_timestamp - task_create_timestamp
+        if status_code == 2:
+            sql = "UPDATE evaluate_record SET status = 2 WHERE evaluate_id = %s"
+            mysql.execute(sql, (evaluate_id,))
+            logger.info(f"evaluate_id = {evaluate_id} , 任务完成,更新为2")
+        elif elapsed > 1000 * 60 * 60:
+            sql = "UPDATE evaluate_record SET status = 3 WHERE evaluate_id = %s"
+            mysql.execute(sql, (evaluate_id,))
+            logger.info(f"evaluate_id = {evaluate_id} ,任务超时,更新为3")
         else:
-            if current_timestamp - task_create_timestamp > 1000 * 60 * 60:
-                sql = "UPDATE evaluate_record SET status = 3, error_reason = %s WHERE evaluate_id = %s"
-                mysql.execute(sql, (json.dumps(evaluate_result),evaluate_id))
-                logger.info(f"evaluate_id = {evaluate_id} ,任务状态异常")   
+            logger.info(f"evaluate_id = {evaluate_id} ,未成功且未超时,保持状态1")      
     except Exception as e:
         logger.error(f"evaluate_id = {evaluate_id} , error = {e}")
         sql = "UPDATE evaluate_record SET status = 3, error_reason = %s WHERE evaluate_id = %s"

+ 22 - 1
main.py

@@ -4,11 +4,12 @@ import uuid
 from fastapi import FastAPI, HTTPException, Request
 from fastapi.responses import JSONResponse
 from pydantic import BaseModel
-from utils.params import TaskStatusParam, DecodeListParam,EvaluateListParam
+from utils.params import TaskStatusParam, DecodeListParam,EvaluateListParam,EvaluateStatusParam
 from dotenv import load_dotenv
 
 
 from decode_task.decodeTask import get_decode_result_by_id as get_decode_result_by_id_db
+from decode_task.evaluateTask import get_evaluate_result_by_id as get_evaluate_result_by_id_db
 
 
 
@@ -130,6 +131,26 @@ def evaluate_video(param:EvaluateListParam):
         "data": data_list
     }
 
+@app.post("/evaluate/result")
+def get_evaluate_result(param: EvaluateStatusParam):
+    db_res = get_evaluate_result_by_id_db(param.evaluate_id)
+    logger.info(f"\n查询评估结果的evaluate_id = {param.evaluate_id}")
 
+    if not db_res:
+        return {
+            "code": -1,
+            "message": '评估不存在',
+            "data": None
+        }
+    result, status,error_reason = db_res
+    return {
+        "code": 0,
+        "message": status == 2 and "success" or error_reason,
+        "data": {
+            "result": result,
+            "status": status,
+            "error":error_reason,
+        } 
+    }
 
 

+ 2 - 0
utils/params.py

@@ -6,6 +6,8 @@ from typing import List
 class TaskStatusParam(BaseModel):
     task_id: str
 
+class EvaluateStatusParam(BaseModel):
+    evaluate_id: str
 class DecodeParam(BaseModel):
     channel_content_id: str
     video: str

+ 1 - 0
utils/sync_mysql_help.py

@@ -24,6 +24,7 @@ class SyncMySQLHelper(object):
             # 加载环境变量,允许通过 .env 配置本机调试数据库
             load_dotenv()
             env = (os.getenv('APP_ENV') or os.getenv('ENV') or 'local').lower()
+            logger.info(f"✅ env = {env}")
             host = os.getenv('DB_HOST', 'rm-t4nh1xx6o2a6vj8qu3o.mysql.singapore.rds.aliyuncs.com')
             port = int(os.getenv('DB_PORT', '3306'))
             user = os.getenv('DB_USER', 'content_rw')