Jelajahi Sumber

小年糕账号

zhangliang 1 bulan lalu
induk
melakukan
7598c2aff2

+ 179 - 0
CONFIGURATION.md

@@ -0,0 +1,179 @@
+# AutoScraperX 配置说明
+
+本文档详细说明了AutoScraperX项目的配置项。
+
+---
+
+# 环境配置说明
+
+环境配置通过 `.env` 文件进行配置,以下为所有可配置项:
+
+| 配置项 | 描述 | 是否必填 | 默认值 |
+|--------|------|----------|--------|
+| ENV | 运行环境 (可选值: prod, dev) | 否 | prod |
+| DB_HOST | 数据库主机地址 | 是 |  |
+| DB_PORT | 数据库端口 | 否 | 3306 |
+| DB_USER | 数据库用户名 | 是 |  |
+| DB_PASSWORD | 数据库密码 | 是 |  |
+| DB_NAME | 数据库名称 | 是 |  |
+| DB_CHARSET | 数据库字符集 | 是 |  |
+| ROCKETMQ_ENDPOINT | RocketMQ接入点 | 是 |  |
+| ROCKETMQ_ACCESS_KEY_ID | RocketMQ访问密钥ID | 是 |  |
+| ROCKETMQ_ACCESS_KEY_SECRET | RocketMQ访问密钥 | 是 |  |
+| FEISHU_APPID | 飞书应用ID | 是 |  |
+| FEISHU_APPSECRET | 飞书应用密钥 | 是 |  |
+| ALIYUN_ACCESS_KEY_ID | 阿里云访问密钥ID | 是 |  |
+| ALIYUN_ACCESS_KEY_SECRET | 阿里云访问密钥 | 是 |  |
+| REDIS_HOST | Redis主机地址 | 是 |  |
+| REDIS_PORT | Redis端口 | 否 | 6379 |
+| REDIS_PASSWORD | Redis密码 | 是 |  |
+
+---
+
+# 爬虫配置说明
+
+爬虫配置通过 `config/spiders_config.yaml` 文件进行配置。
+
+## 配置示例
+
+```yaml
+default:
+  base_url: http://8.217.192.46:8889
+  request_timeout: 30
+  max_retries: 3
+  headers:
+    {"Content-Type": "application/json"}
+
+benshanzhufurecommend:
+  platform: benshanzhufu
+  mode: recommend
+  path: /crawler/ben_shan_zhu_fu/recommend
+  method: post
+  request_body:
+    cursor: "{{next_cursor}}"
+  loop_times: 50
+  loop_interval:
+    min: 30
+    max: 60
+  feishu_sheetid: "aTSJH4"
+  response_parse:
+    data: "$.data"
+    next_cursor: "$.data.next_cursor"
+    data_path: "$.data.data"
+    fields:
+      video_id: "$.nid"
+      video_title: "$.title"
+      play_cnt: 0
+      publish_time_stamp: "$.update_time"
+      out_user_id: "$.nid"
+      cover_url: "$.video_cover"
+      like_cnt: 0
+      video_url: "$.video_url"
+      out_video_id: "$.nid"
+
+
+yuannifuqimanmanrecommend:
+  platform: yuannifuqimanman
+  mode: recommend
+  path: /crawler/yuan_ni_fu_qi_man_man/recommend
+  method: post
+  request_body:
+    cursor: "{{next_cursor}}"
+  loop_times: 100
+  loop_interval:
+    min: 30
+    max: 60
+  feishu_sheetid: "golXy9"
+  response_parse:
+    data: "$.data"
+    next_cursor: "$.data.next_cursor"
+    data_path: "$.data.data"
+    fields:
+      video_id: "$.nid"
+      video_title: "$.title"
+      out_user_id: "$.nid"
+      cover_url: "$.video_cover"
+      video_url: "$.video_url"
+      out_video_id: "$.nid"
+
+xiaoniangaoauthor:
+  platform: xiaoniangao
+  mode: author
+  path: /crawler/xiao_nian_gao_plus/blogger
+  method: post
+  request_body:
+      cursor: "{{next_cursor}}"
+      account_id: "{{uid}}" # 数据库的uid
+  loop_times: 100
+  loop_interval:
+    min: 5
+    max: 20
+  feishu_sheetid: "golXy9"
+  response_parse:
+    uid: "$.uid" # 数据库的uid
+    next_cursor: "$.cursor"
+    data: "$.data"
+    has_more: "$.data.has_more"
+    data_path: "$.data.data"
+    fields:
+      video_title: "$.title"
+      duration: "$.du"
+      play_cnt: "$.play_pv"
+      like_cnt: "$.favor.total"
+      comment_cnt: "$.comment_count"
+      share_cnt: "$.share"
+      width: "$.w"
+      height: "$.h"
+      avatar_url: "$.user.hurl"
+      cover_url: "$.url"
+      video_url: "$.v_url"
+      out_user_id: "$.user.mid"
+      out_video_id: "$.vid"
+      publish_time_stamp: "$.t"
+
+
+
+```
+
+## 字段说明
+
+### 全局配置字段
+
+| 字段 | 描述 |
+|------|------|
+| base_url | 基础URL,用于拼接完整请求URL |
+| request_timeout | 请求超时时间(秒) |
+| max_retries | 最大重试次数 |
+| headers | 请求头信息 |
+
+### 平台配置字段
+
+| 字段 | 描述 |
+|------|------|
+| platform | 平台名称 |
+| mode | 爬取模式(如 recommend, author) |
+| path | API路径 |
+| url | 完整请求URL |
+| method | HTTP请求方法 |
+| request_body | 请求体参数 |
+| loop_times | 循环次数 |
+| loop_interval | 循环间隔(min/max) |
+| response_parse | 响应解析配置 |
+| feishu_sheetid | 飞书表格ID |
+
+### 响应解析字段
+
+| 字段 | 描述 |
+|------|------|
+| data_path | 数据列表路径 |
+| next_cursor | 下一页游标路径 |
+| has_more | 是否还有更多数据路径 |
+| fields | 字段映射配置 |
+
+---
+
+## 当前配置状态
+
+- 平台配置数量: 3
+- 运行环境: prod
+- 配置文件路径: /AutoScraperX/config/spiders_config.yaml

+ 236 - 78
README.md

@@ -1,4 +1,3 @@
-
 # AutoScraperX
 
 一个基于 YAML 配置驱动的通用分布式爬虫系统,支持多 Topic 并发消费,按平台灵活执行爬虫逻辑,最终推送至 ETL 消费系统。
