jihuaqiang 1 тиждень тому
батько
коміт
08eb2d3db8

+ 20 - 0
.env

@@ -0,0 +1,20 @@
+# Gemini配置
+GEMINI_API_KEY=AIzaSyAkt1l9Kw1CQgHFzTpla0vgt0OE53fr-BI
+GEMINI_MODEL=gemini-2.5-flash
+
+# 数据库配置
+DB_HOST=rm-bp13g3ra2f59q49xs.mysql.rds.aliyuncs.com
+DB_PORT=3306
+DB_USER=wqsd
+DB_PASSWORD=wqsd@2025
+DB_NAME=ai_knowledge
+DB_CHARSET=utf8
+
+# 服务配置
+HOST=0.0.0.0
+PORT=8079
+DEBUG=True
+
+# 日志配置
+LOG_LEVEL=INFO
+LOG_FILE=logs/app.log

+ 0 - 1
.gitignore

@@ -120,7 +120,6 @@ celerybeat.pid
 *.sage.py
 
 # Environments
-.env
 .venv
 env/
 venv/

+ 27 - 0
main.py

@@ -0,0 +1,27 @@
+#!/usr/bin/env python3
+"""
+知识工具服务主程序入口
+"""
+
+import os
+import uvicorn
+from dotenv import load_dotenv
+
+# 加载环境变量
+load_dotenv()
+
+
+def main():
+    """主程序入口"""
+    uvicorn.run(
+        "src.api.main:app",
+        host=os.getenv("HOST", "0.0.0.0"),
+        port=int(os.getenv("PORT", 8079)),
+        reload=os.getenv("DEBUG", "True").lower() == "true",
+        log_level=os.getenv("LOG_LEVEL", "INFO").lower(),
+        access_log=True
+    )
+
+
+if __name__ == "__main__":
+    main()

+ 284 - 0
readme.md

@@ -0,0 +1,284 @@
+# 知识工具服务
+
+这个项目主要完成以下的任务:
+1. 可通过sh脚本启动,停止服务
+2. 这个服务对外暴露一个接口,可以接收一个question字段,将question转化为若干可以进行爬虫任务的query词
+3. 具体转化的任务交给一个agent即可,可以使用langgraph构建,通过执行tools定义的 suggestQuery,根据一个Prompt模版,将传入的问题拆解为若干的query词
+4. 注意模块的封装,尽量拆的独立一些
+5. 支持异步任务处理,提高系统响应性能
+
+## 项目结构
+
+```
+knowledge-tools/
+├── src/                    # 源代码目录
+│   ├── api/               # API接口模块
+│   │   └── main.py        # FastAPI主应用
+│   ├── agent/             # LangGraph Agent模块
+│   │   └── query_agent.py # 查询生成Agent
+│   ├── tools/             # 工具模块
+│   │   ├── query_tool.py  # 查询工具
+│   │   ├── prompts.py     # Prompt模板
+│   │   └── scheduler.py   # 任务调度器
+│   ├── database/          # 数据库模块
+│   │   ├── connection.py  # 数据库连接管理
+│   │   └── models.py      # 数据库模型和DAO
+│   └── models/            # 数据模型
+│       └── schemas.py     # Pydantic模型
+├── logs/                  # 日志目录
+├── start.sh              # 启动脚本
+├── stop.sh               # 停止脚本
+├── main.py               # 主程序入口
+├── requirements.txt      # Python依赖
+├── app.pid              # 进程ID文件
+└── .env                 # 环境变量配置文件
+```
+
+## 功能特性
+
+- 🚀 **RESTful API**: 基于FastAPI构建的高性能API服务
+- 🤖 **智能Agent**: 使用LangGraph构建的查询生成Agent
+- 🗄️ **数据库支持**: MySQL数据库存储任务记录和状态跟踪
+- ⚡ **异步处理**: 任务异步处理,提高系统响应性能
+- ⏰ **定时调度**: 每分钟自动处理待执行任务
+- 🔧 **模块化设计**: 清晰的模块分离,便于维护和扩展
+- 📝 **详细日志**: 完整的日志记录和错误处理
+- 🛠️ **便捷脚本**: 一键启动/停止服务脚本
+
+## 快速开始
+
+### 1. 环境准备
+
+```bash
+# 克隆项目(如果从git仓库)
+git clone <repository-url>
+cd knowledge-tools
+
+# 创建虚拟环境
+python3 -m venv venv
+source venv/bin/activate  # Linux/Mac
+# 或
+venv\Scripts\activate     # Windows
+```
+
+### 2. 安装依赖
+
+```bash
+pip install -r requirements.txt
+```
+
+### 3. 配置环境变量
+
+```bash
+# 复制环境变量示例文件
+cp env.example .env
+
+# 编辑.env文件,设置你的OpenAI API密钥
+vim .env
+```
+
+### 4. 启动服务
+
+```bash
+# 使用脚本启动(推荐)
+./start.sh
+
+# 或直接使用Python
+python main.py
+```
+
+### 5. 测试服务
+
+```bash
+# 健康检查
+curl http://localhost:8079/health
+
+# 生成查询词(异步处理)
+curl -X POST "http://localhost:8079/generate-queries" \
+     -H "Content-Type: application/json" \
+     -d '{"question": "如何学习Python编程"}'
+
+# 查询任务状态
+curl http://localhost:8079/task/1703123456789
+
+# 运行异步处理测试(需要先创建测试脚本)
+# python test_async_processing.py
+```
+
+## API接口
+
+### 健康检查
+- **GET** `/health`
+- 返回服务状态
+
+### 生成查询词(异步处理)
+- **POST** `/generate-queries`
+- 请求体:
+  ```json
+  {
+    "question": "用户的问题"
+  }
+  ```
+- 响应(立即返回,状态为待执行):
+  ```json
+  {
+    "task_id": 1703123456789,
+    "queries": [],
+    "original_question": "原始问题",
+    "total_count": 0,
+    "status": 0
+  }
+  ```
+- 说明:接口立即返回任务ID,实际处理由后台定时任务完成
+
+### 获取任务信息
+- **GET** `/task/{task_id}`
+- 响应:
+  ```json
+  {
+    "task_id": 1703123456789,
+    "question": "用户的问题",
+    "queries": ["查询词1", "查询词2", "..."],
+    "status": 2,
+    "status_text": "成功"
+  }
+  ```
+- 状态说明:
+  - `0` - 待执行:任务已创建,等待后台处理
+  - `1` - 执行中:任务正在处理中
+  - `2` - 成功:任务处理完成,已生成查询词
+  - `3` - 失败:任务处理失败
+
+### 获取任务列表
+- **GET** `/tasks?status=2&limit=10`
+- 参数:
+  - `status`: 任务状态筛选(0-待执行,1-执行中,2-成功,3-失败)
+  - `limit`: 限制数量
+- 响应:
+  ```json
+  {
+    "tasks": [...],
+    "total": 10
+  }
+  ```
+
+## 服务管理
+
+### 启动服务
+```bash
+./start.sh
+```
+
+### 停止服务
+```bash
+./stop.sh
+```
+
+### 查看日志
+```bash
+tail -f logs/app.log
+```
+
+## 异步任务处理
+
+### 工作流程
+
+1. **任务提交**: 调用 `/generate-queries` 接口提交问题
+2. **立即返回**: 接口立即返回任务ID和待执行状态
+3. **后台处理**: 定时调度器每分钟检查并处理待执行任务
+4. **状态查询**: 通过 `/task/{task_id}` 接口查询任务处理状态
+
+### 任务状态
+
+- `0` - 待执行: 任务已创建,等待处理
+- `1` - 执行中: 任务正在处理中
+- `2` - 成功: 任务处理完成,已生成查询词
+- `3` - 失败: 任务处理失败
+
+### 测试异步处理
+
+可以创建测试脚本验证异步处理功能:
+
+```bash
+# 创建测试脚本后运行
+python test_async_processing.py
+```
+
+## 配置说明
+
+主要配置项在 `.env` 文件中:
+
+```env
+# Gemini API配置
+GEMINI_API_KEY=your_gemini_api_key_here
+GEMINI_MODEL=gemini-1.5-pro
+
+# 数据库配置
+DB_HOST=rm-bp13g3ra2f59q49xs.mysql.rds.aliyuncs.com
+DB_PORT=3306
+DB_USER=wqsd
+DB_PASSWORD=wqsd@2025
+DB_NAME=ai_knowledge
+DB_CHARSET=utf8
+
+# 服务配置
+HOST=0.0.0.0
+PORT=8079
+DEBUG=True
+
+# 日志配置
+LOG_LEVEL=INFO
+LOG_FILE=logs/app.log
+```
+
+## 开发说明
+
+### 模块说明
+
+1. **API模块** (`src/api/`): 负责HTTP接口和路由处理
+2. **Agent模块** (`src/agent/`): 包含LangGraph Agent逻辑
+3. **工具模块** (`src/tools/`): 定义各种工具、Prompt模板和任务调度器
+4. **数据库模块** (`src/database/`): 数据库连接、模型和DAO操作
+5. **模型模块** (`src/models/`): 定义数据模型和Schema
+
+### 扩展开发
+
+- 添加新工具:在 `src/tools/` 目录下创建新的工具类
+- 修改Prompt:编辑 `src/tools/prompts.py` 文件
+- 调整Agent逻辑:修改 `src/agent/query_agent.py` 文件
+- 添加新接口:在 `src/api/main.py` 中添加新的路由
+- 调整调度策略:修改 `src/tools/scheduler.py` 文件
+
+## 数据库表结构
+
+项目使用MySQL数据库存储任务信息,需要创建以下表:
+
+```sql
+CREATE TABLE `knowledge_suggest_query` (
+  `question` text COMMENT '问题',
+  `task_id` bigint(20) NOT NULL COMMENT '任务id',
+  `querys` mediumtext COMMENT '产出的query词,json格式',
+  `status` int(11) DEFAULT NULL COMMENT '0:待执行;1:执行中;2:成功; 3:失败;',
+  PRIMARY KEY (`task_id`)
+) ENGINE=InnoDB DEFAULT CHARSET=utf8 COMMENT='生成query词任务';
+```
+
+## 注意事项
+
+1. 确保已正确配置Gemini API密钥
+2. 确保数据库连接配置正确,并已创建相应的表
+3. 服务默认运行在8079端口,可通过环境变量修改
+4. 日志文件保存在 `logs/app.log`
+5. 生产环境建议设置 `DEBUG=False`
+6. 异步任务处理:任务提交后立即返回,实际处理由后台定时任务完成
+7. 定时调度器每分钟检查一次待执行任务,每次处理一条
+
+## 故障排除
+
+1. **服务启动失败**: 检查端口是否被占用,查看日志文件
+2. **API调用失败**: 确认Gemini API密钥是否正确配置
+3. **数据库连接失败**: 检查数据库配置和网络连接
+4. **依赖安装失败**: 确保Python版本 >= 3.8
+5. **任务处理失败**: 检查Agent配置和API密钥,查看任务状态
+6. **定时任务不工作**: 检查调度器是否正常启动,查看日志文件
+7. **进程无法停止**: 使用 `./stop.sh` 脚本,或手动查找并终止相关进程

