Просмотр исходного кода

refactor(decode): enhance task creation and error handling in workflow

- Removed deprecated model configurations from the environment file.
- Updated the WorkflowTask model to include content type as a parameter.
- Improved error handling in the HTTP exception handler to provide more detailed responses.
- Enhanced the task creation and quota checking functions to incorporate content type, ensuring better management of task parameters.
jihuaqiang 1 месяц назад
Родитель
Сommit
234bfa23b5
5 измененных файлов с 47 добавлено и 36 удалено
  1. 0 9
      .env
  2. 11 8
      main.py
  3. 5 4
      models/task.py
  4. 16 13
      tasks/decode.py
  5. 15 2
      tasks/pattern.py

+ 0 - 9
.env

@@ -6,15 +6,6 @@ RETRY_LIMIT=3
 DEBUG_MODE=False
 LOG_LEVEL=INFO
 
-# 模型配置
-OPENAI_MODEL=gpt-4o
-GEMINI_MODEL=gemini-2.5-flash
-GEMINI_API_KEY=AIzaSyAkt1l9Kw1CQgHFzTpla0vgt0OE53fr-BI
-
-LANGSMITH_TRACING=true
-LANGSMITH_ENDPOINT=https://api.smith.langchain.com
-LANGSMITH_API_KEY=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
-LANGSMITH_PROJECT=aigc_what_decode
 
 # local本地  prod线上
 APP_ENV=prod  

+ 11 - 8
main.py

@@ -1,10 +1,6 @@
-from ast import main
-import json
-import uuid
 from fastapi import FastAPI, HTTPException, Request
 from fastapi.responses import JSONResponse
 from fastapi.middleware.cors import CORSMiddleware
-from pydantic import BaseModel
 from utils.params import DecodeContentParam, PatternContentParam
 from dotenv import load_dotenv, find_dotenv
 from typing import List, Dict, Any, Optional