@@ -8,69 +7,33 @@
 ## 🧠 项目结构简介
 
 ```bash
+├── config/                # 配置文件
+│   ├── __init__.py        # 配置初始化
+│   ├── base.py            # 环境配置定义
+│   └── spiders_config.yaml# 爬虫平台配置
 ├── core/                  # 核心框架模块
-│   ├── common/            # 公共工具类
-│   │   ├── config_loader.py  # 配置加载(YAML→Pydantic模型)
-│   │   ├── exception.py      # 自定义异常体系(DataError/NetError等)
-│   │   └── utils.py          # 通用工具(时间戳/哈希/正则)
-│   ├── database/          # 数据库访问层
-│   │   ├── base.py           # 异步DB基类(连接池管理)
-│   │   └── mysql.py          # MySQL具体实现(CRUD封装)
-│   ├── log/               # 日志系统
-│   │   ├── aliyun_logger.py  # 阿里云SLS日志适配器
-│   │   └── local_logger.py   # 本地文件日志(按天滚动)
-│   └── spider/            # 爬虫核心组件
-│       ├── basespider.py    # 爬虫基类(定义run/parse等抽象方法)
-│       ├── registry.py       # 爬虫注册中心(动态加载子类)
-│       └── pipeline.py       # 数据处理流水线(清洗/去重/存储)
+│   ├── base/              # 基础组件(异步客户端等)
+│   ├── models/            # 数据模型
+│   ├── utils/             # 工具类
+│   │   ├── config_manager.py      # 统一配置管理器
+│   │   ├── config_health_check.py # 配置健康检查
+│   │   ├── config_documentation.py# 配置文档生成
+│   │   └── spider_config.py       # 爬虫配置加载
+│   └── __init__.py
 ├── spiders/               # 业务爬虫实现
-│   ├── wechat_official/     # 微信公众号爬虫
-│   ├── video_account/       # 视频号爬虫
-│   └── news_website.py      # 新闻网站示例爬虫
-├── config/                # 配置文件
-│   ├── __init__.py          # 配置模型初始化
-│   ├── dev.yaml             # 开发环境配置(本地MySQL/日志级别DEBUG)
-│   └── prod.yaml            # 生产环境配置(阿里云RDS/日志级别INFO)
+│   ├── basespider.py      # 爬虫基类
+│   ├── recommendspider.py # 推荐模式爬虫基类
+│   ├── authorspider.py    # 账号模式爬虫基类
+│   └── spider_registry.py # 爬虫注册中心
+├── services/              # 业务服务
+│   ├── pipeline.py        # 数据处理管道
+│   └── async_mysql_service.py # 数据库服务
+├── scheduler/             # 调度器
+│   ├── process_manager.py # 进程管理
+│   └── async_consumer.py  # 异步消费者
 ├── tests/                 # 测试用例
-│   ├── test_spider.py       # 爬虫基类功能测试
-│   └── test_pipeline.py     # 数据清洗流水线测试
-├── scripts/               # 运维脚本
-│   ├── manage.py            # 爬虫管理工具(启动/监控/清洗)
-│   └── deploy.sh            # 生产环境部署脚本
-├── .env.example           # 环境变量示例(敏感信息占位符)
-├── requirements.txt       # 依赖库清单(含版本约束)
-├── pyproject.toml         # PEP 621项目元数据(poetry管理)
-└── README.md              # 项目说明(当前文件)
-
-4. 添加新爬虫
-4.1 实现爬虫类
-# spiders/tech_blog.py
-from core.spider.base_spider import BaseSpider
-
-class TechBlogSpider(BaseSpider):
-    name = "tech_blog"
-    
-    async def parse(self, response):
-        articles = []
-        for item in response.html.select("div.article"):
-            title = item.select_one("h2.title").text.strip()
-            link = item.select_one("a")["href"]
-            articles.append({"title": title, "link": link})
-        return articles
-4.2 注册爬虫
-# spiders/__init__.py
-from spiders.tech_blog import TechBlogSpider
-
-SPIDER_REGISTRY = {
-    cls.name: cls for cls in BaseSpider.__subclasses__()
-}
-
-4.3 配置MQ主题
-# config/prod.yaml
-spider:
-  topic: "custom_tech_blog_topic"
-
-核心流程
+└── scripts/               # 运维脚本
+    └── config_cli.py      # 配置管理命令行工具
 
 ## 🚀 功能特性
 
@@ -80,18 +43,126 @@ spider:
 - ✅ 请求支持自动重试、动态分页、字段抽取
 - ✅ 视频封装为标准 `VideoItem`,统一推送到 MQ
 - ✅ 任务执行成功后再确认 ACK,保证一致性
+- ✅ 完善的配置管理(验证、健康检查、文档生成、命令行工具)
 
 ---
 
 ## 🧱 架构概览
 
-- **main.py**:监听多个 Topic,消费 MQ 消息,解析出平台并调度 `UniversalCrawler`
-- **UniversalCrawler**:核心爬虫逻辑,读取配置发送请求,抽取字段,封装数据项,交由 `pipeline` 处理
-- **PiaoQuanPipeline**:负责数据 ETL 入库、推送到 ETL MQ
+- **main.py**:监听多个 Topic,消费 MQ 消息,解析出平台并调度爬虫
+- **爬虫类**:核心爬虫逻辑,读取配置发送请求,抽取字段,封装数据项
+- **Pipeline**:负责数据校验、去重和推送至 ETL MQ
 - **MQ 系统**:阿里云 MQ,支持按平台配置多个 Topic,消费完成再手动 ACK
-- **配置文件**:
-  - `spiders_config.yaml`:各平台请求方式、字段映射、分页等配置
-  - `topic_map.yaml`:多 Topic 映射(暂不再使用 platform 字段)
+- **配置系统**:
+  - 环境配置:通过 `.env` 文件和 `config/base.py` 管理
+  - 爬虫配置:通过 `config/spiders_config.yaml` 管理
+
+---
+
+## ⚙️ 配置管理
+
+AutoScraperX 使用分层配置管理系统,提供完整的配置管理功能:
+
+### 环境配置
+
+环境配置通过 `.env` 文件管理,包含数据库、消息队列、日志等基础设施配置。
+
+1. 复制 `.env.example` 为 `.env`
+2. 根据实际环境填写配置项
+
+```bash
+cp .env.example .env
+# 编辑 .env 文件
+```
+
+### 爬虫配置
+
+爬虫配置通过 `config/spiders_config.yaml` 文件管理,采用 YAML 格式,支持默认配置和平台特定配置。
+
+```yaml
+# 默认配置
+default:
+  base_url: http://api.example.com
+  request_timeout: 30
+  max_retries: 3
+
+# 平台特定配置
+platform_name:
+  platform: platform_name
+  mode: recommend
+  path: /api/path
+  method: post
+  # 更多配置...
+```
+
+### 配置验证
+
+系统使用 Pydantic 模型对配置进行验证,确保配置格式正确:
+
+- HTTP方法必须是有效的(GET, POST, PUT, DELETE, PATCH)
+- 循环次数必须是正数
+- 循环间隔配置必须包含min和max,且min不能大于max
+- 响应解析配置必须包含data_path字段
+
+### 配置健康检查
+
+运行以下命令检查配置健康状态:
+
+```bash
+python -m core.utils.config_health_check
+```
+
+该工具会检查:
+- 环境配置完整性
+- 爬虫配置有效性
+- 配置文件权限
+
+### 配置文档生成
+
+运行以下命令生成配置文档:
+
+```bash
+python -m core.utils.config_documentation
+```
+
+生成的文档包含:
+- 环境配置详细说明
+- 爬虫配置结构说明
+- 当前配置状态信息
+
+### 配置命令行工具
+
+使用命令行工具管理配置:
+
+```bash
+# 检查配置健康状态
+python scripts/config_cli.py check
+
+# 生成配置文档
+python scripts/config_cli.py docs
+
+# 列出所有平台
+python scripts/config_cli.py list
+
+# 显示配置统计信息
+python scripts/config_cli.py stats
+
+# 显示特定平台配置详情
+python scripts/config_cli.py show <platform_name>
+```
+
+### 配置热更新
+
+当修改了配置文件后,可以通过以下方式重新加载配置而无需重启服务:
+
+```bash
+# 通过API重新加载配置(如果启用了配置API服务)
+curl -X POST http://127.0.0.1:8080/config/reload
+
+# 或者在代码中调用
+from core.utils.spider_config import SpiderConfig
+SpiderConfig.reload_config()
+```
 
 ---
 
@@ -100,7 +171,7 @@ spider:
 ### 1. 启动项目
 
 ```bash
-python main1.py
+python main.py
 ```
 
 > 程序将自动监听所有 Topic,消费消息后创建对应的爬虫任务并执行。
@@ -111,29 +182,37 @@ python main1.py
 
 ```yaml
 default:
-  base_url: http://api.xxx.com
-  request_timeout: 30[]()
+  base_url: http://8.217.192.46:8889
+  request_timeout: 30
   headers:
     {"Content-Type": "application/json"}
 
 benshanzhufu:
+  platform: benshanzhufu
   mode: recommend
   path: /crawler/ben_shan_zhu_fu/recommend
   method: post
   request_body:
-    cursor: "1"
-  paging: true
-  max_pages: 5
-  etl_hook: "process_video_obj"
+    cursor: "{{next_cursor}}"
+  loop_times: 50
+  loop_interval:
+    min: 30
+    max: 60
+  feishu_sheetid: "aTSJH4"
   response_parse:
+    data: "$.data"
     next_cursor: "$.data.next_cursor"
     data_path: "$.data.data"
     fields:
       video_id: "$.nid"
       video_title: "$.title"
       play_cnt: 0
-      publish_time: "$.update_time"
+      publish_time_stamp: "$.update_time"
+      out_user_id: "$.nid"
+      cover_url: "$.video_cover"
+      like_cnt: 0
       video_url: "$.video_url"
