一个基于 YAML 配置驱动的通用分布式爬虫系统,支持多 Topic 并发消费,按平台灵活执行爬虫逻辑,最终推送至 ETL 消费系统。
├── core/ # 核心框架模块
│ ├── common/ # 公共工具类
│ │ ├── config_loader.py # 配置加载(YAML→Pydantic模型)
│ │ ├── exception.py # 自定义异常体系(DataError/NetError等)
│ │ └── utils.py # 通用工具(时间戳/哈希/正则)
│ ├── database/ # 数据库访问层
│ │ ├── base.py # 异步DB基类(连接池管理)
│ │ └── mysql.py # MySQL具体实现(CRUD封装)
│ ├── log/ # 日志系统
│ │ ├── aliyun_logger.py # 阿里云SLS日志适配器
│ │ └── local_logger.py # 本地文件日志(按天滚动)
│ └── spider/ # 爬虫核心组件
│ ├── base_spider.py # 爬虫基类(定义run/parse等抽象方法)
│ ├── registry.py # 爬虫注册中心(动态加载子类)
│ └── pipeline.py # 数据处理流水线(清洗/去重/存储)
├── spiders/ # 业务爬虫实现
│ ├── wechat_official/ # 微信公众号爬虫
│ ├── video_account/ # 视频号爬虫
│ └── news_website.py # 新闻网站示例爬虫
├── config/ # 配置文件
│ ├── __init__.py # 配置模型初始化
│ ├── dev.yaml # 开发环境配置(本地MySQL/日志级别DEBUG)
│ └── prod.yaml # 生产环境配置(阿里云RDS/日志级别INFO)
├── tests/ # 测试用例
│ ├── test_spider.py # 爬虫基类功能测试
│ └── test_pipeline.py # 数据清洗流水线测试
├── scripts/ # 运维脚本
│ ├── manage.py # 爬虫管理工具(启动/监控/清洗)
│ └── deploy.sh # 生产环境部署脚本
├── .env.example # 环境变量示例(敏感信息占位符)
├── requirements.txt # 依赖库清单(含版本约束)
├── pyproject.toml # PEP 621项目元数据(poetry管理)
└── README.md # 项目说明(当前文件)
4. 添加新爬虫
4.1 实现爬虫类
# spiders/tech_blog.py
from core.spider.base_spider import BaseSpider
class TechBlogSpider(BaseSpider):
name = "tech_blog"
async def parse(self, response):
articles = []
for item in response.html.select("div.article"):
title = item.select_one("h2.title").text.strip()
link = item.select_one("a")["href"]
articles.append({"title": title, "link": link})
return articles
4.2 注册爬虫
# spiders/__init__.py
from spiders.tech_blog import TechBlogSpider
SPIDER_REGISTRY = {
cls.name: cls for cls in BaseSpider.__subclasses__()
}
4.3 配置MQ主题
# config/prod.yaml
spider:
topic: "custom_tech_blog_topic"
核心流程
## 🚀 功能特性
- ✅ 多 Topic 单进程并发监听消费(使用线程)
- ✅ 根据消息动态获取 platform/mode,并注入 user_list、rule_dict
- ✅ YAML 驱动爬虫逻辑,无需重复开发代码
- ✅ 请求支持自动重试、动态分页、字段抽取
- ✅ 视频封装为标准 `VideoItem`,统一推送到 MQ
- ✅ 任务执行成功后再确认 ACK,保证一致性
---
## 🧱 架构概览
- **main.py**:监听多个 Topic,消费 MQ 消息,解析出平台并调度 `UniversalCrawler`
- **UniversalCrawler**:核心爬虫逻辑,读取配置发送请求,抽取字段,封装数据项,交由 `pipeline` 处理
- **PiaoQuanPipeline**:负责数据 ETL 入库、推送到 ETL MQ
- **MQ 系统**:阿里云 MQ,支持按平台配置多个 Topic,消费完成再手动 ACK
- **配置文件**:
- `spiders_config.yaml`:各平台请求方式、字段映射、分页等配置
- `topic_map.yaml`:多 Topic 映射(暂不再使用 platform 字段)
---
## 🛠 使用说明
### 1. 启动项目
```bash
python main1.py
程序将自动监听所有 Topic,消费消息后创建对应的爬虫任务并执行。
default:
base_url: http://api.xxx.com
request_timeout: 30[]()
headers:
{"Content-Type": "application/json"}
benshanzhufu:
mode: recommend
path: /crawler/ben_shan_zhu_fu/recommend
method: post
request_body:
cursor: "1"
paging: true
max_pages: 5
etl_hook: "process_video_obj"
response_parse:
next_cursor: "$.data.next_cursor"
data_path: "$.data.data"
fields:
video_id: "$.nid"
video_title: "$.title"
play_cnt: 0
publish_time: "$.update_time"
video_url: "$.video_url"
.run()
,完成后再 ACK