+ 12 - 0
requirements.txt

@@ -0,0 +1,12 @@
+fastapi==0.116.1
+uvicorn==0.35.0
+pydantic==2.11.7
+pydantic-settings==2.6.1
+langgraph==0.6.6
+langchain==0.3.27
+langchain-openai==0.3.31
+python-dotenv==1.1.1
+python-multipart==0.0.6
+httpx[socks]==0.28.1
+pymysql==1.1.0
+sqlalchemy==2.0.43

+ 1 - 0
src/__init__.py

@@ -0,0 +1 @@
+# 知识工具包

+ 1 - 0
src/agent/__init__.py

@@ -0,0 +1 @@
+# Agent模块

+ 195 - 0
src/agent/query_agent.py

@@ -0,0 +1,195 @@
+from typing import List, Dict, Any, TypedDict, Annotated
+from langgraph.graph import StateGraph, END
+from langchain_google_genai import ChatGoogleGenerativeAI
+from langchain.prompts import ChatPromptTemplate
+from langchain.schema import HumanMessage, SystemMessage
+
+from ..tools.query_tool import SuggestQueryTool
+from ..tools.prompts import QUERY_GENERATION_PROMPT, QUERY_REFINEMENT_PROMPT
+from ..database.models import QueryTaskDAO, QueryTaskStatus, logger
+
+
+class AgentState(TypedDict):
+    """Agent状态定义"""
+    question: str
+    task_id: int
+    initial_queries: List[str]
+    refined_queries: List[str]
+    context: str
+    iteration_count: int
+
+
+class QueryGenerationAgent:
+    """查询词生成Agent"""
+    
+    def __init__(self, gemini_api_key: str, model_name: str = "gemini-1.5-pro"):
+        """
+        初始化Agent
+        
+        Args:
+            gemini_api_key: Gemini API密钥
+            model_name: 使用的模型名称
+        """
+        self.llm = ChatGoogleGenerativeAI(
+            google_api_key=gemini_api_key,
+            model=model_name,
+            temperature=0.7
+        )
+        
+        self.query_tool = SuggestQueryTool()
+        self.task_dao = QueryTaskDAO()
+        
+        # 创建状态图
+        self.graph = self._create_graph()
+    
+    def _create_graph(self) -> StateGraph:
+        """创建LangGraph状态图"""
+        workflow = StateGraph(AgentState)
+        
+        # 添加节点
+        workflow.add_node("analyze_question", self._analyze_question)
+        workflow.add_node("generate_initial_queries", self._generate_initial_queries)
+        workflow.add_node("refine_queries", self._refine_queries)
+        workflow.add_node("validate_queries", self._validate_queries)
+        
+        # 设置入口点
+        workflow.set_entry_point("analyze_question")
+        
+        # 添加边
+        workflow.add_edge("analyze_question", "generate_initial_queries")
+        workflow.add_edge("generate_initial_queries", "refine_queries")
+        workflow.add_edge("refine_queries", "validate_queries")
+        workflow.add_edge("validate_queries", END)
+        
+        return workflow.compile()
+    
+    def _analyze_question(self, state: AgentState) -> AgentState:
+        """分析问题节点"""
+        question = state["question"]
+        
+        # 分析问题的复杂度和类型
+        analysis_prompt = ChatPromptTemplate.from_messages([
+            SystemMessage(content="你是一个问题分析专家。请分析用户问题的类型和复杂度。"),
+            HumanMessage(content=f"请分析这个问题:{question}\n\n分析要点:1.问题类型 2.复杂度 3.关键词 4.需要的查询角度")
+        ])
+        
+        try:
+            response = self.llm.invoke(analysis_prompt.format_messages())
+            logger.info(f"问题分析结果: {response.content}")
+            context = response.content
+        except Exception as e:
+            context = f"问题分析失败: {str(e)}"
+        
+        state["context"] = context
+        state["iteration_count"] = 0
+        
+        return state
+    
+    def _generate_initial_queries(self, state: AgentState) -> AgentState:
+        """生成初始查询词节点"""
+        question = state["question"]
+        task_id = state["task_id"]
+        context = state.get("context", "")
+        
+        # 使用工具生成查询词
+        try:
+            initial_queries = self.query_tool._run(question, context, task_id)
+        except Exception as e:
+            # 如果工具失败,使用LLM生成
+            prompt = ChatPromptTemplate.from_messages([
+                SystemMessage(content=QUERY_GENERATION_PROMPT),
+                HumanMessage(content=question)
+            ])
+            
+            try:
+                response = self.llm.invoke(prompt.format_messages())
+                queries_text = response.content
+                initial_queries = [q.strip() for q in queries_text.split('\n') if q.strip()]
+            except Exception:
+                initial_queries = [question]  # 降级处理
+        
+        state["initial_queries"] = initial_queries
+        return state
+    
+    def _refine_queries(self, state: AgentState) -> AgentState:
+        """优化查询词节点"""
+        question = state["question"]
+        initial_queries = state["initial_queries"]
+        
+        if not initial_queries:
+            state["refined_queries"] = [question]
+            return state
+        
+        # 使用LLM优化查询词
+        queries_text = '\n'.join(initial_queries)
+        prompt = ChatPromptTemplate.from_messages([
+            SystemMessage(content=QUERY_REFINEMENT_PROMPT),
+            HumanMessage(content=f"问题:{question}\n查询词:{queries_text}")
+        ])
+        
+        try:
+            response = self.llm.invoke(prompt.format_messages())
+            logger.info(f"查询词优化结果: {response.content}")
+            refined_text = response.content
+            refined_queries = [q.strip() for q in refined_text.split('\n') if q.strip()]
+        except Exception as e:
+            # 如果优化失败,使用原始查询词
+            refined_queries = initial_queries
+        
+        state["refined_queries"] = refined_queries
+        return state
+    
+    def _validate_queries(self, state: AgentState) -> AgentState:
+        """验证查询词节点"""
+        refined_queries = state["refined_queries"]
+        
+        # 基本验证:去重、过滤空字符串、限制长度
+        validated_queries = []
+        seen = set()
+        
+        for query in refined_queries:
+            if query and len(query.strip()) > 0 and len(query.strip()) < 100:
+                if query.strip() not in seen:
+                    validated_queries.append(query.strip())
+                    seen.add(query.strip())
+        
+        # 限制最终数量
+        if len(validated_queries) > 10:
+            validated_queries = validated_queries[:10]
+        
+        # 确保至少有一个查询词
+        if not validated_queries:
+            validated_queries = [state["question"]]
+        logger.info(f"查询词验证结果: {validated_queries}")
+        state["refined_queries"] = validated_queries
+        return state
+    
+    async def generate_queries(self, question: str, task_id: int = 0) -> List[str]:
+        """
+        生成查询词的主入口
+        
+        Args:
+            question: 用户问题
+            task_id: 任务ID
+            
+        Returns:
+            生成的查询词列表
+        """
+        initial_state = {
+            "question": question,
+            "task_id": task_id,
+            "initial_queries": [],
+            "refined_queries": [],
+            "context": "",
+            "iteration_count": 0
+        }
+        
+        try:
+            result = await self.graph.ainvoke(initial_state)
+            return result["refined_queries"]
+        except Exception as e:
+            # 更新任务状态为失败
+            if task_id > 0:
+                self.task_dao.update_task_status(task_id, QueryTaskStatus.FAILED)
+            # 降级处理:返回原始问题
+            return [question]