+      out_video_id: "$.nid"
 ```
 
 ---
@@ -141,8 +220,87 @@ benshanzhufu:
 ## 🧵 线程调度与消费机制
 
 - 每个 topic 启一个线程进行 MQ 消费
-- 每条消息创建一个 UniversalCrawler 实例,执行 `.run()`,完成后再 ACK
+- 每条消息创建一个爬虫实例,执行 `.run()`,完成后再 ACK
 - 失败或超时不会阻塞其他任务
 
-pip freeze > requirements.txt
-pip install -r requirements.txt
+---
+
+## 🧪 测试
+
+运行测试:
+
+```bash
+# 运行所有测试
+pytest
+
+# 运行特定测试
+pytest tests/test_config.py
+```
+
+---
+
+## 📦 部署
+
+```bash
+# 安装依赖
+pip install -r requirements.txt
+
+# 启动服务
+python main.py
+
+# 或使用部署脚本
+sh deploy.sh
+```
+
+## 🧰 常用操作
+
+### 配置管理操作
+
+1. **查看所有平台配置**:
+   ```bash
+   python scripts/config_cli.py list
+   ```
+
+2. **查看特定平台配置详情**:
+   ```bash
+   python scripts/config_cli.py show <platform_name>
+   ```
+
+3. **检查配置健康状态**:
+   ```bash
+   python scripts/config_cli.py check
+   ```
+
+4. **生成配置文档**:
+   ```bash
+   python scripts/config_cli.py docs
+   ```
+
+5. **查看配置统计信息**:
+   ```bash
+   python scripts/config_cli.py stats
+   ```
+
+### 配置更新操作
+
+当需要更新配置时:
+
+1. 编辑 `config/spiders_config.yaml` 文件
+2. 通过命令行工具重新加载配置:
+   ```bash
+   # 在代码中调用重新加载
+   from core.utils.spider_config import SpiderConfig
+   SpiderConfig.reload_config()
+   ```
+
+### 日常维护操作
+
+1. **检查系统健康状态**:
+   ```bash
+   python -m core.utils.config_health_check
+   ```
+
+2. **生成最新的配置文档**:
+   ```bash
+   python -m core.utils.config_documentation
+   ```

+ 33 - 3
config/base.py

@@ -1,8 +1,8 @@
 import os
 
 from dotenv import load_dotenv
-from core.utils.path_utils import project_root,log_dir
-from pydantic import BaseSettings, Field, AnyUrl
+from core.utils.path_utils import project_root, log_dir
+from pydantic import BaseSettings, Field, AnyUrl, validator
 
 # 在 Settings 类之前加载 .env 文件
 
@@ -56,9 +56,39 @@ class Settings(BaseSettings):
     REDIS_MAX_CONNECTIONS: int = Field(20, env="REDIS_MAX_CONNECTIONS")
     @property
     def redis_url(self) -> str:
-        """生成"""
+        """生成Redis连接URL"""
         return f"redis://:{self.REDIS_PASSWORD}@{self.REDIS_HOST}:{self.REDIS_PORT}/{self.REDIS_DB}"
 
+    @validator('DB_PORT', 'REDIS_PORT')
+    def validate_port(cls, v):
+        if not 1 <= v <= 65535:
+            raise ValueError('Port must be between 1 and 65535')
+        return v
+
+    @validator('DB_POOL_SIZE', 'DB_POOL_RECYCLE', 'REDIS_MAX_CONNECTIONS')
+    def validate_positive_int(cls, v, field):
+        if v <= 0:
+            raise ValueError(f'{field.name} must be positive')
+        return v
+
+    @validator('ROCKETMQ_WAIT_SECONDS')
+    def validate_rocketmq_wait_seconds(cls, v):
+        if not 1 <= v <= 30:
+            raise ValueError('ROCKETMQ_WAIT_SECONDS must be between 1 and 30')
+        return v
+
+    @validator('ROCKETMQ_BATCH')
+    def validate_rocketmq_batch(cls, v):
+        if not 1 <= v <= 16:
+            raise ValueError('ROCKETMQ_BATCH must be between 1 and 16')
+        return v
+
+    @validator('CONNECTION_TIMEOUT', 'REQUEST_TIMEOUT')
+    def validate_timeouts(cls, v, field):
+        if v <= 0:
+            raise ValueError(f'{field.name} must be positive')
+        return v
+
     class Config:
         env_file = ".env"
         env_file_encoding = 'utf-8'

+ 2 - 2
config/spiders_config.yaml

@@ -69,7 +69,7 @@ xiaoniangaoauthor:
   loop_interval:
     min: 5
     max: 20
-  feishu_sheetid: "golXy9"
+  feishu_sheetid: "K0gA9Y"
   response_parse:
     uid: "$.uid" # 数据库的uid
     next_cursor: "$.cursor"
@@ -90,7 +90,7 @@ xiaoniangaoauthor:
       video_url: "$.v_url"
       out_user_id: "$.user.mid"
       out_video_id: "$.vid"
-
+      publish_time_stamp: "$.t"
 
 
 

+ 14 - 14
core/base/async_request_client.py

@@ -11,10 +11,9 @@ class AsyncRequestClient:
     请求返回code!=0重试,本地日志记录
     重试达到最大次数后上报阿里云日志
     """
-    def __init__(self, logger:Optional[LoggerManager.get_logger()] = None ,
-                 aliyun_log:Optional[LoggerManager.get_aliyun_logger()] = None,
-                 max_retries=3, timeout=30
-                 ):
+    def __init__(self, logger: Optional[LoggerManager.get_logger()] = None,
+                 aliyun_log: Optional[LoggerManager.get_aliyun_logger()] = None,
+                 max_retries=3, timeout=30):
         self.logger = logger
         self.aliyun_log = aliyun_log
         self.max_retries = max_retries
@@ -22,6 +21,7 @@ class AsyncRequestClient:
 
     async def request(self, session: aiohttp.ClientSession, method: str, url: str, **kwargs) -> Optional[Dict]:
         retries = 0
+        resp = None  # 初始化resp变量
 
         while retries < self.max_retries:
             try:
@@ -29,12 +29,11 @@ class AsyncRequestClient:
                     self.logger.info(f"请求 {method} {url}, 请求参数{kwargs}")
                 if self.aliyun_log:
                     self.aliyun_log.logging(
-                        code = "1012",
+                        code="1012",
                         message="初始化请求",
-                        data={"utl":url,
-                              "method":method,
-                              "requestBody":kwargs
-                              }
+                        data={"url": url,
+                              "method": method,
+                              "requestBody": kwargs}
                     )
                 async with session.request(method, url, **kwargs) as response:
                     response.raise_for_status()
@@ -72,11 +71,12 @@ class AsyncRequestClient:
                             code="9006",
                             message="请求异常达到最大重试次数",
                             data={
-                                  "url": url,
-                                  "method": method,
-                                  "requestBody": kwargs,
-                                  "response": resp
-                                }
+                                "url": url,
+                                "method": method,
+                                "requestBody": kwargs,
+                                "response": str(resp) if resp else str(e),
+                                "error_type": type(e).__name__
+                            }
                         )
                     return
                 if self.logger:

+ 16 - 0
core/models/__init__.py

@@ -0,0 +1,16 @@
+"""
+核心模型模块
+包含所有Pydantic数据模型
+"""
+
+from .video_item import VideoItem
+from .spiders_config_models import BaseConfig, PlatformConfig
+from .rule_models import RuleModel, RuleField
+
+__all__ = [
+    "VideoItem",
+    "BaseConfig", 
+    "PlatformConfig",
+    "RuleModel",
+    "RuleField"
+]

+ 40 - 0
core/models/rule_models.py

@@ -0,0 +1,40 @@
+from pydantic import BaseModel, validator
+from typing import Dict, Any, Optional, Union
+
+
+class RuleField(BaseModel):
+    """
+    规则字段模型,用于验证单个规则字段
+    """
+    min: Union[int, float] = 0
+    max: Union[int, float] = 0
+
+    @validator('max')
+    def validate_min_max(cls, v, values, field):
+        if 'min' in values and v != 0 and values['min'] > v:
+            raise ValueError('min value cannot be greater than max value')
+        return v
+
+
+class RuleModel(BaseModel):
+    """
+    规则模型,用于验证规则字典
+    """
+    period: Optional[RuleField] = None
+    duration: Optional[RuleField] = None
+    play_cnt: Optional[RuleField] = None
+    like_cnt: Optional[RuleField] = None
+    comment_cnt: Optional[RuleField] = None
+    share_cnt: Optional[RuleField] = None
+    videos_cnt: Optional[RuleField] = None
+    width: Optional[RuleField] = None
+    height: Optional[RuleField] = None
+
+    @validator('*')
+    def validate_rule_fields(cls, v):
+        if v is not None and not isinstance(v, RuleField):
+            raise ValueError('Rule fields must be of type RuleField')
+        return v
+
+    class Config:
+        extra = "allow"  # 允许额外的字段

+ 71 - 8
core/models/spiders_config_models.py

@@ -1,21 +1,84 @@
-from pydantic import BaseModel, AnyUrl
+from pydantic import BaseModel, AnyUrl, validator
+from typing import Dict, Any, Optional, Union
+
 
 class BaseConfig(BaseModel):
-    base_url: AnyUrl = None
+    base_url: Optional[AnyUrl]
     request_timeout: int = 30
     max_retries: int = 3
-    headers: dict = {}
+    headers: Dict[str, Any] = {}
+
+    @validator('request_timeout', 'max_retries')
+    def validate_positive_int(cls, v, field):
+        if v <= 0:
+            raise ValueError(f'{field.name} must be positive')
+        return v
+
 
 class PlatformConfig(BaseConfig):
     platform: str
     mode: str
-    path: str = None
+    path: Optional[str]
     url: AnyUrl
     method: str
-    request_body: dict = {}
+    request_body: Dict[str, Any] = {}
     loop_times: int = 1
-    loop_interval: dict = {}
-    response_parse: dict = {}
+    loop_interval: Dict[str, int] = {}
+    response_parse: Dict[str, Any] = {}
     retry_times: int = 0
-    feishu_sheetid: str
+    feishu_sheetid: Optional[str] = None
+
+    @validator('method')
+    def validate_method(cls, v):
+        allowed_methods = ['GET', 'POST', 'PUT', 'DELETE', 'PATCH']
+        if v.upper() not in allowed_methods:
+            raise ValueError(f'Method must be one of {", ".join(allowed_methods)}')
+        return v.upper()
+
+    @validator('loop_times')
+    def validate_loop_times(cls, v):
+        if v <= 0:
+            raise ValueError('loop_times must be positive')
+        return v
+
+    @validator('loop_interval')
+    def validate_loop_interval(cls, v):
+        if 'min' not in v or 'max' not in v:
+            raise ValueError('loop_interval must contain both min and max keys')
+        if v['min'] < 0 or v['max'] < 0:
+            raise ValueError('loop_interval values must be non-negative')
+        if v['min'] > v['max']:
+            raise ValueError('min value cannot be greater than max value')
+        return v
+
+    @validator('response_parse')
+    def validate_response_parse(cls, v):
+        if 'data_path' not in v:
+            raise ValueError('response_parse must contain data_path')
+        return v
+
+    @validator('retry_times')
+    def validate_retry_times(cls, v):
+        if v < 0:
+            raise ValueError('retry_times must be non-negative')
+        return v
 
+    @validator('request_body')
+    def validate_request_body(cls, v):
+        # 确保request_body中的值是基本类型或字典/列表
+        if not isinstance(v, dict):
+            raise ValueError('request_body must be a dictionary')
+        
+        def is_valid_type(value):
+            if isinstance(value, (str, int, float, bool, type(None))):
+                return True
+            elif isinstance(value, (list, tuple)):
+                return all(is_valid_type(item) for item in value)
+            elif isinstance(value, dict):
+                return all(isinstance(k, str) and is_valid_type(v) for k, v in value.items())
+            return False
+            
+        for key, value in v.items():
+            if not is_valid_type(value):
+                raise ValueError(f'Invalid type for request_body["{key}"]: {type(value)}')
+        return v

+ 46 - 8
core/models/video_item.py

@@ -1,8 +1,8 @@
 import time
 import uuid
-from typing import Optional
+from typing import Optional, Union
 
-from pydantic import BaseModel, Field
+from pydantic import BaseModel, Field, validator
 
 from services.clean_title import clean_title
 
@@ -26,8 +26,8 @@ class VideoItem(BaseModel):
     session: Optional[str]
 
     video_title: Optional[str]
-    publish_time_stamp: Optional[int] = None
-    update_time_stamp: Optional[int] = None
+    publish_time_stamp: Optional[Union[int, str]] = None
+    update_time_stamp: Optional[Union[int, str]] = None
 
     duration: int = 0
     play_cnt: int = 0
@@ -40,15 +40,41 @@ class VideoItem(BaseModel):
     publish_time_str: Optional[str] = None
     publish_time: Optional[str] = None
 
+    # 添加验证器确保数值字段非负
+    @validator('duration', 'play_cnt', 'like_cnt', 'comment_cnt', 'share_cnt', 'width', 'height')
+    def validate_non_negative(cls, v, field):
+        if v < 0:
+            raise ValueError(f'{field.name} must be non-negative')
+        return v
+
+    @validator('video_url', 'cover_url')
+    def validate_url(cls, v, field):
+        if v and not (v.startswith('http://') or v.startswith('https://')):
+            raise ValueError(f'{field.name} must be a valid URL')
+        return v
+
     async def prepare(self):
         """
         异步预处理:清洗标题、补全发布时间和更新时间
         """
         # 标题清洗
-        self.video_title = await clean_title(self.video_title)
+        if self.video_title:
+            self.video_title = await clean_title(self.video_title)
 
         # 发布时间处理
-        if not self.publish_time_stamp:
+        if self.publish_time_stamp:
+            # 确保publish_time_stamp是整数类型
+            if isinstance(self.publish_time_stamp, str):
+                try:
+                    if len(self.publish_time_stamp) == 13:
+                        self.publish_time_stamp = int(self.publish_time_stamp) // 1000
+                    else:
+                        self.publish_time_stamp = int(self.publish_time_stamp)
+                except ValueError:
+                    self.publish_time_stamp = int(time.time())
+            elif isinstance(self.publish_time_stamp, int) and len(str(self.publish_time_stamp)) == 13:
+                self.publish_time_stamp = self.publish_time_stamp // 1000
+        else:
             self.publish_time_stamp = int(time.time())
 
         self.publish_time_str = time.strftime(
@@ -59,6 +85,18 @@ class VideoItem(BaseModel):
         # 更新时间戳默认当前时间
         if not self.update_time_stamp:
             self.update_time_stamp = int(time.time())
+        else:
+            # 确保update_time_stamp是整数类型
+            if isinstance(self.update_time_stamp, str):
+                try:
+                    if len(self.update_time_stamp) == 13:
+                        self.update_time_stamp = int(self.update_time_stamp) // 1000
+                    else:
+                        self.update_time_stamp = int(self.update_time_stamp)
+                except ValueError:
+                    self.update_time_stamp = int(time.time())
+            elif isinstance(self.update_time_stamp, int) and len(str(self.update_time_stamp)) == 13:
+                self.update_time_stamp = self.update_time_stamp // 1000
 
         if not self.session:
             self.session = str(f"{self.platform}_{int(time.time())}")
@@ -75,6 +113,6 @@ class VideoItem(BaseModel):
         ]
         for f in must_fields:
             if not getattr(self, f, None):
-                return False
+                return None
 
-        return self.dict()
+        return self.dict()

+ 2 - 2
core/utils/__init__.py

@@ -1,3 +1,3 @@
-# from .log.log_codes import CODES
+# from .config_manager import config_manager, get_config_manager
 #
-# __all__ = ['CODES']
+# __all__ = ['config_manager', 'get_config_manager']

+ 219 - 0
core/utils/config_documentation.py

@@ -0,0 +1,219 @@
+"""
+配置文档生成工具
+自动生成配置文件说明文档
+"""
+import yaml
+from core.utils.config_manager import get_config_manager
+from core.utils.path_utils import spiders_config_path
+
+
+class ConfigDocumentation:
+    """
+    配置文档生成工具
+    """
+    
+    def __init__(self):
+        self.config_manager = get_config_manager()
+
+    def generate_env_config_docs(self) -> str:
+        """
+        生成环境配置文档
+        """
+        docs = "# 环境配置说明\n\n"
+        docs += "环境配置通过 `.env` 文件进行配置,以下为所有可配置项:\n\n"
+        
+        env_settings_info = {
+            "ENV": {
+                "description": "运行环境",
+                "default": "prod",
+                "options": ["prod", "dev"]
+            },
+            "DB_HOST": {
+                "description": "数据库主机地址",
+                "required": True
+            },
+            "DB_PORT": {
+                "description": "数据库端口",
+                "default": 3306
+            },
+            "DB_USER": {
+                "description": "数据库用户名",
+                "required": True
+            },
+            "DB_PASSWORD": {
+                "description": "数据库密码",
+                "required": True
+            },
+            "DB_NAME": {
+                "description": "数据库名称",
+                "required": True
+            },
+            "DB_CHARSET": {
+                "description": "数据库字符集",
+                "required": True
+            },
+            "ROCKETMQ_ENDPOINT": {
+                "description": "RocketMQ接入点",
+                "required": True
+            },
+            "ROCKETMQ_ACCESS_KEY_ID": {
+                "description": "RocketMQ访问密钥ID",
+                "required": True
+            },
+            "ROCKETMQ_ACCESS_KEY_SECRET": {
+                "description": "RocketMQ访问密钥",
+                "required": True
+            },
+            "FEISHU_APPID": {
+                "description": "飞书应用ID",
+                "required": True
+            },
+            "FEISHU_APPSECRET": {
+                "description": "飞书应用密钥",
+                "required": True
+            },
+            "ALIYUN_ACCESS_KEY_ID": {
+                "description": "阿里云访问密钥ID",
+                "required": True
+            },
+            "ALIYUN_ACCESS_KEY_SECRET": {
+                "description": "阿里云访问密钥",
+                "required": True
+            },
+            "REDIS_HOST": {
+                "description": "Redis主机地址",
+                "required": True
+            },
+            "REDIS_PORT": {
+                "description": "Redis端口",
+                "default": 6379
+            },
+            "REDIS_PASSWORD": {
+                "description": "Redis密码",
+                "required": True
+            }
+        }
+        
+        docs += "| 配置项 | 描述 | 是否必填 | 默认值 |\n"
+        docs += "|--------|------|----------|--------|\n"
+        
+        for key, info in env_settings_info.items():
+            description = info.get("description", "")
+            required = "是" if info.get("required", False) else "否"
+            default = str(info.get("default", "")) if info.get("default") is not None else ""
+            options = ", ".join(info.get("options", []))
+            if options:
+                description += f" (可选值: {options})"
+                
+            docs += f"| {key} | {description} | {required} | {default} |\n"
+            
+        return docs
+
+    def generate_spider_config_docs(self) -> str:
+        """
+        生成爬虫配置文档
+        """
+        docs = "# 爬虫配置说明\n\n"
+        docs += "爬虫配置通过 `config/spiders_config.yaml` 文件进行配置。\n\n"
+        
+        # 添加配置示例
+        docs += "## 配置示例\n\n```yaml\n"
+        with open(spiders_config_path, 'r', encoding='utf-8') as f:
+            docs += f.read()
+        docs += "```\n\n"
+        
+        # 添加字段说明
+        docs += "## 字段说明\n\n"
+        
+        global_config_fields = {
+            "base_url": "基础URL,用于拼接完整请求URL",
+            "request_timeout": "请求超时时间(秒)",
+            "max_retries": "最大重试次数",
+            "headers": "请求头信息"
+        }
+        
+        platform_config_fields = {
+            "platform": "平台名称",
+            "mode": "爬取模式(如 recommend, author)",
+            "path": "API路径",
+            "url": "完整请求URL",
+            "method": "HTTP请求方法",
+            "request_body": "请求体参数",
+            "loop_times": "循环次数",
+            "loop_interval": "循环间隔(min/max)",
+            "response_parse": "响应解析配置",
+            "feishu_sheetid": "飞书表格ID"
+        }
+        
+        response_parse_fields = {
+            "data_path": "数据列表路径",
+            "next_cursor": "下一页游标路径",
+            "has_more": "是否还有更多数据路径",
+            "fields": "字段映射配置"
+        }
+        
+        docs += "### 全局配置字段\n\n"
+        docs += "| 字段 | 描述 |\n"
+        docs += "|------|------|\n"
+        for field, description in global_config_fields.items():
+            docs += f"| {field} | {description} |\n"
+            
+        docs += "\n### 平台配置字段\n\n"
+        docs += "| 字段 | 描述 |\n"
+        docs += "|------|------|\n"
+        for field, description in platform_config_fields.items():
+            docs += f"| {field} | {description} |\n"
+            
+        docs += "\n### 响应解析字段\n\n"
+        docs += "| 字段 | 描述 |\n"
+        docs += "|------|------|\n"
+        for field, description in response_parse_fields.items():
+            docs += f"| {field} | {description} |\n"
+            
+        return docs
+
+    def generate_complete_docs(self) -> str:
+        """
+        生成完整配置文档
+        """
+        docs = "# AutoScraperX 配置说明\n\n"
+        docs += "本文档详细说明了AutoScraperX项目的配置项。\n\n"
+        docs += "---\n\n"
+        docs += self.generate_env_config_docs()
+        docs += "\n---\n\n"
+        docs += self.generate_spider_config_docs()
+        docs += "\n---\n\n"
+        docs += "## 当前配置状态\n\n"
+        
+        try:
+            stats = self.config_manager.get_config_stats()
+            docs += f"- 平台配置数量: {stats['total_platforms']}\n"
+            docs += f"- 运行环境: {stats['env']}\n"
+            docs += f"- 配置文件路径: {stats['config_file']}\n"
+        except Exception as e:
+            docs += f"配置状态获取失败: {e}\n"
+            
+        return docs
+
+    def save_docs(self, filepath: str = "CONFIGURATION.md"):
+        """
+        保存文档到文件
+        """
+        docs = self.generate_complete_docs()
+        with open(filepath, 'w', encoding='utf-8') as f:
+            f.write(docs)
+        return filepath
+
+
+def generate_config_docs():
+    """
+    生成配置文档
+    """
+    doc_generator = ConfigDocumentation()
+    filepath = doc_generator.save_docs()
+    print(f"配置文档已保存到: {filepath}")
+    return filepath
+
+
+if __name__ == "__main__":
+    generate_config_docs()

