Нет описания

zhangliang 4341158f7a 修复任务id获取 17 часов назад
config 4341158f7a 修复任务id获取 17 часов назад
core f0821ee29c 修复任务id获取 19 часов назад
scheduler 4341158f7a 修复任务id获取 17 часов назад
services 4341158f7a 修复任务id获取 17 часов назад
spiders f0821ee29c 修复任务id获取 19 часов назад
tests 85590ce087 优化消息处理逻辑 1 день назад
.env 85590ce087 优化消息处理逻辑 1 день назад
.gitignore 3822dc89a8 更新 1 неделя назад
README.md 85590ce087 优化消息处理逻辑 1 день назад
deploy.sh 85590ce087 优化消息处理逻辑 1 день назад
main.py 15f1b4e2d8 bug修复 1 неделя назад
requirements.txt 85590ce087 优化消息处理逻辑 1 день назад
run.sh fc8c21b105 整体改为进程+协程运行模式 2 недель назад

README.md

AutoScraperX

一个基于 YAML 配置驱动的通用分布式爬虫系统,支持多 Topic 并发消费,按平台灵活执行爬虫逻辑,最终推送至 ETL 消费系统。


🧠 项目结构简介

├── core/                  # 核心框架模块
│   ├── common/            # 公共工具类
│   │   ├── config_loader.py  # 配置加载(YAML→Pydantic模型)
│   │   ├── exception.py      # 自定义异常体系(DataError/NetError等)
│   │   └── utils.py          # 通用工具(时间戳/哈希/正则)
│   ├── database/          # 数据库访问层
│   │   ├── base.py           # 异步DB基类(连接池管理)
│   │   └── mysql.py          # MySQL具体实现(CRUD封装)
│   ├── log/               # 日志系统
│   │   ├── aliyun_logger.py  # 阿里云SLS日志适配器
│   │   └── local_logger.py   # 本地文件日志(按天滚动)
│   └── spider/            # 爬虫核心组件
│       ├── base_spider.py    # 爬虫基类(定义run/parse等抽象方法)
│       ├── registry.py       # 爬虫注册中心(动态加载子类)
│       └── pipeline.py       # 数据处理流水线(清洗/去重/存储)
├── spiders/               # 业务爬虫实现
│   ├── wechat_official/     # 微信公众号爬虫
│   ├── video_account/       # 视频号爬虫
│   └── news_website.py      # 新闻网站示例爬虫
├── config/                # 配置文件
│   ├── __init__.py          # 配置模型初始化
│   ├── dev.yaml             # 开发环境配置(本地MySQL/日志级别DEBUG)
│   └── prod.yaml            # 生产环境配置(阿里云RDS/日志级别INFO)
├── tests/                 # 测试用例
│   ├── test_spider.py       # 爬虫基类功能测试
│   └── test_pipeline.py     # 数据清洗流水线测试
├── scripts/               # 运维脚本
│   ├── manage.py            # 爬虫管理工具(启动/监控/清洗)
│   └── deploy.sh            # 生产环境部署脚本
├── .env.example           # 环境变量示例(敏感信息占位符)
├── requirements.txt       # 依赖库清单(含版本约束)
├── pyproject.toml         # PEP 621项目元数据(poetry管理)
└── README.md              # 项目说明(当前文件)

4. 添加新爬虫
4.1 实现爬虫类
# spiders/tech_blog.py
from core.spider.base_spider import BaseSpider

class TechBlogSpider(BaseSpider):
    name = "tech_blog"
    
    async def parse(self, response):
        articles = []
        for item in response.html.select("div.article"):
            title = item.select_one("h2.title").text.strip()
            link = item.select_one("a")["href"]
            articles.append({"title": title, "link": link})
        return articles
4.2 注册爬虫
# spiders/__init__.py
from spiders.tech_blog import TechBlogSpider

SPIDER_REGISTRY = {
    cls.name: cls for cls in BaseSpider.__subclasses__()
}

4.3 配置MQ主题
# config/prod.yaml
spider:
  topic: "custom_tech_blog_topic"

核心流程

## 🚀 功能特性

- ✅ 多 Topic 单进程并发监听消费(使用线程)
- ✅ 根据消息动态获取 platform/mode,并注入 user_list、rule_dict
- ✅ YAML 驱动爬虫逻辑,无需重复开发代码
- ✅ 请求支持自动重试、动态分页、字段抽取
- ✅ 视频封装为标准 `VideoItem`,统一推送到 MQ
- ✅ 任务执行成功后再确认 ACK,保证一致性

---

## 🧱 架构概览

- **main.py**:监听多个 Topic,消费 MQ 消息,解析出平台并调度 `UniversalCrawler`
- **UniversalCrawler**:核心爬虫逻辑,读取配置发送请求,抽取字段,封装数据项,交由 `pipeline` 处理
- **PiaoQuanPipeline**:负责数据 ETL 入库、推送到 ETL MQ
- **MQ 系统**:阿里云 MQ,支持按平台配置多个 Topic,消费完成再手动 ACK
- **配置文件**:
  - `spiders_config.yaml`:各平台请求方式、字段映射、分页等配置
  - `topic_map.yaml`:多 Topic 映射(暂不再使用 platform 字段)

---

## 🛠 使用说明

### 1. 启动项目

```bash
python main1.py

程序将自动监听所有 Topic,消费消息后创建对应的爬虫任务并执行。


🧩 spiders_config.yaml 示例配置

default:
  base_url: http://api.xxx.com
  request_timeout: 30[]()
  headers:
    {"Content-Type": "application/json"}

benshanzhufu:
  mode: recommend
  path: /crawler/ben_shan_zhu_fu/recommend
  method: post
  request_body:
    cursor: "1"
  paging: true
  max_pages: 5
  etl_hook: "process_video_obj"
  response_parse:
    next_cursor: "$.data.next_cursor"
    data_path: "$.data.data"
    fields:
      video_id: "$.nid"
      video_title: "$.title"
      play_cnt: 0
      publish_time: "$.update_time"
      video_url: "$.video_url"

🧵 线程调度与消费机制

  • 每个 topic 启一个线程进行 MQ 消费
  • 每条消息创建一个 UniversalCrawler 实例,执行 .run(),完成后再 ACK
  • 失败或超时不会阻塞其他任务

pip freeze > requirements.txt