|
@@ -15,11 +15,6 @@ from contextlib import asynccontextmanager
|
|
|
# 保证可以导入本项目模块
|
|
|
sys.path.append(os.path.dirname(os.path.abspath(__file__)))
|
|
|
|
|
|
-# 禁用 LangSmith 追踪,避免网络连接错误
|
|
|
-os.environ["LANGCHAIN_TRACING_V2"] = "false"
|
|
|
-os.environ["LANGCHAIN_ENDPOINT"] = ""
|
|
|
-os.environ["LANGCHAIN_API_KEY"] = ""
|
|
|
-
|
|
|
from fastapi import FastAPI, HTTPException, BackgroundTasks
|
|
|
from fastapi.responses import JSONResponse
|
|
|
from pydantic import BaseModel, Field
|
|
@@ -66,6 +61,28 @@ class TriggerResponse(BaseModel):
|
|
|
# 全局变量
|
|
|
identify_tool = None
|
|
|
|
|
|
+def update_request_status(request_id: str, status: int):
|
|
|
+ """
|
|
|
+ 更新 knowledge_request 表中的 parsing_status
|
|
|
+
|
|
|
+ Args:
|
|
|
+ request_id: 请求ID
|
|
|
+ status: 状态值 (1: 处理中, 2: 处理完成, 3: 处理失败)
|
|
|
+ """
|
|
|
+ try:
|
|
|
+ from utils.mysql_db import MysqlHelper
|
|
|
+
|
|
|
+ sql = "UPDATE knowledge_request SET parsing_status = %s WHERE request_id = %s"
|
|
|
+ result = MysqlHelper.update_values(sql, (status, request_id))
|
|
|
+
|
|
|
+ if result is not None:
|
|
|
+ logger.info(f"更新请求状态成功: requestId={request_id}, status={status}")
|
|
|
+ else:
|
|
|
+ logger.error(f"更新请求状态失败: requestId={request_id}, status={status}")
|
|
|
+
|
|
|
+ except Exception as e:
|
|
|
+ logger.error(f"更新请求状态异常: requestId={request_id}, status={status}, error={e}")
|
|
|
+
|
|
|
@asynccontextmanager
|
|
|
async def lifespan(app: FastAPI):
|
|
|
"""应用生命周期管理"""
|
|
@@ -104,6 +121,9 @@ def create_langgraph_workflow():
|
|
|
request_id = state["request_id"]
|
|
|
logger.info(f"开始获取数据: requestId={request_id}")
|
|
|
|
|
|
+ # 更新状态为处理中
|
|
|
+ update_request_status(request_id, 1)
|
|
|
+
|
|
|
items = QueryDataTool.fetch_crawl_data_list(request_id)
|
|
|
state["items"] = items
|
|
|
state["processed"] = len(items)
|
|
@@ -186,12 +206,16 @@ def create_langgraph_workflow():
|
|
|
def should_continue(state: AgentState) -> str:
|
|
|
"""判断是否继续处理"""
|
|
|
if state.get("error"):
|
|
|
+ # 处理失败,更新状态为3
|
|
|
+ update_request_status(state["request_id"], 3)
|
|
|
return "end"
|
|
|
|
|
|
current_index = state.get("current_index", 0)
|
|
|
items = state.get("items", [])
|
|
|
|
|
|
if current_index >= len(items):
|
|
|
+ # 所有数据处理完毕,更新状态为2
|
|
|
+ update_request_status(state["request_id"], 2)
|
|
|
return "end"
|
|
|
|
|
|
return "continue"
|
|
@@ -217,7 +241,7 @@ def create_langgraph_workflow():
|
|
|
}
|
|
|
)
|
|
|
|
|
|
- # 编译工作流,禁用 LangSmith 追踪
|
|
|
+ # 编译工作流
|
|
|
return workflow.compile()
|
|
|
|
|
|
# 全局工作流实例
|
|
@@ -298,9 +322,14 @@ async def parse_processing(request: TriggerRequest, background_tasks: Background
|
|
|
# 回退到传统模式
|
|
|
logger.info("使用传统模式处理")
|
|
|
|
|
|
+ # 更新状态为处理中
|
|
|
+ update_request_status(request.requestId, 1)
|
|
|
+
|
|
|
# 获取待处理数据
|
|
|
items = QueryDataTool.fetch_crawl_data_list(request.requestId)
|
|
|
if not items:
|
|
|
+ # 无数据需要处理,更新状态为完成
|
|
|
+ update_request_status(request.requestId, 2)
|
|
|
return TriggerResponse(
|
|
|
requestId=request.requestId,
|
|
|
processed=0,
|
|
@@ -357,12 +386,17 @@ async def parse_processing(request: TriggerRequest, background_tasks: Background
|
|
|
success=success_count,
|
|
|
details=details
|
|
|
)
|
|
|
+
|
|
|
+ # 更新状态为处理完成
|
|
|
+ update_request_status(request.requestId, 2)
|
|
|
|
|
|
logger.info(f"处理完成: requestId={request.requestId}, processed={result.processed}, success={result.success}")
|
|
|
return result
|
|
|
|
|
|
except Exception as e:
|
|
|
logger.error(f"处理请求失败: {e}")
|
|
|
+ # 处理失败,更新状态为3
|
|
|
+ update_request_status(request.requestId, 3)
|
|
|
raise HTTPException(status_code=500, detail=f"处理失败: {str(e)}")
|
|
|
|
|
|
@app.post("/parse/async")
|
|
@@ -396,6 +430,9 @@ async def process_request_background(request_id: str):
|
|
|
|
|
|
if WORKFLOW and HAS_LANGGRAPH:
|
|
|
# 使用 LangGraph 工作流
|
|
|
+ # 更新状态为处理中
|
|
|
+ update_request_status(request_id, 1)
|
|
|
+
|
|
|
initial_state = AgentState(
|
|
|
request_id=request_id,
|
|
|
items=[],
|
|
@@ -417,9 +454,14 @@ async def process_request_background(request_id: str):
|
|
|
|
|
|
else:
|
|
|
# 传统模式
|
|
|
+ # 更新状态为处理中
|
|
|
+ update_request_status(request_id, 1)
|
|
|
+
|
|
|
items = QueryDataTool.fetch_crawl_data_list(request_id)
|
|
|
if not items:
|
|
|
logger.info(f"后台处理完成: requestId={request_id}, 无数据需要处理")
|
|
|
+ # 无数据需要处理,更新状态为完成
|
|
|
+ update_request_status(request_id, 2)
|
|
|
return
|
|
|
|
|
|
success_count = 0
|
|
@@ -450,9 +492,13 @@ async def process_request_background(request_id: str):
|
|
|
logger.error(f"后台处理第 {idx} 项时出错: {e}")
|
|
|
|
|
|
logger.info(f"传统模式后台处理完成: requestId={request_id}, processed={len(items)}, success={success_count}")
|
|
|
+ # 更新状态为处理完成
|
|
|
+ update_request_status(request_id, 2)
|
|
|
|
|
|
except Exception as e:
|
|
|
logger.error(f"后台处理失败: requestId={request_id}, error={e}")
|
|
|
+ # 处理失败,更新状态为3
|
|
|
+ update_request_status(request_id, 3)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
|
# 启动服务
|