+ 184 - 0
core/utils/config_health_check.py

@@ -0,0 +1,184 @@
+"""
+配置健康检查工具
+用于验证配置文件的完整性和正确性
+"""
+import sys
+from typing import List, Dict, Any
+from core.utils.config_manager import get_config_manager
+from core.utils.spider_config import SpiderConfig
+from config import settings
+
+
+class ConfigHealthCheck:
+    """
+    配置健康检查工具
+    """
+    
+    def __init__(self):
+        self.config_manager = get_config_manager()
+        self.errors = []
+        self.warnings = []
+
+    def check_env_config(self) -> bool:
+        """
+        检查环境配置
+        """
+        try:
+            # 检查必要配置是否存在
+            required_settings = [
+                'DB_HOST', 'DB_USER', 'DB_PASSWORD', 'DB_NAME',
+                'ROCKETMQ_ENDPOINT', 'ROCKETMQ_ACCESS_KEY_ID', 'ROCKETMQ_ACCESS_KEY_SECRET',
+                'FEISHU_APPID', 'FEISHU_APPSECRET',
+                'ALIYUN_ACCESS_KEY_ID', 'ALIYUN_ACCESS_KEY_SECRET',
+                'REDIS_HOST', 'REDIS_PASSWORD'
+            ]
+            
+            for setting in required_settings:
+                if not getattr(settings, setting, None):
+                    self.errors.append(f"环境配置缺失: {setting}")
+            
+            # 检查URL格式
+            url_settings = ['ROCKETMQ_ENDPOINT']
+            for setting in url_settings:
+                url = getattr(settings, setting, None)
+                if url and not isinstance(url, str):
+                    self.errors.append(f"URL配置格式错误: {setting}")
+            
+            return len(self.errors) == 0
+            
+        except Exception as e:
+            self.errors.append(f"环境配置检查异常: {str(e)}")
+            return False
+
+    def check_spider_configs(self) -> bool:
+        """
+        检查所有爬虫配置
+        """
+        try:
+            platforms = self.config_manager.list_platforms()
+            if not platforms:
+                self.warnings.append("未找到任何平台配置")
+                return True
+                
+            valid_count = 0
+            for platform in platforms:
+                try:
+                    config = self.config_manager.get_platform_config(platform)
+                    # 验证配置字段
+                    if not config.platform:
+                        self.errors.append(f"平台 {platform} 缺少 platform 字段")
+                    if not config.mode:
+                        self.errors.append(f"平台 {platform} 缺少 mode 字段")
+                    if not config.url:
+                        self.errors.append(f"平台 {platform} 缺少 url 字段")
+                    valid_count += 1
+                except Exception as e:
+                    self.errors.append(f"平台 {platform} 配置验证失败: {str(e)}")
+            
+            return len(self.errors) == 0
+            
+        except Exception as e:
+            self.errors.append(f"爬虫配置检查异常: {str(e)}")
+            return False
+
+    def check_file_permissions(self) -> bool:
+        """
+        检查配置文件权限
+        """
+        import os
+        from core.utils.path_utils import spiders_config_path
+        
+        try:
+            # 检查爬虫配置文件是否存在
+            if not os.path.exists(spiders_config_path):
+                self.errors.append(f"爬虫配置文件不存在: {spiders_config_path}")
+                return False
+                
+            # 检查文件是否可读
+            if not os.access(spiders_config_path, os.R_OK):
+                self.errors.append(f"爬虫配置文件不可读: {spiders_config_path}")
+                
+            return len(self.errors) == 0
+            
+        except Exception as e:
+            self.errors.append(f"文件权限检查异常: {str(e)}")
+            return False
+
+    def run_all_checks(self) -> Dict[str, Any]:
+        """
+        运行所有检查
+        """
+        self.errors.clear()
+        self.warnings.clear()
+        
+        env_ok = self.check_env_config()
+        spider_ok = self.check_spider_configs()
+        file_ok = self.check_file_permissions()
+        
+        overall_ok = env_ok and spider_ok and file_ok
+        
+        return {
+            "success": overall_ok,
+            "errors": self.errors.copy(),
+            "warnings": self.warnings.copy(),
+            "details": {
+                "env_config": env_ok,
+                "spider_configs": spider_ok,
+                "file_permissions": file_ok
+            }
+        }
+
+    def print_report(self):
+        """
+        打印健康检查报告
+        """
+        result = self.run_all_checks()
+        
+        print("=" * 50)
+        print("配置健康检查报告")
+        print("=" * 50)
+        
+        if result["success"]:
+            print("✓ 所有配置检查通过")
+        else:
+            print("✗ 配置存在问题")
+            
+        print(f"\n详细信息:")
+        print(f"  环境配置: {'✓' if result['details']['env_config'] else '✗'}")
+        print(f"  爬虫配置: {'✓' if result['details']['spider_configs'] else '✗'}")
+        print(f"  文件权限: {'✓' if result['details']['file_permissions'] else '✗'}")
+        
+        if result["warnings"]:
+            print(f"\n警告:")
+            for warning in result["warnings"]:
+                print(f"  - {warning}")
+                
+        if result["errors"]:
+            print(f"\n错误:")
+            for error in result["errors"]:
+                print(f"  - {error}")
+        
+        print("\n统计信息:")
+        try:
+            stats = self.config_manager.get_config_stats()
+            print(f"  平台数量: {stats['total_platforms']}")
+            print(f"  运行环境: {stats['env']}")
+        except Exception as e:
+            print(f"  统计信息获取失败: {e}")
+            
+        print("=" * 50)
+        
+        return result
+
+
+def run_health_check():
+    """
+    运行配置健康检查
+    """
+    checker = ConfigHealthCheck()
+    return checker.print_report()
+
+
+if __name__ == "__main__":
+    result = run_health_check()
+    sys.exit(0 if result["success"] else 1)

