123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230 |
- #!/usr/bin/env python3
- # -*- coding: utf-8 -*-
- """
- 使用 FastAPI 重构的 Agent 服务
- 提供现代化的 HTTP API 接口
- """
- import json
- import sys
- import os
- import time
- from typing import Any, Dict, List, Optional
- from contextlib import asynccontextmanager
- # 保证可以导入本项目模块
- sys.path.append(os.path.dirname(os.path.abspath(__file__)))
- from fastapi import FastAPI, HTTPException, BackgroundTasks
- from fastapi.responses import JSONResponse
- from pydantic import BaseModel, Field
- import uvicorn
- from utils.logging_config import get_logger
- from agent_tools import QueryDataTool, IdentifyTool, StructureTool
- # 创建 logger
- logger = get_logger('AgentFastAPI')
- # 请求模型
- class TriggerRequest(BaseModel):
- requestId: str = Field(..., description="请求ID")
- # 响应模型
- class TriggerResponse(BaseModel):
- requestId: str
- processed: int
- success: int
- details: List[Dict[str, Any]]
- # 全局变量
- identify_tool = None
- @asynccontextmanager
- async def lifespan(app: FastAPI):
- """应用生命周期管理"""
- # 启动时初始化
- global identify_tool
- identify_tool = IdentifyTool()
- logger.info("Agent 服务启动完成")
-
- yield
-
- # 关闭时清理
- logger.info("Agent 服务正在关闭")
- # 创建 FastAPI 应用
- app = FastAPI(
- title="Knowledge Agent API",
- description="智能内容识别和结构化处理服务",
- version="1.0.0",
- lifespan=lifespan
- )
- @app.get("/")
- async def root():
- """根路径,返回服务信息"""
- return {
- "service": "Knowledge Agent API",
- "version": "1.0.0",
- "status": "running",
- "endpoints": {
- "parse": "/parse",
- "health": "/health",
- "docs": "/docs"
- }
- }
- @app.get("/health")
- async def health_check():
- """健康检查接口"""
- return {"status": "healthy", "timestamp": time.time()}
- @app.post("/parse", response_model=TriggerResponse)
- async def parse_processing(request: TriggerRequest, background_tasks: BackgroundTasks):
- """
- 解析内容处理
-
- - **requestId**: 请求ID,用于标识处理任务
- """
- try:
- logger.info(f"收到解析请求: requestId={request.requestId}")
-
- # 获取待处理数据
- items = QueryDataTool.fetch_crawl_data_list(request.requestId)
- if not items:
- return TriggerResponse(
- requestId=request.requestId,
- processed=0,
- success=0,
- details=[]
- )
- # 处理数据
- success_count = 0
- details: List[Dict[str, Any]] = []
-
- for idx, item in enumerate(items, start=1):
- try:
- crawl_data = item.get('crawl_data') or {}
-
- # Step 1: 识别
- identify_result = identify_tool.run(
- crawl_data if isinstance(crawl_data, dict) else {}
- )
-
- # Step 2: 结构化并入库
- affected = StructureTool.store_parsing_result(
- request.requestId,
- item.get('raw') or {},
- identify_result
- )
-
- ok = affected is not None and affected > 0
- if ok:
- success_count += 1
-
- details.append({
- "index": idx,
- "dbInserted": ok,
- "identifyError": identify_result.get('error'),
- "status": "success" if ok else "failed"
- })
-
- except Exception as e:
- logger.error(f"处理第 {idx} 项时出错: {e}")
- details.append({
- "index": idx,
- "dbInserted": False,
- "identifyError": str(e),
- "status": "error"
- })
- result = TriggerResponse(
- requestId=request.requestId,
- processed=len(items),
- success=success_count,
- details=details
- )
-
- logger.info(f"处理完成: requestId={request.requestId}, processed={len(items)}, success={success_count}")
- return result
-
- except Exception as e:
- logger.error(f"处理请求失败: {e}")
- raise HTTPException(status_code=500, detail=f"处理失败: {str(e)}")
- @app.post("/parse/async")
- async def parse_processing_async(request: TriggerRequest, background_tasks: BackgroundTasks):
- """
- 异步解析内容处理(后台任务)
-
- - **requestId**: 请求ID,用于标识处理任务
- """
- try:
- logger.info(f"收到异步解析请求: requestId={request.requestId}")
-
- # 添加后台任务
- background_tasks.add_task(process_request_background, request.requestId)
-
- return {
- "requestId": request.requestId,
- "status": "processing",
- "message": "任务已提交到后台处理"
- }
-
- except Exception as e:
- logger.error(f"提交异步任务失败: {e}")
- raise HTTPException(status_code=500, detail=f"提交任务失败: {str(e)}")
- async def process_request_background(request_id: str):
- """后台处理请求"""
- try:
- logger.info(f"开始后台处理: requestId={request_id}")
-
- # 获取待处理数据
- items = QueryDataTool.fetch_crawl_data_list(request_id)
- if not items:
- logger.info(f"后台处理完成: requestId={request_id}, 无数据需要处理")
- return
- # 处理数据
- success_count = 0
- for idx, item in enumerate(items, start=1):
- try:
- crawl_data = item.get('crawl_data') or {}
-
- # Step 1: 识别
- identify_result = identify_tool.run(
- crawl_data if isinstance(crawl_data, dict) else {}
- )
-
- # Step 2: 结构化并入库
- affected = StructureTool.store_parsing_result(
- request_id,
- item.get('raw') or {},
- identify_result
- )
-
- if affected is not None and affected > 0:
- success_count += 1
-
- logger.info(f"后台处理进度: {idx}/{len(items)} - {'成功' if affected else '失败'}")
-
- except Exception as e:
- logger.error(f"后台处理第 {idx} 项时出错: {e}")
- logger.info(f"后台处理完成: requestId={request_id}, processed={len(items)}, success={success_count}")
-
- except Exception as e:
- logger.error(f"后台处理失败: requestId={request_id}, error={e}")
- if __name__ == "__main__":
- # 启动服务
- uvicorn.run(
- "agent:app",
- host="0.0.0.0",
- port=8080,
- reload=True, # 开发模式,自动重载
- log_level="info"
- )
|