微信指数采集系统
项目概述
这是一个用于采集微信指数数据的自动化系统,支持多关键词、定时采集、数据入库等功能。
系统架构
核心组件
配置管理 (config.py)
- 管理数据库、API、监控等配置
- 使用
.env 文件存储敏感信息
数据模型 (db_model.py)
- TaskKeyword: 关键词任务表,存储待采集的关键词
- WxTrendData: 趋势数据表,存储采集到的指数数据
数据仓库 (index_repo.py)
采集服务 (collector.py)
调度器 (main.py)
- 基于APScheduler的定时任务系统
- 支持周期性数据采集
数据库结构
wx_trend_keywords 表
- id: 主键
- keyword: 关键词 (唯一)
- is_active: 是否启用
- priority: 优先级
create_time: 创建时间
wx_trend_data 表
- id: 主键
keyword_id: 关联关键词ID
- ymd: 日期 (YYYYMMDD)
- channel_score: 渠道评分 (JSON格式)
create_time: 创建时间
update_time: 更新时间
核心流程
数据采集流程
- 从
wx_trend_keywords 表获取所有 is_active=1 的关键词
- 按优先级和创建时间排序
- 遍历每个关键词,调用外部API获取数据
- 将获取的数据批量插入/更新到
wx_trend_data 表
- 记录采集日志和监控指标
数据更新策略
- 仅当 channel_score 数据发生变化时才更新
update_time
- 使用
ON DUPLICATE KEY UPDATE 实现幂等写入
- 防止重复数据录入
扩展指南
1. 添加新的数据采集源
步骤1: 创建新的服务类
在 app/services/ 目录下创建新的服务文件,例如 new_collector.py:
步骤2: 创建数据模型
在 app/models/ 目录下创建对应的数据模型和API Schema
步骤3: 创建数据仓库
在 app/repository/ 目录下创建对应的数据访问层
步骤4: 配置调度
在 main.py 中添加新的调度任务
2. 添加新的数据处理逻辑
数据验证
- 在 api_schema.py 中定义数据验证模型
- 使用 Pydantic 进行数据校验
数据转换
- 在采集服务中实现数据转换逻辑
- 确保数据格式符合目标表结构
3. 扩展监控能力
自定义指标
- 在
ali_log.py 中添加新的监控指标
- 使用
put_metric 方法上报自定义数据
报警规则
4. 性能优化建议
数据库优化
- 为常用查询字段添加索引
- 合理设置连接池大小
- 使用批量操作减少数据库交互次数
网络优化
- 合理设置请求超时时间
- 实现请求重试机制
- 使用连接复用减少握手开销
并发优化
- 使用异步IO提高并发处理能力
- 合理控制并发请求数量
- 实现请求频率限制防止被封
配置说明
环境变量
- DB_HOST: 数据库主机
- DB_PORT: 数据库端口
- DB_USER: 数据库用户名
- DB_PASSWORD: 数据库密码
- DB_NAME: 数据库名
- API_URL: 数据源API地址
- CRON_HOUR: 采集任务小时
- CRON_MINUTE: 采集任务分钟
调度配置
- 默认每天指定时间执行一次
- 支持启动时立即执行测试
- 可配置多个调度规则
部署说明
Docker部署
使用提供的 docker-compose.yml 文件进行容器化部署
环境要求
- Python 3.8+
- MySQL 5.7+
- 依赖包见
requirements.txt
故障排查
常见问题
- 数据库连接失败:检查连接参数和网络连通性
- API请求失败:检查API地址和认证信息
- 调度任务不执行:检查Cron表达式和时区设置
日志查看
- 系统日志:
logs/ 目录下
- 错误信息:重点关注ERROR级别日志
- 调试信息:DEBUG级别提供详细过程信息