+ 129 - 0
core/utils/config_manager.py

@@ -0,0 +1,129 @@
+"""
+配置管理服务
+统一管理环境配置和爬虫配置
+"""
+import json
+from typing import Dict, Any, Optional
+from core.utils.spider_config import SpiderConfig
+from core.models.spiders_config_models import PlatformConfig
+
+
+class ConfigManager:
+    """
+    统一配置管理器
+    提供对环境配置和爬虫配置的统一访问接口
+    """
+    
+    def __init__(self):
+        # 延迟导入settings以避免循环导入
+        from config import settings
+        self._env_settings = settings
+        self._spider_config = SpiderConfig
+
+    @property
+    def env_settings(self):
+        """
+        获取环境配置
+        """
+        return self._env_settings
+
+    def get_platform_config(self, platform_name: str) -> PlatformConfig:
+        """
+        获取平台爬虫配置
+        """
+        return self._spider_config.get_platform_config(platform_name)
+
+    def list_platforms(self) -> list:
+        """
+        获取所有平台列表
+        """
+        return self._spider_config.list_all_platforms()
+
+    def reload_spider_configs(self):
+        """
+        重新加载爬虫配置
+        """
+        self._spider_config.reload_config()
+
+    def get_config_stats(self) -> Dict[str, Any]:
+        """
+        获取配置统计信息
+        """
+        stats = self._spider_config.get_config_stats()
+        stats["env"] = self._env_settings.ENV
+        return stats
+
+    def validate_platform_config(self, platform_name: str) -> bool:
+        """
+        验证平台配置是否有效
+        """
+        try:
+            self.get_platform_config(platform_name)
+            return True
+        except Exception:
+            return False
+
+    def export_configs(self) -> Dict[str, Any]:
+        """
+        导出所有配置信息(用于调试和监控)
+        """
+        return {
+            "env_settings": {
+                "env": self._env_settings.ENV,
+                "log_level": self._env_settings.LOG_LEVEL,
+                "db_host": self._env_settings.DB_HOST,
+                "rocketmq_endpoint": str(self._env_settings.ROCKETMQ_ENDPOINT),
+                # 不包含敏感信息如密码、密钥等
+            },
+            "spider_configs": self.list_platforms(),
+            "stats": self.get_config_stats()
+        }
+
+    def get_platform_configs_summary(self) -> Dict[str, Dict[str, Any]]:
+        """
+        获取所有平台配置摘要信息
+        """
+        platforms = self.list_platforms()
+        summary = {}
+        
+        for platform in platforms:
+            try:
+                config = self.get_platform_config(platform)
+                summary[platform] = {
+                    "platform": config.platform,
+                    "mode": config.mode,
+                    "method": config.method,
+                    "url": str(config.url),
+                    "loop_times": config.loop_times,
+                }
+            except Exception as e:
+                summary[platform] = {
+                    "error": str(e)
+                }
+                
+        return summary
+
+    async def reload_configs_runtime(self):
+        """
+        运行时重新加载配置(支持不重启服务的情况下重新加载配置)
+        这个方法可以在接收到特定信号或API调用时被调用
+        """
+        try:
+            # 重新加载爬虫配置
+            self.reload_spider_configs()
+            return True
+        except Exception as e:
+            # 记录错误日志
+            print(f"运行时重新加载配置失败: {e}")
+            return False
+
+
+# 全局配置管理器实例
+config_manager = ConfigManager()
+
+
+def get_config_manager() -> ConfigManager:
+    """
+    获取配置管理器实例
+    """
+    return config_manager

+ 17 - 11
core/utils/extractors.py

@@ -1,9 +1,9 @@
-from typing import Dict
+from typing import Dict, Any
 
 from jsonpath_ng import parse
 
 
-def safe_extract(json_obj, path, default=None):
+def safe_extract(json_obj: Any, path: str, default=None):
     """
     安全提取单个字段值,返回匹配到的第一个,否则返回默认值。
 
@@ -12,17 +12,22 @@ def safe_extract(json_obj, path, default=None):
     :param default: 提取失败时返回的默认值
     :return: 提取结果或默认值
     """
+    # 处理空对象或None的情况
+    if json_obj is None:
+        return default
+        
     try:
         jsonpath_expr = parse(path)
         match = jsonpath_expr.find(json_obj)
         if match:
             return match[0].value
     except Exception as e:
