jihuaqiang 1 week ago
parent
commit
46190af41f
2 changed files with 25 additions and 2 deletions
  1. 24 1
      agent.py
  2. 1 1
      tools/agent_tools.py

+ 24 - 1
agent.py

@@ -105,6 +105,10 @@ app = FastAPI(
     lifespan=lifespan
 )
 
+# 并发控制:跟踪正在处理的 requestId,防止重复并发提交
+RUNNING_REQUESTS: set = set()
+RUNNING_LOCK = asyncio.Lock()
+
 # =========================
 # LangGraph 工作流定义
 # =========================
@@ -435,12 +439,31 @@ async def parse_processing_async(request: TriggerRequest, background_tasks: Back
     - **requestId**: 请求ID,用于标识处理任务
     
     行为:立即返回 200,并在后台继续处理任务。
+    若同一个 requestId 已有任务进行中,则立即返回失败(status=3)。
     """
     try:
         logger.info(f"收到异步解析请求: requestId={request.requestId}")
         
+        # 并发防抖:同一 requestId 只允许一个在运行
+        async with RUNNING_LOCK:
+            if request.requestId in RUNNING_REQUESTS:
+                return {
+                    "requestId": request.requestId,
+                    "status": 3,
+                    "message": "已有任务进行中,稍后再试",
+                    "langgraph_enabled": HAS_LANGGRAPH
+                }
+            RUNNING_REQUESTS.add(request.requestId)
+        
+        async def _background_wrapper(rid: str):
+            try:
+                await process_request_background(rid)
+            finally:
+                async with RUNNING_LOCK:
+                    RUNNING_REQUESTS.discard(rid)
+        
         # 直接使用 asyncio 创建后台任务(不阻塞当前请求返回)
-        asyncio.create_task(process_request_background(request.requestId))
+        asyncio.create_task(_background_wrapper(request.requestId))
         
         # 立即返回(不阻塞)
         return {

+ 1 - 1
tools/agent_tools.py

@@ -293,7 +293,7 @@ class UpdateDataTool:
             # 状态:5 表示结构化处理完成
             status = 5
             params = (
-                json.dumps(parsing_payload, ensure_ascii=False),
+                parsing_payload,
                 status,
                 content_id
             )