流程恢复功能是 Knowledge Agent 服务的一个重要特性,它能够在服务重启后自动检测并恢复中断的处理流程,确保数据处理的连续性和完整性。
在服务运行过程中,可能会因为以下原因导致处理流程中断:
系统通过 knowledge_request
表中的状态字段跟踪每个请求的处理状态:
parsing_status
: 解析状态 (0: 未开始, 1: 处理中, 2: 处理完成, 3: 处理失败)extraction_status
: 提取状态 (0: 未开始, 1: 处理中, 2: 处理完成, 3: 处理失败)expansion_status
: 扩展状态 (0: 未开始, 1: 处理中, 2: 处理完成, 3: 处理失败)服务启动时,系统会自动执行以下恢复步骤:
async def restore_interrupted_processes():
"""启动后恢复中断的流程"""
# 1. 恢复解析中断的流程
await restore_parsing_processes()
# 2. 恢复提取中断的流程
await restore_extraction_processes()
# 3. 恢复扩展中断的流程
await restore_expansion_processes()
async def restore_parsing_processes():
"""恢复解析中断的流程"""
sql = "SELECT request_id FROM knowledge_request WHERE parsing_status = 1"
rows = MysqlHelper.get_values(sql)
for row in rows:
request_id = row[0]
await call_parse_async_with_retry(request_id)
async def restore_extraction_processes():
"""恢复提取中断的流程"""
sql = "SELECT request_id, query FROM knowledge_request WHERE extraction_status = 1"
rows = MysqlHelper.get_values(sql)
for row in rows:
request_id = row[0]
query = row[1]
await call_extract_with_retry(request_id, query)
async def restore_expansion_processes():
"""恢复扩展中断的流程"""
sql = "SELECT request_id, query FROM knowledge_request WHERE expansion_status = 1"
rows = MysqlHelper.get_values(sql)
for row in rows:
request_id = row[0]
query = row[1]
await call_expand_with_retry(request_id, query)
系统使用指数退避算法进行重试:
async def call_parse_async_with_retry(request_id: str, max_retries: int = 3):
"""调用 /parse/async 接口,带重试机制"""
for attempt in range(max_retries):
try:
# 尝试调用接口
response = await client.post("http://localhost:8080/parse/async", ...)
if response.status_code == 200:
return # 成功,退出重试
except Exception as e:
logger.warning(f"调用失败: {e}, attempt={attempt+1}")
# 指数退避等待
if attempt < max_retries - 1:
await asyncio.sleep(2 ** attempt)
可以通过环境变量配置恢复行为:
# 禁用流程恢复(开发环境)
DISABLE_PROCESS_RESTORE=true
# 调整重试次数
MAX_RETRY_ATTEMPTS=5
# 调整启动延迟
STARTUP_DELAY=5
恢复过程的日志级别可以通过 LOG_LEVEL
环境变量控制:
export LOG_LEVEL=info # 显示详细信息
export LOG_LEVEL=warning # 只显示警告和错误
export LOG_LEVEL=error # 只显示错误
恢复过程会产生详细的日志输出:
🔄 开始恢复中断的流程...
📋 没有发现中断的解析流程
📋 没有发现中断的提取流程
📋 没有发现中断的扩展流程
✅ 流程恢复完成
如果恢复过程中出现错误,系统会记录详细的错误信息:
❌ 恢复解析流程失败: request_id=abc123, error=Connection refused
❌ 调用 /parse/async 最终失败: request_id=abc123, 已重试3次
使用提供的测试工具验证恢复功能:
# 创建测试数据
python3 test_restore.py --create-data
# 运行测试
python3 test_restore.py --test
# 清理测试数据
python3 test_restore.py --cleanup