+        # 记录错误但不中断程序执行
         print(f"[extractor] Error extracting {path}: {e}")
     return default
 
 
-def extract_multiple(json_obj, fields: dict) -> dict:
+def extract_multiple(json_obj: Any, fields: dict) -> dict:
     """
     根据字段配置提取多个字段。
 
@@ -33,20 +38,21 @@ def extract_multiple(json_obj, fields: dict) -> dict:
     return {key: safe_extract(json_obj, path) for key, path in fields.items()}
 
 
-def extract_fields(video: Dict, field_map: Dict, logger=None, trace_id=None,aliyun_log=None) -> Dict:
+def extract_fields(video: Dict, field_map: Dict, logger=None, trace_id=None, aliyun_log=None) -> Dict:
     result = {}
     for field, path in field_map.items():
+        # 如果path不是字符串或不是以$开头的jsonpath,则直接使用值
         if not isinstance(path, str) or not path.startswith("$"):
             result[field] = path
             continue
         value = safe_extract(video, path)
         if value is None and logger:
             logger.warning(f"字段提取失败: {field} 路径: {path}")
-            aliyun_log.logging(
-                code="9024",
-                message=f"字段提取失败: {field} 路径: {path}",
-                data={"video": video}
-
-            )
+            if aliyun_log:
+                aliyun_log.logging(
+                    code="9024",
+                    message=f"字段提取失败: {field} 路径: {path}",
+                    data={"video": video}
+                )
         result[field] = value
-    return result
+    return result

+ 7 - 5
core/utils/helpers.py

@@ -19,14 +19,16 @@ async def get_title_filter_word() -> List[str]:
         feishu_data = await feishu.get_values(spreadsheet_token=spreadsheet_token, sheet_id=sheet_id)
         return feishu_data[1]
 
-async def generate_titles(sheet_id: str,video_obj: Dict):
+async def generate_titles(sheet_id: str,video_obj: Dict,logger,aliyun_log):
     title_list = await get_title_filter_word()
     title = video_obj.get("title")
     if not title:
         return
     contains_keyword = any(keyword in title for keyword in title_list)
+    logger.info(f"【{title}】标题包含过滤关键词:{contains_keyword}")
     if contains_keyword:
         new_title = await GPT4oMini.get_ai_mini_title(title)
+        logger.info(f"生成新的标题:{new_title}")
         current_time = datetime.now()
         formatted_time = current_time.strftime("%Y-%m-%d %H:%M:%S")
         values = [
@@ -36,10 +38,10 @@ async def generate_titles(sheet_id: str,video_obj: Dict):
                 new_title,
                 formatted_time,
         ]
-        await insert_feishu_data(sheet_id, values)
+        await insert_safe_data(sheet_id, values)
 
-async def insert_feishu_data(sheet_id: str,values: List):
-    spreadsheet_token = "KsoMsyP2ghleM9tzBfmcEEXBnXg"
+async def insert_safe_data(sheet_id: str, values: List):
+    spreadsheet_token = "U5dXsSlPOhiNNCtEfgqcm1iYnpf"
     async with FeishuDataAsync() as feishu:
         await feishu.insert_values(spreadsheet_token=spreadsheet_token, sheet_id=sheet_id,ranges="A2:Z2",values=values)
 
@@ -47,5 +49,5 @@ async def insert_feishu_data(sheet_id: str,values: List):
 
 
 if __name__ == '__main__':
-     filter_word = asyncio.run(insert_feishu_data())
+     filter_word = asyncio.run(get_title_filter_word())
      print(filter_word)

+ 16 - 5
core/utils/request_preparer.py

@@ -1,9 +1,7 @@
 import loguru
-
-from core.utils.extractors import safe_extract
 from typing import Dict, Any
 
-
+from core.utils.extractors import safe_extract
 
 
 class RequestPreparer:
@@ -21,7 +19,7 @@ class RequestPreparer:
         :param logger: 可选 logger
         :param aliyun_log: 可选阿里云日志实例
         """
-        self.response_parse_config = response_parse_config
+        self.response_parse_config = response_parse_config or {}
         self.logger = logger or loguru.logger
         self.aliyun_log = aliyun_log
 
@@ -29,17 +27,30 @@ class RequestPreparer:
         """
         根据 request_body_config 和上次响应 response_data,返回可直接请求接口的 request_body
         """
+        if not request_body_config:
+            return {}
+            
         prepared_body = {}
         for key, value in request_body_config.items():
             if isinstance(value, str) and "{{" in value and "}}" in value:
-                var_name = value.strip("{}").split("|")[0]  # 支持后续扩展默认值
+                # 提取变量名(支持后续扩展默认值)
+                var_name = value.strip("{}").split("|")[0]
                 jsonpath_expr = self.response_parse_config.get(var_name)
+                
                 if jsonpath_expr:
                     extracted_value = safe_extract(response_data, jsonpath_expr, default="")
                     prepared_body[key] = extracted_value
+                    
+                    # 记录提取信息(仅在有日志记录器时)
+                    if extracted_value == "" and self.logger:
+                        self.logger.debug(f"变量 {var_name} 提取结果为空,使用默认值")
                 else:
                     # response_parse_config 中未配置路径,默认空字符串
                     prepared_body[key] = ""
+                    
+                    # 记录警告信息
+                    if self.logger:
+                        self.logger.warning(f"未在response_parse_config中找到变量 {var_name} 的路径配置")
             else:
                 prepared_body[key] = value
         return prepared_body

+ 35 - 1
core/utils/spider_config.py

@@ -5,6 +5,7 @@ import yaml
 from core.utils.path_utils import spiders_config_path
 from core.models.spiders_config_models import PlatformConfig
 
+
 class SpiderConfig:
     _config = None
     _config_path = spiders_config_path
