agent.py 6.9 KB


  1. #!/usr/bin/env python3
  2. # -*- coding: utf-8 -*-
  3. """
  4. 使用 FastAPI 重构的 Agent 服务
  5. 提供现代化的 HTTP API 接口
  6. """
  7. import json
  8. import sys
  9. import os
  10. import time
  11. from typing import Any, Dict, List, Optional
  12. from contextlib import asynccontextmanager
  13. # 保证可以导入本项目模块
  14. sys.path.append(os.path.dirname(os.path.abspath(__file__)))
  15. from fastapi import FastAPI, HTTPException, BackgroundTasks
  16. from fastapi.responses import JSONResponse
  17. from pydantic import BaseModel, Field
  18. import uvicorn
  19. from utils.logging_config import get_logger
  20. from agent_tools import QueryDataTool, IdentifyTool, StructureTool
  21. # 创建 logger
  22. logger = get_logger('AgentFastAPI')
  23. # 请求模型
  24. class TriggerRequest(BaseModel):
  25. requestId: str = Field(..., description="请求ID")
  26. # 响应模型
  27. class TriggerResponse(BaseModel):
  28. requestId: str
  29. processed: int
  30. success: int
  31. details: List[Dict[str, Any]]
  32. # 全局变量
  33. identify_tool = None
  34. @asynccontextmanager
  35. async def lifespan(app: FastAPI):
  36. """应用生命周期管理"""
  37. # 启动时初始化
  38. global identify_tool
  39. identify_tool = IdentifyTool()
  40. logger.info("Agent 服务启动完成")
  41. yield
  42. # 关闭时清理
  43. logger.info("Agent 服务正在关闭")
  44. # 创建 FastAPI 应用
  45. app = FastAPI(
  46. title="Knowledge Agent API",
  47. description="智能内容识别和结构化处理服务",
  48. version="1.0.0",
  49. lifespan=lifespan
  50. )
  51. @app.get("/")
  52. async def root():
  53. """根路径,返回服务信息"""
  54. return {
  55. "service": "Knowledge Agent API",
  56. "version": "1.0.0",
  57. "status": "running",
  58. "endpoints": {
  59. "parse": "/parse",
  60. "health": "/health",
  61. "docs": "/docs"
  62. }
  63. }
  64. @app.get("/health")
  65. async def health_check():
  66. """健康检查接口"""
  67. return {"status": "healthy", "timestamp": time.time()}
  68. @app.post("/parse", response_model=TriggerResponse)
  69. async def parse_processing(request: TriggerRequest, background_tasks: BackgroundTasks):
  70. """
  71. 解析内容处理
  72. - **requestId**: 请求ID,用于标识处理任务
  73. """
  74. try:
  75. logger.info(f"收到解析请求: requestId={request.requestId}")
  76. # 获取待处理数据
  77. items = QueryDataTool.fetch_crawl_data_list(request.requestId)
  78. if not items:
  79. return TriggerResponse(
  80. requestId=request.requestId,
  81. processed=0,
  82. success=0,
  83. details=[]
  84. )
  85. # 处理数据
  86. success_count = 0
  87. details: List[Dict[str, Any]] = []
  88. for idx, item in enumerate(items, start=1):
  89. try:
  90. crawl_data = item.get('crawl_data') or {}
  91. # Step 1: 识别
  92. identify_result = identify_tool.run(
  93. crawl_data if isinstance(crawl_data, dict) else {}
  94. )
  95. # Step 2: 结构化并入库
  96. affected = StructureTool.store_parsing_result(
  97. request.requestId,
  98. item.get('raw') or {},
  99. identify_result
  100. )
  101. ok = affected is not None and affected > 0
  102. if ok:
  103. success_count += 1
  104. details.append({
  105. "index": idx,
  106. "dbInserted": ok,
  107. "identifyError": identify_result.get('error'),
  108. "status": "success" if ok else "failed"
  109. })
  110. except Exception as e:
  111. logger.error(f"处理第 {idx} 项时出错: {e}")
  112. details.append({
  113. "index": idx,
  114. "dbInserted": False,
  115. "identifyError": str(e),
  116. "status": "error"
  117. })
  118. result = TriggerResponse(
  119. requestId=request.requestId,
  120. processed=len(items),
  121. success=success_count,
  122. details=details
  123. )
  124. logger.info(f"处理完成: requestId={request.requestId}, processed={len(items)}, success={success_count}")
  125. return result
  126. except Exception as e:
  127. logger.error(f"处理请求失败: {e}")
  128. raise HTTPException(status_code=500, detail=f"处理失败: {str(e)}")
  129. @app.post("/parse/async")
  130. async def parse_processing_async(request: TriggerRequest, background_tasks: BackgroundTasks):
  131. """
  132. 异步解析内容处理(后台任务)
  133. - **requestId**: 请求ID,用于标识处理任务
  134. """
  135. try:
  136. logger.info(f"收到异步解析请求: requestId={request.requestId}")
  137. # 添加后台任务
  138. background_tasks.add_task(process_request_background, request.requestId)
  139. return {
  140. "requestId": request.requestId,
  141. "status": "processing",
  142. "message": "任务已提交到后台处理"
  143. }
  144. except Exception as e:
  145. logger.error(f"提交异步任务失败: {e}")
  146. raise HTTPException(status_code=500, detail=f"提交任务失败: {str(e)}")
  147. async def process_request_background(request_id: str):
  148. """后台处理请求"""
  149. try:
  150. logger.info(f"开始后台处理: requestId={request_id}")
  151. # 获取待处理数据
  152. items = QueryDataTool.fetch_crawl_data_list(request_id)
  153. if not items:
  154. logger.info(f"后台处理完成: requestId={request_id}, 无数据需要处理")
  155. return
  156. # 处理数据
  157. success_count = 0
  158. for idx, item in enumerate(items, start=1):
  159. try:
  160. crawl_data = item.get('crawl_data') or {}
  161. # Step 1: 识别
  162. identify_result = identify_tool.run(
  163. crawl_data if isinstance(crawl_data, dict) else {}
  164. )
  165. # Step 2: 结构化并入库
  166. affected = StructureTool.store_parsing_result(
  167. request_id,
  168. item.get('raw') or {},
  169. identify_result
  170. )
  171. if affected is not None and affected > 0:
  172. success_count += 1
  173. logger.info(f"后台处理进度: {idx}/{len(items)} - {'成功' if affected else '失败'}")
  174. except Exception as e:
  175. logger.error(f"后台处理第 {idx} 项时出错: {e}")
  176. logger.info(f"后台处理完成: requestId={request_id}, processed={len(items)}, success={success_count}")
  177. except Exception as e:
  178. logger.error(f"后台处理失败: requestId={request_id}, error={e}")
  179. if __name__ == "__main__":
  180. # 启动服务
  181. uvicorn.run(
  182. "agent:app",
  183. host="0.0.0.0",
  184. port=8080,
  185. reload=True, # 开发模式,自动重载
  186. log_level="info"
  187. )