Browse Source

clean_agent

丁云鹏 2 days ago
parent
commit
fad447e113
1 changed files with 118 additions and 24 deletions
  1. 118 24
      agent.py

+ 118 - 24
agent.py

@@ -676,55 +676,149 @@ extraction_requests: set = set()
 
 @app.post("/extract")
 async def extract(request: ExtractRequest):
+    """
+    执行提取处理(异步方式)
+    
+    Args:
+        request: 包含请求ID和查询词的请求体
+        
+    Returns:
+        dict: 包含执行状态的字典
+    """
     try:
         requestId = request.requestId
         query = request.query
-        # 检查请求是否已经在处理中
+        logger.info(f"收到提取请求: requestId={requestId}, query={query}")
+        
+        # 并发防抖:同一 requestId 只允许一个在运行
         async with RUNNING_LOCK:
             if requestId in extraction_requests:
-                return {"status": 1, "request_id": requestId, "message": "请求已在处理中"}
+                return {"status": 1, "requestId": requestId, "message": "请求已在处理中"}
             extraction_requests.add(requestId)
         
-        try:
-            # 更新状态为处理中
-            update_extract_status(requestId, 1)
-            # 执行Agent
-            result = execute_agent_with_api(json.dumps({"query_word": query, "request_id": requestId}))
-            update_extract_status(requestId, 2)
-        finally:
-            # 无论成功失败,都从运行集合中移除
-            async with RUNNING_LOCK:
-                extraction_requests.discard(requestId)
-        return {"status": "success", "result": result}
+        # 更新状态为处理中
+        update_extract_status(requestId, 1)
+        
+        # 创建异步任务执行Agent
+        async def _execute_extract_async():
+            try:
+                # 在线程池中执行同步函数
+                import concurrent.futures
+                loop = asyncio.get_event_loop()
+                with concurrent.futures.ThreadPoolExecutor() as executor:
+                    result = await loop.run_in_executor(
+                        executor, 
+                        execute_agent_with_api, 
+                        json.dumps({"query_word": query, "request_id": requestId})
+                    )
+                # 更新状态为处理完成
+                update_extract_status(requestId, 2)
+                logger.info(f"异步提取任务完成: requestId={requestId}")
+                return result
+            except Exception as e:
+                logger.error(f"异步提取任务失败: requestId={requestId}, error={e}")
+                # 更新状态为处理失败
+                update_extract_status(requestId, 3)
+                raise
+            finally:
+                # 无论成功失败,都从运行集合中移除
+                async with RUNNING_LOCK:
+                    extraction_requests.discard(requestId)
+        
+        # 创建异步任务但不等待完成
+        asyncio.create_task(_execute_extract_async())
+        
+        # 立即返回状态
+        return {"status": 1, "requestId": requestId, "message": "提取任务已启动并在后台处理"}
     except Exception as e:
+        logger.error(f"启动提取任务失败: requestId={requestId}, error={e}")
         # 发生异常,更新状态为处理失败
         update_extract_status(requestId, 3)
-        raise HTTPException(status_code=500, detail=f"执行Agent时出错: {str(e)}")
+        # 从运行集合中移除
+        async with RUNNING_LOCK:
+            extraction_requests.discard(requestId)
+        raise HTTPException(status_code=500, detail=f"启动提取任务失败: {str(e)}")
 
 @app.post("/expand")
-async def expand(request: ExpandRequest, background_tasks: BackgroundTasks):
+async def expand(request: ExpandRequest):
     """
-    执行扩展查询处理
+    执行扩展查询处理(异步方式)
     
     Args:
-        request: 包含请求ID的请求体
-        background_tasks: FastAPI 后台任务
+        request: 包含请求ID和查询词的请求体
         
     Returns:
         dict: 包含执行状态的字典
     """
     try:
+        requestId = request.requestId
+        query = request.query
+        logger.info(f"收到扩展查询请求: requestId={requestId}, query={query}")
+        
+        # 并发防抖:同一 requestId 只允许一个在运行
+        expansion_requests = getattr(app.state, 'expansion_requests', set())
+        async with RUNNING_LOCK:
+            if requestId in expansion_requests:
+                return {"status": 1, "requestId": requestId, "message": "扩展查询已在处理中"}
+            # 如果集合不存在,创建它
+            if not hasattr(app.state, 'expansion_requests'):
+                app.state.expansion_requests = set()
+            app.state.expansion_requests.add(requestId)
+        
         # 立即更新状态为处理中
-        _update_expansion_status(request.requestId, 1)
+        _update_expansion_status(requestId, 1)
+        
+        # 创建异步任务执行扩展Agent
+        async def _execute_expand_async():
+            try:
+                # 在线程池中执行同步函数
+                import concurrent.futures
+                loop = asyncio.get_event_loop()
+                with concurrent.futures.ThreadPoolExecutor() as executor:
+                    result = await loop.run_in_executor(
+                        executor, 
+                        execute_expand_agent_with_api, 
+                        requestId, 
+                        query
+                    )
+                # 更新状态为处理完成
+                _update_expansion_status(requestId, 2)
+                logger.info(f"异步扩展查询任务完成: requestId={requestId}")
+                return result
+            except Exception as e:
+                logger.error(f"异步扩展查询任务失败: requestId={requestId}, error={e}")
+                # 更新状态为处理失败
+                _update_expansion_status(requestId, 3)
+                raise
+            finally:
+                # 无论成功失败,都从运行集合中移除
+                async with RUNNING_LOCK:
+                    if hasattr(app.state, 'expansion_requests'):
+                        app.state.expansion_requests.discard(requestId)
         
-        # 添加后台任务
-        background_tasks.add_task(execute_expand_agent_with_api, request.requestId, request.query)
+        # 创建异步任务但不等待完成
+        asyncio.create_task(_execute_expand_async())
         
         # 立即返回状态
-        return {"status": 1, "requestId": request.requestId, "message": "扩展查询处理已启动"}
-        
+        return {"status": 1, "requestId": requestId, "message": "扩展查询任务已启动并在后台处理"}
+    except Exception as e:
+        logger.error(f"启动扩展查询任务失败: requestId={requestId}, error={e}")
+        # 发生异常,更新状态为处理失败
+        _update_expansion_status(requestId, 3)
+        # 从运行集合中移除
+        async with RUNNING_LOCK:
+            if hasattr(app.state, 'expansion_requests'):
+                app.state.expansion_requests.discard(requestId)
+        raise HTTPException(status_code=500, detail=f"启动扩展查询任务失败: {str(e)}")
+
     except Exception as e:
-        raise HTTPException(status_code=500, detail=f"执行Agent时出错: {str(e)}")
+        # 发生异常,更新状态为处理失败
+        _update_expansion_status(request.requestId, 3)
+        # 从运行集合中移除
+        async with RUNNING_LOCK:
+            if hasattr(app.state, 'expansion_requests'):
+                app.state.expansion_requests.discard(request.requestId)
+        raise HTTPException(status_code=500, detail=f"启动扩展查询任务失败: {str(e)}")
 def update_extract_status(request_id: str, status: int):
     try:
         from utils.mysql_db import MysqlHelper