# 微信指数采集系统 ## 项目概述 这是一个用于采集微信指数数据的自动化系统,支持多关键词、定时采集、数据入库等功能。 ## 系统架构 ### 核心组件 1. **配置管理 (config.py)** - 管理数据库、API、监控等配置 - 使用 `.env` 文件存储敏感信息 2. **数据模型 (db_model.py)** - [TaskKeyword](file:///Users/zhangliang/Documents/piaoquan/WeIndex/app/models/db_model.py#L5-L10): 关键词任务表,存储待采集的关键词 - [WxTrendData](file:///Users/zhangliang/Documents/piaoquan/WeIndex/app/models/db_model.py#L12-L18): 趋势数据表,存储采集到的指数数据 3. **数据仓库 (index_repo.py)** - 提供数据库操作接口 - 支持批量插入和更新操作 4. **采集服务 (collector.py)** - 负责调用外部API获取数据 - 数据验证和转换 5. **调度器 (main.py)** - 基于APScheduler的定时任务系统 - 支持周期性数据采集 ## 数据库结构 ### wx_trend_keywords 表 - [id](file:///Users/zhangliang/Documents/piaoquan/WeIndex/app/models/db_model.py#L14-L14): 主键 - [keyword](file:///Users/zhangliang/Documents/piaoquan/WeIndex/app/models/db_model.py#L15-L15): 关键词 (唯一) - [is_active](file:///Users/zhangliang/Documents/piaoquan/WeIndex/app/models/db_model.py#L9-L9): 是否启用 - [priority](file:///Users/zhangliang/Documents/piaoquan/WeIndex/app/models/db_model.py#L10-L10): 优先级 - `create_time`: 创建时间 ### wx_trend_data 表 - [id](file:///Users/zhangliang/Documents/piaoquan/WeIndex/app/models/db_model.py#L14-L14): 主键 - `keyword_id`: 关联关键词ID - [ymd](file:///Users/zhangliang/Documents/piaoquan/WeIndex/app/models/db_model.py#L16-L16): 日期 (YYYYMMDD) - [channel_score](file:///Users/zhangliang/Documents/piaoquan/WeIndex/app/models/db_model.py#L17-L17): 渠道评分 (JSON格式) - `create_time`: 创建时间 - `update_time`: 更新时间 ## 核心流程 ### 数据采集流程 1. 从 `wx_trend_keywords` 表获取所有 `is_active=1` 的关键词 2. 按优先级和创建时间排序 3. 遍历每个关键词,调用外部API获取数据 4. 将获取的数据批量插入/更新到 `wx_trend_data` 表 5. 记录采集日志和监控指标 ### 数据更新策略 - 仅当 [channel_score](file:///Users/zhangliang/Documents/piaoquan/WeIndex/app/models/db_model.py#L17-L17) 数据发生变化时才更新 `update_time` - 使用 `ON DUPLICATE KEY UPDATE` 实现幂等写入 - 防止重复数据录入 ## 扩展指南 ### 1. 添加新的数据采集源 #### 步骤1: 创建新的服务类 在 `app/services/` 目录下创建新的服务文件,例如 `new_collector.py`: #### 步骤2: 创建数据模型 在 `app/models/` 目录下创建对应的数据模型和API Schema #### 步骤3: 创建数据仓库 在 `app/repository/` 目录下创建对应的数据访问层 #### 步骤4: 配置调度 在 [main.py](file:///Users/zhangliang/Documents/piaoquan/WeIndex/app/main.py) 中添加新的调度任务 ### 2. 添加新的数据处理逻辑 #### 数据验证 - 在 [api_schema.py](file:///Users/zhangliang/Documents/piaoquan/WeIndex/app/models/api_schema.py) 中定义数据验证模型 - 使用 Pydantic 进行数据校验 #### 数据转换 - 在采集服务中实现数据转换逻辑 - 确保数据格式符合目标表结构 ### 3. 扩展监控能力 #### 自定义指标 - 在 `ali_log.py` 中添加新的监控指标 - 使用 `put_metric` 方法上报自定义数据 #### 报警规则 - 配置基于指标的报警规则 - 设置异常情况的通知机制 ### 4. 性能优化建议 #### 数据库优化 - 为常用查询字段添加索引 - 合理设置连接池大小 - 使用批量操作减少数据库交互次数 #### 网络优化 - 合理设置请求超时时间 - 实现请求重试机制 - 使用连接复用减少握手开销 #### 并发优化 - 使用异步IO提高并发处理能力 - 合理控制并发请求数量 - 实现请求频率限制防止被封 ## 配置说明 ### 环境变量 - [DB_HOST](file:///Users/zhangliang/Documents/piaoquan/WeIndex/app/core/config.py#L7-L7): 数据库主机 - [DB_PORT](file:///Users/zhangliang/Documents/piaoquan/WeIndex/app/core/config.py#L8-L8): 数据库端口 - [DB_USER](file:///Users/zhangliang/Documents/piaoquan/WeIndex/app/core/config.py#L9-L9): 数据库用户名 - [DB_PASSWORD](file:///Users/zhangliang/Documents/piaoquan/WeIndex/app/core/config.py#L10-L10): 数据库密码 - [DB_NAME](file:///Users/zhangliang/Documents/piaoquan/WeIndex/app/core/config.py#L11-L11): 数据库名 - [API_URL](file:///Users/zhangliang/Documents/piaoquan/WeIndex/app/core/config.py#L14-L14): 数据源API地址 - [CRON_HOUR](file:///Users/zhangliang/Documents/piaoquan/WeIndex/app/core/config.py#L17-L17): 采集任务小时 - [CRON_MINUTE](file:///Users/zhangliang/Documents/piaoquan/WeIndex/app/core/config.py#L18-L18): 采集任务分钟 ### 调度配置 - 默认每天指定时间执行一次 - 支持启动时立即执行测试 - 可配置多个调度规则 ## 部署说明 ### Docker部署 使用提供的 `docker-compose.yml` 文件进行容器化部署 ### 环境要求 - Python 3.8+ - MySQL 5.7+ - 依赖包见 `requirements.txt` ## 故障排查 ### 常见问题 1. 数据库连接失败:检查连接参数和网络连通性 2. API请求失败:检查API地址和认证信息 3. 调度任务不执行:检查Cron表达式和时区设置 ### 日志查看 - 系统日志:`logs/` 目录下 - 错误信息:重点关注ERROR级别日志 - 调试信息:DEBUG级别提供详细过程信息