123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847 |
- #!/usr/bin/env python3
- # -*- coding: utf-8 -*-
- """
- 使用 FastAPI + LangGraph 重构的 Agent 服务
- 提供强大的工作流管理和状态控制
- """
- import json
- import sys
- import os
- import time
- from typing import Any, Dict, List, Optional, TypedDict, Annotated
- from contextlib import asynccontextmanager
- import asyncio
- from utils.mysql_db import MysqlHelper
- # 保证可以导入本项目模块
- sys.path.append(os.path.dirname(os.path.abspath(__file__)))
- from fastapi import FastAPI, HTTPException, BackgroundTasks
- from fastapi.responses import JSONResponse
- from pydantic import BaseModel, Field
- import uvicorn
- from agents.clean_agent.agent import execute_agent_with_api
- from agents.expand_agent.agent import execute_expand_agent_with_api, _update_expansion_status
- # LangGraph 相关导入
- try:
- from langgraph.graph import StateGraph, END
- HAS_LANGGRAPH = True
- except ImportError:
- HAS_LANGGRAPH = False
- print("警告: LangGraph 未安装")
- from utils.logging_config import get_logger
- from tools.agent_tools import QueryDataTool, IdentifyTool, UpdateDataTool, StructureTool
- # 创建 logger
- logger = get_logger('Agent')
- # 状态定义
- class AgentState(TypedDict):
- request_id: str
- items: List[Dict[str, Any]]
- details: List[Dict[str, Any]]
- processed: int
- success: int
- error: Optional[str]
- status: str
- class ExpandRequest(BaseModel):
- requestId: str = Field(..., description="请求ID")
- query: str = Field(..., description="查询词")
- # 请求模型
- class TriggerRequest(BaseModel):
- requestId: str = Field(..., description="请求ID")
- # 响应模型
- class TriggerResponse(BaseModel):
- requestId: str
- processed: int
- success: int
- details: List[Dict[str, Any]]
- class ExtractRequest(BaseModel):
- requestId: str = Field(..., description="请求ID")
- query: str = Field(..., description="查询词")
- # 全局变量
- identify_tool = None
- def update_request_status(request_id: str, status: int):
- """
- 更新 knowledge_request 表中的 parsing_status
-
- Args:
- request_id: 请求ID
- status: 状态值 (1: 处理中, 2: 处理完成, 3: 处理失败)
- """
- try:
- from utils.mysql_db import MysqlHelper
-
- sql = "UPDATE knowledge_request SET parsing_status = %s WHERE request_id = %s"
- result = MysqlHelper.update_values(sql, (status, request_id))
-
- if result is not None:
- logger.info(f"更新请求状态成功: requestId={request_id}, status={status}")
- else:
- logger.error(f"更新请求状态失败: requestId={request_id}, status={status}")
-
- except Exception as e:
- logger.error(f"更新请求状态异常: requestId={request_id}, status={status}, error={e}")
- def _update_expansion_status(requestId: str, status: int):
- """更新扩展查询状态"""
- try:
- from utils.mysql_db import MysqlHelper
- sql = "UPDATE knowledge_request SET expansion_status = %s WHERE request_id = %s"
- MysqlHelper.update_values(sql, (status, requestId))
- logger.info(f"更新扩展查询状态成功: requestId={requestId}, status={status}")
- except Exception as e:
- logger.error(f"更新扩展查询状态失败: requestId={requestId}, status={status}, error={e}")
- @asynccontextmanager
- async def lifespan(app: FastAPI):
- """应用生命周期管理"""
- # 启动时执行
- logger.info("🚀 启动 Knowledge Agent 服务...")
-
- # 初始化全局工具
- global identify_tool
- identify_tool = IdentifyTool()
-
- # 启动后恢复中断的流程
- # 异步恢复中断流程,避免阻塞启动
- app.state.restore_task = asyncio.create_task(restore_interrupted_processes())
-
- yield
-
- # 关闭时执行
- logger.info("🛑 关闭 Knowledge Agent 服务...")
- # 优雅取消后台恢复任务
- restore_task = getattr(app.state, 'restore_task', None)
- if restore_task and not restore_task.done():
- restore_task.cancel()
- try:
- await restore_task
- except asyncio.CancelledError:
- logger.info("✅ 已取消后台恢复任务")
- async def restore_interrupted_processes():
- """
- 启动后恢复中断的流程
- 1. 找到knowledge_request表中parsing_status=1的request_id,去请求 /parse/async
- 2. 找到knowledge_request表中extraction_status=1的request_id和query,去请求 /extract
- 3. 找到knowledge_request表中expansion_status=1的request_id和query,去请求 /expand
- """
- try:
- logger.info("🔄 开始恢复中断的流程...")
-
- # 等待服务完全启动
- await asyncio.sleep(3)
-
- # 1. 恢复解析中断的流程
- await restore_parsing_processes()
-
- # 2. 恢复提取中断的流程
- await restore_extraction_processes()
-
- # 3. 恢复扩展中断的流程
- await restore_expansion_processes()
-
- logger.info("✅ 流程恢复完成")
-
- except Exception as e:
- logger.error(f"❌ 流程恢复失败: {e}")
- async def restore_parsing_processes():
- """恢复解析中断的流程"""
- try:
- from utils.mysql_db import MysqlHelper
-
- # 查询parsing_status=1的请求
- sql = "SELECT request_id FROM knowledge_request WHERE parsing_status = 1"
- rows = MysqlHelper.get_values(sql)
-
- if not rows:
- logger.info("📋 没有发现中断的解析流程")
- return
-
- logger.info(f"🔄 发现 {len(rows)} 个中断的解析流程,开始恢复...")
-
- for row in rows:
- request_id = row[0]
- try:
- # 调用 /parse/async 接口,带重试机制
- await call_parse_async_with_retry(request_id)
- logger.info(f"✅ 恢复解析流程成功: request_id={request_id}")
- except Exception as e:
- logger.error(f"❌ 恢复解析流程失败: request_id={request_id}, error={e}")
-
- except Exception as e:
- logger.error(f"❌ 恢复解析流程时发生错误: {e}")
- async def restore_extraction_processes():
- """恢复提取中断的流程"""
- try:
- from utils.mysql_db import MysqlHelper
-
- # 查询extraction_status=1的请求和query
- sql = "SELECT request_id, query FROM knowledge_request WHERE extraction_status = 1"
- rows = MysqlHelper.get_values(sql)
-
- if not rows:
- logger.info("📋 没有发现中断的提取流程")
- return
-
- logger.info(f"🔄 发现 {len(rows)} 个中断的提取流程,开始恢复...")
-
- for row in rows:
- request_id = row[0]
- query = row[1] if len(row) > 1 else ""
- try:
- # 直接调用提取函数,带重试机制(函数内部会处理状态更新)
- await call_extract_with_retry(request_id, query)
- logger.info(f"✅ 恢复提取流程成功: request_id={request_id}")
- except Exception as e:
- logger.error(f"❌ 恢复提取流程失败: request_id={request_id}, error={e}")
-
- except Exception as e:
- logger.error(f"❌ 恢复提取流程时发生错误: {e}")
- async def restore_expansion_processes():
- """恢复扩展中断的流程"""
- try:
- from utils.mysql_db import MysqlHelper
-
- # 查询expansion_status=1的请求和query
- sql = "SELECT request_id, query FROM knowledge_request WHERE expansion_status = 1"
- rows = MysqlHelper.get_values(sql)
-
- if not rows:
- logger.info("📋 没有发现中断的扩展流程")
- return
-
- logger.info(f"🔄 发现 {len(rows)} 个中断的扩展流程,开始恢复...")
-
- for row in rows:
- request_id = row[0]
- query = row[1] if len(row) > 1 else ""
- try:
- # 直接调用扩展函数,带重试机制(函数内部会处理状态更新)
- await call_expand_with_retry(request_id, query)
- logger.info(f"✅ 恢复扩展流程成功: request_id={request_id}")
- except Exception as e:
- logger.error(f"❌ 恢复扩展流程失败: request_id={request_id}, error={e}")
-
- except Exception as e:
- logger.error(f"❌ 恢复扩展流程时发生错误: {e}")
- async def call_parse_async_with_retry(request_id: str, max_retries: int = 3):
- """直接调用解析函数,带重试机制"""
- for attempt in range(max_retries):
- try:
- # 直接调用后台处理函数
- await process_request_background(request_id)
- logger.info(f"直接调用解析函数成功: request_id={request_id}")
- return
-
- except Exception as e:
- logger.warning(f"直接调用解析函数异常: request_id={request_id}, error={e}, attempt={attempt+1}")
-
- # 如果不是最后一次尝试,等待后重试
- if attempt < max_retries - 1:
- await asyncio.sleep(2 ** attempt) # 指数退避
-
- logger.error(f"直接调用解析函数最终失败: request_id={request_id}, 已重试{max_retries}次")
- async def call_extract_with_retry(request_id: str, query: str, max_retries: int = 3):
- """直接调用提取函数,带重试机制"""
- for attempt in range(max_retries):
- try:
- # 更新状态为处理中
- update_extract_status(request_id, 1)
-
- # 直接调用提取函数(同步函数,在线程池中执行)
- from agents.clean_agent.agent import execute_agent_with_api
- import concurrent.futures
-
- # 在线程池中执行同步函数
- loop = asyncio.get_event_loop()
- with concurrent.futures.ThreadPoolExecutor() as executor:
- result = await loop.run_in_executor(
- executor,
- execute_agent_with_api,
- json.dumps({"query_word": query, "request_id": request_id})
- )
-
- # 更新状态为处理完成
- update_extract_status(request_id, 2)
- logger.info(f"直接调用提取函数成功: request_id={request_id}, result={result}")
- return
-
- except Exception as e:
- logger.warning(f"直接调用提取函数异常: request_id={request_id}, error={e}, attempt={attempt+1}")
- # 更新状态为处理失败
- update_extract_status(request_id, 3)
-
- # 如果不是最后一次尝试,等待后重试
- if attempt < max_retries - 1:
- await asyncio.sleep(2 ** attempt) # 指数退避
-
- logger.error(f"直接调用提取函数最终失败: request_id={request_id}, 已重试{max_retries}次")
- async def call_expand_with_retry(request_id: str, query: str, max_retries: int = 3):
- """直接调用扩展函数,带重试机制"""
- for attempt in range(max_retries):
- try:
- # 直接调用扩展函数(同步函数,在线程池中执行)
- from agents.expand_agent.agent import execute_expand_agent_with_api
- import concurrent.futures
-
- # 在线程池中执行同步函数
- loop = asyncio.get_event_loop()
- with concurrent.futures.ThreadPoolExecutor() as executor:
- result = await loop.run_in_executor(
- executor,
- execute_expand_agent_with_api,
- request_id,
- query
- )
-
- logger.info(f"直接调用扩展函数成功: request_id={request_id}")
- return
-
- except Exception as e:
- logger.warning(f"直接调用扩展函数异常: request_id={request_id}, error={e}, attempt={attempt+1}")
-
- # 如果不是最后一次尝试,等待后重试
- if attempt < max_retries - 1:
- await asyncio.sleep(2 ** attempt) # 指数退避
-
- logger.error(f"直接调用扩展函数最终失败: request_id={request_id}, 已重试{max_retries}次")
- # 这些函数已被删除,因为我们现在直接调用相应的函数而不是通过HTTP请求
- # 创建 FastAPI 应用
- app = FastAPI(
- title="Knowledge Agent API",
- description="基于 LangGraph 的智能内容识别和结构化处理服务",
- version="2.0.0",
- lifespan=lifespan
- )
- # 并发控制:跟踪正在处理的 requestId,防止重复并发提交
- RUNNING_REQUESTS: set = set()
- RUNNING_LOCK = asyncio.Lock()
- # =========================
- # LangGraph 工作流定义
- # =========================
- def create_langgraph_workflow():
- """创建 LangGraph 工作流"""
- if not HAS_LANGGRAPH:
- return None
-
- # 工作流节点定义
-
- def fetch_data(state: AgentState) -> AgentState:
- """获取待处理数据"""
- try:
- request_id = state["request_id"]
- logger.info(f"开始获取数据: requestId={request_id}")
-
- # 更新状态为处理中
- update_request_status(request_id, 1)
-
- items = QueryDataTool.fetch_crawl_data_list(request_id)
- state["items"] = items
- state["processed"] = len(items)
- state["status"] = "data_fetched"
-
- logger.info(f"数据获取完成: requestId={request_id}, 数量={len(items)}")
- return state
-
- except Exception as e:
- logger.error(f"获取数据失败: {e}")
- state["error"] = str(e)
- state["status"] = "error"
- return state
-
- def process_items_batch(state: AgentState) -> AgentState:
- """批量处理所有数据项"""
- try:
- items = state["items"]
- if not items:
- state["status"] = "completed"
- return state
-
- success_count = 0
- details = []
-
- for idx, item in enumerate(items, start=1):
- try:
- crawl_data = item.get('crawl_data') or {}
- content_id = item.get('content_id') or ''
- task_id = item.get('task_id') or ''
- # 先在库中查询是否已经处理过
- check_sql = "SELECT id,status,indentify_data FROM knowledge_parsing_content WHERE request_id = %s AND content_id = %s"
- check_result = MysqlHelper.get_values(check_sql, (state["request_id"], content_id))
- result_status = 0
- result_id = 0
- result_indentify_data = {}
- if check_result:
- id, status, indentify_data = check_result[0]
- logger.info(f"查询到待结构化处理的条目,id: {id}, status: {status}, indentify_data: {indentify_data}")
- result_status = status
- result_id = id
- result_indentify_data = indentify_data
- if status == 5:
- success_count += 1
- continue
- # result_status == 0 表示为处理过,需要进行识别和结构化
- if result_status == 0 or result_status == 3:
- # Step 1: 识别
- identify_result = identify_tool.run(
- crawl_data if isinstance(crawl_data, dict) else {}
- )
-
- # Step 2: 结构化并入库
- affected = UpdateDataTool.store_indentify_result(
- state["request_id"],
- {
- "content_id": content_id,
- "task_id": task_id
- },
- identify_result
- )
- else:
- # result_indentify_data是JSON字符串,需要解析为对象
- identify_result = json.loads(result_indentify_data) if isinstance(result_indentify_data, str) else result_indentify_data
- affected = result_id
-
- # 使用StructureTool进行内容结构化处理
- structure_tool = StructureTool()
- structure_result = structure_tool.process_content_structure(identify_result)
-
- # 存储结构化解析结果
- parsing_affected = UpdateDataTool.store_parsing_result(
- state["request_id"],
- {
- "id": affected,
- "content_id": content_id,
- "task_id": task_id
- },
- structure_result
- )
-
- ok = affected is not None and affected > 0 and parsing_affected is not None and parsing_affected > 0
- if ok:
- success_count += 1
- else:
- success_count += 1
- logger.error(f"处理第 {idx} 项时出错: {identify_result.get('error') or structure_result.get('error')}")
-
- # 记录处理详情
- detail = {
- "index": idx,
- "dbInserted": ok,
- "identifyError": identify_result.get('error') or structure_result.get('error'),
- "status": 2 if ok else 3
- }
- details.append(detail)
-
- logger.info(f"处理进度: {idx}/{len(items)} - {'成功' if ok else '失败'}")
-
- except Exception as e:
- logger.error(f"处理第 {idx} 项时出错: {e}")
- detail = {
- "index": idx,
- "dbInserted": False,
- "identifyError": str(e),
- "status": 3
- }
- details.append(detail)
-
- state["success"] = success_count
- state["details"] = details
- state["status"] = "completed"
-
- return state
-
- except Exception as e:
- logger.error(f"批量处理失败: {e}")
- state["error"] = str(e)
- state["status"] = "error"
- return state
-
- def should_continue(state: AgentState) -> str:
- """判断是否继续处理"""
- if state.get("error"):
- # 处理失败,更新状态为3
- update_request_status(state["request_id"], 3)
- return "end"
-
- # 所有数据处理完毕,更新状态为2
- update_request_status(state["request_id"], 2)
- return "end"
-
- # 构建工作流图
- workflow = StateGraph(AgentState)
-
- # 添加节点
- workflow.add_node("fetch_data", fetch_data)
- workflow.add_node("process_items_batch", process_items_batch)
-
- # 设置入口点
- workflow.set_entry_point("fetch_data")
-
- # 添加边
- workflow.add_edge("fetch_data", "process_items_batch")
- workflow.add_edge("process_items_batch", END)
-
- # 编译工作流
- return workflow.compile()
- # 全局工作流实例
- WORKFLOW = create_langgraph_workflow() if HAS_LANGGRAPH else None
- # =========================
- # FastAPI 接口定义
- # =========================
- @app.get("/")
- async def root():
- """根路径,返回服务信息"""
- return {
- "service": "Knowledge Agent API",
- "version": "2.0.0",
- "status": "running",
- "langgraph_enabled": HAS_LANGGRAPH,
- "endpoints": {
- "parse": "/parse",
- "parse/async": "/parse/async",
- "health": "/health",
- "docs": "/docs"
- }
- }
- @app.get("/health")
- async def health_check():
- """健康检查接口"""
- return {
- "status": "healthy",
- "timestamp": time.time(),
- "langgraph_enabled": HAS_LANGGRAPH
- }
- @app.post("/parse", response_model=TriggerResponse)
- async def parse_processing(request: TriggerRequest, background_tasks: BackgroundTasks):
- """
- 解析内容处理
-
- - **requestId**: 请求ID,用于标识处理任务
- """
- try:
- logger.info(f"收到解析请求: requestId={request.requestId}")
-
- if WORKFLOW and HAS_LANGGRAPH:
- # 使用 LangGraph 工作流
- logger.info("使用 LangGraph 工作流处理")
-
- # 初始化状态
- initial_state = AgentState(
- request_id=request.requestId,
- items=[],
- details=[],
- processed=0,
- success=0,
- error=None,
- status="started"
- )
-
- # 执行工作流
- final_state = WORKFLOW.invoke(
- initial_state,
- config={
- "configurable": {"thread_id": f"thread_{request.requestId}"},
- "recursion_limit": 100 # 增加递归限制
- }
- )
-
- # 构建响应
- result = TriggerResponse(
- requestId=request.requestId,
- processed=final_state.get("processed", 0),
- success=final_state.get("success", 0),
- details=final_state.get("details", [])
- )
- return result
-
- except Exception as e:
- logger.error(f"处理请求失败: {e}")
- # 处理失败,更新状态为3
- update_request_status(request.requestId, 3)
- raise HTTPException(status_code=500, detail=f"处理失败: {str(e)}")
- @app.post("/parse/async", status_code=200)
- async def parse_processing_async(request: TriggerRequest, background_tasks: BackgroundTasks):
- """
- 异步解析内容处理(后台任务)
-
- - **requestId**: 请求ID,用于标识处理任务
-
- 行为:立即返回 200,并在后台继续处理任务。
- 若同一个 requestId 已有任务进行中,则立即返回失败(status=3)。
- """
- try:
- logger.info(f"收到异步解析请求: requestId={request.requestId}")
-
- # 并发防抖:同一 requestId 只允许一个在运行
- async with RUNNING_LOCK:
- if request.requestId in RUNNING_REQUESTS:
- return {
- "requestId": request.requestId,
- "status": 3,
- "message": "已有任务进行中,稍后再试",
- "langgraph_enabled": HAS_LANGGRAPH
- }
- RUNNING_REQUESTS.add(request.requestId)
-
- async def _background_wrapper(rid: str):
- try:
- await process_request_background(rid)
- finally:
- async with RUNNING_LOCK:
- RUNNING_REQUESTS.discard(rid)
-
- # 直接使用 asyncio 创建后台任务(不阻塞当前请求返回)
- asyncio.create_task(_background_wrapper(request.requestId))
-
- # 立即返回(不阻塞)
- return {
- "requestId": request.requestId,
- "status": 1,
- "message": "任务已进入队列并在后台处理",
- "langgraph_enabled": HAS_LANGGRAPH
- }
-
- except Exception as e:
- logger.error(f"提交异步任务失败: {e}")
- raise HTTPException(status_code=500, detail=f"提交任务失败: {str(e)}")
- async def process_request_background(request_id: str):
- """后台处理请求"""
- try:
- logger.info(f"开始后台处理: requestId={request_id}")
-
- if WORKFLOW and HAS_LANGGRAPH:
- # 使用 LangGraph 工作流
- # 更新状态为处理中
- update_request_status(request_id, 1)
-
- initial_state = AgentState(
- request_id=request_id,
- items=[],
- details=[],
- processed=0,
- success=0,
- error=None,
- status="started"
- )
-
- final_state = WORKFLOW.invoke(
- initial_state,
- config={
- "configurable": {"thread_id": f"thread_{request_id}"},
- "recursion_limit": 100 # 增加递归限制
- }
- )
- # 所有数据处理完毕,更新状态为2
- update_request_status(request_id, 2)
- logger.info(f"LangGraph 后台处理完成: requestId={request_id}, processed={final_state.get('processed', 0)}, success={final_state.get('success', 0)}")
-
- except Exception as e:
- logger.error(f"后台处理失败: requestId={request_id}, error={e}")
- # 处理失败,更新状态为3
- update_request_status(request_id, 3)
- extraction_requests: set = set()
- @app.post("/extract")
- async def extract(request: ExtractRequest):
- """
- 执行提取处理(异步方式)
-
- Args:
- request: 包含请求ID和查询词的请求体
-
- Returns:
- dict: 包含执行状态的字典
- """
- try:
- requestId = request.requestId
- query = request.query
- logger.info(f"收到提取请求: requestId={requestId}, query={query}")
-
- # 并发防抖:同一 requestId 只允许一个在运行
- if requestId in extraction_requests:
- return {"status": 1, "requestId": requestId, "message": "请求已在处理中"}
- extraction_requests.add(requestId)
-
- # 更新状态为处理中
- update_extract_status(requestId, 1)
-
- # 创建异步任务执行Agent
- async def _execute_extract_async():
- try:
- # 在线程池中执行同步函数
- import concurrent.futures
- loop = asyncio.get_event_loop()
- with concurrent.futures.ThreadPoolExecutor() as executor:
- result = await loop.run_in_executor(
- executor,
- execute_agent_with_api,
- json.dumps({"query_word": query, "request_id": requestId})
- )
- # 更新状态为处理完成
- update_extract_status(requestId, 2)
- logger.info(f"异步提取任务完成: requestId={requestId}")
- return result
- except Exception as e:
- logger.error(f"异步提取任务失败: requestId={requestId}, error={e}")
- # 更新状态为处理失败
- update_extract_status(requestId, 3)
- raise
- finally:
- extraction_requests.discard(requestId)
-
- # 创建异步任务但不等待完成
- asyncio.create_task(_execute_extract_async())
-
- # 立即返回状态
- return {"status": 1, "requestId": requestId, "message": "提取任务已启动并在后台处理"}
- except Exception as e:
- logger.error(f"启动提取任务失败: requestId={requestId}, error={e}")
- # 发生异常,更新状态为处理失败
- update_extract_status(requestId, 3)
- # 从运行集合中移除
- extraction_requests.discard(requestId)
- raise HTTPException(status_code=500, detail=f"启动提取任务失败: {str(e)}")
- @app.post("/expand")
- async def expand(request: ExpandRequest):
- """
- 执行扩展查询处理(异步方式)
-
- Args:
- request: 包含请求ID和查询词的请求体
-
- Returns:
- dict: 包含执行状态的字典
- """
- try:
- requestId = request.requestId
- query = request.query
- logger.info(f"收到扩展查询请求: requestId={requestId}, query={query}")
-
- # 并发防抖:同一 requestId 只允许一个在运行
- expansion_requests = getattr(app.state, 'expansion_requests', set())
- async with RUNNING_LOCK:
- if requestId in expansion_requests:
- return {"status": 1, "requestId": requestId, "message": "扩展查询已在处理中"}
- # 如果集合不存在,创建它
- if not hasattr(app.state, 'expansion_requests'):
- app.state.expansion_requests = set()
- app.state.expansion_requests.add(requestId)
-
- # 立即更新状态为处理中
- _update_expansion_status(requestId, 1)
-
- # 创建异步任务执行扩展Agent
- async def _execute_expand_async():
- try:
- # 在线程池中执行同步函数
- import concurrent.futures
- loop = asyncio.get_event_loop()
- with concurrent.futures.ThreadPoolExecutor() as executor:
- result = await loop.run_in_executor(
- executor,
- execute_expand_agent_with_api,
- requestId,
- query
- )
- # 更新状态为处理完成
- _update_expansion_status(requestId, 2)
- logger.info(f"异步扩展查询任务完成: requestId={requestId}")
- return result
- except Exception as e:
- logger.error(f"异步扩展查询任务失败: requestId={requestId}, error={e}")
- # 更新状态为处理失败
- _update_expansion_status(requestId, 3)
- raise
- finally:
- # 无论成功失败,都从运行集合中移除
- async with RUNNING_LOCK:
- if hasattr(app.state, 'expansion_requests'):
- app.state.expansion_requests.discard(requestId)
-
- # 创建异步任务但不等待完成
- asyncio.create_task(_execute_expand_async())
-
- # 立即返回状态
- return {"status": 1, "requestId": requestId, "message": "扩展查询任务已启动并在后台处理"}
- except Exception as e:
- logger.error(f"启动扩展查询任务失败: requestId={requestId}, error={e}")
- # 发生异常,更新状态为处理失败
- _update_expansion_status(requestId, 3)
- # 从运行集合中移除
- async with RUNNING_LOCK:
- if hasattr(app.state, 'expansion_requests'):
- app.state.expansion_requests.discard(requestId)
- raise HTTPException(status_code=500, detail=f"启动扩展查询任务失败: {str(e)}")
- except Exception as e:
- # 发生异常,更新状态为处理失败
- _update_expansion_status(request.requestId, 3)
- # 从运行集合中移除
- async with RUNNING_LOCK:
- if hasattr(app.state, 'expansion_requests'):
- app.state.expansion_requests.discard(request.requestId)
- raise HTTPException(status_code=500, detail=f"启动扩展查询任务失败: {str(e)}")
- def update_extract_status(request_id: str, status: int):
- try:
- from utils.mysql_db import MysqlHelper
-
- sql = "UPDATE knowledge_request SET extraction_status = %s WHERE request_id = %s"
- result = MysqlHelper.update_values(sql, (status, request_id))
-
- if result is not None:
- logger.info(f"更新请求状态成功: requestId={request_id}, status={status}")
- else:
- logger.error(f"更新请求状态失败: requestId={request_id}, status={status}")
-
- except Exception as e:
- logger.error(f"更新请求状态异常: requestId={request_id}, status={status}, error={e}")
- if __name__ == "__main__":
- # 从环境变量获取配置
- import os
- reload_enabled = os.getenv("RELOAD_ENABLED", "false").lower() == "true"
- log_level = os.getenv("LOG_LEVEL", "info")
-
- # 启动服务
- uvicorn.run(
- "agent:app",
- host="0.0.0.0",
- port=8080,
- reload=reload_enabled, # 通过环境变量控制
- log_level=log_level
- )
|