123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448 |
- #!/usr/bin/env python3
- # -*- coding: utf-8 -*-
- """
- 使用 FastAPI + LangGraph 重构的 Agent 服务
- 提供强大的工作流管理和状态控制
- """
- import json
- import sys
- import os
- import time
- from typing import Any, Dict, List, Optional, TypedDict, Annotated
- from contextlib import asynccontextmanager
- # 保证可以导入本项目模块
- sys.path.append(os.path.dirname(os.path.abspath(__file__)))
- from fastapi import FastAPI, HTTPException, BackgroundTasks
- from fastapi.responses import JSONResponse
- from pydantic import BaseModel, Field
- import uvicorn
- # LangGraph 相关导入
- try:
- from langgraph.graph import StateGraph, END
- HAS_LANGGRAPH = True
- except ImportError:
- HAS_LANGGRAPH = False
- print("警告: LangGraph 未安装,将使用传统模式")
- from utils.logging_config import get_logger
- from agent_tools import QueryDataTool, IdentifyTool, StructureTool
- # 创建 logger
- logger = get_logger('Agent')
- # 状态定义
- class AgentState(TypedDict):
- request_id: str
- items: List[Dict[str, Any]]
- details: List[Dict[str, Any]]
- processed: int
- success: int
- current_index: int
- current_item: Optional[Dict[str, Any]]
- identify_result: Optional[Dict[str, Any]]
- error: Optional[str]
- status: str
- # 请求模型
- class TriggerRequest(BaseModel):
- requestId: str = Field(..., description="请求ID")
- # 响应模型
- class TriggerResponse(BaseModel):
- requestId: str
- processed: int
- success: int
- details: List[Dict[str, Any]]
- # 全局变量
- identify_tool = None
- @asynccontextmanager
- async def lifespan(app: FastAPI):
- """应用生命周期管理"""
- # 启动时初始化
- global identify_tool
- identify_tool = IdentifyTool()
- logger.info("Agent 服务启动完成")
-
- yield
-
- # 关闭时清理
- logger.info("Agent 服务正在关闭")
- # 创建 FastAPI 应用
- app = FastAPI(
- title="Knowledge Agent API",
- description="基于 LangGraph 的智能内容识别和结构化处理服务",
- version="2.0.0",
- lifespan=lifespan
- )
- # =========================
- # LangGraph 工作流定义
- # =========================
- def create_langgraph_workflow():
- """创建 LangGraph 工作流"""
- if not HAS_LANGGRAPH:
- return None
-
- # 工作流节点定义
-
- def fetch_data(state: AgentState) -> AgentState:
- """获取待处理数据"""
- try:
- request_id = state["request_id"]
- logger.info(f"开始获取数据: requestId={request_id}")
-
- items = QueryDataTool.fetch_crawl_data_list(request_id)
- state["items"] = items
- state["processed"] = len(items)
- state["status"] = "data_fetched"
-
- logger.info(f"数据获取完成: requestId={request_id}, 数量={len(items)}")
- return state
-
- except Exception as e:
- logger.error(f"获取数据失败: {e}")
- state["error"] = str(e)
- state["status"] = "error"
- return state
-
- def process_item(state: AgentState) -> AgentState:
- """处理单个数据项"""
- try:
- items = state["items"]
- current_index = state.get("current_index", 0)
-
- if current_index >= len(items):
- state["status"] = "completed"
- return state
-
- item = items[current_index]
- state["current_item"] = item
- state["current_index"] = current_index + 1
-
- # 处理当前项
- crawl_data = item.get('crawl_data') or {}
-
- # Step 1: 识别
- identify_result = identify_tool.run(
- crawl_data if isinstance(crawl_data, dict) else {}
- )
- state["identify_result"] = identify_result
-
- # Step 2: 结构化并入库
- affected = StructureTool.store_parsing_result(
- state["request_id"],
- item.get('raw') or {},
- identify_result
- )
-
- ok = affected is not None and affected > 0
- if ok:
- state["success"] += 1
-
- # 记录处理详情
- detail = {
- "index": current_index + 1,
- "dbInserted": ok,
- "identifyError": identify_result.get('error'),
- "status": "success" if ok else "failed"
- }
- state["details"].append(detail)
-
- state["status"] = "item_processed"
- logger.info(f"处理进度: {current_index + 1}/{len(items)} - {'成功' if ok else '失败'}")
-
- return state
-
- except Exception as e:
- logger.error(f"处理第 {current_index + 1} 项时出错: {e}")
- detail = {
- "index": current_index + 1,
- "dbInserted": False,
- "identifyError": str(e),
- "status": "error"
- }
- state["details"].append(detail)
- state["status"] = "item_error"
- return state
-
- def should_continue(state: AgentState) -> str:
- """判断是否继续处理"""
- if state.get("error"):
- return "end"
-
- current_index = state.get("current_index", 0)
- items = state.get("items", [])
-
- if current_index >= len(items):
- return "end"
-
- return "continue"
-
- # 构建工作流图
- workflow = StateGraph(AgentState)
-
- # 添加节点
- workflow.add_node("fetch_data", fetch_data)
- workflow.add_node("process_item", process_item)
-
- # 设置入口点
- workflow.set_entry_point("fetch_data")
-
- # 添加边
- workflow.add_edge("fetch_data", "process_item")
- workflow.add_conditional_edges(
- "process_item",
- should_continue,
- {
- "continue": "process_item",
- "end": END
- }
- )
-
- # 编译工作流
- return workflow.compile()
- # 全局工作流实例
- WORKFLOW = create_langgraph_workflow() if HAS_LANGGRAPH else None
- # =========================
- # FastAPI 接口定义
- # =========================
- @app.get("/")
- async def root():
- """根路径,返回服务信息"""
- return {
- "service": "Knowledge Agent API",
- "version": "2.0.0",
- "status": "running",
- "langgraph_enabled": HAS_LANGGRAPH,
- "endpoints": {
- "parse": "/parse",
- "parse/async": "/parse/async",
- "health": "/health",
- "docs": "/docs"
- }
- }
- @app.get("/health")
- async def health_check():
- """健康检查接口"""
- return {
- "status": "healthy",
- "timestamp": time.time(),
- "langgraph_enabled": HAS_LANGGRAPH
- }
- @app.post("/parse", response_model=TriggerResponse)
- async def parse_processing(request: TriggerRequest, background_tasks: BackgroundTasks):
- """
- 解析内容处理
-
- - **requestId**: 请求ID,用于标识处理任务
- """
- try:
- logger.info(f"收到解析请求: requestId={request.requestId}")
-
- if WORKFLOW and HAS_LANGGRAPH:
- # 使用 LangGraph 工作流
- logger.info("使用 LangGraph 工作流处理")
-
- # 初始化状态
- initial_state = AgentState(
- request_id=request.requestId,
- items=[],
- details=[],
- processed=0,
- success=0,
- current_index=0,
- current_item=None,
- identify_result=None,
- error=None,
- status="started"
- )
-
- # 执行工作流
- final_state = WORKFLOW.invoke(
- initial_state,
- config={"configurable": {"thread_id": f"thread_{request.requestId}"}}
- )
-
- # 构建响应
- result = TriggerResponse(
- requestId=request.requestId,
- processed=final_state.get("processed", 0),
- success=final_state.get("success", 0),
- details=final_state.get("details", [])
- )
-
- else:
- # 回退到传统模式
- logger.info("使用传统模式处理")
-
- # 获取待处理数据
- items = QueryDataTool.fetch_crawl_data_list(request.requestId)
- if not items:
- return TriggerResponse(
- requestId=request.requestId,
- processed=0,
- success=0,
- details=[]
- )
- # 处理数据
- success_count = 0
- details: List[Dict[str, Any]] = []
-
- for idx, item in enumerate(items, start=1):
- try:
- crawl_data = item.get('crawl_data') or {}
-
- # Step 1: 识别
- identify_result = identify_tool.run(
- crawl_data if isinstance(crawl_data, dict) else {}
- )
-
- # Step 2: 结构化并入库
- affected = StructureTool.store_parsing_result(
- request.requestId,
- item.get('raw') or {},
- identify_result
- )
-
- ok = affected is not None and affected > 0
- if ok:
- success_count += 1
-
- details.append({
- "index": idx,
- "dbInserted": ok,
- "identifyError": identify_result.get('error'),
- "status": "success" if ok else "failed"
- })
-
- except Exception as e:
- logger.error(f"处理第 {idx} 项时出错: {e}")
- details.append({
- "index": idx,
- "dbInserted": False,
- "identifyError": str(e),
- "status": "error"
- })
- result = TriggerResponse(
- requestId=request.requestId,
- processed=len(items),
- success=success_count,
- details=details
- )
-
- logger.info(f"处理完成: requestId={request.requestId}, processed={result.processed}, success={result.success}")
- return result
-
- except Exception as e:
- logger.error(f"处理请求失败: {e}")
- raise HTTPException(status_code=500, detail=f"处理失败: {str(e)}")
- @app.post("/parse/async")
- async def parse_processing_async(request: TriggerRequest, background_tasks: BackgroundTasks):
- """
- 异步解析内容处理(后台任务)
-
- - **requestId**: 请求ID,用于标识处理任务
- """
- try:
- logger.info(f"收到异步解析请求: requestId={request.requestId}")
-
- # 添加后台任务
- background_tasks.add_task(process_request_background, request.requestId)
-
- return {
- "requestId": request.requestId,
- "status": "processing",
- "message": "任务已提交到后台处理",
- "langgraph_enabled": HAS_LANGGRAPH
- }
-
- except Exception as e:
- logger.error(f"提交异步任务失败: {e}")
- raise HTTPException(status_code=500, detail=f"提交任务失败: {str(e)}")
- async def process_request_background(request_id: str):
- """后台处理请求"""
- try:
- logger.info(f"开始后台处理: requestId={request_id}")
-
- if WORKFLOW and HAS_LANGGRAPH:
- # 使用 LangGraph 工作流
- initial_state = AgentState(
- request_id=request_id,
- items=[],
- details=[],
- processed=0,
- success=0,
- current_index=0,
- current_item=None,
- identify_result=None,
- error=None,
- status="started"
- )
-
- final_state = WORKFLOW.invoke(
- initial_state,
- config={"configurable": {"thread_id": f"thread_{request_id}"}}
- )
- logger.info(f"LangGraph 后台处理完成: requestId={request_id}, processed={final_state.get('processed', 0)}, success={final_state.get('success', 0)}")
-
- else:
- # 传统模式
- items = QueryDataTool.fetch_crawl_data_list(request_id)
- if not items:
- logger.info(f"后台处理完成: requestId={request_id}, 无数据需要处理")
- return
- success_count = 0
- for idx, item in enumerate(items, start=1):
- try:
- crawl_data = item.get('crawl_data') or {}
-
- identify_result = identify_tool.run(
- crawl_data if isinstance(crawl_data, dict) else {}
- )
-
- affected = StructureTool.store_parsing_result(
- request_id,
- item.get('raw') or {},
- identify_result
- )
-
- if affected is not None and affected > 0:
- success_count += 1
-
- logger.info(f"后台处理进度: {idx}/{len(items)} - {'成功' if affected else '失败'}")
-
- except Exception as e:
- logger.error(f"后台处理第 {idx} 项时出错: {e}")
- logger.info(f"传统模式后台处理完成: requestId={request_id}, processed={len(items)}, success={success_count}")
-
- except Exception as e:
- logger.error(f"后台处理失败: requestId={request_id}, error={e}")
- if __name__ == "__main__":
- # 启动服务
- uvicorn.run(
- "agent:app",
- host="0.0.0.0",
- port=8080,
- reload=True, # 开发模式,自动重载
- log_level="info"
- )
|