process_restore.md 5.8 KB

流程恢复功能说明

概述

流程恢复功能是 Knowledge Agent 服务的一个重要特性,它能够在服务重启后自动检测并恢复中断的处理流程,确保数据处理的连续性和完整性。

问题背景

在服务运行过程中,可能会因为以下原因导致处理流程中断:

  1. 服务重启: 系统维护、配置更新等原因导致服务重启
  2. 网络中断: 网络连接不稳定导致请求失败
  3. 资源不足: 内存、CPU等资源不足导致处理中断
  4. 异常错误: 代码异常或外部服务异常导致处理失败

解决方案

状态跟踪

系统通过 knowledge_request 表中的状态字段跟踪每个请求的处理状态:

  • parsing_status: 解析状态 (0: 未开始, 1: 处理中, 2: 处理完成, 3: 处理失败)
  • extraction_status: 提取状态 (0: 未开始, 1: 处理中, 2: 处理完成, 3: 处理失败)
  • expansion_status: 扩展状态 (0: 未开始, 1: 处理中, 2: 处理完成, 3: 处理失败)

自动恢复机制

服务启动时,系统会自动执行以下恢复步骤:

  1. 等待服务启动: 延迟3秒确保服务完全启动
  2. 查询中断流程: 分别查询三种状态为"处理中"的请求
  3. 重新调用接口: 对每个中断的请求重新调用相应的处理接口
  4. 重试机制: 使用指数退避算法进行重试,最多重试3次

实现细节

核心函数

async def restore_interrupted_processes():
    """启动后恢复中断的流程"""
    # 1. 恢复解析中断的流程
    await restore_parsing_processes()
    # 2. 恢复提取中断的流程
    await restore_extraction_processes()
    # 3. 恢复扩展中断的流程
    await restore_expansion_processes()

恢复逻辑

解析流程恢复

async def restore_parsing_processes():
    """恢复解析中断的流程"""
    sql = "SELECT request_id FROM knowledge_request WHERE parsing_status = 1"
    rows = MysqlHelper.get_values(sql)
    
    for row in rows:
        request_id = row[0]
        await call_parse_async_with_retry(request_id)

提取流程恢复

async def restore_extraction_processes():
    """恢复提取中断的流程"""
    sql = "SELECT request_id, query FROM knowledge_request WHERE extraction_status = 1"
    rows = MysqlHelper.get_values(sql)
    
    for row in rows:
        request_id = row[0]
        query = row[1]
        await call_extract_with_retry(request_id, query)

扩展流程恢复

async def restore_expansion_processes():
    """恢复扩展中断的流程"""
    sql = "SELECT request_id, query FROM knowledge_request WHERE expansion_status = 1"
    rows = MysqlHelper.get_values(sql)
    
    for row in rows:
        request_id = row[0]
        query = row[1]
        await call_expand_with_retry(request_id, query)

重试机制

系统使用指数退避算法进行重试:

async def call_parse_async_with_retry(request_id: str, max_retries: int = 3):
    """调用 /parse/async 接口,带重试机制"""
    for attempt in range(max_retries):
        try:
            # 尝试调用接口
            response = await client.post("http://localhost:8080/parse/async", ...)
            if response.status_code == 200:
                return  # 成功,退出重试
        except Exception as e:
            logger.warning(f"调用失败: {e}, attempt={attempt+1}")
        
        # 指数退避等待
        if attempt < max_retries - 1:
            await asyncio.sleep(2 ** attempt)

配置选项

环境变量

可以通过环境变量配置恢复行为:

# 禁用流程恢复(开发环境)
DISABLE_PROCESS_RESTORE=true

# 调整重试次数
MAX_RETRY_ATTEMPTS=5

# 调整启动延迟
STARTUP_DELAY=5

日志级别

恢复过程的日志级别可以通过 LOG_LEVEL 环境变量控制:

export LOG_LEVEL=info    # 显示详细信息
export LOG_LEVEL=warning # 只显示警告和错误
export LOG_LEVEL=error   # 只显示错误

监控和调试

日志输出

恢复过程会产生详细的日志输出:

🔄 开始恢复中断的流程...
📋 没有发现中断的解析流程
📋 没有发现中断的提取流程
📋 没有发现中断的扩展流程
✅ 流程恢复完成

错误处理

如果恢复过程中出现错误,系统会记录详细的错误信息:

❌ 恢复解析流程失败: request_id=abc123, error=Connection refused
❌ 调用 /parse/async 最终失败: request_id=abc123, 已重试3次

测试工具

使用提供的测试工具验证恢复功能:

# 创建测试数据
python3 test_restore.py --create-data

# 运行测试
python3 test_restore.py --test

# 清理测试数据
python3 test_restore.py --cleanup

最佳实践

1. 定期监控

  • 定期检查日志中的恢复信息
  • 监控中断流程的数量和频率
  • 分析中断原因并优化系统

2. 配置优化

  • 根据系统负载调整重试次数和间隔
  • 合理设置超时时间
  • 配置适当的日志级别

3. 故障排除

  • 检查数据库连接状态
  • 验证API接口可用性
  • 确认网络连接稳定

注意事项

  1. 并发安全: 系统使用锁机制避免重复处理同一请求
  2. 资源消耗: 恢复过程会消耗一定的系统资源,建议在低峰期进行
  3. 数据一致性: 确保数据库中的状态信息准确反映实际处理状态
  4. 网络依赖: 恢复过程依赖内部API调用,确保网络连接稳定

故障排除

常见问题

  1. 恢复失败: 检查服务是否正常启动,API接口是否可用
  2. 重复处理: 检查并发锁机制是否正常工作
  3. 性能问题: 调整重试间隔和并发数量

调试步骤

  1. 检查服务启动日志
  2. 验证数据库连接
  3. 测试API接口可用性
  4. 查看详细的错误日志