|
@@ -373,7 +373,7 @@ def process_single_item(args):
|
|
|
idx, item, request_id, api_key = args
|
|
|
try:
|
|
|
# 临时设置环境变量以使用指定的API密钥
|
|
|
- original_api_key = os.getenv('GEMINI_API_KEY')
|
|
|
+ original_api_key = os.getenv('GEMINI_API_KEY_1')
|
|
|
os.environ['GEMINI_API_KEY'] = api_key
|
|
|
|
|
|
crawl_data = item.get('crawl_data') or {}
|
|
@@ -522,7 +522,7 @@ def create_langgraph_workflow():
|
|
|
api_keys.append(api_key)
|
|
|
else:
|
|
|
logger.warning(f"未找到 GEMINI_API_KEY_{i},使用默认 GEMINI_API_KEY")
|
|
|
- api_keys.append(os.getenv('GEMINI_API_KEY'))
|
|
|
+ api_keys.append(os.getenv('GEMINI_API_KEY_1'))
|
|
|
|
|
|
# 准备多进程参数,为每个任务分配API密钥
|
|
|
process_args = []
|
|
@@ -631,55 +631,6 @@ async def health_check():
|
|
|
"langgraph_enabled": HAS_LANGGRAPH
|
|
|
}
|
|
|
|
|
|
-@app.post("/parse", response_model=TriggerResponse)
|
|
|
-async def parse_processing(request: TriggerRequest, background_tasks: BackgroundTasks):
|
|
|
- """
|
|
|
- 解析内容处理
|
|
|
-
|
|
|
- - **requestId**: 请求ID,用于标识处理任务
|
|
|
- """
|
|
|
- try:
|
|
|
- logger.info(f"收到解析请求: requestId={request.requestId}")
|
|
|
-
|
|
|
- if WORKFLOW and HAS_LANGGRAPH:
|
|
|
- # 使用 LangGraph 工作流
|
|
|
- logger.info("使用 LangGraph 工作流处理")
|
|
|
-
|
|
|
- # 初始化状态
|
|
|
- initial_state = AgentState(
|
|
|
- request_id=request.requestId,
|
|
|
- items=[],
|
|
|
- details=[],
|
|
|
- processed=0,
|
|
|
- success=0,
|
|
|
- error=None,
|
|
|
- status="started"
|
|
|
- )
|
|
|
-
|
|
|
- # 执行工作流
|
|
|
- final_state = WORKFLOW.invoke(
|
|
|
- initial_state,
|
|
|
- config={
|
|
|
- "configurable": {"thread_id": f"thread_{request.requestId}"},
|
|
|
- "recursion_limit": 100 # 增加递归限制
|
|
|
- }
|
|
|
- )
|
|
|
-
|
|
|
- # 构建响应
|
|
|
- result = TriggerResponse(
|
|
|
- requestId=request.requestId,
|
|
|
- processed=final_state.get("processed", 0),
|
|
|
- success=final_state.get("success", 0),
|
|
|
- details=final_state.get("details", [])
|
|
|
- )
|
|
|
- return result
|
|
|
-
|
|
|
- except Exception as e:
|
|
|
- logger.error(f"处理请求失败: {e}")
|
|
|
- # 处理失败,更新状态为3
|
|
|
- update_request_status(request.requestId, 3)
|
|
|
- raise HTTPException(status_code=500, detail=f"处理失败: {str(e)}")
|
|
|
-
|
|
|
@app.post("/parse/async", status_code=200)
|
|
|
async def parse_processing_async(request: TriggerRequest, background_tasks: BackgroundTasks):
|
|
|
"""
|
|
@@ -763,19 +714,6 @@ def process_request_background_sync(request_id: str):
|
|
|
# 处理失败,更新状态为3
|
|
|
update_request_status(request_id, 3)
|
|
|
|
|
|
-async def process_request_background(request_id: str):
|
|
|
- """后台处理请求(异步版本,为了兼容性保留)"""
|
|
|
- try:
|
|
|
- # 在线程池中执行同步版本
|
|
|
- loop = asyncio.get_event_loop()
|
|
|
- with concurrent.futures.ThreadPoolExecutor() as executor:
|
|
|
- await loop.run_in_executor(executor, process_request_background_sync, request_id)
|
|
|
- except Exception as e:
|
|
|
- logger.error(f"后台处理失败: requestId={request_id}, error={e}")
|
|
|
- # 处理失败,更新状态为3
|
|
|
- update_request_status(request_id, 3)
|
|
|
-
|
|
|
-
|
|
|
extraction_requests: set = set()
|
|
|
|
|
|
@app.post("/extract")
|