+ 1 - 0
src/api/__init__.py

@@ -0,0 +1 @@
+# API模块

+ 276 - 0
src/api/main.py

@@ -0,0 +1,276 @@
+import os
+import logging
+from fastapi import FastAPI, HTTPException
+from fastapi.middleware.cors import CORSMiddleware
+from contextlib import asynccontextmanager
+from dotenv import load_dotenv
+
+from ..models.schemas import QuestionRequest, QueryResponse, HealthResponse
+from ..agent.query_agent import QueryGenerationAgent
+from ..database.connection import init_database_manager, get_db_manager
+from ..database.models import QueryTaskDAO, QueryTaskStatus, get_query_task_dao
+from ..tools.scheduler import create_scheduler, get_scheduler
+import time
+
+# 加载环境变量
+load_dotenv()
+
+# 配置日志
+log_level = os.getenv("LOG_LEVEL", "INFO")
+log_file = os.getenv("LOG_FILE", "logs/app.log")
+
+logging.basicConfig(
+    level=getattr(logging, log_level),
+    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
+    handlers=[
+        logging.FileHandler(log_file),
+        logging.StreamHandler()
+    ]
+)
+
+logger = logging.getLogger(__name__)
+
+# 全局Agent实例
+agent = None
+task_dao = None
+scheduler = None
+
+
+@asynccontextmanager
+async def lifespan(app: FastAPI):
+    """应用生命周期管理"""
+    global agent, task_dao, scheduler
+    
+    # 启动时初始化数据库
+    logger.info("正在初始化数据库连接...")
+    try:
+        init_database_manager(
+            host=os.getenv("DB_HOST", "localhost"),
+            port=int(os.getenv("DB_PORT", 3306)),
+            user=os.getenv("DB_USER", "root"),
+            password=os.getenv("DB_PASSWORD", ""),
+            database=os.getenv("DB_NAME", "ai_knowledge"),
+            charset=os.getenv("DB_CHARSET", "utf8")
+        )
+        task_dao = get_query_task_dao()
+        logger.info("数据库初始化成功")
+    except Exception as e:
+        logger.error(f"数据库初始化失败: {e}")
+        raise
+    
+    # 启动时初始化Agent
+    logger.info("正在初始化查询生成Agent...")
+    try:
+        agent = QueryGenerationAgent(
+            gemini_api_key=os.getenv("GEMINI_API_KEY", ""),
+            model_name=os.getenv("GEMINI_MODEL", "gemini-1.5-pro")
+        )
+        logger.info("Agent初始化成功")
+    except Exception as e:
+        logger.error(f"Agent初始化失败: {e}")
+        raise
+    
+    # 启动时初始化调度器
+    logger.info("正在启动任务调度器...")
+    try:
+        scheduler = create_scheduler(agent, task_dao)
+        await scheduler.start()
+        logger.info("任务调度器启动成功")
+    except Exception as e:
+        logger.error(f"任务调度器启动失败: {e}")
+        raise
+    
+    yield
+    
+    # 关闭时清理资源
+    logger.info("正在关闭服务...")
+    
+    # 停止调度器
+    if scheduler:
+        try:
+            await scheduler.stop()
+            logger.info("任务调度器已停止")
+        except Exception as e:
+            logger.error(f"停止任务调度器失败: {e}")
+
+
+# 创建FastAPI应用
+app = FastAPI(
+    title="知识工具API",
+    description="将问题转换为查询词的服务",
+    version="1.0.0",
+    lifespan=lifespan
+)
+
+# 添加CORS中间件
+app.add_middleware(
+    CORSMiddleware,
+    allow_origins=["*"],
+    allow_credentials=True,
+    allow_methods=["*"],
+    allow_headers=["*"],
+)
+
+
+@app.get("/health", response_model=HealthResponse)
+async def health_check():
+    """健康检查接口"""
+    return HealthResponse(
+        status="healthy",
+        message="服务运行正常"
+    )
+
+
+@app.post("/generate-queries", response_model=QueryResponse)
+async def generate_queries(request: QuestionRequest):
+    """
+    生成查询词接口
+    
+    Args:
+        request: 包含question字段的请求
+        
+    Returns:
+        任务信息(状态为待执行)
+    """
+    global task_dao
+    
+    if task_dao is None:
+        raise HTTPException(status_code=503, detail="数据库未初始化")
+    
+    try:
+        logger.info(f"收到问题: {request.question}")
+        
+        # 生成任务ID(使用时间戳)
+        task_id = int(time.time() * 1000)
+        
+        # 创建任务记录,状态设置为0(待执行)
+        task_dao.create_task(task_id, request.question)
+        logger.info(f"创建任务: {task_id},状态: 待执行")
+        
+        # 立即返回待执行状态
+        return QueryResponse(
+            task_id=task_id,
+            queries=[],  # 空列表,因为还未处理
+            original_question=request.question,
+            total_count=0,  # 0个查询词,因为还未处理
+            status=QueryTaskStatus.PENDING  # 待执行状态
+        )
+        
+    except Exception as e:
+        logger.error(f"创建任务失败: {e}")
+        raise HTTPException(status_code=500, detail=f"创建任务失败: {str(e)}")
+
+
+@app.get("/task/{task_id}")
+async def get_task(task_id: int):
+    """
+    获取任务信息接口
+    
+    Args:
+        task_id: 任务ID
+        
+    Returns:
+        任务信息
+    """
+    global task_dao
+    
+    if task_dao is None:
+        raise HTTPException(status_code=503, detail="数据库未初始化")
+    
+    try:
+        task = task_dao.get_task(task_id)
+        if task is None:
+            raise HTTPException(status_code=404, detail="任务不存在")
+        
+        return {
+            "task_id": task.task_id,
+            "question": task.question,
+            "queries": task.querys,
+            "status": task.status,
+            "status_text": {
+                0: "待执行",
+                1: "执行中", 
+                2: "成功",
+                3: "失败"
+            }.get(task.status, "未知")
+        }
+        
+    except HTTPException:
+        raise
+    except Exception as e:
+        logger.error(f"获取任务信息失败: {e}")
+        raise HTTPException(status_code=500, detail=f"获取任务信息失败: {str(e)}")
+
+
+@app.get("/tasks")
+async def get_tasks(status: int = None, limit: int = 100):
+    """
+    获取任务列表接口
+    
+    Args:
+        status: 任务状态筛选(可选)
+        limit: 限制数量
+        
+    Returns:
+        任务列表
+    """
+    global task_dao
+    
+    if task_dao is None:
+        raise HTTPException(status_code=503, detail="数据库未初始化")
+    
+    try:
+        if status is not None:
+            tasks = task_dao.get_tasks_by_status(status, limit)
+        else:
+            # 获取所有状态的任务(这里简化处理,实际可能需要更复杂的查询)
+            all_tasks = []
+            for s in [0, 1, 2, 3]:
+                tasks_by_status = task_dao.get_tasks_by_status(s, limit // 4)
+                all_tasks.extend(tasks_by_status)
+            tasks = all_tasks[:limit]
+        
+        return {
+            "tasks": [
+                {
+                    "task_id": task.task_id,
+                    "question": task.question,
+                    "queries": task.querys,
+                    "status": task.status,
+                    "status_text": {
+                        0: "待执行",
+                        1: "执行中",
+                        2: "成功", 
+                        3: "失败"
+                    }.get(task.status, "未知")
+                }
+                for task in tasks
+            ],
+            "total": len(tasks)
+        }
+        
+    except Exception as e:
+        logger.error(f"获取任务列表失败: {e}")
+        raise HTTPException(status_code=500, detail=f"获取任务列表失败: {str(e)}")
+
+
+@app.get("/")
+async def root():
+    """根路径"""
+    return {
+        "message": "知识工具API服务",
+        "version": "1.0.0",
+        "docs": "/docs"
+    }
+
+
+if __name__ == "__main__":
+    import uvicorn
+    
+    uvicorn.run(
+        "src.api.main:app",
+        host=os.getenv("HOST", "0.0.0.0"),
+        port=int(os.getenv("PORT", 8079)),
+        reload=os.getenv("DEBUG", "True").lower() == "true",
+        log_level=os.getenv("LOG_LEVEL", "INFO").lower()
+    )

+ 1 - 0
src/database/__init__.py

@@ -0,0 +1 @@
+# 数据库模块

+ 116 - 0
src/database/connection.py

@@ -0,0 +1,116 @@
+import pymysql
+import logging
+from typing import Optional
+from contextlib import contextmanager
+
+logger = logging.getLogger(__name__)
+
+
+class DatabaseManager:
+    """数据库管理器"""
+    
+    def __init__(self, host: str, port: int, user: str, password: str, database: str, charset: str = "utf8"):
+        """
+        初始化数据库管理器
+        
+        Args:
+            host: 数据库主机
+            port: 数据库端口
+            user: 用户名
+            password: 密码
+            database: 数据库名
+            charset: 字符集
+        """
+        self.host = host
+        self.port = port
+        self.user = user
+        self.password = password
+        self.database = database
+        self.charset = charset
+    
+    def get_connection(self) -> pymysql.Connection:
+        """
+        获取数据库连接
+        
+        Returns:
+            数据库连接对象
+        """
+        try:
+            connection = pymysql.connect(
+                host=self.host,
+                port=self.port,
+                user=self.user,
+                passwd=self.password,
+                db=self.database,
+                charset=self.charset,
+                connect_timeout=30,
+                read_timeout=30,
+                write_timeout=30,
+                autocommit=False
+            )
+            return connection
+        except Exception as e:
+            logger.error(f"数据库连接失败: {e}")
+            raise
+    
+    @contextmanager
+    def get_cursor(self):
+        """
+        获取数据库游标的上下文管理器
+        
+        Yields:
+            数据库游标对象
+        """
+        connection = None
+        cursor = None
+        try:
+            connection = self.get_connection()
+            cursor = connection.cursor(pymysql.cursors.DictCursor)
+            yield cursor
+            connection.commit()
+        except Exception as e:
+            if connection:
+                connection.rollback()
+            logger.error(f"数据库操作失败: {e}")
+            raise
+        finally:
+            if cursor:
+                cursor.close()
+            if connection:
+                connection.close()
+
+
+# 全局数据库管理器实例
+db_manager: Optional[DatabaseManager] = None
+
+
+def init_database_manager(host: str, port: int, user: str, password: str, database: str, charset: str = "utf8"):
+    """
+    初始化全局数据库管理器
+    
+    Args:
+        host: 数据库主机
+        port: 数据库端口
+        user: 用户名
+        password: 密码
+        database: 数据库名
+        charset: 字符集
+    """
+    global db_manager
+    db_manager = DatabaseManager(host, port, user, password, database, charset)
+    logger.info("数据库管理器初始化成功")
+
+
+def get_db_manager() -> DatabaseManager:
+    """
+    获取数据库管理器实例
+    
+    Returns:
+        数据库管理器实例
+        
+    Raises:
+        RuntimeError: 如果数据库管理器未初始化
+    """
+    if db_manager is None:
+        raise RuntimeError("数据库管理器未初始化")
+    return db_manager

+ 211 - 0
src/database/models.py

@@ -0,0 +1,211 @@
+from typing import List, Optional, Dict, Any
+from datetime import datetime
+import json
+import logging
+from .connection import get_db_manager
+
+logger = logging.getLogger(__name__)
+
+
+class QueryTaskStatus:
+    """查询任务状态常量"""
+    PENDING = 0      # 待执行
+    RUNNING = 1      # 执行中
+    SUCCESS = 2      # 成功
+    FAILED = 3       # 失败
+
+
+class KnowledgeSuggestQuery:
+    """知识查询建议模型"""
+    
+    def __init__(self, task_id: int, question: str, querys: Optional[List[str]] = None, status: int = QueryTaskStatus.PENDING):
+        """
+        初始化查询任务
+        
+        Args:
+            task_id: 任务ID
+            question: 问题
+            querys: 查询词列表
+            status: 任务状态
+        """
+        self.task_id = task_id
+        self.question = question
+        self.querys = querys or []
+        self.status = status
+    
+    def to_dict(self) -> Dict[str, Any]:
+        """转换为字典"""
+        return {
+            'task_id': self.task_id,
+            'question': self.question,
+            'querys': json.dumps(self.querys, ensure_ascii=False) if self.querys else None,
+            'status': self.status
+        }
+    
+    @classmethod
+    def from_dict(cls, data: Dict[str, Any]) -> 'KnowledgeSuggestQuery':
+        """从字典创建实例"""
+        querys = None
+        if data.get('querys'):
+            try:
+                querys = json.loads(data['querys'])
+            except json.JSONDecodeError:
+                querys = []
+        
+        return cls(
+            task_id=data['task_id'],
+            question=data['question'],
+            querys=querys,
+            status=data['status']
+        )
+
+
+class QueryTaskDAO:
+    """查询任务数据访问对象"""
+    
+    def __init__(self):
+        self.db_manager = get_db_manager()
+    
+    def create_task(self, task_id: int, question: str) -> bool:
+        """
+        创建新的查询任务
+        
+        Args:
+            task_id: 任务ID
+            question: 问题
+            
+        Returns:
+            是否创建成功
+        """
+        try:
+            with self.db_manager.get_cursor() as cursor:
+                sql = """
+                INSERT INTO knowledge_suggest_query (task_id, question, status)
+                VALUES (%s, %s, %s)
+                ON DUPLICATE KEY UPDATE
+                question = VALUES(question),
+                status = VALUES(status),
+                querys = NULL
+                """
+                cursor.execute(sql, (task_id, question, QueryTaskStatus.PENDING))
+                return True
+        except Exception as e:
+            logger.error(f"创建任务失败: {e}")
+            return False
+    
+    def update_task_status(self, task_id: int, status: int) -> bool:
+        """
+        更新任务状态
+        
+        Args:
+            task_id: 任务ID
+            status: 新状态
+            
+        Returns:
+            是否更新成功
+        """
+        try:
+            with self.db_manager.get_cursor() as cursor:
+                sql = "UPDATE knowledge_suggest_query SET status = %s WHERE task_id = %s"
+                cursor.execute(sql, (status, task_id))
+                return cursor.rowcount > 0
+        except Exception as e:
+            logger.error(f"更新任务状态失败: {e}")
+            return False
+    
+    def update_task_results(self, task_id: int, querys: List[str], status: int = QueryTaskStatus.SUCCESS) -> bool:
+        """
+        更新任务结果
+        
+        Args:
+            task_id: 任务ID
+            querys: 查询词列表
+            status: 任务状态
+            
+        Returns:
+            是否更新成功
+        """
+        try:
+            with self.db_manager.get_cursor() as cursor:
+                sql = "UPDATE knowledge_suggest_query SET querys = %s, status = %s WHERE task_id = %s"
+                querys_json = json.dumps(querys, ensure_ascii=False)
+                cursor.execute(sql, (querys_json, status, task_id))
+                return cursor.rowcount > 0
+        except Exception as e:
+            logger.error(f"更新任务结果失败: {e}")
+            return False
+    
+    def get_task(self, task_id: int) -> Optional[KnowledgeSuggestQuery]:
+        """
+        获取任务信息
+        
+        Args:
+            task_id: 任务ID
+            
+        Returns:
+            任务对象,如果不存在返回None
+        """
+        try:
+            with self.db_manager.get_cursor() as cursor:
+                sql = "SELECT * FROM knowledge_suggest_query WHERE task_id = %s"
+                cursor.execute(sql, (task_id,))
+                result = cursor.fetchone()
+                
+                if result:
+                    return KnowledgeSuggestQuery.from_dict(result)
+                return None
+        except Exception as e:
+            logger.error(f"获取任务失败: {e}")
+            return None
+    
+    def get_tasks_by_status(self, status: int, limit: int = 100) -> List[KnowledgeSuggestQuery]:
+        """
+        根据状态获取任务列表
+        
+        Args:
+            status: 任务状态
+            limit: 限制数量
+            
+        Returns:
+            任务列表
+        """
+        try:
+            with self.db_manager.get_cursor() as cursor:
+                sql = "SELECT * FROM knowledge_suggest_query WHERE status = %s ORDER BY task_id DESC LIMIT %s"
+                cursor.execute(sql, (status, limit))
+                results = cursor.fetchall()
+                
+                return [KnowledgeSuggestQuery.from_dict(row) for row in results]
+        except Exception as e:
+            logger.error(f"获取任务列表失败: {e}")
+            return []
+    
+    def delete_task(self, task_id: int) -> bool:
+        """
+        删除任务
+        
+        Args:
+            task_id: 任务ID
+            
+        Returns:
+            是否删除成功
+        """
+        try:
+            with self.db_manager.get_cursor() as cursor:
+                sql = "DELETE FROM knowledge_suggest_query WHERE task_id = %s"
+                cursor.execute(sql, (task_id,))
+                return cursor.rowcount > 0
+        except Exception as e:
+            logger.error(f"删除任务失败: {e}")
+            return False
+
+
+# 全局DAO实例
+query_task_dao = None
+
+def get_query_task_dao() -> QueryTaskDAO:
+    """获取QueryTaskDAO实例"""
+    global query_task_dao
+    if query_task_dao is None:
+        query_task_dao = QueryTaskDAO()
+    return query_task_dao

+ 1 - 0
src/models/__init__.py

@@ -0,0 +1 @@
+# 数据模型模块

+ 22 - 0
src/models/schemas.py

@@ -0,0 +1,22 @@
+from typing import List, Optional
+from pydantic import BaseModel, Field
+
+
+class QuestionRequest(BaseModel):
+    """问题请求模型"""
+    question: str = Field(..., description="用户提出的问题", min_length=1, max_length=1000)
+
+
+class QueryResponse(BaseModel):
+    """查询词响应模型"""
+    task_id: int = Field(..., description="任务ID")
+    queries: List[str] = Field(..., description="生成的查询词列表")
+    original_question: str = Field(..., description="原始问题")
+    total_count: int = Field(..., description="生成的查询词数量")
+    status: int = Field(..., description="任务状态:0-待执行,1-执行中,2-成功,3-失败")
+
+
+class HealthResponse(BaseModel):
+    """健康检查响应模型"""
+    status: str = Field(..., description="服务状态")
+    message: str = Field(..., description="状态消息")

+ 1 - 0
src/tools/__init__.py

@@ -0,0 +1 @@
+# 工具模块

+ 47 - 0
src/tools/prompts.py

@@ -0,0 +1,47 @@
+"""Prompt模板定义"""
+
+QUERY_GENERATION_PROMPT = """
+你是一个专业的搜索查询词生成助手。你的任务是根据用户的问题,生成多个相关的查询词,这些查询词将用于网络爬虫任务来收集相关信息。
+
+用户问题:{question}
+
+请按照以下要求生成查询词:
+1. 生成5-10个相关的查询词
+2. 查询词应该简洁明了,便于搜索
+3. 包含不同的角度和层次(如:基础概念、具体方法、实际应用等)
+4. 避免过于宽泛或过于狭窄的查询词
+5. 考虑同义词和相关术语
+6. 不要直接复述原问题,要生成具体的搜索关键词
+7. 每个查询词应该是独立的搜索词,而不是完整的问题
+
+输出格式:直接输出查询词,每行一个,不要编号或其他格式,也不需要其他额外的说明,仅输出查询词即可。
+
+示例:
+如果问题是"如何学习Python编程"
+输出可能是:
+Python编程入门
+Python基础教程
+Python学习路径
+Python编程实践
+Python语法学习
+Python开发环境搭建
+Python项目实战
+Python算法实现
+Python Web开发
+Python数据分析
+"""
+
+QUERY_REFINEMENT_PROMPT = """
+你是一个查询词优化助手。请对以下查询词进行评估和优化:
+
+原始问题:{question}
+生成的查询词:{queries}
+
+请:
+1. 评估每个查询词的相关性和有效性
+2. 移除重复或过于相似的查询词
+3. 优化表达不清晰的查询词
+4. 确保查询词覆盖问题的不同方面
+
+输出优化后的查询词列表,每行一个,无需其他说明。
+"""

+ 98 - 0
src/tools/query_tool.py

@@ -0,0 +1,98 @@
+from typing import List, Dict, Any
+from langchain.tools import BaseTool
+from pydantic import BaseModel, Field
+import logging
+from langchain_google_genai import ChatGoogleGenerativeAI
+from langchain.prompts import ChatPromptTemplate
+from langchain.schema import HumanMessage, SystemMessage
+
+from ..database.models import QueryTaskDAO, QueryTaskStatus
+from .prompts import QUERY_GENERATION_PROMPT
+
+logger = logging.getLogger(__name__)
+
+
+class QueryToolInput(BaseModel):
+    """查询工具输入模型"""
+    question: str = Field(..., description="用户的问题")
+    context: str = Field(default="", description="额外的上下文信息")
+    task_id: int = Field(..., description="任务ID")
+
+
+class SuggestQueryTool(BaseTool):
+    """建议查询词工具"""
+    
+    name: str = "suggest_query"
+    description: str = "根据用户问题生成多个相关的查询词,用于爬虫任务"
+    args_schema: type = QueryToolInput
+    task_dao: QueryTaskDAO = None
+    llm: ChatGoogleGenerativeAI = None
+    
+    def __init__(self):
+        super().__init__()
+        self.task_dao = QueryTaskDAO()
+        # 初始化 LLM
+        import os
+        self.llm = ChatGoogleGenerativeAI(
+            google_api_key=os.getenv("GEMINI_API_KEY", ""),
+            model=os.getenv("GEMINI_MODEL", "gemini-1.5-pro"),
+            temperature=0.7
+        )
+    
+    def _run(
+        self, 
+        question: str, 
+        context: str = "",
+        task_id: int = 0,
+        run_manager = None,
+        **kwargs: Any,
+    ) -> List[str]:
+        """
+        根据问题生成查询词列表
+        
+        Args:
+            question: 用户问题
+            context: 额外上下文
+            task_id: 任务ID
+            run_manager: 运行管理器
+            
+        Returns:
+            查询词列表
+        """
+        try:
+            # 使用 LLM 和 QUERY_GENERATION_PROMPT 生成查询词
+            prompt = ChatPromptTemplate.from_messages([
+                SystemMessage(content=QUERY_GENERATION_PROMPT),
+                HumanMessage(content=question)
+            ])
+            
+            try:
+                response = self.llm.invoke(prompt.format_messages())
+                queries_text = response.content
+                queries = [q.strip() for q in queries_text.split('\n') if q.strip()]
+                
+                # 去重并限制数量
+                unique_queries = list(dict.fromkeys(queries))[:10]
+                
+                return unique_queries
+                
+            except Exception as e:
+                logger.error(f"LLM 生成查询词失败: {e}")
+                # 降级处理:返回原始问题
+                return [question]
+            
+        except Exception as e:
+            logger.error(f"生成查询词失败: {e}")
+            # 返回降级结果
+            return [question]
+    
+    async def _arun(
+        self, 
+        question: str, 
+        context: str = "",
+        task_id: int = 0,
+        run_manager = None,
+        **kwargs: Any,
+    ) -> List[str]:
+        """异步运行版本"""
+        return self._run(question, context, task_id, run_manager, **kwargs)

+ 117 - 0
src/tools/scheduler.py

@@ -0,0 +1,117 @@
+import asyncio
+import logging
+from typing import Optional
+from ..database.models import QueryTaskDAO, QueryTaskStatus, get_query_task_dao
+from ..agent.query_agent import QueryGenerationAgent
+
+logger = logging.getLogger(__name__)
+
+
+class TaskScheduler:
+    """任务调度器,负责处理待执行的任务"""
+    
+    def __init__(self, agent: QueryGenerationAgent, task_dao: QueryTaskDAO):
+        """
+        初始化调度器
+        
+        Args:
+            agent: 查询生成代理
+            task_dao: 任务数据访问对象
+        """
+        self.agent = agent
+        self.task_dao = task_dao
+        self.running = False
+        self.task = None
+    
+    async def start(self):
+        """启动调度器"""
+        if self.running:
+            logger.warning("调度器已经在运行中")
+            return
+        
+        self.running = True
+        self.task = asyncio.create_task(self._run_scheduler())
+        logger.info("任务调度器已启动")
+    
+    async def stop(self):
+        """停止调度器"""
+        if not self.running:
+            logger.warning("调度器未运行")
+            return
+        
+        self.running = False
+        if self.task:
+            self.task.cancel()
+            try:
+                await self.task
+            except asyncio.CancelledError:
+                pass
+        logger.info("任务调度器已停止")
+    
+    async def _run_scheduler(self):
+        """调度器主循环"""
+        while self.running:
+            try:
+                await self._process_pending_tasks()
+                # 等待60秒
+                await asyncio.sleep(60)
+            except asyncio.CancelledError:
+                logger.info("调度器被取消")
+                break
+            except Exception as e:
+                logger.error(f"调度器运行出错: {e}")
+                # 出错后等待30秒再继续
+                await asyncio.sleep(30)
+    
+    async def _process_pending_tasks(self):
+        """处理待执行的任务"""
+        try:
+            # 获取一条待执行的任务
+            pending_tasks = self.task_dao.get_tasks_by_status(QueryTaskStatus.PENDING, limit=1)
+            
+            if not pending_tasks:
+                logger.info("没有待执行的任务")
+                return
+            
+            task = pending_tasks[0]
+            logger.info(f"开始处理任务: {task.task_id}")
+            
+            # 更新任务状态为执行中
+            self.task_dao.update_task_status(task.task_id, QueryTaskStatus.RUNNING)
+            
+            try:
+                # 使用Agent生成查询词
+                queries = await self.agent.generate_queries(task.question, task.task_id)
+                
+                # 更新任务结果
+                success = self.task_dao.update_task_results(task.task_id, queries, QueryTaskStatus.SUCCESS)
+                
+                if success:
+                    logger.info(f"任务 {task.task_id} 处理成功,生成 {len(queries)} 个查询词")
+                else:
+                    logger.error(f"任务 {task.task_id} 结果更新失败")
+                    self.task_dao.update_task_status(task.task_id, QueryTaskStatus.FAILED)
+                    
+            except Exception as e:
+                logger.error(f"任务 {task.task_id} 处理失败: {e}")
+                # 更新任务状态为失败
+                self.task_dao.update_task_status(task.task_id, QueryTaskStatus.FAILED)
+                
+        except Exception as e:
+            logger.error(f"处理待执行任务时出错: {e}")
+
+
+# 全局调度器实例
+scheduler: Optional[TaskScheduler] = None
+
+
+def get_scheduler() -> Optional[TaskScheduler]:
+    """获取调度器实例"""
+    return scheduler
+
+
+def create_scheduler(agent: QueryGenerationAgent, task_dao: QueryTaskDAO) -> TaskScheduler:
+    """创建调度器实例"""
+    global scheduler
+    scheduler = TaskScheduler(agent, task_dao)
+    return scheduler

+ 81 - 0
start.sh

@@ -0,0 +1,81 @@
+#!/bin/bash
+
+# 知识工具服务启动脚本
+
+set -e
+
+# 颜色定义
+RED='\033[0;31m'
+GREEN='\033[0;32m'
+YELLOW='\033[1;33m'
+NC='\033[0m' # No Color
+
+# 项目根目录
+PROJECT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
+PID_FILE="$PROJECT_DIR/app.pid"
+LOG_FILE="$PROJECT_DIR/logs/app.log"
+
+echo -e "${GREEN}启动知识工具服务...${NC}"
+
+# 检查是否已经运行
+if [ -f "$PID_FILE" ]; then
+    PID=$(cat "$PID_FILE")
+    if ps -p "$PID" > /dev/null 2>&1; then
+        echo -e "${YELLOW}服务已经在运行中 (PID: $PID)${NC}"
+        exit 1
+    else
+        echo -e "${YELLOW}发现旧的PID文件,正在清理...${NC}"
+        rm -f "$PID_FILE"
+    fi
+fi
+
+# 创建日志目录
+mkdir -p "$PROJECT_DIR/logs"
+
+# 检查环境变量文件
+if [ ! -f "$PROJECT_DIR/.env" ]; then
+    echo -e "${YELLOW}警告: 未找到.env文件,请创建配置文件${NC}"
+    echo "示例配置:"
+    echo "OPENAI_API_KEY=your_api_key_here"
+    echo "HOST=0.0.0.0"
+    echo "PORT=8000"
+    echo "DEBUG=True"
+fi
+
+# 检查Python依赖
+if [ ! -d "$PROJECT_DIR/venv" ]; then
+    echo -e "${YELLOW}未找到虚拟环境,正在创建...${NC}"
+    python3 -m venv "$PROJECT_DIR/venv"
+fi
+
+# 激活虚拟环境
+source "$PROJECT_DIR/venv/bin/activate"
+
+# 安装依赖
+echo -e "${GREEN}安装Python依赖...${NC}"
+pip install -r requirements.txt
+
+# 启动服务
+echo -e "${GREEN}启动服务...${NC}"
+nohup python main.py > "$LOG_FILE" 2>&1 &
+PID=$!
+
+# 保存PID
+echo $PID > "$PID_FILE"
+
+# 等待服务启动
+sleep 3
+
+# 检查服务是否启动成功
+if ps -p "$PID" > /dev/null 2>&1; then
+    echo -e "${GREEN}服务启动成功!${NC}"
+    echo -e "${GREEN}PID: $PID${NC}"
+    echo -e "${GREEN}日志文件: $LOG_FILE${NC}"
+    echo -e "${GREEN}API文档: http://localhost:8079/docs${NC}"
+    echo -e "${GREEN}健康检查: http://localhost:8079/health${NC}"
+else
+    echo -e "${RED}服务启动失败!${NC}"
+    echo -e "${RED}请检查日志文件: $LOG_FILE${NC}"
+    rm -f "$PID_FILE"
+    exit 1
+fi

+ 121 - 0
stop.sh

@@ -0,0 +1,121 @@
+#!/bin/bash
+
+# 知识工具服务停止脚本
+
+set -e
+
+# 颜色定义
+RED='\033[0;31m'
+GREEN='\033[0;32m'
+YELLOW='\033[1;33m'
+NC='\033[0m' # No Color
+
+# 项目根目录
+PROJECT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
+PID_FILE="$PROJECT_DIR/app.pid"
+
+echo -e "${GREEN}停止知识工具服务...${NC}"
+
+# 检查PID文件是否存在
+if [ ! -f "$PID_FILE" ]; then
+    echo -e "${YELLOW}未找到PID文件,服务可能未运行${NC}"
+    exit 0
+fi
+
+# 读取PID
+PID=$(cat "$PID_FILE")
+
+# 检查进程是否存在
+if ! ps -p "$PID" > /dev/null 2>&1; then
+    echo -e "${YELLOW}进程不存在,清理PID文件${NC}"
+    rm -f "$PID_FILE"
+    exit 0
+fi
+
+# 停止进程
+echo -e "${GREEN}正在停止进程 (PID: $PID)...${NC}"
+
+# 获取进程组ID
+PGID=$(ps -o pgid= -p "$PID" 2>/dev/null | tr -d ' ')
+
+# 尝试优雅停止整个进程组
+if [ -n "$PGID" ]; then
+    echo -e "${GREEN}停止进程组 (PGID: $PGID)...${NC}"
+    kill -TERM -"$PGID" 2>/dev/null || kill -TERM "$PID"
+else
+    kill -TERM "$PID"
+fi
+
+# 额外检查并停止可能的后台任务
+echo -e "${GREEN}检查并停止相关后台任务...${NC}"
+
+# 查找可能的Python进程
+PYTHON_PIDS=$(ps aux | grep -E "(python.*main\.py|uvicorn.*main)" | grep -v grep | awk '{print $2}')
+if [ -n "$PYTHON_PIDS" ]; then
+    echo -e "${YELLOW}发现相关Python进程: $PYTHON_PIDS${NC}"
+    for py_pid in $PYTHON_PIDS; do
+        if [ "$py_pid" != "$PID" ]; then
+            echo -e "${GREEN}停止Python进程: $py_pid${NC}"
+            kill -TERM "$py_pid" 2>/dev/null || true
+        fi
+    done
+fi
+
+# 等待进程停止
+for i in {1..15}; do
+    if ! ps -p "$PID" > /dev/null 2>&1; then
+        echo -e "${GREEN}服务已成功停止${NC}"
+        rm -f "$PID_FILE"
+        # 检查端口是否已释放
+        if command -v lsof >/dev/null 2>&1; then
+            if ! lsof -i :8079 >/dev/null 2>&1; then
+                echo -e "${GREEN}端口8079已释放${NC}"
+            else
+                echo -e "${YELLOW}警告: 端口8079仍被占用${NC}"
+            fi
+        fi
+        exit 0
+    fi
+    echo -e "${YELLOW}等待进程停止... ($i/15)${NC}"
+    sleep 1
+done
+
+# 如果优雅停止失败,强制停止整个进程组
+echo -e "${YELLOW}优雅停止失败,强制停止进程组...${NC}"
+if [ -n "$PGID" ]; then
+    kill -KILL -"$PGID" 2>/dev/null || kill -KILL "$PID"
+else
+    kill -KILL "$PID"
+fi
+
+# 强制停止所有相关Python进程
+PYTHON_PIDS=$(ps aux | grep -E "(python.*main\.py|uvicorn.*main)" | grep -v grep | awk '{print $2}')
+if [ -n "$PYTHON_PIDS" ]; then
+    echo -e "${YELLOW}强制停止所有相关Python进程...${NC}"
+    for py_pid in $PYTHON_PIDS; do
+        echo -e "${YELLOW}强制停止Python进程: $py_pid${NC}"
+        kill -KILL "$py_pid" 2>/dev/null || true
+    done
+fi
+
+# 再次等待
+sleep 3
+
+if ! ps -p "$PID" > /dev/null 2>&1; then
+    echo -e "${GREEN}服务已强制停止${NC}"
+    rm -f "$PID_FILE"
+    # 检查端口是否已释放
+    if command -v lsof >/dev/null 2>&1; then
+        if ! lsof -i :8079 >/dev/null 2>&1; then
+            echo -e "${GREEN}端口8079已释放${NC}"
+        else
+            echo -e "${YELLOW}警告: 端口8079仍被占用,可能需要手动处理${NC}"
+        fi
+    fi
+    exit 0
+else
+    echo -e "${RED}无法停止服务,请手动处理${NC}"
+    echo -e "${RED}可以尝试: kill -9 $PID${NC}"
+    echo -e "${RED}或检查端口占用: lsof -i :8079${NC}"
+    exit 1
+fi