@@ -40,10 +36,17 @@ app.add_middleware(
 
 @app.exception_handler(HTTPException)
 async def http_exception_handler(request: Request, exc: HTTPException):
-    return JSONResponse(
-        status_code=200,
-        content={"code": exc.status_code, "message": exc.message, "data": None}
-    )
+    """统一处理 HTTPException,保证返回结构与其它接口一致"""
+    msg = RESPONSE_MSG_MAP.get(exc.status_code, "failed")
+    content = {
+        "code": exc.status_code,
+        "msg": msg,
+        "data": None,
+    }
+    # 将异常 detail 写入 reason,便于排查问题
+    if exc.detail:
+        content["reason"] = str(exc.detail)
+    return JSONResponse(status_code=200, content=content)
 
 def _build_api_response(
     code: int,

+ 5 - 4
models/task.py

@@ -3,7 +3,7 @@ from pydantic import BaseModel, Field
 from typing_extensions import Annotated
 
 from utils.sync_mysql_help import mysql
-from utils.params import SceneEnum, CapabilityEnum
+from utils.params import SceneEnum, CapabilityEnum, ContentTypeEnum
 from utils.general import generate_task_id
 
 
@@ -15,7 +15,7 @@ class WorkflowTask(BaseModel):
     status: Annotated[int, Field(description='任务状态', default=0)]  # 0:待处理 1:执行中 2:成功 3:失败
     scene: Annotated[int, Field(description='业务场景')]  # 0:选题 1:创作 2:制作
     capability: Annotated[int, Field(description='能力类型')]  # 0:解构 1:聚类
-
+    content_type: Annotated[int, Field(description='内容类型')]  # 1:文本 2:图片 3:视频
     def save(self):
         """保存任务到数据库"""
         record = self.model_dump(exclude={'table_name'})
@@ -24,14 +24,15 @@ class WorkflowTask(BaseModel):
         mysql.execute(sql, tuple([record[key] for key in keys]))
 
     @staticmethod
-    def create_task(scene: SceneEnum, capability: CapabilityEnum, root_task_id: str = '') -> 'WorkflowTask':
+    def create_task(scene: SceneEnum, capability: CapabilityEnum, content_type: ContentTypeEnum, root_task_id: str = '') -> 'WorkflowTask':
         """创建新任务"""
         task = WorkflowTask(
             task_id=generate_task_id(),
             root_task_id=root_task_id,
             status=0,  # 待处理
             scene=scene.value,
-            capability=capability.value
+            capability=capability.value,
+            content_type=content_type.value
         )
         task.save()
         return task

+ 16 - 13
tasks/decode.py

@@ -35,12 +35,13 @@ def _build_success_response(task_id: str) -> Dict[str, Any]:
     }
 
 
-def _create_workflow_task(scene: SceneEnum) -> Optional[WorkflowTask]:
+def _create_workflow_task(scene: SceneEnum, content_type: ContentTypeEnum) -> Optional[WorkflowTask]:
     """创建工作流任务"""
     try:
         task = WorkflowTask.create_task(
             scene=scene,
             capability=CapabilityEnum.DECODE,
+            content_type=content_type,
             root_task_id=''
         )
         logger.info(f"创建解构任务成功,task_id: {task.task_id}")
@@ -64,14 +65,14 @@ def _initialize_task_result(task_id: str, content) -> Optional[WorkflowDecodeTas
         return None
 
 
-def _check_quota(scene: SceneEnum, capability: CapabilityEnum = CapabilityEnum.DECODE) -> bool:
+def _check_quota(scene: SceneEnum, capability: CapabilityEnum = CapabilityEnum.DECODE, content_type: ContentTypeEnum = ContentTypeEnum.TEXT) -> bool:
     """检查配额是否充足(并发安全版本)"""
     try:
         # 获取今天的日期,格式为 YYYYMMDD
         quota_date = datetime.now().strftime('%Y%m%d')
         scene_value = scene.value
         capability_value = capability.value
-        
+        content_type_value = content_type.value
         # 使用事务和 SELECT FOR UPDATE 确保并发安全
         pool = mysql.get_pool()
         with pool.connection() as conn:
@@ -84,19 +85,19 @@ def _check_quota(scene: SceneEnum, capability: CapabilityEnum = CapabilityEnum.D
                     select_sql = """
                         SELECT quota_limit, used_count, locked 
                         FROM workflow_daily_quota 
-                        WHERE quota_date = %s AND scene = %s AND capability = %s
+                        WHERE quota_date = %s AND scene = %s AND capability = %s AND content_type = %s
                         FOR UPDATE
                     """
-                    cursor.execute(select_sql, (quota_date, scene_value, capability_value))
+                    cursor.execute(select_sql, (quota_date, scene_value, capability_value, content_type_value))
                     quota_record = cursor.fetchone()
                     
                     # 2. 如果没找到,创建一条新记录,quota_limit 默认为 10
                     if not quota_record:
                         insert_sql = """
-                            INSERT INTO workflow_daily_quota (scene, capability, quota_date, quota_limit, used_count, locked)
-                            VALUES (%s, %s, %s, %s, %s, %s)
+                            INSERT INTO workflow_daily_quota (scene, capability, content_type, quota_date, quota_limit, used_count, locked)
+                            VALUES (%s, %s, %s, %s, %s, %s, %s)
                         """
-                        cursor.execute(insert_sql, (scene_value, capability_value, quota_date, 10, 0, 0))
+                        cursor.execute(insert_sql, (scene_value, capability_value, content_type_value, quota_date, 10, 0, 0))
                         quota_limit = 10
                         current_used_count = 0
                         is_locked = 0
@@ -117,9 +118,10 @@ def _check_quota(scene: SceneEnum, capability: CapabilityEnum = CapabilityEnum.D
                         FROM workflow_task 
                         WHERE scene = %s 
                         AND capability = %s 
+                        AND content_type = %s
                         AND DATE(created_time) = CURDATE()
                     """
-                    cursor.execute(count_sql, (scene_value, capability_value))
+                    cursor.execute(count_sql, (scene_value, capability_value, content_type_value))
                     count_result = cursor.fetchone()
                     actual_used_count = count_result.get('task_count', 0) if count_result else 0
                     
@@ -148,14 +150,14 @@ def decode_topic(param: DecodeContentParam) -> Dict[str, Any]:
     """选题解构方法"""
     try:
         # 前置配额检查,用于超出每天解构次数时,直接返回错误
-        if not _check_quota(param.scene, CapabilityEnum.DECODE):
+        if not _check_quota(param.scene, CapabilityEnum.DECODE, param.content_type):
             return _build_error_response(
                 ERROR_CODE_FAILED,
                 "配额不足"
             )
 
         # 步骤1: 创建工作流task任务
-        task = _create_workflow_task(param.scene)
+        task = _create_workflow_task(param.scene, param.content_type)
         if not task or not task.task_id:
             return _build_error_response(
                 ERROR_CODE_TASK_CREATE_FAILED,
@@ -171,10 +173,11 @@ def decode_topic(param: DecodeContentParam) -> Dict[str, Any]:
             )
         
         # 步骤3: 触发解构工作流
-        if not _trigger_topic_decode_workflow(task.task_id):
+        trigger_result = _trigger_topic_decode_workflow(task.task_id)
+        if trigger_result.get("code") != ERROR_CODE_SUCCESS:
             return _build_error_response(
                 ERROR_CODE_FAILED,
-                "发起解构任务失败"
+                trigger_result.get("reason") or "发起解构任务失败"
             )
         
         # 所有步骤成功

+ 15 - 2
tasks/pattern.py

@@ -1,2 +1,15 @@
-def begin_pattern_task():
-    pass
+from typing import Dict, Any
+
+from utils.params import PatternContentParam
+
+
+ERROR_CODE_FAILED = -1
+
+
+def begin_pattern_task(param: PatternContentParam) -> Dict[str, Any]:
+    """聚类任务暂未实现的占位实现"""
+    return {
+        "code": ERROR_CODE_FAILED,
+        "task_id": None,
+        "reason": "聚类任务暂未实现"
+    }