@@ -17,6 +18,8 @@ class SpiderConfig:
         """
         if not os.path.exists(cls._config_path):
             raise FileNotFoundError(f"[配置错误] 找不到配置文件: {cls._config_path}")
+            
+        # 检查文件是否修改过
         with open(cls._config_path, "r", encoding="utf-8") as f:
             cls._config = yaml.safe_load(f)
 
@@ -45,9 +48,40 @@ class SpiderConfig:
         # 使用 pydantic 进行验证
         try:
             return PlatformConfig(**merged)
-        except ValueError as e:
+        except Exception as e:
             raise ValueError(f"[配置错误] 平台 {classname} 的配置验证失败: {e}")
 
+    @classmethod
+    def reload_config(cls):
+        """
+        强制重新加载配置文件
+        """
+        cls._config = None
+        cls._load_yaml()
+
+    @classmethod
+    def list_all_platforms(cls):
+        """
+        获取所有平台配置名称列表
+        """
+        if cls._config is None:
+            cls._load_yaml()
+        platforms = [key for key in cls._config.keys() if key != "default"]
+        return platforms
+
+    @classmethod
+    def get_config_stats(cls):
+        """
+        获取配置统计信息
+        """
+        if cls._config is None:
+            cls._load_yaml()
+        return {
+            "total_platforms": len(cls.list_all_platforms()),
+            "last_modified": os.path.getmtime(cls._config_path) if os.path.exists(cls._config_path) else 0,
+            "config_file": cls._config_path
+        }
+
 
 # 示例使用
 if __name__ == '__main__':

+ 1 - 0
scripts/__init__.py

@@ -0,0 +1 @@
+# scripts package

+ 110 - 0
scripts/config_cli.py

@@ -0,0 +1,110 @@
+#!/usr/bin/env python3
+"""
+配置管理命令行工具
+提供配置检查、验证、文档生成等功能
+"""
+import argparse
+import sys
+from core.utils.config_health_check import run_health_check
+from core.utils.config_documentation import generate_config_docs
+from core.utils.config_manager import get_config_manager
+
+
+def list_platforms():
+    """列出所有平台配置"""
+    config_manager = get_config_manager()
+    platforms = config_manager.list_platforms()
+    
+    print("平台配置列表:")
+    print("-" * 30)
+    for platform in platforms:
+        print(f"  - {platform}")
+    print(f"\n总计: {len(platforms)} 个平台")
+
+
+def show_platform_config(platform_name):
+    """显示特定平台的配置详情"""
+    config_manager = get_config_manager()
+    try:
+        config = config_manager.get_platform_config(platform_name)
+        print(f"平台 '{platform_name}' 配置详情:")
+        print("-" * 30)
+        print(f"平台名称: {config.platform}")
+        print(f"模式: {config.mode}")
+        print(f"URL: {config.url}")
+        print(f"方法: {config.method}")
+        print(f"请求超时: {config.request_timeout}")
+        print(f"最大重试次数: {config.max_retries}")
+        print(f"循环次数: {config.loop_times}")
+        print(f"飞书表格ID: {config.feishu_sheetid}")
+        
+        if config.request_body:
+            print("请求体:")
+            for key, value in config.request_body.items():
+                print(f"  {key}: {value}")
+                
+        if config.response_parse:
+            print("响应解析配置:")
+            for key, value in config.response_parse.items():
+                print(f"  {key}: {value}")
+    except Exception as e:
+        print(f"获取平台配置失败: {e}")
+        sys.exit(1)
+
+
+def show_stats():
+    """显示配置统计信息"""
+    config_manager = get_config_manager()
+    try:
+        stats = config_manager.get_config_stats()
+        print("配置统计信息:")
+        print("-" * 30)
+        print(f"平台数量: {stats['total_platforms']}")
+        print(f"运行环境: {stats['env']}")
+        print(f"配置文件: {stats['config_file']}")
+        print(f"最后修改时间: {stats['last_modified']}")
+    except Exception as e:
+        print(f"获取统计信息失败: {e}")
+        sys.exit(1)
+
+
+def main():
+    parser = argparse.ArgumentParser(description="AutoScraperX 配置管理工具")
+    subparsers = parser.add_subparsers(dest="command", help="可用命令")
+    
+    # 健康检查命令
+    subparsers.add_parser("check", help="检查配置健康状态")
+    
+    # 文档生成命令
+    subparsers.add_parser("docs", help="生成配置文档")
+    
+    # 列出平台命令
+    subparsers.add_parser("list", help="列出所有平台配置")
+    
+    # 显示统计信息命令
+    subparsers.add_parser("stats", help="显示配置统计信息")
+    
+    # 显示平台配置详情命令
+    show_parser = subparsers.add_parser("show", help="显示平台配置详情")
+    show_parser.add_argument("platform", help="平台名称")
+    
+    args = parser.parse_args()
+    
+    if args.command == "check":
+        result = run_health_check()
+        sys.exit(0 if result["success"] else 1)
+    elif args.command == "docs":
+        filepath = generate_config_docs()
+        print(f"配置文档已生成: {filepath}")
+    elif args.command == "list":
+        list_platforms()
+    elif args.command == "stats":
+        show_stats()
+    elif args.command == "show":
+        show_platform_config(args.platform)
+    else:
+        parser.print_help()
+
+
+if __name__ == "__main__":
+    main()

+ 2 - 2
services/async_mysql_service.py

@@ -161,7 +161,7 @@ class AsyncMysqlService:
         return result["cnt"] if result else 0
 
     async def get_xng_mid(self) -> int:
-        sql = """select link from crawler_user_v3 where task_id=21;"""
+        sql = """select uid,link,nick_name from crawler_user_v3 where task_id=21;"""
         result = await self.fetch_all(sql)
         return result if result else 0
 
@@ -178,7 +178,7 @@ async def get_db_service(platform: Optional[str] = None, mode: Optional[str] = N
 async def demo_usage():
     # 方式一:platform和mode为None,使用默认值"system"
     async with AsyncMysqlService() as default_service:
-        users = await default_service.get_user_list(8)
+        users = await default_service.get_user_list(21)
         print(f"系统配置用户数: {users}")
 
     async with AsyncMysqlService() as default_service:

+ 37 - 28
services/pipeline.py

@@ -2,10 +2,9 @@ import os
 import re
 import sys
 import time
-from datetime import datetime
-
-sys.path.append(os.getcwd())
+from datetime import datetime, timezone
 
+from core.models.rule_models import RuleModel
 from core.utils.feishu_data_async import FeishuDataAsync
 from core.utils.log.logger_manager import LoggerManager
 from services.async_mysql_service import AsyncMysqlService
@@ -25,11 +24,19 @@ class PiaoQuanPipeline:
         self.trace_id = trace_id
         self.account = account
 
+        # 使用Pydantic模型验证规则字典
+        try:
+            self.validated_rules = RuleModel(**rule_dict)
+        except Exception as e:
+            LoggerManager.get_logger(platform=platform, mode=mode).warning(f"规则验证失败: {e}")
+            self.validated_rules = None
+
         self.mysql = AsyncMysqlService(platform=platform, mode=mode)
         self.logger = LoggerManager.get_logger(platform=platform, mode=mode)
         self.aliyun_log = LoggerManager.get_aliyun_logger(platform=platform, mode=mode)
         self.feishu_spreadsheet_token = "KsoMsyP2ghleM9tzBfmcEEXBnXg"
-
+        self.test_account = [58528285, 58527674, 58528085, 58527582, 58527601, 58527612, 58528281, 58528095, 58527323,
+                             58528071, 58527278]
     async def feishu_time_list(self):
         async with FeishuDataAsync() as feishu_data:
             summary = await feishu_data.get_values(
@@ -52,28 +59,18 @@ class PiaoQuanPipeline:
                 return row[1]
         return None
 
-    async def title_restricted_words(self):
-        async with FeishuDataAsync() as feishu_data:
-            summary = await feishu_data.get_values(
-                spreadsheet_token=self.feishu_spreadsheet_token,
-                sheet_id="BS9uyu"
-            )
-        for row in summary[1:]:
-            if row[0] == self.platform:
-                return row[1]
-        return None
 
     async def publish_time_flag(self) -> bool:
-        publish_ts = self.item.get("publish_time_stamp", int(time.time()))
-        update_ts = self.item.get("update_time_stamp", int(time.time()))
+        publish_ts = self.item.get("publish_time_stamp")
+        update_ts = self.item.get("update_time_stamp")
 
         max_d = self.rule_dict.get("period", {}).get("max", 1000)
         min_d = self.rule_dict.get("period", {}).get("min", 1000)
         days = max(max_d, min_d)
 
-        feishu_days = await self.feishu_time_list()
-        if feishu_days:
-            days = int(feishu_days)
+        # feishu_days = await self.feishu_time_list()
+        # if feishu_days:
+        #     days = int(feishu_days)
 
         now_ts = int(time.time())
 
@@ -96,8 +93,9 @@ class PiaoQuanPipeline:
                 )
                 return False
         else:
-            if days == 0:
-                is_today = datetime.fromtimestamp(publish_ts).date() == datetime.today().date()
+            if days == 0 or (self.platform == "xiaoniangao" and self.item["out_user_id"] in self.test_account) :
+                # 使用UTC时间进行比较,避免时区问题
+                is_today = datetime.fromtimestamp(publish_ts, tz=timezone.utc).date() == datetime.now(timezone.utc).date()
                 if not is_today:
                     msg = "[发布时间] 不在今日"
                     self.logger.warning(msg)
@@ -131,12 +129,23 @@ class PiaoQuanPipeline:
         return True
 
     def title_flag(self) -> bool:
+        """
+        标题敏感词过滤
+        :return:
+        """
         title = self.item.get("video_title", "")
-        cleaned_title = re.sub(r"[^\w]", " ", title)
-        sensitive_words = []  # 可配置敏感词列表
-
+        if not title:
+            return True
+            
+        # 清理标题,移除空白字符
+        cleaned_title = re.sub(r"\s+", " ", title).strip()
+        
+        # 异步获取敏感词列表
+        sensitive_words = []  # 这里应该从飞书表格或其他配置源获取敏感词
+        
+        # 检查是否包含敏感词
         for word in sensitive_words:
-            if word in cleaned_title:
+            if word and word in cleaned_title:
                 msg = f"[标题包含敏感词] {word} in {title}"
                 self.logger.warning(msg)
                 self.aliyun_log.logging(
@@ -200,14 +209,14 @@ class PiaoQuanPipeline:
             "laonianshenghuokuaile", "laonianquan"
         }
 
-        if self.platform in bypass_platforms or (self.platform, self.mode) in {
+        if self.platform in bypass_platforms or (self.platform, self.mode) in [
             ("zhuwanwufusunew", "recommend"),
             ("jixiangxingfu", "recommend"),
             ("yuannifuqichangzai", "recommend"),
             ("benshanzhufu", "recommend"),
             ("zuihaodesongni", "recommend"),
             ("tiantianjufuqi", "recommend")
-        }:
+        ]:
             self.logger.info("[去重] 平台配置无需去重,直接通过")
             return True
 
@@ -281,4 +290,4 @@ class PiaoQuanPipeline:
             self.logger.info("校验结束: 下载规则不符合")
             return False
         self.logger.info("校验结束: 全部通过")
-        return True
+        return True

+ 18 - 13
spiders/authorspider.py

@@ -14,6 +14,7 @@ class AuthorSpider(BaseSpider):
         self.user_list_from_db = []  # 数据库用户列表
         self.current_user_index = 0  # 当前用户索引
         self.current_cursor = "" # 当前分页游标(初始为空)
+        self.next_cursor_last = ""
 
 
     async def before_run(self):
@@ -22,6 +23,8 @@ class AuthorSpider(BaseSpider):
         if not self.user_list_from_db:
             self.logger.warning("用户列表为空,终止账号模式")
         self.logger.info(f"{self.platform}获取用户列表完成,共 {len(self.user_list_from_db)} 个用户")
+
+
     async def core_loop(self):
         """核心循环:处理每个用户的视频"""
         async with aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=self.timeout)) as session:
@@ -31,9 +34,9 @@ class AuthorSpider(BaseSpider):
                     return
                 # 当前用户
                 user = self.user_list_from_db[self.current_user_index]
-                user_uid = user.get("uid")  # 数据库中的uid字段
+                crawler_user_uid = user.get("link")  # 数据库中的link字段
                 self.logger.info(
-                    f"处理用户 uid={user_uid}(第{self.current_user_index + 1}个),"
+                    f"处理用户 uid={crawler_user_uid}(第{self.current_user_index + 1}个),"
                     f"当前cursor: {self.current_cursor or '0'}"
                 )
 
@@ -41,17 +44,17 @@ class AuthorSpider(BaseSpider):
                 request_body = self._build_request_body(user)
 
                 # 获取当前用户视频
-                hase_more,raw_data = await self.crawl_user_videos(session, request_body, user_uid)
-                if not hase_more:
-                    self.logger.info(f"用户 {user_uid} 第{int(self.current_cursor or 0) + 1}页无更多视频")
+                raw_data = await self.crawl_user_videos(session, request_body, crawler_user_uid)
                 if not raw_data:
                     # 切换到下一个用户
                     self.current_user_index += 1
                     continue
                 # 处理数据
-                await self.process_raw_data(raw_data)
+                if self.platform == "xiaoniangao":
+                    self.user_list = [user]
+                await self.process_data(raw_data)
                 if self.current_user_index == len(self.user_list_from_db)-1:
-                    self.current_cursor = str(int(self.current_cursor or 0) + 1)
+                    self.current_cursor =  self.next_cursor_last
                     self.current_user_index = 0
                     continue
                 self.current_user_index += 1
@@ -59,10 +62,10 @@ class AuthorSpider(BaseSpider):
 
 
     def _build_request_body(self, user: Dict) -> Dict:
-        """构建请求体:将用户uid和当前cursor注入"""
+        """构建请求体:将用户link和当前cursor注入"""
         # 准备"虚拟数据",键名对应你的配置路径($.uid 和 $.cursor)
         virtual_data = {
-            "uid": str(user.get("uid")),  # 对应配置中的 $.uid
+            "uid": str(user.get("link")),  # 对应配置中的 $.uid
             "cursor": self.current_cursor  # 对应配置中的 $.cursor
         }
 
@@ -84,13 +87,15 @@ class AuthorSpider(BaseSpider):
             headers=self.headers,
             json=request_body
         )
-        has_more = safe_extract(response,self.has_more)
+        # has_more = safe_extract(response,self.has_more)
         # 解析用户视频列表
         data_list = safe_extract(response, self.data_path)
+        if safe_extract(response, self.next_cursor):
+           self.next_cursor_last = safe_extract(response, self.next_cursor)
         if not data_list:
-            self.logger.info(f"用户 {user_uid} 第{self.current_cursor or 0}页无视频数据")
-            return None, None
-        return has_more, data_list
+            self.logger.info(f"用户 {user_uid} 无更多视频数据")
+            return None
+        return data_list
 
     async def fetch_detail(self, item: Dict) -> Dict:
         """账号模式:补充视频详情(子类自行实现)"""

+ 53 - 19
spiders/basespider.py

@@ -19,23 +19,27 @@ from services.async_mq_producer import AsyncMQProducer
 
 class BaseSpider(ABC):
     """通用爬虫基类"""
-
-    def __init__(self, rule_dict: Dict, user_list: List, env: str = "prod"):
+    
+    def __init__(self, rule_dict: Dict, user_list: List, env: str = "prod",
+                 request_client: AsyncRequestClient = None,
+                 db_service: AsyncMysqlService = None,
+                 mq_producer: AsyncMQProducer = None):
         self.rule_dict = rule_dict
         self.user_list = user_list
         self.env = env
         self.class_name = self.__class__.__name__.lower()
-        print(self.class_name)
 
         # 初始化核心组件
         self._setup_configuration()
         self._setup_logging()
-        self._setup_services()
+        self._setup_services(request_client, db_service, mq_producer)
         self._setup_state()
 
         # 通用状态
         self.total_success = 0
         self.total_fail = 0
+        self.video = None
+
 
     def _setup_configuration(self):
         self.platform_config = SpiderConfig.get_platform_config(classname=self.class_name)
@@ -51,6 +55,7 @@ class BaseSpider(ABC):
         self.data_path = self.response_parse_config.get("data_path")
         self.has_more = self.response_parse_config.get("has_more")
         self.field_map = self.response_parse_config.get("fields", {})
+        self.next_cursor = self.response_parse_config.get("next_cursor") or ""
         self.loop_times = self.platform_config.loop_times or 100
         self.loop_interval = self.platform_config.loop_interval or {"min": 2, "max": 5}
         self.timeout = self.platform_config.request_timeout or 30
@@ -62,11 +67,23 @@ class BaseSpider(ABC):
         self.aliyun_log = LoggerManager.get_aliyun_logger(platform=self.platform, mode=self.mode)
         self.logger.info(f"爬虫 '{self.platform}/{self.mode}' 初始化...")
 
-    def _setup_services(self):
-        self.request_client = AsyncRequestClient(logger=self.logger, aliyun_log=self.aliyun_log)
-        self.db_service = AsyncMysqlService(platform=self.platform, mode=self.mode)
-        self.mq_producer = AsyncMQProducer(topic_name="topic_crawler_etl_prod_v2", platform=self.platform, mode=self.mode)
-
+    def _setup_services(self, request_client: AsyncRequestClient = None,
+                        db_service: AsyncMysqlService = None,
+                        mq_producer: AsyncMQProducer = None):
+        """初始化服务组件"""
+        self.request_client = request_client or AsyncRequestClient(
+            logger=self.logger,
+            aliyun_log=self.aliyun_log
+        )
+        self.db_service = db_service or AsyncMysqlService(
+            platform=self.platform,
+            mode=self.mode
+        )
+        self.mq_producer = mq_producer or AsyncMQProducer(
+            topic_name="topic_crawler_etl_prod_v2",
+            platform=self.platform,
+            mode=self.mode
+        )
     def _setup_state(self):
         self.last_response_data = {}
         self.request_preparer = RequestPreparer(
@@ -99,9 +116,9 @@ class BaseSpider(ABC):
         return item
 
     # 通用数据处理流程
-    async def process_raw_data(self, raw_data: List[Dict]):
+    async def process_data(self, video_data: List[Dict]):
         """处理原始数据列表(清洗→过滤→推送)"""
-        for item in raw_data:
+        for item in video_data:
             try:
                 # 补充详情(完全由子类实现)
                 detail_data = await self.fetch_detail(item)
@@ -117,6 +134,7 @@ class BaseSpider(ABC):
 
     async def process_and_push_video(self, video: Dict[str, Any]) -> bool:
         try:
+            self.video = video
             video_obj = await self.process_video(video)
             if not video_obj:
                 return False
@@ -128,17 +146,28 @@ class BaseSpider(ABC):
             self.logger.exception(f"视频处理异常: {e}")
             return False
 
+    async def publish_video_user(self) -> Dict[str, Any]:
+        """获取随机发布用户"""
+        if self.user_list:
+            return random.choice(self.user_list)
+        else:
+            self.logger.error("未获取到用户列表数据")
+            return None
+
+
     async def process_video(self, video: Dict) -> Optional[Dict]:
         """
         字段映射
         统一字段抽取及 VideoItem 初始化
         """
         self.logger.info(f"处理视频数据: {video}")
-        if self.user_list:
-            publish_user = random.choice(self.user_list)
-        else:
-            self.logger.error(f"未获取到用户列表数据{self.user_list}")
-            return
+        publish_user = await self.publish_video_user()
+        
+        # 检查是否成功获取到发布用户
+        if not publish_user:
+            self.logger.error("无法获取发布用户信息")
+            return None
+            
         item_kwargs = extract_fields(video, self.field_map, logger=self.logger, aliyun_log=self.aliyun_log)
         item_kwargs.update({
             "user_id": publish_user.get("uid"),
@@ -150,7 +179,7 @@ class BaseSpider(ABC):
             item = VideoItem(**item_kwargs)
             video_dict = await item.produce_item()
             if not video_dict:
-                self.logger.warning(f"VideoItem 校验失败")
+                self.logger.warning("VideoItem 校验失败")
                 return None
             return video_dict
         except Exception as e:
@@ -176,7 +205,8 @@ class BaseSpider(ABC):
         """
           钩子函数:可在此实现自动生成标题或其他业务逻辑
         """
-        await generate_titles(self.feishu_sheetid, video)
+        # 视频标题处理生成
+        await generate_titles(self.feishu_sheetid, video,self.logger,self.aliyun_log)
 
     async def push_to_etl(self, video: Dict) -> bool:
         try:
@@ -214,7 +244,11 @@ class BaseSpider(ABC):
         return True
 
     async def wait(self):
-        wait_time = random.randint(self.loop_interval["min"], self.loop_interval["max"])
+        """等待随机时间间隔"""
+        # 确保loop_interval包含min和max键
+        min_time = self.loop_interval.get("min", 1)
+        max_time = self.loop_interval.get("max", 5)
+        wait_time = random.randint(min_time, max_time)
         self.logger.info(f"等待 {wait_time} 秒后继续")
         await asyncio.sleep(wait_time)
 

+ 13 - 4
spiders/recommendspider.py

@@ -15,7 +15,8 @@ class RecommendSpider(BaseSpider):
                 # 检查数量限制
                 self.logger.info(f"检测{self.platform}当日入库视频量")
                 if not await self.is_video_count_sufficient():
-                   return
+                    return
+                    
                 # 获取推荐列表数据
                 self.logger.info(f"开始获取{self.platform}推荐列表数据")
                 raw_data = await self.crawl_data(session)
@@ -23,8 +24,10 @@ class RecommendSpider(BaseSpider):
                     self.logger.info("视频列表为空,开始下次请求")
                     await self.wait()
                     continue
+                    
                 # 处理数据
-                await self.process_raw_data(raw_data)
+                await self.process_data(raw_data)
+                
                 # 等待下一轮
                 await self.wait()
 
@@ -40,10 +43,16 @@ class RecommendSpider(BaseSpider):
         )
 
         self.last_response_data = response
+        
         # 解析推荐列表
+        if not response:
+            self.logger.warning("接口响应为空")
+            return None
+            
         data_list = safe_extract(response, self.data_path)
         if not data_list:
             self.logger.info(f"接口返回视频列表为空: {response}")
             self.aliyun_log.logging(code="9021", message="接口返回视频列表为空", data=response)
-            return
-        return data_list
+            return None
+            
+        return data_list

+ 3 - 9
spiders/xiaoniangao_author.py

@@ -5,16 +5,13 @@ from spiders.authorspider import AuthorSpider
 
 
 class XiaoniangaoAuthor(AuthorSpider):
+
     async def fetch_user_list(self) -> List[Dict]:
         """获取待爬取的用户列表(从数据库)"""
-        datas =await self.db_service.get_xng_mid()
-        datas = [{"uid":data["link"]} for data in datas]
+        datas = await self.db_service.get_xng_mid()
         return datas
 
 
-
-
-
 async def main():
     rule_dict = {"videos_cnt":{"min":1500}}
     user_list = [{'uid': 58527261, 'link': '116311065', 'nick_name': '像我这样'},
@@ -25,8 +22,5 @@ async def main():
     xng = XiaoniangaoAuthor(rule_dict, user_list, trace_id)
     await xng.run()
 
-
-
-
 if __name__ == '__main__':
-    asyncio.run(main())  # 异步入口
+    asyncio.run(main())  # 异步入口