Keine Beschreibung

zhangliang 24a385151c 修复mq消息消费 vor 5 Stunden
config bf43893498 修复mq消息消费 vor 6 Stunden
core a1368591a6 修复mq消息消费 vor 6 Stunden
scheduler a1368591a6 修复mq消息消费 vor 6 Stunden
services 24a385151c 修复mq消息消费 vor 5 Stunden
spiders 45590c4f1c 动态更新请求变量 vor 22 Stunden
tests fd03f01990 更新 vor 2 Tagen
.env fd03f01990 更新 vor 2 Tagen
.gitignore 3822dc89a8 更新 vor 1 Tag
README.md d4b85fe52d 结构调整 vor 3 Tagen
deploy.sh 08901d7fc7 更新 vor 1 Tag
main.py a1368591a6 修复mq消息消费 vor 6 Stunden
requirements.txt 070af55b07 更新 vor 1 Tag
run.sh fc8c21b105 整体改为进程+协程运行模式 vor 1 Woche

README.md

AutoScraperX

一个基于 YAML 配置驱动的通用分布式爬虫系统,支持多 Topic 并发消费,按平台灵活执行爬虫逻辑,最终推送至 ETL 消费系统。


🧠 项目结构简介

├── core/                  # 核心框架模块
│   ├── common/            # 公共工具类
│   │   ├── config_loader.py  # 配置加载(YAML→Pydantic模型)
│   │   ├── exception.py      # 自定义异常体系(DataError/NetError等)
│   │   └── utils.py          # 通用工具(时间戳/哈希/正则)
│   ├── database/          # 数据库访问层
│   │   ├── base.py           # 异步DB基类(连接池管理)
│   │   └── mysql.py          # MySQL具体实现(CRUD封装)
│   ├── log/               # 日志系统
│   │   ├── aliyun_logger.py  # 阿里云SLS日志适配器
│   │   └── local_logger.py   # 本地文件日志(按天滚动)
│   └── spider/            # 爬虫核心组件
│       ├── base_spider.py    # 爬虫基类(定义run/parse等抽象方法)
│       ├── registry.py       # 爬虫注册中心(动态加载子类)
│       └── pipeline.py       # 数据处理流水线(清洗/去重/存储)
├── spiders/               # 业务爬虫实现
│   ├── wechat_official/     # 微信公众号爬虫
│   ├── video_account/       # 视频号爬虫
│   └── news_website.py      # 新闻网站示例爬虫
├── config/                # 配置文件
│   ├── __init__.py          # 配置模型初始化
│   ├── dev.yaml             # 开发环境配置(本地MySQL/日志级别DEBUG)
│   └── prod.yaml            # 生产环境配置(阿里云RDS/日志级别INFO)
├── tests/                 # 测试用例
│   ├── test_spider.py       # 爬虫基类功能测试
│   └── test_pipeline.py     # 数据清洗流水线测试
├── scripts/               # 运维脚本
│   ├── manage.py            # 爬虫管理工具(启动/监控/清洗)
│   └── deploy.sh            # 生产环境部署脚本
├── .env.example           # 环境变量示例(敏感信息占位符)
├── requirements.txt       # 依赖库清单(含版本约束)
├── pyproject.toml         # PEP 621项目元数据(poetry管理)
└── README.md              # 项目说明(当前文件)

4. 添加新爬虫
4.1 实现爬虫类
# spiders/tech_blog.py
from core.spider.base_spider import BaseSpider

class TechBlogSpider(BaseSpider):
    name = "tech_blog"
    
    async def parse(self, response):
        articles = []
        for item in response.html.select("div.article"):
            title = item.select_one("h2.title").text.strip()
            link = item.select_one("a")["href"]
            articles.append({"title": title, "link": link})
        return articles
4.2 注册爬虫
# spiders/__init__.py
from spiders.tech_blog import TechBlogSpider

SPIDER_REGISTRY = {
    cls.name: cls for cls in BaseSpider.__subclasses__()
}

4.3 配置MQ主题
# config/prod.yaml
spider:
  topic: "custom_tech_blog_topic"

核心流程

## 🚀 功能特性

- ✅ 多 Topic 单进程并发监听消费(使用线程)
- ✅ 根据消息动态获取 platform/mode,并注入 user_list、rule_dict
- ✅ YAML 驱动爬虫逻辑,无需重复开发代码
- ✅ 请求支持自动重试、动态分页、字段抽取
- ✅ 视频封装为标准 `VideoItem`,统一推送到 MQ
- ✅ 任务执行成功后再确认 ACK,保证一致性

---

## 🧱 架构概览

- **main.py**:监听多个 Topic,消费 MQ 消息,解析出平台并调度 `UniversalCrawler`
- **UniversalCrawler**:核心爬虫逻辑,读取配置发送请求,抽取字段,封装数据项,交由 `pipeline` 处理
- **PiaoQuanPipeline**:负责数据 ETL 入库、推送到 ETL MQ
- **MQ 系统**:阿里云 MQ,支持按平台配置多个 Topic,消费完成再手动 ACK
- **配置文件**:
  - `spiders_config.yaml`:各平台请求方式、字段映射、分页等配置
  - `topic_map.yaml`:多 Topic 映射(暂不再使用 platform 字段)

---

## 🛠 使用说明

### 1. 启动项目

```bash
python main1.py

程序将自动监听所有 Topic,消费消息后创建对应的爬虫任务并执行。


🧩 spiders_config.yaml 示例配置

default:
  base_url: http://api.xxx.com
  request_timeout: 30[]()
  headers:
    {"Content-Type": "application/json"}

benshanzhufu:
  mode: recommend
  path: /crawler/ben_shan_zhu_fu/recommend
  method: post
  request_body:
    cursor: "1"
  paging: true
  max_pages: 5
  etl_hook: "process_video_obj"
  response_parse:
    next_cursor: "$.data.next_cursor"
    data_path: "$.data.data"
    fields:
      video_id: "$.nid"
      video_title: "$.title"
      play_cnt: 0
      publish_time: "$.update_time"
      video_url: "$.video_url"

🧵 线程调度与消费机制

  • 每个 topic 启一个线程进行 MQ 消费
  • 每条消息创建一个 UniversalCrawler 实例,执行 .run(),完成后再 ACK
  • 失败或超时不会阻塞其他任务