# AutoScraperX 一个基于 YAML 配置驱动的通用分布式爬虫系统,支持多 Topic 并发消费,按平台灵活执行爬虫逻辑,最终推送至 ETL 消费系统。 --- ## 🧠 项目结构简介 ```bash ├── core/ # 核心框架模块 │ ├── common/ # 公共工具类 │ │ ├── config_loader.py # 配置加载(YAML→Pydantic模型) │ │ ├── exception.py # 自定义异常体系(DataError/NetError等) │ │ └── utils.py # 通用工具(时间戳/哈希/正则) │ ├── database/ # 数据库访问层 │ │ ├── base.py # 异步DB基类(连接池管理) │ │ └── mysql.py # MySQL具体实现(CRUD封装) │ ├── log/ # 日志系统 │ │ ├── aliyun_logger.py # 阿里云SLS日志适配器 │ │ └── local_logger.py # 本地文件日志(按天滚动) │ └── spider/ # 爬虫核心组件 │ ├── base_spider.py # 爬虫基类(定义run/parse等抽象方法) │ ├── registry.py # 爬虫注册中心(动态加载子类) │ └── pipeline.py # 数据处理流水线(清洗/去重/存储) ├── spiders/ # 业务爬虫实现 │ ├── wechat_official/ # 微信公众号爬虫 │ ├── video_account/ # 视频号爬虫 │ └── news_website.py # 新闻网站示例爬虫 ├── config/ # 配置文件 │ ├── __init__.py # 配置模型初始化 │ ├── dev.yaml # 开发环境配置(本地MySQL/日志级别DEBUG) │ └── prod.yaml # 生产环境配置(阿里云RDS/日志级别INFO) ├── tests/ # 测试用例 │ ├── test_spider.py # 爬虫基类功能测试 │ └── test_pipeline.py # 数据清洗流水线测试 ├── scripts/ # 运维脚本 │ ├── manage.py # 爬虫管理工具(启动/监控/清洗) │ └── deploy.sh # 生产环境部署脚本 ├── .env.example # 环境变量示例(敏感信息占位符) ├── requirements.txt # 依赖库清单(含版本约束) ├── pyproject.toml # PEP 621项目元数据(poetry管理) └── README.md # 项目说明(当前文件) 4. 添加新爬虫 4.1 实现爬虫类 # spiders/tech_blog.py from core.spider.base_spider import BaseSpider class TechBlogSpider(BaseSpider): name = "tech_blog" async def parse(self, response): articles = [] for item in response.html.select("div.article"): title = item.select_one("h2.title").text.strip() link = item.select_one("a")["href"] articles.append({"title": title, "link": link}) return articles 4.2 注册爬虫 # spiders/__init__.py from spiders.tech_blog import TechBlogSpider SPIDER_REGISTRY = { cls.name: cls for cls in BaseSpider.__subclasses__() } 4.3 配置MQ主题 # config/prod.yaml spider: topic: "custom_tech_blog_topic" 核心流程 ## 🚀 功能特性 - ✅ 多 Topic 单进程并发监听消费(使用线程) - ✅ 根据消息动态获取 platform/mode,并注入 user_list、rule_dict - ✅ YAML 驱动爬虫逻辑,无需重复开发代码 - ✅ 请求支持自动重试、动态分页、字段抽取 - ✅ 视频封装为标准 `VideoItem`,统一推送到 MQ - ✅ 任务执行成功后再确认 ACK,保证一致性 --- ## 🧱 架构概览 - **main.py**:监听多个 Topic,消费 MQ 消息,解析出平台并调度 `UniversalCrawler` - **UniversalCrawler**:核心爬虫逻辑,读取配置发送请求,抽取字段,封装数据项,交由 `pipeline` 处理 - **PiaoQuanPipeline**:负责数据 ETL 入库、推送到 ETL MQ - **MQ 系统**:阿里云 MQ,支持按平台配置多个 Topic,消费完成再手动 ACK - **配置文件**: - `spiders_config.yaml`:各平台请求方式、字段映射、分页等配置 - `topic_map.yaml`:多 Topic 映射(暂不再使用 platform 字段) --- ## 🛠 使用说明 ### 1. 启动项目 ```bash python main1.py ``` > 程序将自动监听所有 Topic,消费消息后创建对应的爬虫任务并执行。 --- ## 🧩 spiders_config.yaml 示例配置 ```yaml default: base_url: http://api.xxx.com request_timeout: 30[]() headers: {"Content-Type": "application/json"} benshanzhufu: mode: recommend path: /crawler/ben_shan_zhu_fu/recommend method: post request_body: cursor: "1" paging: true max_pages: 5 etl_hook: "process_video_obj" response_parse: next_cursor: "$.data.next_cursor" data_path: "$.data.data" fields: video_id: "$.nid" video_title: "$.title" play_cnt: 0 publish_time: "$.update_time" video_url: "$.video_url" ``` --- ## 🧵 线程调度与消费机制 - 每个 topic 启一个线程进行 MQ 消费 - 每条消息创建一个 UniversalCrawler 实例,执行 `.run()`,完成后再 ACK - 失败或超时不会阻塞其他任务