jihuaqiang il y a 1 mois
Parent
commit
fb56961924
1 fichiers modifiés avec 50 ajouts et 117 suppressions
  1. 50 117
      agent.py

+ 50 - 117
agent.py

@@ -193,7 +193,7 @@ async def restore_extraction_processes():
             request_id = row[0]
             query = row[1] if len(row) > 1 else ""
             try:
-                # 调用 /extract 接口,带重试机制
+                # 直接调用提取函数,带重试机制(函数内部会处理状态更新)
                 await call_extract_with_retry(request_id, query)
                 logger.info(f"✅ 恢复提取流程成功: request_id={request_id}")
             except Exception as e:
@@ -221,7 +221,7 @@ async def restore_expansion_processes():
             request_id = row[0]
             query = row[1] if len(row) > 1 else ""
             try:
-                # 调用 /expand 接口,带重试机制
+                # 直接调用扩展函数,带重试机制(函数内部会处理状态更新)
                 await call_expand_with_retry(request_id, query)
                 logger.info(f"✅ 恢复扩展流程成功: request_id={request_id}")
             except Exception as e:
@@ -231,157 +231,90 @@ async def restore_expansion_processes():
         logger.error(f"❌ 恢复扩展流程时发生错误: {e}")
 
 async def call_parse_async_with_retry(request_id: str, max_retries: int = 3):
-    """调用 /parse/async 接口,带重试机制"""
+    """直接调用解析函数,带重试机制"""
     for attempt in range(max_retries):
         try:
-            import httpx
-            
-            # 创建异步HTTP客户端
-            async with httpx.AsyncClient(timeout=30.0) as client:
-                response = await client.post(
-                    "http://localhost:8080/parse/async",
-                    json={"requestId": request_id}
-                )
-
-                print(f"response={response.json()}")
-                
-                if response.status_code == 200:
-                    result = response.json()
-                    logger.info(f"调用 /parse/async 成功: request_id={request_id}, result={result}")
-                    return
-                else:
-                    logger.warning(f"调用 /parse/async 失败: request_id={request_id}, status_code={response.status_code}, attempt={attempt+1}")
+            # 直接调用后台处理函数
+            await process_request_background(request_id)
+            logger.info(f"直接调用解析函数成功: request_id={request_id}")
+            return
                     
         except Exception as e:
-            logger.warning(f"调用 /parse/async 异常: request_id={request_id}, error={e}, attempt={attempt+1}")
+            logger.warning(f"直接调用解析函数异常: request_id={request_id}, error={e}, attempt={attempt+1}")
         
         # 如果不是最后一次尝试,等待后重试
         if attempt < max_retries - 1:
             await asyncio.sleep(2 ** attempt)  # 指数退避
     
-    logger.error(f"调用 /parse/async 最终失败: request_id={request_id}, 已重试{max_retries}次")
+    logger.error(f"直接调用解析函数最终失败: request_id={request_id}, 已重试{max_retries}次")
 
 async def call_extract_with_retry(request_id: str, query: str, max_retries: int = 3):
-    """调用 /extract 接口,带重试机制"""
+    """直接调用提取函数,带重试机制"""
     for attempt in range(max_retries):
         try:
-            import httpx
+            # 更新状态为处理中
+            update_extract_status(request_id, 1)
             
