Bladeren bron

fix: update task description and improve error handling in decode workflow

- Changed task description from "创建模式任务" to "创建聚类任务".
- Added requests library import for HTTP calls.
- Refactored the decode workflow triggering function to improve error handling and logging.
- Removed unused quota update logic to streamline the process.
- Updated database name handling in SyncMySQLHelper for clarity.
jihuaqiang 1 dag geleden
bovenliggende
commit
2f8eef1976
4 gewijzigde bestanden met toevoegingen van 68 en 40 verwijderingen
  1. 1 1
      main.py
  2. 3 0
      requirements.txt
  3. 63 38
      tasks/decode.py
  4. 1 1
      utils/sync_mysql_help.py

+ 1 - 1
main.py

@@ -93,7 +93,7 @@ def get_task_detail(taskId: str):
 
 @app.post("/api/v1/content/tasks/pattern")
 def pattern_content(param: PatternContentParam):
-    """创建模式任务"""
+    """创建聚类任务"""
     res = begin_pattern_task(param)
     code = res.get("code", -1)
     task_id = res.get("task_id")

+ 3 - 0
requirements.txt

@@ -16,3 +16,6 @@ loguru>=0.7.0
 PyMySQL>=1.1.0
 DBUtils>=3.0.3
 cryptography>=41.0.0
+
+# HTTP 客户端
+requests>=2.31.0

+ 63 - 38
tasks/decode.py

@@ -4,9 +4,10 @@ from models.decode_task_result import WorkflowDecodeTaskResult
 from utils.sync_mysql_help import mysql
 from pymysql.cursors import DictCursor
 from loguru import logger
-import sys
 from datetime import datetime
 from typing import Dict, Any, Optional
+import sys
+import requests
 
 logger.add(sink=sys.stderr, level="ERROR", backtrace=True, diagnose=True)
 
