Browse Source

启动后重启之前进行中的任务

jihuaqiang 3 days ago
parent
commit
758b6d349a
6 changed files with 539 additions and 9 deletions
  1. 32 1
      README.md
  2. 270 4
      agent.py
  3. 8 4
      agents/clean_agent/tools.py
  4. 219 0
      docs/process_restore.md
  5. 1 0
      requirements.txt
  6. 9 0
      start_service.sh

+ 32 - 1
README.md

@@ -187,7 +187,38 @@ knowledge-agent/
 5. **RESTful API**: 现代化的 HTTP API 接口
 6. **工作流管理**: 基于 LangGraph 的强大流程控制
 7. **状态管理**: 完整的处理状态跟踪和错误处理
-8. **Agent API**: 支持通过API直接执行Agent处理指令
+8. **流程恢复**: 服务启动时自动恢复中断的处理流程
+9. **Agent API**: 支持通过API直接执行Agent处理指令
+
+## 🔄 流程恢复功能
+
+服务启动时会自动检查并恢复中断的处理流程:
+
+### 恢复机制
+
+1. **解析流程恢复**: 自动查找 `parsing_status=1` 的请求,重新调用 `/parse/async` 接口
+2. **提取流程恢复**: 自动查找 `extraction_status=1` 的请求,重新调用 `/extract` 接口
+3. **扩展流程恢复**: 自动查找 `expansion_status=1` 的请求,重新调用 `/expand` 接口
+
+### 特性
+
+- **智能重试**: 使用指数退避算法进行重试,最多重试3次
+- **并发安全**: 避免重复处理同一请求
+- **错误处理**: 完善的错误日志记录和异常处理
+- **延迟启动**: 等待服务完全启动后再进行恢复
+
+### 测试流程恢复功能
+
+```bash
+# 创建测试数据
+python3 test_restore.py --create-data
+
+# 运行测试
+python3 test_restore.py --test
+
+# 清理测试数据
+python3 test_restore.py --cleanup
+```
 
 ## 📝 API使用示例
 

+ 270 - 4
agent.py

@@ -105,15 +105,281 @@ def _update_expansion_status(requestId: str, status: int):
 @asynccontextmanager
 async def lifespan(app: FastAPI):
     """应用生命周期管理"""
-    # 启动时初始化
+    # 启动时执行
+    logger.info("🚀 启动 Knowledge Agent 服务...")
+    
+    # 初始化全局工具
     global identify_tool
     identify_tool = IdentifyTool()
-    logger.info("Agent 服务启动完成")
+    
+    # 启动后恢复中断的流程
+    await restore_interrupted_processes()
     
     yield
     
