# 流程恢复功能说明 ## 概述 流程恢复功能是 Knowledge Agent 服务的一个重要特性,它能够在服务重启后自动检测并恢复中断的处理流程,确保数据处理的连续性和完整性。 ## 问题背景 在服务运行过程中,可能会因为以下原因导致处理流程中断: 1. **服务重启**: 系统维护、配置更新等原因导致服务重启 2. **网络中断**: 网络连接不稳定导致请求失败 3. **资源不足**: 内存、CPU等资源不足导致处理中断 4. **异常错误**: 代码异常或外部服务异常导致处理失败 ## 解决方案 ### 状态跟踪 系统通过 `knowledge_request` 表中的状态字段跟踪每个请求的处理状态: - `parsing_status`: 解析状态 (0: 未开始, 1: 处理中, 2: 处理完成, 3: 处理失败) - `extraction_status`: 提取状态 (0: 未开始, 1: 处理中, 2: 处理完成, 3: 处理失败) - `expansion_status`: 扩展状态 (0: 未开始, 1: 处理中, 2: 处理完成, 3: 处理失败) ### 自动恢复机制 服务启动时,系统会自动执行以下恢复步骤: 1. **等待服务启动**: 延迟3秒确保服务完全启动 2. **查询中断流程**: 分别查询三种状态为"处理中"的请求 3. **重新调用接口**: 对每个中断的请求重新调用相应的处理接口 4. **重试机制**: 使用指数退避算法进行重试,最多重试3次 ## 实现细节 ### 核心函数 ```python async def restore_interrupted_processes(): """启动后恢复中断的流程""" # 1. 恢复解析中断的流程 await restore_parsing_processes() # 2. 恢复提取中断的流程 await restore_extraction_processes() # 3. 恢复扩展中断的流程 await restore_expansion_processes() ``` ### 恢复逻辑 #### 解析流程恢复 ```python async def restore_parsing_processes(): """恢复解析中断的流程""" sql = "SELECT request_id FROM knowledge_request WHERE parsing_status = 1" rows = MysqlHelper.get_values(sql) for row in rows: request_id = row[0] await call_parse_async_with_retry(request_id) ``` #### 提取流程恢复 ```python async def restore_extraction_processes(): """恢复提取中断的流程""" sql = "SELECT request_id, query FROM knowledge_request WHERE extraction_status = 1" rows = MysqlHelper.get_values(sql) for row in rows: request_id = row[0] query = row[1] await call_extract_with_retry(request_id, query) ``` #### 扩展流程恢复 ```python async def restore_expansion_processes(): """恢复扩展中断的流程""" sql = "SELECT request_id, query FROM knowledge_request WHERE expansion_status = 1" rows = MysqlHelper.get_values(sql) for row in rows: request_id = row[0] query = row[1] await call_expand_with_retry(request_id, query) ``` ### 重试机制 系统使用指数退避算法进行重试: ```python async def call_parse_async_with_retry(request_id: str, max_retries: int = 3): """调用 /parse/async 接口,带重试机制""" for attempt in range(max_retries): try: # 尝试调用接口 response = await client.post("http://localhost:8080/parse/async", ...) if response.status_code == 200: return # 成功,退出重试 except Exception as e: logger.warning(f"调用失败: {e}, attempt={attempt+1}") # 指数退避等待 if attempt < max_retries - 1: await asyncio.sleep(2 ** attempt) ``` ## 配置选项 ### 环境变量 可以通过环境变量配置恢复行为: ```bash # 禁用流程恢复(开发环境) DISABLE_PROCESS_RESTORE=true # 调整重试次数 MAX_RETRY_ATTEMPTS=5 # 调整启动延迟 STARTUP_DELAY=5 ``` ### 日志级别 恢复过程的日志级别可以通过 `LOG_LEVEL` 环境变量控制: ```bash export LOG_LEVEL=info # 显示详细信息 export LOG_LEVEL=warning # 只显示警告和错误 export LOG_LEVEL=error # 只显示错误 ``` ## 监控和调试 ### 日志输出 恢复过程会产生详细的日志输出: ``` 🔄 开始恢复中断的流程... 📋 没有发现中断的解析流程 📋 没有发现中断的提取流程 📋 没有发现中断的扩展流程 ✅ 流程恢复完成 ``` ### 错误处理 如果恢复过程中出现错误,系统会记录详细的错误信息: ``` ❌ 恢复解析流程失败: request_id=abc123, error=Connection refused ❌ 调用 /parse/async 最终失败: request_id=abc123, 已重试3次 ``` ### 测试工具 使用提供的测试工具验证恢复功能: ```bash # 创建测试数据 python3 test_restore.py --create-data # 运行测试 python3 test_restore.py --test # 清理测试数据 python3 test_restore.py --cleanup ``` ## 最佳实践 ### 1. 定期监控 - 定期检查日志中的恢复信息 - 监控中断流程的数量和频率 - 分析中断原因并优化系统 ### 2. 配置优化 - 根据系统负载调整重试次数和间隔 - 合理设置超时时间 - 配置适当的日志级别 ### 3. 故障排除 - 检查数据库连接状态 - 验证API接口可用性 - 确认网络连接稳定 ## 注意事项 1. **并发安全**: 系统使用锁机制避免重复处理同一请求 2. **资源消耗**: 恢复过程会消耗一定的系统资源,建议在低峰期进行 3. **数据一致性**: 确保数据库中的状态信息准确反映实际处理状态 4. **网络依赖**: 恢复过程依赖内部API调用,确保网络连接稳定 ## 故障排除 ### 常见问题 1. **恢复失败**: 检查服务是否正常启动,API接口是否可用 2. **重复处理**: 检查并发锁机制是否正常工作 3. **性能问题**: 调整重试间隔和并发数量 ### 调试步骤 1. 检查服务启动日志 2. 验证数据库连接 3. 测试API接口可用性 4. 查看详细的错误日志