@@ -63,17 +64,6 @@ def _initialize_task_result(task_id: str, content) -> Optional[WorkflowDecodeTas
         return None
 
 
-def _trigger_decode_workflow(task_id: str) -> bool:
-    """触发解构工作流"""
-    try:
-        invoke_decode_workflow(task_id)
-        logger.info(f"发起解构任务成功,task_id: {task_id}")
-        return True
-    except Exception as e:
-        logger.error(f"发起解构任务失败,task_id: {task_id}, error: {str(e)}")
-        return False
-
-
 def _check_quota(scene: SceneEnum, capability: CapabilityEnum = CapabilityEnum.DECODE) -> bool:
     """检查配额是否充足(并发安全版本)"""
     try:
@@ -139,21 +129,6 @@ def _check_quota(scene: SceneEnum, capability: CapabilityEnum = CapabilityEnum.D
                         logger.warning(f"配额不足,scene={scene_value}, capability={capability_value}, used={actual_used_count}, limit={quota_limit}")
                         return False
                     
-                    # 6. 配额充足,更新 used_count(原子操作)
-                    update_sql = """
-                        UPDATE workflow_daily_quota 
-                        SET used_count = used_count + 1 
-                        WHERE quota_date = %s AND scene = %s AND capability = %s
-                        AND used_count < quota_limit AND locked = 0
-                    """
-                    cursor.execute(update_sql, (quota_date, scene_value, capability_value))
-                    
-                    # 检查是否更新成功(受影响的行数)
-                    if cursor.rowcount == 0:
-                        conn.rollback()
-                        logger.warning(f"配额更新失败,可能已被其他请求占用,scene={scene_value}, capability={capability_value}")
-                        return False
-                    
                     # 提交事务
                     conn.commit()
                     logger.info(f"配额检查通过,scene={scene_value}, capability={capability_value}, used={actual_used_count + 1}, limit={quota_limit}")
@@ -165,8 +140,8 @@ def _check_quota(scene: SceneEnum, capability: CapabilityEnum = CapabilityEnum.D
                 
     except Exception as e:
         logger.error(f"配额检查失败: {str(e)}")
-        # 发生异常时,为了不影响业务,返回 True 允许继续执行
-        return True
+        # 发生异常时,返回False,不允许继续执行
+        return False
 
 
 def decode_topic(param: DecodeContentParam) -> Dict[str, Any]:
@@ -196,7 +171,7 @@ def decode_topic(param: DecodeContentParam) -> Dict[str, Any]:
             )
         
         # 步骤3: 触发解构工作流
-        if not _trigger_decode_workflow(task.task_id):
+        if not _trigger_topic_decode_workflow(task.task_id):
             return _build_error_response(
                 ERROR_CODE_FAILED,
                 "发起解构任务失败"
@@ -227,19 +202,69 @@ def begin_decode_task(param: DecodeContentParam) -> Dict[str, Any]:
         return _build_error_response(ERROR_CODE_FAILED, f"未知场景: {param.scene}")
 
 
-def invoke_decode_workflow(task_id: str) -> Dict[str, Any]:
-    """发起解构任务"""
+def _trigger_topic_decode_workflow(task_id: str) -> Dict[str, Any]:
+    """发起解构任务(调用上游工作流服务)"""
     try:
-        # TODO: 调用实际的工作流接口
+        # url = "http://192.168.81.96:8000/workflow/topic/decode"
+        url = "http://supply-content-deconstruction-workflow.piaoquantv.com/workflow/topic/decode"
+        params = {"taskId": task_id}
+
+        resp = requests.get(url, params=params, timeout=10)
+
+        # HTTP 层异常直接视为失败
+        if resp.status_code != 200:
+            logger.error(
+                f"发起解构任务失败,HTTP 状态码异常,status={resp.status_code}, task_id={task_id}"
+            )
+            return {
+                "code": ERROR_CODE_FAILED,
+                "task_id": None,
+                "reason": f"错误: {resp.status_code}",
+            }
+
+        try:
+            data = resp.json()
+        except Exception as e:
+            logger.error(f"发起解构任务失败,返回非JSON,task_id={task_id}, error={str(e)}")
+            return {
+                "code": ERROR_CODE_FAILED,
+                "task_id": None,
+                "reason": "工作流接口返回非JSON格式",
+            }
+
+        code = data.get("code", ERROR_CODE_FAILED)
+        msg = data.get("msg", "")
+
+        # 按上游协议:0 成功,其他(1001/1002/-1)为失败
+        if code == 0:
+            return {
+                "code": ERROR_CODE_SUCCESS,
+                "task_id": task_id,
+                "reason": "",
+            }
+
+        # 将上游错误信息透传到 reason 中
+        logger.error(
+            f"发起解构任务失败,上游返回错误,task_id={task_id}, code={code}, msg={msg}"
+        )
         return {
-            "code": ERROR_CODE_SUCCESS,
-            "task_id": task_id,
-            "reason": ""
+            "code": ERROR_CODE_FAILED,
+            "task_id": None,
+            "reason": f"工作流接口失败: code={code}, msg={msg}",
+        }
+
+    except requests.RequestException as e:
+        # 网络类异常(超时、连接失败等)
+        logger.error(f"发起解构任务失败,请求异常,task_id={task_id}, error={str(e)}")
+        return {
+            "code": ERROR_CODE_FAILED,
+            "task_id": None,
+            "reason": f"工作流接口请求异常: {str(e)}",
         }
     except Exception as e:
-        logger.error(f"发起解构任务失败: {str(e)}")
+        logger.error(f"发起解构任务失败,task_id={task_id}, error={str(e)}")
         return {
             "code": ERROR_CODE_FAILED,
             "task_id": None,
-            "reason": f"解构任务执行失败: {str(e)}"
+            "reason": f"解构任务执行失败: {str(e)}",
         }

+ 1 - 1
utils/sync_mysql_help.py

@@ -31,7 +31,7 @@ class SyncMySQLHelper(object):
             port = int(os.getenv('DB_PORT', '3306'))
             user = os.getenv('DB_USER', 'content_rw')
             password = os.getenv('DB_PASSWORD', 'bC1aH4bA1lB0')
-            database = os.getenv('DB_NAME', 'content-deconstruction-supply-test' if env in ('local','dev','development') else 'content-deconstruction')
+            database = os.getenv('DB_NAME', 'content-deconstruction-supply-test')
             logger.info(f"✅ 当前使用数据库 : {database}")