#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ 使用 FastAPI 重构的 Agent 服务 提供现代化的 HTTP API 接口 """ import json import sys import os import time from typing import Any, Dict, List, Optional from contextlib import asynccontextmanager # 保证可以导入本项目模块 sys.path.append(os.path.dirname(os.path.abspath(__file__))) from fastapi import FastAPI, HTTPException, BackgroundTasks from fastapi.responses import JSONResponse from pydantic import BaseModel, Field import uvicorn from utils.logging_config import get_logger from agent_tools import QueryDataTool, IdentifyTool, StructureTool # 创建 logger logger = get_logger('AgentFastAPI') # 请求模型 class TriggerRequest(BaseModel): requestId: str = Field(..., description="请求ID") # 响应模型 class TriggerResponse(BaseModel): requestId: str processed: int success: int details: List[Dict[str, Any]] # 全局变量 identify_tool = None @asynccontextmanager async def lifespan(app: FastAPI): """应用生命周期管理""" # 启动时初始化 global identify_tool identify_tool = IdentifyTool() logger.info("Agent 服务启动完成") yield # 关闭时清理 logger.info("Agent 服务正在关闭") # 创建 FastAPI 应用 app = FastAPI( title="Knowledge Agent API", description="智能内容识别和结构化处理服务", version="1.0.0", lifespan=lifespan ) @app.get("/") async def root(): """根路径,返回服务信息""" return { "service": "Knowledge Agent API", "version": "1.0.0", "status": "running", "endpoints": { "parse": "/parse", "health": "/health", "docs": "/docs" } } @app.get("/health") async def health_check(): """健康检查接口""" return {"status": "healthy", "timestamp": time.time()} @app.post("/parse", response_model=TriggerResponse) async def parse_processing(request: TriggerRequest, background_tasks: BackgroundTasks): """ 解析内容处理 - **requestId**: 请求ID,用于标识处理任务 """ try: logger.info(f"收到解析请求: requestId={request.requestId}") # 获取待处理数据 items = QueryDataTool.fetch_crawl_data_list(request.requestId) if not items: return TriggerResponse( requestId=request.requestId, processed=0, success=0, details=[] ) # 处理数据 success_count = 0 details: List[Dict[str, Any]] = [] for idx, item in enumerate(items, start=1): try: crawl_data = item.get('crawl_data') or {} # Step 1: 识别 identify_result = identify_tool.run( crawl_data if isinstance(crawl_data, dict) else {} ) # Step 2: 结构化并入库 affected = StructureTool.store_parsing_result( request.requestId, item.get('raw') or {}, identify_result ) ok = affected is not None and affected > 0 if ok: success_count += 1 details.append({ "index": idx, "dbInserted": ok, "identifyError": identify_result.get('error'), "status": "success" if ok else "failed" }) except Exception as e: logger.error(f"处理第 {idx} 项时出错: {e}") details.append({ "index": idx, "dbInserted": False, "identifyError": str(e), "status": "error" }) result = TriggerResponse( requestId=request.requestId, processed=len(items), success=success_count, details=details ) logger.info(f"处理完成: requestId={request.requestId}, processed={len(items)}, success={success_count}") return result except Exception as e: logger.error(f"处理请求失败: {e}") raise HTTPException(status_code=500, detail=f"处理失败: {str(e)}") @app.post("/parse/async") async def parse_processing_async(request: TriggerRequest, background_tasks: BackgroundTasks): """ 异步解析内容处理(后台任务) - **requestId**: 请求ID,用于标识处理任务 """ try: logger.info(f"收到异步解析请求: requestId={request.requestId}") # 添加后台任务 background_tasks.add_task(process_request_background, request.requestId) return { "requestId": request.requestId, "status": "processing", "message": "任务已提交到后台处理" } except Exception as e: logger.error(f"提交异步任务失败: {e}") raise HTTPException(status_code=500, detail=f"提交任务失败: {str(e)}") async def process_request_background(request_id: str): """后台处理请求""" try: logger.info(f"开始后台处理: requestId={request_id}") # 获取待处理数据 items = QueryDataTool.fetch_crawl_data_list(request_id) if not items: logger.info(f"后台处理完成: requestId={request_id}, 无数据需要处理") return # 处理数据 success_count = 0 for idx, item in enumerate(items, start=1): try: crawl_data = item.get('crawl_data') or {} # Step 1: 识别 identify_result = identify_tool.run( crawl_data if isinstance(crawl_data, dict) else {} ) # Step 2: 结构化并入库 affected = StructureTool.store_parsing_result( request_id, item.get('raw') or {}, identify_result ) if affected is not None and affected > 0: success_count += 1 logger.info(f"后台处理进度: {idx}/{len(items)} - {'成功' if affected else '失败'}") except Exception as e: logger.error(f"后台处理第 {idx} 项时出错: {e}") logger.info(f"后台处理完成: requestId={request_id}, processed={len(items)}, success={success_count}") except Exception as e: logger.error(f"后台处理失败: requestId={request_id}, error={e}") if __name__ == "__main__": # 启动服务 uvicorn.run( "agent:app", host="0.0.0.0", port=8080, reload=True, # 开发模式,自动重载 log_level="info" )