|
@@ -147,7 +147,7 @@ async def restore_interrupted_processes():
|
|
|
|
|
|
# 2. 恢复提取中断的流程
|
|
|
await restore_extraction_processes()
|
|
|
-
|
|
|
+
|
|
|
# 3. 恢复扩展中断的流程
|
|
|
await restore_expansion_processes()
|
|
|
|
|
@@ -297,19 +297,11 @@ async def call_expand_with_retry(request_id: str, query: str, max_retries: int =
|
|
|
"""直接调用扩展函数,带重试机制"""
|
|
|
for attempt in range(max_retries):
|
|
|
try:
|
|
|
- # 直接调用扩展函数(同步函数,在线程池中执行)
|
|
|
+ # 直接调用扩展函数
|
|
|
from agents.expand_agent.agent import execute_expand_agent_with_api
|
|
|
- import concurrent.futures
|
|
|
|
|
|
- # 在线程池中执行同步函数
|
|
|
- loop = asyncio.get_event_loop()
|
|
|
- with concurrent.futures.ThreadPoolExecutor() as executor:
|
|
|
- result = await loop.run_in_executor(
|
|
|
- executor,
|
|
|
- execute_expand_agent_with_api,
|
|
|
- request_id,
|
|
|
- query
|
|
|
- )
|
|
|
+ # 直接调用同步函数,不使用线程池
|
|
|
+ result = execute_expand_agent_with_api(request_id, query)
|
|
|
|
|
|
logger.info(f"直接调用扩展函数成功: request_id={request_id}")
|
|
|
return
|
|
@@ -700,15 +692,7 @@ async def extract(request: ExtractRequest):
|
|
|
# 创建异步任务执行Agent
|
|
|
async def _execute_extract_async():
|
|
|
try:
|
|
|
- # 在线程池中执行同步函数
|
|
|
- import concurrent.futures
|
|
|
- loop = asyncio.get_event_loop()
|
|
|
- with concurrent.futures.ThreadPoolExecutor() as executor:
|
|
|
- result = await loop.run_in_executor(
|
|
|
- executor,
|
|
|
- execute_agent_with_api,
|
|
|
- json.dumps({"query_word": query, "request_id": requestId})
|
|
|
- )
|
|
|
+ result = execute_agent_with_api(json.dumps({"query_word": query, "request_id": requestId}))
|
|
|
# 更新状态为处理完成
|
|
|
update_extract_status(requestId, 2)
|
|
|
logger.info(f"异步提取任务完成: requestId={requestId}")
|
|
@@ -766,16 +750,9 @@ async def expand(request: ExpandRequest):
|
|
|
# 创建异步任务执行扩展Agent
|
|
|
async def _execute_expand_async():
|
|
|
try:
|
|
|
- # 在线程池中执行同步函数
|
|
|
- import concurrent.futures
|
|
|
- loop = asyncio.get_event_loop()
|
|
|
- with concurrent.futures.ThreadPoolExecutor() as executor:
|
|
|
- result = await loop.run_in_executor(
|
|
|
- executor,
|
|
|
- execute_expand_agent_with_api,
|
|
|
- requestId,
|
|
|
- query
|
|
|
- )
|
|
|
+ # 直接调用同步函数,不使用线程池
|
|
|
+ from agents.expand_agent.agent import execute_expand_agent_with_api
|
|
|
+ result = execute_expand_agent_with_api(requestId, query)
|
|
|
# 更新状态为处理完成
|
|
|
_update_expansion_status(requestId, 2)
|
|
|
logger.info(f"异步扩展查询任务完成: requestId={requestId}")
|