-            # 创建异步HTTP客户端
-            async with httpx.AsyncClient(timeout=30.0) as client:
-                response = await client.post(
-                    "http://localhost:8080/extract",
-                    json={"requestId": request_id, "query": query}
+            # 直接调用提取函数(同步函数,在线程池中执行)
+            from agents.clean_agent.agent import execute_agent_with_api
+            import concurrent.futures
+            
+            # 在线程池中执行同步函数
+            loop = asyncio.get_event_loop()
+            with concurrent.futures.ThreadPoolExecutor() as executor:
+                result = await loop.run_in_executor(
+                    executor, 
+                    execute_agent_with_api, 
+                    json.dumps({"query_word": query, "request_id": request_id})
                 )
-                
-                if response.status_code == 200:
-                    result = response.json()
-                    logger.info(f"调用 /extract 成功: request_id={request_id}, result={result}")
-                    return
-                else:
-                    logger.warning(f"调用 /extract 失败: request_id={request_id}, status_code={response.status_code}, attempt={attempt+1}")
+            
+            # 更新状态为处理完成
+            update_extract_status(request_id, 2)
+            logger.info(f"直接调用提取函数成功: request_id={request_id}, result={result}")
+            return
                     
         except Exception as e:
-            logger.warning(f"调用 /extract 异常: request_id={request_id}, error={e}, attempt={attempt+1}")
+            logger.warning(f"直接调用提取函数异常: request_id={request_id}, error={e}, attempt={attempt+1}")
+            # 更新状态为处理失败
+            update_extract_status(request_id, 3)
         
         # 如果不是最后一次尝试,等待后重试
         if attempt < max_retries - 1:
             await asyncio.sleep(2 ** attempt)  # 指数退避
     
-    logger.error(f"调用 /extract 最终失败: request_id={request_id}, 已重试{max_retries}次")
+    logger.error(f"直接调用提取函数最终失败: request_id={request_id}, 已重试{max_retries}次")
 
 async def call_expand_with_retry(request_id: str, query: str, max_retries: int = 3):
-    """调用 /expand 接口,带重试机制"""
+    """直接调用扩展函数,带重试机制"""
     for attempt in range(max_retries):
         try:
-            import httpx
+            # 直接调用扩展函数(同步函数,在线程池中执行)
+            from agents.expand_agent.agent import execute_expand_agent_with_api
+            import concurrent.futures
             
-            # 创建异步HTTP客户端
-            async with httpx.AsyncClient(timeout=30.0) as client:
-                response = await client.post(
-                    "http://localhost:8080/expand",
-                    json={"requestId": request_id, "query": query}
+            # 在线程池中执行同步函数
+            loop = asyncio.get_event_loop()
+            with concurrent.futures.ThreadPoolExecutor() as executor:
+                result = await loop.run_in_executor(
+                    executor, 
+                    execute_expand_agent_with_api, 
+                    request_id, 
+                    query
                 )
-                
-                if response.status_code == 200:
-                    result = response.json()
-                    logger.info(f"调用 /expand 成功: request_id={request_id}, result={result}")
-                    return
-                else:
-                    logger.warning(f"调用 /expand 失败: request_id={request_id}, status_code={response.status_code}, attempt={attempt+1}")
+            
+            logger.info(f"直接调用扩展函数成功: request_id={request_id}")
+            return
                     
         except Exception as e:
-            logger.warning(f"调用 /expand 异常: request_id={request_id}, error={e}, attempt={attempt+1}")
+            logger.warning(f"直接调用扩展函数异常: request_id={request_id}, error={e}, attempt={attempt+1}")
         
         # 如果不是最后一次尝试,等待后重试
         if attempt < max_retries - 1:
             await asyncio.sleep(2 ** attempt)  # 指数退避
     
-    logger.error(f"调用 /expand 最终失败: request_id={request_id}, 已重试{max_retries}次")
-
-async def call_parse_async(request_id: str):
-    """调用 /parse/async 接口"""
-    try:
-        import httpx
-        import asyncio
-        
-        # 创建异步HTTP客户端
-        async with httpx.AsyncClient(timeout=30.0) as client:
-            response = await client.post(
-                "http://localhost:8080/parse/async",
-                json={"requestId": request_id}
-            )
-            
-            if response.status_code == 200:
-                result = response.json()
-                logger.info(f"调用 /parse/async 成功: request_id={request_id}, result={result}")
-            else:
-                logger.error(f"调用 /parse/async 失败: request_id={request_id}, status_code={response.status_code}")
-                
-    except Exception as e:
-        logger.error(f"调用 /parse/async 异常: request_id={request_id}, error={e}")
-
-async def call_extract(request_id: str, query: str):
-    """调用 /extract 接口"""
-    try:
-        import httpx
-        
-        # 创建异步HTTP客户端
-        async with httpx.AsyncClient(timeout=30.0) as client:
-            response = await client.post(
-                "http://localhost:8080/extract",
-                json={"requestId": request_id, "query": query}
-            )
-            
-            if response.status_code == 200:
-                result = response.json()
-                logger.info(f"调用 /extract 成功: request_id={request_id}, result={result}")
-            else:
-                logger.error(f"调用 /extract 失败: request_id={request_id}, status_code={response.status_code}")
-                
-    except Exception as e:
-        logger.error(f"调用 /extract 异常: request_id={request_id}, error={e}")
+    logger.error(f"直接调用扩展函数最终失败: request_id={request_id}, 已重试{max_retries}次")
 
-async def call_expand(request_id: str, query: str):
-    """调用 /expand 接口"""
-    try:
-        import httpx
-        
-        # 创建异步HTTP客户端
-        async with httpx.AsyncClient(timeout=30.0) as client:
-            response = await client.post(
-                "http://localhost:8080/expand",
-                json={"requestId": request_id, "query": query}
-            )
-            
-            if response.status_code == 200:
-                result = response.json()
-                logger.info(f"调用 /expand 成功: request_id={request_id}, result={result}")
-            else:
-                logger.error(f"调用 /expand 失败: request_id={request_id}, status_code={response.status_code}")
-                
-    except Exception as e:
-        logger.error(f"调用 /expand 异常: request_id={request_id}, error={e}")
+# 这些函数已被删除,因为我们现在直接调用相应的函数而不是通过HTTP请求
 
 # 创建 FastAPI 应用
 app = FastAPI(