-    # 关闭时清理
-    logger.info("Agent 服务正在关闭")
+    # 关闭时执行
+    logger.info("🛑 关闭 Knowledge Agent 服务...")
+
+async def restore_interrupted_processes():
+    """
+    启动后恢复中断的流程
+    1. 找到knowledge_request表中parsing_status=1的request_id,去请求 /parse/async
+    2. 找到knowledge_request表中extraction_status=1的request_id和query,去请求 /extract
+    3. 找到knowledge_request表中expansion_status=1的request_id和query,去请求 /expand
+    """
+    try:
+        logger.info("🔄 开始恢复中断的流程...")
+        
+        # 等待服务完全启动
+        await asyncio.sleep(3)
+        
+        # 1. 恢复解析中断的流程
+        await restore_parsing_processes()
+        
+        # 2. 恢复提取中断的流程
+        await restore_extraction_processes()
+        
+        # 3. 恢复扩展中断的流程
+        await restore_expansion_processes()
+        
+        logger.info("✅ 流程恢复完成")
+        
+    except Exception as e:
+        logger.error(f"❌ 流程恢复失败: {e}")
+
+async def restore_parsing_processes():
+    """恢复解析中断的流程"""
+    try:
+        from utils.mysql_db import MysqlHelper
+        
+        # 查询parsing_status=1的请求
+        sql = "SELECT request_id FROM knowledge_request WHERE parsing_status = 1"
+        rows = MysqlHelper.get_values(sql)
+        
+        if not rows:
+            logger.info("📋 没有发现中断的解析流程")
+            return
+        
+        logger.info(f"🔄 发现 {len(rows)} 个中断的解析流程,开始恢复...")
+        
+        for row in rows:
+            request_id = row[0]
+            try:
+                # 调用 /parse/async 接口,带重试机制
+                await call_parse_async_with_retry(request_id)
+                logger.info(f"✅ 恢复解析流程成功: request_id={request_id}")
+            except Exception as e:
+                logger.error(f"❌ 恢复解析流程失败: request_id={request_id}, error={e}")
+                
+    except Exception as e:
+        logger.error(f"❌ 恢复解析流程时发生错误: {e}")
+
+async def restore_extraction_processes():
+    """恢复提取中断的流程"""
+    try:
+        from utils.mysql_db import MysqlHelper
+        
+        # 查询extraction_status=1的请求和query
+        sql = "SELECT request_id, query FROM knowledge_request WHERE extraction_status = 1"
+        rows = MysqlHelper.get_values(sql)
+        
+        if not rows:
+            logger.info("📋 没有发现中断的提取流程")
+            return
+        
+        logger.info(f"🔄 发现 {len(rows)} 个中断的提取流程,开始恢复...")
+        
+        for row in rows:
+            request_id = row[0]
+            query = row[1] if len(row) > 1 else ""
+            try:
+                # 调用 /extract 接口,带重试机制
+                await call_extract_with_retry(request_id, query)
+                logger.info(f"✅ 恢复提取流程成功: request_id={request_id}")
+            except Exception as e:
+                logger.error(f"❌ 恢复提取流程失败: request_id={request_id}, error={e}")
+                
+    except Exception as e:
+        logger.error(f"❌ 恢复提取流程时发生错误: {e}")
+
+async def restore_expansion_processes():
+    """恢复扩展中断的流程"""
+    try:
+        from utils.mysql_db import MysqlHelper
+        
+        # 查询expansion_status=1的请求和query
+        sql = "SELECT request_id, query FROM knowledge_request WHERE expansion_status = 1"
+        rows = MysqlHelper.get_values(sql)
+        
+        if not rows:
+            logger.info("📋 没有发现中断的扩展流程")
+            return
+        
+        logger.info(f"🔄 发现 {len(rows)} 个中断的扩展流程,开始恢复...")
+        
+        for row in rows:
+            request_id = row[0]
+            query = row[1] if len(row) > 1 else ""
+            try:
+                # 调用 /expand 接口,带重试机制
+                await call_expand_with_retry(request_id, query)
+                logger.info(f"✅ 恢复扩展流程成功: request_id={request_id}")
+            except Exception as e:
+                logger.error(f"❌ 恢复扩展流程失败: request_id={request_id}, error={e}")
+                
+    except Exception as e:
+        logger.error(f"❌ 恢复扩展流程时发生错误: {e}")
+
+async def call_parse_async_with_retry(request_id: str, max_retries: int = 3):
+    """调用 /parse/async 接口,带重试机制"""
+    for attempt in range(max_retries):
+        try:
+            import httpx
+            
+            # 创建异步HTTP客户端
+            async with httpx.AsyncClient(timeout=30.0) as client:
+                response = await client.post(
+                    "http://localhost:8080/parse/async",
+                    json={"requestId": request_id}
+                )
+                
+                if response.status_code == 200:
+                    result = response.json()
+                    logger.info(f"调用 /parse/async 成功: request_id={request_id}, result={result}")
+                    return
+                else:
+                    logger.warning(f"调用 /parse/async 失败: request_id={request_id}, status_code={response.status_code}, attempt={attempt+1}")
+                    
+        except Exception as e:
+            logger.warning(f"调用 /parse/async 异常: request_id={request_id}, error={e}, attempt={attempt+1}")
+        
+        # 如果不是最后一次尝试,等待后重试
+        if attempt < max_retries - 1:
+            await asyncio.sleep(2 ** attempt)  # 指数退避
+    
+    logger.error(f"调用 /parse/async 最终失败: request_id={request_id}, 已重试{max_retries}次")
+
+async def call_extract_with_retry(request_id: str, query: str, max_retries: int = 3):
+    """调用 /extract 接口,带重试机制"""
+    for attempt in range(max_retries):
+        try:
+            import httpx
+            
+            # 创建异步HTTP客户端
+            async with httpx.AsyncClient(timeout=30.0) as client:
+                response = await client.post(
+                    "http://localhost:8080/extract",
+                    json={"requestId": request_id, "query": query}
+                )
+                
+                if response.status_code == 200:
+                    result = response.json()
+                    logger.info(f"调用 /extract 成功: request_id={request_id}, result={result}")
+                    return
+                else:
+                    logger.warning(f"调用 /extract 失败: request_id={request_id}, status_code={response.status_code}, attempt={attempt+1}")
+                    
+        except Exception as e:
+            logger.warning(f"调用 /extract 异常: request_id={request_id}, error={e}, attempt={attempt+1}")
+        
+        # 如果不是最后一次尝试,等待后重试
+        if attempt < max_retries - 1:
+            await asyncio.sleep(2 ** attempt)  # 指数退避
+    
+    logger.error(f"调用 /extract 最终失败: request_id={request_id}, 已重试{max_retries}次")
+
+async def call_expand_with_retry(request_id: str, query: str, max_retries: int = 3):
+    """调用 /expand 接口,带重试机制"""
+    for attempt in range(max_retries):
+        try:
+            import httpx
+            
+            # 创建异步HTTP客户端
+            async with httpx.AsyncClient(timeout=30.0) as client:
+                response = await client.post(
+                    "http://localhost:8080/expand",
+                    json={"requestId": request_id, "query": query}
+                )
+                
+                if response.status_code == 200:
+                    result = response.json()
+                    logger.info(f"调用 /expand 成功: request_id={request_id}, result={result}")
+                    return
+                else:
+                    logger.warning(f"调用 /expand 失败: request_id={request_id}, status_code={response.status_code}, attempt={attempt+1}")
+                    
+        except Exception as e:
+            logger.warning(f"调用 /expand 异常: request_id={request_id}, error={e}, attempt={attempt+1}")
+        
+        # 如果不是最后一次尝试,等待后重试
+        if attempt < max_retries - 1:
+            await asyncio.sleep(2 ** attempt)  # 指数退避
+    
+    logger.error(f"调用 /expand 最终失败: request_id={request_id}, 已重试{max_retries}次")
+
+async def call_parse_async(request_id: str):
+    """调用 /parse/async 接口"""
+    try:
+        import httpx
+        import asyncio
+        
+        # 创建异步HTTP客户端
+        async with httpx.AsyncClient(timeout=30.0) as client:
+            response = await client.post(
+                "http://localhost:8080/parse/async",
+                json={"requestId": request_id}
+            )
+            
+            if response.status_code == 200:
+                result = response.json()
+                logger.info(f"调用 /parse/async 成功: request_id={request_id}, result={result}")
+            else:
+                logger.error(f"调用 /parse/async 失败: request_id={request_id}, status_code={response.status_code}")
+                
+    except Exception as e:
+        logger.error(f"调用 /parse/async 异常: request_id={request_id}, error={e}")
+
+async def call_extract(request_id: str, query: str):
+    """调用 /extract 接口"""
+    try:
+        import httpx
+        
+        # 创建异步HTTP客户端
+        async with httpx.AsyncClient(timeout=30.0) as client:
+            response = await client.post(
+                "http://localhost:8080/extract",
+                json={"requestId": request_id, "query": query}
+            )
+            
+            if response.status_code == 200:
+                result = response.json()
+                logger.info(f"调用 /extract 成功: request_id={request_id}, result={result}")
+            else:
+                logger.error(f"调用 /extract 失败: request_id={request_id}, status_code={response.status_code}")
+                
+    except Exception as e:
+        logger.error(f"调用 /extract 异常: request_id={request_id}, error={e}")
+
+async def call_expand(request_id: str, query: str):
+    """调用 /expand 接口"""
+    try:
+        import httpx
+        
+        # 创建异步HTTP客户端
+        async with httpx.AsyncClient(timeout=30.0) as client:
+            response = await client.post(
+                "http://localhost:8080/expand",
+                json={"requestId": request_id, "query": query}
+            )
+            
+            if response.status_code == 200:
+                result = response.json()
+                logger.info(f"调用 /expand 成功: request_id={request_id}, result={result}")
+            else:
+                logger.error(f"调用 /expand 失败: request_id={request_id}, status_code={response.status_code}")
+                
+    except Exception as e:
+        logger.error(f"调用 /expand 异常: request_id={request_id}, error={e}")
 
 # 创建 FastAPI 应用
 app = FastAPI(

+ 8 - 4
agents/clean_agent/tools.py

@@ -126,13 +126,14 @@ def batch_evaluate_content(contents: list, db: Session, request_id: str, query_w
         # 处理评估结果
         evaluation_results = []
         
-        for i, (parsing_id, score, score_reason, parsing_data) in enumerate(evaluation_results_raw):
+        for i, (parsing_id, score, score_reason, parsing_data, content_id) in enumerate(evaluation_results_raw):
             # 创建KnowledgeExtractionContent对象
             extraction_content = KnowledgeExtractionContent(
                 request_id=request_id,
                 parsing_id=parsing_id,
                 score=score,
                 score_reason=score_reason,
+                content_id=content_id,
                 create_at=datetime.now()
             )
             db.add(extraction_content)
@@ -142,6 +143,7 @@ def batch_evaluate_content(contents: list, db: Session, request_id: str, query_w
                 "score": score,
                 "score_reason": score_reason,
                 "parsing_data": parsing_data,
+                "content_id": content_id,
                 "extraction_content": extraction_content
             })
             
@@ -227,7 +229,8 @@ def batch_call_llm_for_evaluation(contents: list, query_word: str) -> list:
     for content in contents:
         evaluation_contents.append({
             "query_word": query_word,
-            "content": content.parsing_data
+            "content": content.parsing_data,
+            "content_id": content.content_id
         })
     
     try:
@@ -241,11 +244,12 @@ def batch_call_llm_for_evaluation(contents: list, query_word: str) -> list:
             result = re.sub(r'(^\s*```json)|(\s*```\s*$)', '', result, flags=re.MULTILINE).strip()
             result = json.loads(result)
             parsing_id = contents[i].id
-            parsing_data = contents[i].parsing_data   
+            parsing_data = contents[i].parsing_data  
+            content_id = contents[i].content_id
             score = result.get("score", -2)
             score_reason = result.get("reason", "")
             
-            evaluation_results.append((parsing_id, score, score_reason, parsing_data))
+            evaluation_results.append((parsing_id, score, score_reason, parsing_data, content_id))
         
         return evaluation_results
         

+ 219 - 0
docs/process_restore.md

@@ -0,0 +1,219 @@
+# 流程恢复功能说明
+
+## 概述
+
+流程恢复功能是 Knowledge Agent 服务的一个重要特性,它能够在服务重启后自动检测并恢复中断的处理流程,确保数据处理的连续性和完整性。
+
+## 问题背景
+
+在服务运行过程中,可能会因为以下原因导致处理流程中断:
+
+1. **服务重启**: 系统维护、配置更新等原因导致服务重启
+2. **网络中断**: 网络连接不稳定导致请求失败
+3. **资源不足**: 内存、CPU等资源不足导致处理中断
+4. **异常错误**: 代码异常或外部服务异常导致处理失败
+
+## 解决方案
+
+### 状态跟踪
+
+系统通过 `knowledge_request` 表中的状态字段跟踪每个请求的处理状态:
+
+- `parsing_status`: 解析状态 (0: 未开始, 1: 处理中, 2: 处理完成, 3: 处理失败)
+- `extraction_status`: 提取状态 (0: 未开始, 1: 处理中, 2: 处理完成, 3: 处理失败)
+- `expansion_status`: 扩展状态 (0: 未开始, 1: 处理中, 2: 处理完成, 3: 处理失败)
+
+### 自动恢复机制
+
+服务启动时,系统会自动执行以下恢复步骤:
+
+1. **等待服务启动**: 延迟3秒确保服务完全启动
+2. **查询中断流程**: 分别查询三种状态为"处理中"的请求
+3. **重新调用接口**: 对每个中断的请求重新调用相应的处理接口
+4. **重试机制**: 使用指数退避算法进行重试,最多重试3次
+
+## 实现细节
+
+### 核心函数
+
+```python
+async def restore_interrupted_processes():
+    """启动后恢复中断的流程"""
+    # 1. 恢复解析中断的流程
+    await restore_parsing_processes()
+    # 2. 恢复提取中断的流程
+    await restore_extraction_processes()
+    # 3. 恢复扩展中断的流程
+    await restore_expansion_processes()
+```
+
+### 恢复逻辑
+
+#### 解析流程恢复
+
+```python
+async def restore_parsing_processes():
+    """恢复解析中断的流程"""
+    sql = "SELECT request_id FROM knowledge_request WHERE parsing_status = 1"
+    rows = MysqlHelper.get_values(sql)
+    
+    for row in rows:
+        request_id = row[0]
+        await call_parse_async_with_retry(request_id)
+```
+
+#### 提取流程恢复
+
+```python
+async def restore_extraction_processes():
+    """恢复提取中断的流程"""
+    sql = "SELECT request_id, query FROM knowledge_request WHERE extraction_status = 1"
+    rows = MysqlHelper.get_values(sql)
+    
+    for row in rows:
+        request_id = row[0]
+        query = row[1]
+        await call_extract_with_retry(request_id, query)
+```
+
+#### 扩展流程恢复
+
+```python
+async def restore_expansion_processes():
+    """恢复扩展中断的流程"""
+    sql = "SELECT request_id, query FROM knowledge_request WHERE expansion_status = 1"
+    rows = MysqlHelper.get_values(sql)
+    
+    for row in rows:
+        request_id = row[0]
+        query = row[1]
+        await call_expand_with_retry(request_id, query)
+```
+
+### 重试机制
+
+系统使用指数退避算法进行重试:
+
+```python
+async def call_parse_async_with_retry(request_id: str, max_retries: int = 3):
+    """调用 /parse/async 接口,带重试机制"""
+    for attempt in range(max_retries):
+        try:
+            # 尝试调用接口
+            response = await client.post("http://localhost:8080/parse/async", ...)
+            if response.status_code == 200:
+                return  # 成功,退出重试
+        except Exception as e:
+            logger.warning(f"调用失败: {e}, attempt={attempt+1}")
+        
+        # 指数退避等待
+        if attempt < max_retries - 1:
+            await asyncio.sleep(2 ** attempt)
+```
+
+## 配置选项
+
+### 环境变量
+
+可以通过环境变量配置恢复行为:
+
+```bash
+# 禁用流程恢复(开发环境)
+DISABLE_PROCESS_RESTORE=true
+
+# 调整重试次数
+MAX_RETRY_ATTEMPTS=5
+
+# 调整启动延迟
+STARTUP_DELAY=5
+```
+
+### 日志级别
+
+恢复过程的日志级别可以通过 `LOG_LEVEL` 环境变量控制:
+
+```bash
+export LOG_LEVEL=info    # 显示详细信息
+export LOG_LEVEL=warning # 只显示警告和错误
+export LOG_LEVEL=error   # 只显示错误
+```
+
+## 监控和调试
+
+### 日志输出
+
+恢复过程会产生详细的日志输出:
+
+```
+🔄 开始恢复中断的流程...
+📋 没有发现中断的解析流程
+📋 没有发现中断的提取流程
+📋 没有发现中断的扩展流程
+✅ 流程恢复完成
+```
+
+### 错误处理
+
+如果恢复过程中出现错误,系统会记录详细的错误信息:
+
+```
+❌ 恢复解析流程失败: request_id=abc123, error=Connection refused
+❌ 调用 /parse/async 最终失败: request_id=abc123, 已重试3次
+```
+
+### 测试工具
+
+使用提供的测试工具验证恢复功能:
+
+```bash
+# 创建测试数据
+python3 test_restore.py --create-data
+
+# 运行测试
+python3 test_restore.py --test
+
+# 清理测试数据
+python3 test_restore.py --cleanup
+```
+
+## 最佳实践
+
+### 1. 定期监控
+
+- 定期检查日志中的恢复信息
+- 监控中断流程的数量和频率
+- 分析中断原因并优化系统
+
+### 2. 配置优化
+
+- 根据系统负载调整重试次数和间隔
+- 合理设置超时时间
+- 配置适当的日志级别
+
+### 3. 故障排除
+
+- 检查数据库连接状态
+- 验证API接口可用性
+- 确认网络连接稳定
+
+## 注意事项
+
+1. **并发安全**: 系统使用锁机制避免重复处理同一请求
+2. **资源消耗**: 恢复过程会消耗一定的系统资源,建议在低峰期进行
+3. **数据一致性**: 确保数据库中的状态信息准确反映实际处理状态
+4. **网络依赖**: 恢复过程依赖内部API调用,确保网络连接稳定
+
+## 故障排除
+
+### 常见问题
+
+1. **恢复失败**: 检查服务是否正常启动,API接口是否可用
+2. **重复处理**: 检查并发锁机制是否正常工作
+3. **性能问题**: 调整重试间隔和并发数量
+
+### 调试步骤
+
+1. 检查服务启动日志
+2. 验证数据库连接
+3. 测试API接口可用性
+4. 查看详细的错误日志 

+ 1 - 0
requirements.txt

@@ -5,6 +5,7 @@ loguru==0.7.3
 pymysql==1.0.2
 Pillow==10.4.0
 requests==2.32.4
+httpx==0.27.0
 
 # FastAPI 相关依赖
 fastapi>=0.116.0

+ 9 - 0
start_service.sh

@@ -61,6 +61,15 @@ start_service() {
         exit 1
     fi
     
+    # 检查 httpx
+    echo "🔍 检查 httpx..."
+    python3 -c "import httpx" 2>/dev/null
+    if [ $? -ne 0 ]; then
+        echo "❌ 错误: 缺少 httpx 依赖包"
+        echo "请运行: pip install httpx"
+        exit 1
+    fi
+    
     # 检查 LangGraph
     echo "🔍 检查 LangGraph..."
     python3 -c "import langgraph" 2>/dev/null