Browse Source

结构调整

zhangliang 3 days ago
parent
commit
d4b85fe52d
44 changed files with 748 additions and 509 deletions
  1. 63 24
      README.md
  2. 1 1
      config/__init__.py
  3. 7 12
      config/base.py
  4. 1 1
      config/prod.py
  5. 0 98
      config/settings.py
  6. 0 4
      config/spiders_config.yaml
  7. 0 2
      config/topic_map.yaml
  8. 127 74
      core/base/async_mysql_client.py
  9. 34 0
      core/base/async_request_client.py
  10. 12 12
      core/base/async_rocketmq_consumer.py
  11. 22 0
      core/models/spiders_config_models.py
  12. 3 2
      core/models/video_item.py
  13. 62 0
      core/utils/codes.py
  14. 4 1
      core/utils/env_loader.py
  15. 17 0
      core/utils/extractors.py
  16. 1 1
      core/utils/feishu/__init__.py
  17. 2 0
      core/utils/feishu/feishu.py
  18. 1 1
      core/utils/feishu/feishu_data.py
  19. 0 3
      core/utils/feishu/feishu_insert.py
  20. 3 4
      core/utils/feishu/feishu_utils.py
  21. 1 1
      core/utils/gpt/__init__.py
  22. 18 16
      core/utils/gpt/gpt4o_mini_help.py
  23. 1 1
      core/utils/log/__init__.py
  24. 7 2
      core/utils/log/aliyun_log.py
  25. 38 22
      core/utils/log/local_log.py
  26. 73 0
      core/utils/log/log_codes.py
  27. 36 7
      core/utils/log/logger_manager.py
  28. 2 5
      core/utils/path_utils.py
  29. 17 13
      core/utils/spider_config.py
  30. 1 0
      core/utils/trace_utils.py
  31. 14 1
      main.py
  32. 56 10
      scheduler/async_consumer.py
  33. 3 1
      scheduler/process_manager.py
  34. 16 15
      services/async_mysql_service.py
  35. 0 3
      services/clean_title.py
  36. 14 9
      services/pipeline.py
  37. 7 8
      services/rocketmq_consumer.py
  38. 51 132
      spiders/base_spider.py
  39. 11 1
      spiders/benshanzhufu_recommend.py
  40. 4 3
      spiders/spider_registry.py
  41. 13 9
      spiders/universal_crawler.py
  42. 2 5
      tests/test1.py
  43. 0 3
      tests/test_config.py
  44. 3 2
      tests/test_video_item.py

+ 63 - 24
README.md

@@ -8,30 +8,69 @@
 ## 🧠 项目结构简介
 
 ```bash
-AutoScraperX/
-├── main1.py                           # 项目入口:监听 MQ 消息,调度 UniversalCrawler
-├── spiders/
-│   ├── universal_crawler.py         # 通用爬虫主类,读取配置并执行爬虫逻辑
-│   └── rabbitmq_consumer.py         # 多线程消费 MQ 中的消息并执行任务
-├── config/
-│   ├── spiders_config.yaml         # 各平台爬虫规则配置文件(含 JsonPath)
-│   └── topic_map.yaml              # topic 与平台名映射关系配置
-├── application/
-│   ├── common/                     # 各种工具模块(日志、GPT、FFmpeg、代理池、Feishu 等)
-│   ├── config/                     # 全局配置
-│   ├── mysql/redis/                # 数据库与缓存工具模块
-│   ├── messageQueue/               # MQ 消费/ACK 工具模块
-│   ├── functions/                  # 各类通用工具函数与日志记录
-│   ├── items/                      # 爬取的数据结构定义
-│   └── pipeline/                   # 数据处理与入库管道
-├── scheduler/
-│   └── scheduler_main.py           # 预留任务调度扩展入口(支持定时任务)
-├── utils/
-│   ├── config_loader.py            # YAML 配置加载工具
-│   └── project_paths.py            # 自动定位 config、log 路径等
-```
-
----
+├── core/                  # 核心框架模块
+│   ├── common/            # 公共工具类
+│   │   ├── config_loader.py  # 配置加载(YAML→Pydantic模型)
+│   │   ├── exception.py      # 自定义异常体系(DataError/NetError等)
+│   │   └── utils.py          # 通用工具(时间戳/哈希/正则)
+│   ├── database/          # 数据库访问层
+│   │   ├── base.py           # 异步DB基类(连接池管理)
+│   │   └── mysql.py          # MySQL具体实现(CRUD封装)
+│   ├── log/               # 日志系统
+│   │   ├── aliyun_logger.py  # 阿里云SLS日志适配器
+│   │   └── local_logger.py   # 本地文件日志(按天滚动)
+│   └── spider/            # 爬虫核心组件
+│       ├── base_spider.py    # 爬虫基类(定义run/parse等抽象方法)
+│       ├── registry.py       # 爬虫注册中心(动态加载子类)
+│       └── pipeline.py       # 数据处理流水线(清洗/去重/存储)
+├── spiders/               # 业务爬虫实现
+│   ├── wechat_official/     # 微信公众号爬虫
+│   ├── video_account/       # 视频号爬虫
+│   └── news_website.py      # 新闻网站示例爬虫
+├── config/                # 配置文件
+│   ├── __init__.py          # 配置模型初始化
+│   ├── dev.yaml             # 开发环境配置(本地MySQL/日志级别DEBUG)
+│   └── prod.yaml            # 生产环境配置(阿里云RDS/日志级别INFO)
+├── tests/                 # 测试用例
+│   ├── test_spider.py       # 爬虫基类功能测试
+│   └── test_pipeline.py     # 数据清洗流水线测试
+├── scripts/               # 运维脚本
+│   ├── manage.py            # 爬虫管理工具(启动/监控/清洗)
+│   └── deploy.sh            # 生产环境部署脚本
+├── .env.example           # 环境变量示例(敏感信息占位符)
+├── requirements.txt       # 依赖库清单(含版本约束)
+├── pyproject.toml         # PEP 621项目元数据(poetry管理)
+└── README.md              # 项目说明(当前文件)
+
+4. 添加新爬虫
+4.1 实现爬虫类
+# spiders/tech_blog.py
+from core.spider.base_spider import BaseSpider
+
+class TechBlogSpider(BaseSpider):
+    name = "tech_blog"
+    
+    async def parse(self, response):
+        articles = []
+        for item in response.html.select("div.article"):
+            title = item.select_one("h2.title").text.strip()
+            link = item.select_one("a")["href"]
+            articles.append({"title": title, "link": link})
+        return articles
+4.2 注册爬虫
+# spiders/__init__.py
+from spiders.tech_blog import TechBlogSpider
+
+SPIDER_REGISTRY = {
+    cls.name: cls for cls in BaseSpider.__subclasses__()
+}
+
+4.3 配置MQ主题
+# config/prod.yaml
+spider:
+  topic: "custom_tech_blog_topic"
+
+核心流程
 
 ## 🚀 功能特性
 

+ 1 - 1
config/__init__.py

@@ -1,3 +1,3 @@
 from .prod import settings  # 使用生产环境配置
 
-__all__ = ['settings']
+__all__ = ['settings']

+ 7 - 12
config/base.py

@@ -1,24 +1,22 @@
 import os
 from pathlib import Path
-from typing import Optional
-from pydantic import BaseSettings, Field, AnyUrl, validator
+
 from dotenv import load_dotenv
-import os
+from core.utils.path_utils import project_root,log_dir
+from pydantic import BaseSettings, Field, AnyUrl
 
 # 在 Settings 类之前加载 .env 文件
-load_dotenv(os.path.join(os.path.dirname(__file__), '../.env'))
 
-class Settings(BaseSettings):
+load_dotenv(os.path.join(project_root,".env"))
+
 
+class Settings(BaseSettings):
     # 环境标识
     ENV: str = "prod"  # prod/dev
     ENABLE_ALIYUN_LOG: bool = True
 
-    # 路径配置
-    PROJECT_ROOT: Path = Path(__file__).parent.parent.resolve()
-    LOG_DIR: Path = PROJECT_ROOT / "logs"
-
     # 日志配置
+    LOG_DIR: str = log_dir
     LOG_LEVEL: str = "INFO"
 
     # 阿里云数据库配置 (RDS)
@@ -59,6 +57,3 @@ class Settings(BaseSettings):
 
 
 settings = Settings()
-
-# 自动创建日志目录
-os.makedirs(settings.LOG_DIR, exist_ok=True)

+ 1 - 1
config/prod.py

@@ -13,4 +13,4 @@ class ProdSettings(Settings):
         env_prefix = "PROD_"
 
 
-settings = ProdSettings()
+settings = ProdSettings()

+ 0 - 98
config/settings.py

@@ -1,98 +0,0 @@
-from pydantic import BaseSettings, Field, validator, BaseModel
-from typing import Dict, Any, Optional, List
-
-
-class DatabaseConfig(BaseSettings):
-    """数据库配置"""
-    host: str = Field(..., env="DB_HOST")
-    port: int = Field(..., env="DB_PORT")
-    user: str = Field(..., env="DB_USER")
-    password: str = Field(..., env="DB_PASSWORD")
-    db: str = Field(..., env="DB_NAME")
-    charset: str = Field("utf8mb4", env="DB_CHARSET")
-
-    class Config:
-        env_file = ".env"
-        env_file_encoding = "utf-8"
-
-
-class RocketMQConfig(BaseSettings):
-    """阿里云 RocketMQ 配置"""
-    endpoint: str = Field(..., env="ROCKETMQ_ENDPOINT")
-    access_key_id: str = Field(..., env="ROCKETMQ_ACCESS_KEY_ID")
-    access_key_secret: str = Field(..., env="ROCKETMQ_ACCESS_KEY_SECRET")
-    instance_id: str = Field(..., env="ROCKETMQ_INSTANCE_ID")
-    wait_seconds: int = Field(10, env="ROCKETMQ_WAIT_SECONDS")
-    batch: int = Field(1, env="ROCKETMQ_BATCH")
-
-    class Config:
-        env_file = ".env"
-
-
-class GlobalConfig(BaseSettings):
-    """全局配置"""
-    env: str = Field("prod", env="ENV")
-    base_url: str = Field("https://api.production.com", env="BASE_URL")
-    request_timeout: int = Field(30, env="REQUEST_TIMEOUT")
-    log_level: str = Field("INFO", env="LOG_LEVEL")
-    enable_aliyun_log: bool = Field(True, env="ENABLE_ALIYUN_LOG")
-
-    class Config:
-        env_file = ".env"
-
-
-class ResponseParse(BaseModel):
-    """数据解析配置"""
-    next_cursor: str = Field(..., description="下一页游标路径")
-    data_path: str = Field(..., description="数据主体路径")
-    fields: Dict[str, str] = Field(..., description="字段映射规则")
-
-
-class PlatformConfig(BaseModel):
-    """平台配置"""
-    platform: str
-    mode: str
-    path: str
-    method: str = "POST"
-    request_body: Dict[str, Any] = {}
-    loop_times: int = 1
-    loop_interval: int = 0
-    response_parse: ResponseParse
-    etl_hook: Optional[str] = None
-    post_actions: Optional[List[PostAction]] = None
-
-    @validator("etl_hook", pre=True)
-    def resolve_etl_hook(cls, v):
-        """动态加载钩子函数"""
-        if not v:
-            return None
-        module_name, func_name = v.rsplit(".", 1)
-        return getattr(importlib.import_module(module_name), func_name)
-
-
-class SpiderConfig(BaseModel):
-    """全局配置容器"""
-    default: dict = Field(...)  # 全局默认配置
-    platforms: Dict[str, PlatformConfig] = {}
-
-    @classmethod
-    def load(cls):
-        """从 YAML 加载配置"""
-        with open("config/config.yaml") as f:
-            raw_config = yaml.safe_load(f)
-        return cls(
-            default=raw_config["default"],
-            platforms=raw_config["platforms"]
-        )
-
-
-class SpiderConfig(BaseSettings):
-    """全局配置容器"""
-    default: GlobalConfig
-    database: DatabaseConfig
-    mq: RocketMQConfig
-
-    class Config:
-        env_file = ".env"
-        env_prefix = "SPIDER_"  # 环境变量前缀
-        case_sensitive = False  # 环境变量不区分大小写

+ 0 - 4
config/spiders_config.yaml

@@ -27,10 +27,6 @@ benshanzhufurecommend:
       video_url: "$.video_url"
       out_video_id: "$.nid"
 
-
-
-
-
 xngtjl_recommend_prod:
   platform: xiaoniangaotuijianliu
   mode: recommend

+ 0 - 2
config/topic_map.yaml

@@ -1,2 +0,0 @@
-topics:
-  - bszf_recommend_prod

+ 127 - 74
core/base/async_mysql_client.py

@@ -1,47 +1,52 @@
 """
-文件功能:
-    异步 MySQL 客户端封装(基于 asyncmy):
-    - 自动管理连接池
-    - 支持 async with 上下文管理
-    - 提供常见方法:fetch_all, fetch_one, execute, executemany
-    - 内部单例复用,避免重复创建连接池
+AsyncMySQLClient (基于 asyncmy) + 项目统一日志正式上线版
 
-适用场景:
-    - 高并发异步任务系统
-    - 通用业务数据库访问组件
+- 使用 LoggerManager 管理本地及阿里云日志,便于定位问题
+- 自动管理连接池,避免重复初始化和内存泄漏
+- 支持 async with 上下文管理
+- 高并发协程安全
 """
 
+from typing import List, Dict, Any, Optional, Tuple
+import asyncio
 import asyncmy
-from typing import List, Dict, Any, Optional
 
+from core.utils.log.logger_manager import LoggerManager
 
 class AsyncMySQLClient:
-    """
-    通用异步 MySQL 客户端,基于 asyncmy 实现
-    """
+    # 类变量:同配置单例管理连接池实例
+    _instances: Dict[Tuple, "AsyncMySQLClient"] = {}
 
-    # 类变量用于单例连接池
-    _instance: Optional["AsyncMySQLClient"] = None
-
-    def __new__(cls, *args, **kwargs):
+    def __new__(cls,
+                host: str,
+                port: int,
+                user: str,
+                password: str,
+                db: str,
+                charset: str,
+                minsize: int = 1,
+                maxsize: int = 5):
         """
-        单例模式,确保同一配置只创建一个连接池实例
+        单例模式:同配置共享同一连接池实例
         """
-        if cls._instance is None:
-            cls._instance = super().__new__(cls)
-        return cls._instance
+        key = (host, port, user, db)
+        if key not in cls._instances:
+            instance = super().__new__(cls)
+            cls._instances[key] = instance
+        return cls._instances[key]
 
-    def __init__(
-        self,
-        host: str,
-        port: int,
-        user: str,
-        password: str,
-        db: str,
-        charset: str,
-        minsize: int = 1,
-        maxsize: int = 5,
-    ):
+    def __init__(self,
+                 host: str,
+                 port: int,
+                 user: str,
+                 password: str,
+                 db: str,
+                 charset: str,
+                 minsize: int = 1,
+                 maxsize: int = 5):
+        """
+        初始化配置,延迟创建连接池
+        """
         self._db_settings = {
             "host": host,
             "port": port,
@@ -50,13 +55,19 @@ class AsyncMySQLClient:
             "db": db,
             "autocommit": True,
             "charset": charset,
+            "connect_timeout": 5,
         }
         self._minsize = minsize
         self._maxsize = maxsize
         self._pool: Optional[asyncmy.Pool] = None
+        self._lock = asyncio.Lock()  # 防止并发初始化
+
+        # 引入统一日志体系
+        self.logger = LoggerManager.get_logger(platform=db, mode="mysql")
+        self.aliyun_logger = LoggerManager.get_aliyun_logger(platform=db, mode="mysql")
 
     async def __aenter__(self):
-        """支持 async with 上下文初始化连接池"""
+        """支持 async with 自动初始化连接池"""
         await self.init_pool()
         return self
 
@@ -66,14 +77,27 @@ class AsyncMySQLClient:
 
     async def init_pool(self):
         """
-        初始化连接池(如未初始化
+        初始化连接池(懒加载 + 并发锁保护
         """
-        if not self._pool:
-            self._pool = await asyncmy.create_pool(
-                **self._db_settings,
-                minsize=self._minsize,
-                maxsize=self._maxsize,
-            )
+        if self._pool:
+            return
+        async with self._lock:
+            if self._pool:
+                return
+            try:
+                self._pool = await asyncmy.create_pool(
+                    **self._db_settings,
+                    minsize=self._minsize,
+                    maxsize=self._maxsize,
+                )
+                msg = f"[AsyncMySQLClient] 连接池初始化成功: {self._db_settings['host']}:{self._db_settings['db']}"
+                self.logger.info(msg)
+                self.aliyun_logger.logging(code="2000", message=msg)
+            except Exception as e:
+                msg = f"[AsyncMySQLClient] 连接池初始化失败: {e}"
+                self.logger.error(msg)
+                self.aliyun_logger.logging(code="9001", message=msg, data={"error": str(e)})
+                raise
 
     async def close(self):
         """
@@ -82,54 +106,83 @@ class AsyncMySQLClient:
         if self._pool:
             self._pool.close()
             await self._pool.wait_closed()
+            msg = "[AsyncMySQLClient] 连接池已关闭"
+            self.logger.info(msg)
+            self.aliyun_logger.logging(code="2001", message=msg)
             self._pool = None
 
     async def fetch_all(self, sql: str, params: Optional[List[Any]] = None) -> List[Dict[str, Any]]:
         """
-        查询多行数据,返回字典列表
+        查询多行,返回字典列表
         """
-        async with self._pool.acquire() as conn:
-            async with conn.cursor() as cur:
-                await cur.execute(sql, params or [])
-                rows = await cur.fetchall()
-                columns = [desc[0] for desc in cur.description]  # 获取字段名列表
-                # 转换每一行为字典
-                result = [dict(zip(columns, row)) for row in rows]
-                return result
+        await self.init_pool()
+        try:
+            async with self._pool.acquire() as conn:
+                async with conn.cursor() as cur:
+                    await cur.execute(sql, params or [])
+                    rows = await cur.fetchall()
+                    columns = [desc[0] for desc in cur.description]
+                    result = [dict(zip(columns, row)) for row in rows]
+                    self.logger.info(f"[AsyncMySQLClient] fetch_all 执行成功: {sql}")
+                    return result
+        except Exception as e:
+            msg = f"[AsyncMySQLClient] fetch_all 执行失败: {e} | SQL: {sql}"
+            self.logger.error(msg)
+            self.aliyun_logger.logging(code="9002", message=msg)
+            raise
 
     async def fetch_one(self, sql: str, params: Optional[List[Any]] = None) -> Optional[Dict[str, Any]]:
         """
-        查询单行数据,返回字典
+        查询单行,返回字典
         """
-        async with self._pool.acquire() as conn:
-            async with conn.cursor() as cur:
-                await cur.execute(sql, params or [])
-                row = await cur.fetchone()
-                if row is None:
-                    return None
-                columns = [desc[0] for desc in cur.description]
-                return dict(zip(columns, row))
+        await self.init_pool()
+        try:
+            async with self._pool.acquire() as conn:
+                async with conn.cursor() as cur:
+                    await cur.execute(sql, params or [])
+                    row = await cur.fetchone()
+                    if row is None:
+                        return None
+                    columns = [desc[0] for desc in cur.description]
+                    result = dict(zip(columns, row))
+                    self.logger.info(f"[AsyncMySQLClient] fetch_one 执行成功: {sql}")
+                    return result
+        except Exception as e:
+            msg = f"[AsyncMySQLClient] fetch_one 执行失败: {e} | SQL: {sql}"
+            self.logger.error(msg)
+            self.aliyun_logger.logging(code="9003", message=msg)
+            raise
 
     async def execute(self, sql: str, params: Optional[List[Any]] = None) -> int:
         """
-        执行单条写操作(insert/update/delete)
-        :param sql: SQL 语句
-        :param params: 参数列表
-        :return: 影响行数
+        执行单条写操作
         """
-        async with self._pool.acquire() as conn:
-            async with conn.cursor() as cur:
-                await cur.execute(sql, params or [])
-                return cur.rowcount
+        await self.init_pool()
+        try:
+            async with self._pool.acquire() as conn:
+                async with conn.cursor() as cur:
+                    await cur.execute(sql, params or [])
+                    self.logger.info(f"[AsyncMySQLClient] execute 执行成功: {sql}")
+                    return cur.rowcount
+        except Exception as e:
+            msg = f"[AsyncMySQLClient] execute 执行失败: {e} | SQL: {sql}"
+            self.logger.error(msg)
+            self.aliyun_logger.logging(code="9004", message=msg)
+            raise
 
     async def executemany(self, sql: str, params_list: List[List[Any]]) -> int:
         """
         批量执行写操作
-        :param sql: SQL 语句
-        :param params_list: 多组参数
-        :return: 总影响行数
-        """
-        async with self._pool.acquire() as conn:
-            async with conn.cursor() as cur:
-                await cur.executemany(sql, params_list)
-                return cur.rowcount
+        """
+        await self.init_pool()
+        try:
+            async with self._pool.acquire() as conn:
+                async with conn.cursor() as cur:
+                    await cur.executemany(sql, params_list)
+                    self.logger.info(f"[AsyncMySQLClient] executemany 执行成功: {sql}")
+                    return cur.rowcount
+        except Exception as e:
+            msg = f"[AsyncMySQLClient] executemany 执行失败: {e} | SQL: {sql}"
+            self.logger.error(msg)
+            self.aliyun_logger.logging(code="9005", message=msg)
+            raise

+ 34 - 0
core/base/async_request_client.py

@@ -0,0 +1,34 @@
+import asyncio
+from typing import Optional, Dict
+
+import aiohttp
+
+
+class AsyncRequestClient:
+    """
+    可独立复用的异步请求客户端,支持重试、日志结构化和限流预留
+    """
+    def __init__(self, logger=None, max_retries=3, timeout=30):
+        self.logger = logger
+        self.max_retries = max_retries
+        self.timeout = timeout
+
+    async def request(self, session: aiohttp.ClientSession, method: str, url: str, **kwargs) -> Optional[Dict]:
+        retries = 0
+
+        while retries < self.max_retries:
+            try:
+                if self.logger:
+                    self.logger.info(f"请求 {method} {url}, 尝试 {retries+1}/{self.max_retries}")
+                async with session.request(method, url, **kwargs) as response:
+                    response.raise_for_status()
+                    return await response.json()
+            except Exception as e:
+                retries += 1
+                if retries >= self.max_retries:
+                    if self.logger:
+                        self.logger.error(f"请求失败达到最大重试次数: {e}")
+                    raise
+                if self.logger:
+                    self.logger.warning(f"请求失败 {e}, 重试 {retries}/{self.max_retries}")
+                await asyncio.sleep(1)

+ 12 - 12
core/base/async_rocketmq_consumer.py

@@ -1,9 +1,9 @@
 import asyncio
-import json
-from typing import List, Optional
+from typing import List
+
 from mq_http_sdk.mq_client import MQClient
-from mq_http_sdk.mq_exception import MQExceptionBase
 from mq_http_sdk.mq_consumer import Message
+from mq_http_sdk.mq_exception import MQExceptionBase
 
 
 class AsyncRocketMQConsumer:
@@ -15,15 +15,15 @@ class AsyncRocketMQConsumer:
     """
 
     def __init__(
-        self,
-        endpoint: str,
-        access_key_id: str,
-        access_key_secret: str,
-        instance_id: str,
-        topic_name: str,
-        group_id: str,
-        wait_seconds: int = 3,
-        batch: int = 1,
+            self,
+            endpoint: str,
+            access_key_id: str,
+            access_key_secret: str,
+            instance_id: str,
+            topic_name: str,
+            group_id: str,
+            wait_seconds: int = 3,
+            batch: int = 1,
     ):
         self.endpoint = endpoint
         self.access_key_id = access_key_id

+ 22 - 0
core/models/spiders_config_models.py

@@ -0,0 +1,22 @@
+from pydantic import BaseModel, AnyUrl
+
+class BaseConfig(BaseModel):
+    base_url: AnyUrl = None
+    request_timeout: int = 30
+    headers: dict = {}
+
+class PlatformConfig(BaseConfig):
+    platform: str
+    mode: str
+    path: str
+    url: AnyUrl = None
+    method: str
+    request_body: dict = {}
+    loop_times: int = 1
+    loop_interval: int = 0
+    response_parse: dict = {}
+    paging: bool = False
+    max_pages: int = 0
+    parse: dict = {}
+    retry_times: int = 0
+

+ 3 - 2
core/models/video_item.py

@@ -1,8 +1,9 @@
 import time
 import uuid
+from typing import Optional
 
 from pydantic import BaseModel, Field
-from typing import Optional
+
 from services import clean_title
 
 
@@ -70,7 +71,7 @@ class VideoItem(BaseModel):
 
         must_fields = [
             "video_id", "user_id", "user_name", "out_video_id", "session",
-            "video_url", "cover_url", "platform", "strategy"
+            "video_url", "cover_url", "classname", "strategy"
         ]
         for f in must_fields:
             if not getattr(self, f, None):

+ 62 - 0
core/utils/codes.py

@@ -0,0 +1,62 @@
+"""
+codes.py
+
+全局统一日志编码管理文件
+
+分类规则:
+- 1xxx: 成功流程码(用于运营指标与验证成功)
+- 4xxx: 可恢复配置错误(需修复配置)
+- 9xxx: 错误码(可恢复失败、需要排查)
+- 99xx: 系统致命错误(需告警处理)
+
+"""
+
+CODES = {
+    # 成功类 (1xxx)
+    "1000": "任务接收成功",
+    "1001": "数据采集成功",
+    "1002": "视频处理成功",
+    "1003": "爬虫执行指标汇总",
+    "1004": "未获取到数据",
+    "1005": "视频过滤成功",
+    "1006": "字段校验通过",
+    "1007": "数据校验通过",
+    "1008": "规则匹配成功",
+    "1009": "成功发送至ETL",
+    "1010": "任务执行完成",
+
+    # 配置错误类 (4xxx)
+    "4000": "爬虫配置缺失",
+    "4001": "ETL配置缺失",
+    "4002": "MQ配置缺失",
+    "4003": "数据库配置缺失",
+
+    # 可恢复失败类 (9xxx)
+    "9001": "消息处理失败",
+    "9002": "消费循环异常重启",
+    "9003": "视频处理失败",
+    "9004": "推送ETL失败",
+    "9005": "爬虫致命错误退出",
+    "9006": "请求异常重试",
+    "9007": "字段缺失校验失败",
+    "9008": "标题不符合规则",
+    "9009": "发布时间不符合规则",
+    "9010": "黑名单用户过滤",
+    "9011": "重复视频过滤",
+    "9012": "视频时长不符合规则",
+    "9013": "点赞数/播放量不符合规则",
+    "9014": "平台限制过滤",
+    "9015": "下载异常",
+    "9016": "数据库写入失败",
+    "9017": "消费超时重试",
+    "9018": "规则解析失败",
+    "9019": "任务参数缺失",
+    "9020": "过滤条件不匹配",
+
+    # 系统致命错误类 (99xx)
+    "9900": "数据库连接失败",
+    "9901": "MQ连接失败",
+    "9902": "系统内存不足",
+    "9903": "未知致命错误",
+    "9904": "配置加载失败",
+}

+ 4 - 1
core/utils/env_loader.py

@@ -1,6 +1,8 @@
 import os
+
 from dotenv import load_dotenv
 
+
 def load_env(env: str = None):
     """
     根据传入的环境名加载对应的 .env 文件,默认加载 .env
@@ -12,10 +14,12 @@ def load_env(env: str = None):
     load_dotenv(dotenv_path)
     print(f"加载环境配置文件: {dotenv_path}")
 
+
 def get_env(key: str, default: str = "") -> str:
     """获取环境变量"""
     return os.getenv(key, default)
 
+
 def get_int_env(key: str, default: int = 0) -> int:
     """获取整数类型环境变量"""
     try:
@@ -26,4 +30,3 @@ def get_int_env(key: str, default: int = 0) -> int:
 
 # 自动加载环境变量
 load_env()
-

+ 17 - 0
core/utils/extractors.py

@@ -1,5 +1,8 @@
+from typing import Dict
+
 from jsonpath_ng import parse
 
+
 def safe_extract(json_obj, path, default=None):
     """
     安全提取单个字段值,返回匹配到的第一个,否则返回默认值。
@@ -18,6 +21,7 @@ def safe_extract(json_obj, path, default=None):
         print(f"[extractor] Error extracting {path}: {e}")
     return default
 
+
 def extract_multiple(json_obj, fields: dict) -> dict:
     """
     根据字段配置提取多个字段。
@@ -27,3 +31,16 @@ def extract_multiple(json_obj, fields: dict) -> dict:
     :return: 字段名 -> 提取值的字典
     """
     return {key: safe_extract(json_obj, path) for key, path in fields.items()}
+
+
+def extract_fields(video: Dict, field_map: Dict, logger=None, trace_id=None) -> Dict:
+    result = {}
+    for field, path in field_map.items():
+        if not isinstance(path, str) or not path.startswith("$"):
+            result[field] = path
+            continue
+        value = safe_extract(video, path)
+        if value is None and logger:
+            logger.warning(f"{trace_id} 字段提取失败: {field} 路径: {path}")
+        result[field] = value
+    return result

+ 1 - 1
core/utils/feishu/__init__.py

@@ -1,3 +1,3 @@
 from .feishu import Feishu
+from .feishu_data import FsData
 from .feishu_insert import FeishuInsert
-from .feishu_data import FsData

+ 2 - 0
core/utils/feishu/feishu.py

@@ -7,12 +7,14 @@
 import json
 import os
 import sys
+
 import requests
 import urllib3
 
 sys.path.append(os.getcwd())
 
 from core.utils.log import Local
+
 proxies = {"http": None, "https": None}
 
 

+ 1 - 1
core/utils/feishu/feishu_data.py

@@ -17,4 +17,4 @@ class FsData:
 if __name__ == '__main__':
     data_rule = FsData()
     title_rule = data_rule.get_title_rule()
-    print(title_rule)
+    print(title_rule)

+ 0 - 3
core/utils/feishu/feishu_insert.py

@@ -51,6 +51,3 @@ class FeishuInsert(object):
         }
         response = requests.request("POST", url=insert_value_url, headers=headers, json=body)
         print(response.json())
-
-
-

+ 3 - 4
core/utils/feishu/feishu_utils.py

@@ -6,6 +6,7 @@
 import json
 import os
 import sys
+
 import requests
 import urllib3
 from loguru import logger
@@ -20,6 +21,7 @@ class FeishuUtils:
     编辑飞书云文档
     """
     succinct_url = "https://w42nne6hzg.feishu.cn/sheets/"
+
     # 飞书路径token
     @classmethod
     def spreadsheettoken(cls, crawler):
@@ -28,8 +30,6 @@ class FeishuUtils:
         else:
             return crawler
 
-
-
     # 获取飞书api token
     @classmethod
     def get_token(cls):
@@ -99,7 +99,6 @@ class FeishuUtils:
         values = response["data"]["valueRanges"][0]["values"]
         return values
 
-
     # 工作表,插入行或列
     @classmethod
     def insert_columns(cls, crawler, sheetid, majordimension, startindex, endindex):
@@ -225,6 +224,7 @@ class FeishuUtils:
             return r.json()["data"]["valueRange"]["values"][0]
         except Exception as e:
             logger.error("读取单元格数据异常:{}", e)
+
     # 获取表内容
     @classmethod
     def get_sheet_content(cls, crawler, sheet_id):
@@ -395,4 +395,3 @@ class FeishuUtils:
             r = requests.post(url, headers=headers, data=data, verify=False, proxies=proxies)
         except Exception as e:
             logger.error(f"bot异常:{e}\n")
-

+ 1 - 1
core/utils/gpt/__init__.py

@@ -1 +1 @@
-from .gpt4o_mini_help import GPT4oMini
+from .gpt4o_mini_help import GPT4oMini

+ 18 - 16
core/utils/gpt/gpt4o_mini_help.py

@@ -1,9 +1,10 @@
 import json
 
 import requests
-class GPT4oMini:
 
 
+class GPT4oMini:
+
     @classmethod
     def get_ai_mini_title(cls, title):
         url = "http://aigc-api.cybertogether.net//aigc/dev/test/gpt"
@@ -11,20 +12,20 @@ class GPT4oMini:
             "imageList": [],
             "model": "gpt-4o-mini-2024-07-18",
             "prompt": (
-            "针对微信平台视频类小程序场景"
-            "面向人群是中国中老年人,在单聊、群聊场景。为视频生成一个吸引人的标题。每次生成我会提供一个原标题,你通过以下规则生成一个新的标题。"
-            "生成规则:"
-            "a.生成的新标题一定一定不能包含以下任何一个或多个风险词。"
-            "风险词:请注意, 分享, 听听, 看看, 全体, 一定, 所以人, 无数人, 值得一看, 值得一听, 99 %, 震撼, 必, 必看, 必听, 必读, 全场, 听听, 一起听听, 一起, 快看, 看看, 快来, 分享, 转发, 都看看吧, 都来, 注意, 最新, 紧急, 速看, 速转, 刚刚, 事关, 赶紧, 一定要, 千万不要, 震惊, 惊人, 亿万, 无数, 百分之, 自杀, 致死, 全体国民, 全体国人, 央视, 中央, 国务院, 人民日报, 卫生部, 官方, 气象局, 世卫, 联合国, 新闻, 内部, 内幕, 最新, 医生提醒, 爆炸性消息, 九胞胎, 天大的, 连看三遍, 务必看, 终于曝光, 神药, 危害太大, 不要吃了, 大事发生, 无数国人, 再忙也要, 出大事, 关系你我, 正式确认, 好消息, 突然传出, 新规出台, 重要的消息, 重要消息, 即将失传, 打死都, 惊天, 不要再吃, 格外留心, 太危险, 可怕一幕, 身亡, 后果很严重, 寿命长短, 错过别后悔, 必看, 早点知道就好了, 不得不信, 看一次少一次, 无数人, 老美, 新华社, 新规, 最新骗局, 新型骗局, 吃的是这些, 大老虎, 官员财产, 老中医, 预言, 致命, 救命, 保命, 非常难得, 太震撼了, 快来看, 一定要看, 来看看, 所有人都, 头一次见, 新型"
-            "b.新标题字符不小于15个字,同时不超过30个字。"
-            "c.新标题最前面或最后面必须加上emoij符号。如“🔴”、“⭕️”、“🚩”、“🔥”、“💖”"
-            "d.新标题只去掉原标题里的低质词,但语句、语意都和原标题保持不变。"
-            "e.去掉低质词后,根据语意适当加字句,使新标题整句读起来简洁、通顺、有吸引力、并准确反映视频核心内容。但一定不能包含任何一个或多个风险词。"
-
-            "视频的原标题:“哇!好美的一个视频,发给您也看看!”、“晚上好,这也太美啦,发给大家一起欣赏欣赏。”、“____这段话说得真好,一起听听!每句话都很有道快分享给群友看看吧!”、“👈这段话说的真好,值得一听”、“🔴世界顶尖雪雕❗ 太真实了,太美了!忍不住发给你看看!”、“💖《等》说得真好,看看吧...”、“🔴这样的萌娃你们喜欢吗,都看看吧!”、“🔴2025金蛇纳福,这首歌送给全体群友,祝大家财运亨通永不断!”、“🔴元旦青蛇遇双春,这三件事千万别做,都看看吧!”、“💕呵呵太搞笑了!老师和家长的对话!值得一看!绝了!”、“❤️《中国知识大全》太珍贵了!值得我们每个中国人都看看!”、“六岁小女孩一首《爸》全场泪奔”、“🔴酒店招牌菜,菠菜炒鸡蛋的家常做法,快来学学!”、“这个视频,分享给我的老友,祝愿您能幸福安康”"
-
-            "请务必严格遵守上述生成规则,为原标题生成对应的新标题。"
-            f"请分析该标题,标题为:{title},返回新的标题。"
+                "针对微信平台视频类小程序场景"
+                "面向人群是中国中老年人,在单聊、群聊场景。为视频生成一个吸引人的标题。每次生成我会提供一个原标题,你通过以下规则生成一个新的标题。"
+                "生成规则:"
+                "a.生成的新标题一定一定不能包含以下任何一个或多个风险词。"
+                "风险词:请注意, 分享, 听听, 看看, 全体, 一定, 所以人, 无数人, 值得一看, 值得一听, 99 %, 震撼, 必, 必看, 必听, 必读, 全场, 听听, 一起听听, 一起, 快看, 看看, 快来, 分享, 转发, 都看看吧, 都来, 注意, 最新, 紧急, 速看, 速转, 刚刚, 事关, 赶紧, 一定要, 千万不要, 震惊, 惊人, 亿万, 无数, 百分之, 自杀, 致死, 全体国民, 全体国人, 央视, 中央, 国务院, 人民日报, 卫生部, 官方, 气象局, 世卫, 联合国, 新闻, 内部, 内幕, 最新, 医生提醒, 爆炸性消息, 九胞胎, 天大的, 连看三遍, 务必看, 终于曝光, 神药, 危害太大, 不要吃了, 大事发生, 无数国人, 再忙也要, 出大事, 关系你我, 正式确认, 好消息, 突然传出, 新规出台, 重要的消息, 重要消息, 即将失传, 打死都, 惊天, 不要再吃, 格外留心, 太危险, 可怕一幕, 身亡, 后果很严重, 寿命长短, 错过别后悔, 必看, 早点知道就好了, 不得不信, 看一次少一次, 无数人, 老美, 新华社, 新规, 最新骗局, 新型骗局, 吃的是这些, 大老虎, 官员财产, 老中医, 预言, 致命, 救命, 保命, 非常难得, 太震撼了, 快来看, 一定要看, 来看看, 所有人都, 头一次见, 新型"
+                "b.新标题字符不小于15个字,同时不超过30个字。"
+                "c.新标题最前面或最后面必须加上emoij符号。如“🔴”、“⭕️”、“🚩”、“🔥”、“💖”"
+                "d.新标题只去掉原标题里的低质词,但语句、语意都和原标题保持不变。"
+                "e.去掉低质词后,根据语意适当加字句,使新标题整句读起来简洁、通顺、有吸引力、并准确反映视频核心内容。但一定不能包含任何一个或多个风险词。"
+
+                "视频的原标题:“哇!好美的一个视频,发给您也看看!”、“晚上好,这也太美啦,发给大家一起欣赏欣赏。”、“____这段话说得真好,一起听听!每句话都很有道快分享给群友看看吧!”、“👈这段话说的真好,值得一听”、“🔴世界顶尖雪雕❗ 太真实了,太美了!忍不住发给你看看!”、“💖《等》说得真好,看看吧...”、“🔴这样的萌娃你们喜欢吗,都看看吧!”、“🔴2025金蛇纳福,这首歌送给全体群友,祝大家财运亨通永不断!”、“🔴元旦青蛇遇双春,这三件事千万别做,都看看吧!”、“💕呵呵太搞笑了!老师和家长的对话!值得一看!绝了!”、“❤️《中国知识大全》太珍贵了!值得我们每个中国人都看看!”、“六岁小女孩一首《爸》全场泪奔”、“🔴酒店招牌菜,菠菜炒鸡蛋的家常做法,快来学学!”、“这个视频,分享给我的老友,祝愿您能幸福安康”"
+
+                "请务必严格遵守上述生成规则,为原标题生成对应的新标题。"
+                f"请分析该标题,标题为:{title},返回新的标题。"
             ),
             "responseFormat": {
                 "type": "json_schema",
@@ -56,6 +57,7 @@ class GPT4oMini:
         except Exception as e:
             return None
 
+
 if __name__ == '__main__':
     title = GPT4oMini.get_ai_mini_title("🔴这位美女说的太好了!这就是我们的大中国")
-    print(title)
+    print(title)

+ 1 - 1
core/utils/log/__init__.py

@@ -1,2 +1,2 @@
+from .aliyun_log import AliyunLogger
 from .local_log import Local
-from .aliyun_log import AliyunLogger

+ 7 - 2
core/utils/log/aliyun_log.py

@@ -1,11 +1,13 @@
-
 """
 公共方法,包含:生成log
 """
 import json
-from aliyun.log import LogClient, PutLogsRequest, LogItem
 import time
+
+from aliyun.log import LogClient, PutLogsRequest, LogItem
+
 from config import settings
+from core.utils.log.log_codes import LOG_CODES
 
 proxies = {"http": None, "https": None}
 
@@ -14,6 +16,7 @@ class AliyunLogger(object):
     """
     阿里云日志方法
     """
+
     def __init__(self, platform, mode, env="prod"):
         self.platform = platform
         self.mode = mode
@@ -29,6 +32,8 @@ class AliyunLogger(object):
         正式库: https://sls.console.aliyun.com/lognext/project/crawler-log-prod/logsearch/crawler-log-prod
         """
         # 设置阿里云日志服务的访问信息
+        if message is None:
+            message = LOG_CODES.get(code, "未知错误")
         if data is None:
             data = {}
         accessKeyId = settings.ALIYUN_ACCESS_KEY_ID

+ 38 - 22
core/utils/log/local_log.py

@@ -1,50 +1,66 @@
 import sys
-from datetime import date, datetime
-from loguru import logger
+from datetime import date
 from pathlib import Path
+
+from loguru._logger import Logger as LoguruLogger
+from loguru import logger as global_logger
+
 from config import settings
 
 
 class Local:
     """
-    恢复有效的本地日志记录器
+    本地日志记录器
     """
 
+    _initialized = set()  # 防止重复初始化相同 platform_mode
+
     @staticmethod
-    def init_logger(platform: str, mode: str, log_level: str = settings.LOG_LEVEL,
-                    log_to_console: bool = False, rotation: str = "00:00",
-                    retention: str = "10 days"):
-        """
-        初始化日志记录器 - 有效版本
-        """
+    def init_logger(platform: str,
+                    mode: str,
+                    log_level: str = settings.LOG_LEVEL,
+                    log_to_console: bool = False,
+                    rotation: str = "00:00",
+                    retention: str = "10 days") -> LoguruLogger:
+        key = f"{platform}_{mode}"
+        if key in Local._initialized:
+            return global_logger  # 已初始化,直接返回
+
         # 创建日志目录
         log_path = Path(f"{settings.LOG_DIR}/{platform}")
         log_path.mkdir(parents=True, exist_ok=True)
 
-        # 获取当前日期(动态)
+        # 动态日期
         current_date = date.today().strftime("%Y-%m-%d")
         log_filename = f"{platform}-{mode}-{current_date}.log"
         log_file_path = log_path / log_filename
 
-        # 清除默认 handler
-        logger.remove()
-
-        # 添加文件日志 handler
-        logger.add(
-            str(log_file_path),  # 使用字符串路径
+        # 添加文件日志
+        global_logger.add(
+            str(log_file_path),
             level=log_level.upper(),
-            rotation=rotation,  # 支持rotation参数
+            rotation=rotation,
             retention=retention,
             encoding="utf-8",
-            enqueue=True
+            enqueue=True,
+            backtrace=True,
+            diagnose=True
         )
 
-        # 可选:输出到控制台
         if log_to_console:
-            logger.add(
+            global_logger.add(
                 sys.stdout,
                 level=log_level.upper(),
-                format="<green>{time:YYYY-MM-DD HH:mm:ss}</green> | <level>{level}</level> | {message}"
+                format="<green>{time:YYYY-MM-DD HH:mm:ss}</green> | "
+                       "<level>{level}</level> | "
+                       "<cyan>{extra[platform]}</cyan> | "
+                       "<cyan>{extra[mode]}</cyan> | "
+                       "{message}"
             )
 
-        return logger
+        # 绑定上下文便于日志区分来源
+        logger_with_context = global_logger.bind(platform=platform, mode=mode)
+
+        Local._initialized.add(key)
+
+        return logger_with_context

+ 73 - 0
core/utils/log/log_codes.py

@@ -0,0 +1,73 @@
+"""
+codes.py
+
+全局统一日志编码管理文件
+
+分类规则:
+- 1xxx: 成功类(用于正常流程验证和运营统计)
+- 15xx: 调度监控相关状态码
+- 4xxx: 可恢复配置错误(需修复配置)
+- 9xxx: 错误码(可恢复失败、需要排查)
+- 99xx: 系统致命错误(需告警处理)
+"""
+
+CODES = {
+    # 成功类 (1xxx)
+    "1000": "任务接收成功",
+    "1001": "数据采集成功",
+    "1002": "视频处理成功",
+    "1003": "爬虫执行指标汇总",
+    "1004": "未获取到数据",
+    "1005": "视频过滤成功",
+    "1006": "字段校验通过",
+    "1007": "数据校验通过",
+    "1008": "规则匹配成功",
+    "1009": "成功发送至ETL",
+    "1010": "任务执行完成",
+
+    # 调度监控 (15xx)
+    "1500": "主进程启动成功",
+    "1501": "子进程崩溃重启",
+    "1502": "子进程启动成功",
+    "1503": "子进程正常退出",
+    "1504": "进程监听循环启动",
+    "1505": "子进程分配完成",
+
+    # 可恢复配置错误 (4xxx)
+    "4000": "爬虫配置缺失",
+    "4001": "ETL配置缺失",
+    "4002": "MQ配置缺失",
+    "4003": "数据库配置缺失",
+
+    # 可恢复失败类 (9xxx)
+    "9001": "消息处理失败",
+    "9002": "消费循环异常重启",
+    "9003": "视频处理失败",
+    "9004": "推送ETL失败",
+    "9005": "爬虫致命错误退出",
+    "9006": "请求异常重试",
+    "9007": "字段缺失校验失败",
+    "9008": "标题不符合规则",
+    "9009": "发布时间不符合规则",
+    "9010": "黑名单用户过滤",
+    "9011": "重复视频过滤",
+    "9012": "视频时长不符合规则",
+    "9013": "点赞数/播放量不符合规则",
+    "9014": "平台限制过滤",
+    "9015": "下载异常",
+    "9016": "数据库写入失败",
+    "9017": "消费超时重试",
+    "9018": "规则解析失败",
+    "9019": "任务参数缺失",
+    "9020": "过滤条件不匹配",
+    "9021": "异步请求失败",
+    "9022": "子进程内部异常",
+
+    # 系统致命错误 (99xx)
+    "9900": "数据库连接失败",
+    "9901": "MQ连接失败",
+    "9902": "系统内存不足",
+    "9903": "未知致命错误",
+    "9904": "配置加载失败",
+    "9905": "进程启动失败",
+}

+ 36 - 7
core/utils/log/logger_manager.py

@@ -1,15 +1,21 @@
-from core.utils.log.local_log import Local
 from core.utils.log.aliyun_log import AliyunLogger
+from core.utils.log.local_log import Local
+from loguru._logger import Logger as LoguruLogger
+
 
 class LoggerManager:
     """
-    日志管理器 - 优化版
+    日志管理器
     """
-    _local_loggers = {}
-    _aliyun_loggers = {}
+    _local_loggers: dict[str, LoguruLogger] = {}
+    _aliyun_loggers: dict[str, AliyunLogger] = {}
 
     @staticmethod
-    def get_logger(platform: str = "system", mode: str = "crawler", log_to_console=True):
+    def get_logger(
+        platform: str = "system",
+        mode: str = "crawler",
+        log_to_console: bool = False
+    ) -> LoguruLogger:
         key = f"{platform}_{mode}"
         if key not in LoggerManager._local_loggers:
             LoggerManager._local_loggers[key] = Local.init_logger(
@@ -20,7 +26,11 @@ class LoggerManager:
         return LoggerManager._local_loggers[key]
 
     @staticmethod
-    def get_aliyun_logger(platform: str = "system", mode: str = "crawler", env: str = "prod"):
+    def get_aliyun_logger(
+        platform: str = "system",
+        mode: str = "crawler",
+        env: str = "prod"
+    ) -> AliyunLogger:
         key = f"{platform}_{mode}"
         if key not in LoggerManager._aliyun_loggers:
             LoggerManager._aliyun_loggers[key] = AliyunLogger(
@@ -28,4 +38,23 @@ class LoggerManager:
                 mode=mode,
                 env=env
             )
-        return LoggerManager._aliyun_loggers[key]
+        return LoggerManager._aliyun_loggers[key]
+
+    def log_event(
+        self,
+        code: str,
+        message: str,
+        data: dict[str, Any],
+        trace_id: str = "",
+        platform: str = "system",
+        mode: str = "crawler",
+        level: str = "info"
+    ):
+        logger = self.get_logger(platform=platform, mode=mode)
+        aliyun_logger = self.get_aliyun_logger(platform=platform, mode=mode)
+
+        # 本地记录
+        getattr(logger, level.lower(), logger.info)(f"{code} | {message} | {data}")
+
+        # 阿里云记录
+        aliyun_logger.logging(code=code, message=message, data=data, trace_id=trace_id)

+ 2 - 5
core/utils/path_utils.py

@@ -14,18 +14,15 @@ project_root = get_project_path()
 
 # 配置路径
 config_dir = os.path.join(project_root, "config")
-config_spiders_path = os.path.join(config_dir, "spiders_config.yaml")
+spiders_config_path = os.path.join(config_dir, "spiders_config.yaml")
 
 # 日志路径
 log_dir = os.path.join(project_root, "logs")
 
 
-# 数据库配置路径(可选)
-# db_config_path = os.path.join(config_dir, "db.yaml")
-
 __all__ = [
     "project_root",
     "config_dir",
-    "config_spiders_path",
+    "spiders_config_path",
     "log_dir",
 ]

+ 17 - 13
core/utils/config_loader.py → core/utils/spider_config.py

@@ -1,12 +1,13 @@
-import yaml
+# spider_config.py
 import os
 from urllib.parse import urljoin
-from core.utils.path_utils import config_spiders_path
-
+import yaml
+from core.utils.path_utils import spiders_config_path
+from core.models.spiders_config_models import BaseConfig, PlatformConfig
 
-class ConfigLoader:
+class SpiderConfig:
     _config = None
-    _config_path = config_spiders_path
+    _config_path = spiders_config_path
 
     @classmethod
     def _load_yaml(cls):
@@ -16,7 +17,7 @@ class ConfigLoader:
             cls._config = yaml.safe_load(f)
 
     @classmethod
-    def get_platform_config(cls, platform: str) -> dict:
+    def get_platform_config(cls, classname: str) -> PlatformConfig:
         """
         获取平台配置,并拼接完整 URL
         支持类方法调用 + 单次加载配置
@@ -24,10 +25,10 @@ class ConfigLoader:
         if cls._config is None:
             cls._load_yaml()
 
-        if platform not in cls._config:
-            raise ValueError(f"[配置错误] 未找到平台配置: {platform}")
+        if classname not in cls._config:
+            raise ValueError(f"[配置错误] 未找到平台配置: {classname}")
 
-        platform_config = cls._config.get(platform, {})
+        platform_config = cls._config.get(classname, {})
         base_config = cls._config.get("default", {})
 
         # 合并配置:平台配置覆盖默认配置
@@ -37,10 +38,13 @@ class ConfigLoader:
         if "url" not in merged and "base_url" in merged and "path" in merged:
             merged["url"] = urljoin(merged["base_url"], merged["path"])
 
-        return merged
-
+        # 使用 pydantic 进行验证
+        try:
+            return PlatformConfig(**merged)
+        except ValueError as e:
+            raise ValueError(f"[配置错误] 平台 {classname} 的配置验证失败: {e}")
 
 # 示例使用
 if __name__ == '__main__':
-    config = ConfigLoader.get_platform_config("benshanzhufu")
-    print(config)
+    config = SpiderConfig.get_platform_config("benshanzhufurecommend")
+    print(config)

+ 1 - 0
core/utils/trace_utils.py

@@ -1,5 +1,6 @@
 import time
 import uuid
 
+
 def generate_trace_id():
     return f"{uuid.uuid4().hex[:8]}-{int(time.time() * 1000)}"

+ 14 - 1
main.py

@@ -2,13 +2,15 @@ import time
 from multiprocessing import Process, cpu_count
 from typing import Dict
 
+from core.utils.log.logger_manager import LoggerManager
 from scheduler.process_manager import split_topics, start_worker_process
 from spiders.spider_registry import SPIDER_CLASS_MAP
-from core.utils.log.logger_manager import LoggerManager
+
 
 
 def main():
     logger = LoggerManager.get_logger()
+
     aliyun_log = LoggerManager.get_aliyun_logger()
     """
     主调度入口:
@@ -39,6 +41,17 @@ def main():
             for group_id, p in list(process_map.items()):
                 if not p.is_alive():
                     logger.warning(f"[监控] 进程 {p.name} PID={p.pid} 已崩溃,正在重启...")
+                    # 上报阿里云日志
+                    aliyun_log.logging(
+                        code="1501",
+                        message=f"[主进程监控] 子进程 {p.name} 崩溃重启",
+                        data={
+                            "pid": p.pid,
+                            "exitcode": p.exitcode,
+                            "group_id": group_id,
+                            "topics": topic_groups[group_id]
+                        }
+                    )
                     time.sleep(2)
                     start_worker_process(group_id, topic_groups[group_id], process_map)
     except KeyboardInterrupt:

+ 56 - 10
scheduler/async_consumer.py

@@ -1,14 +1,16 @@
+import asyncio
 import json
 import traceback
 from typing import List
-import asyncio
+import signal
 
 from core.utils.log.logger_manager import LoggerManager
 from core.utils.trace_utils import generate_trace_id
 from services.async_mysql_service import AsyncMysqlService
 from spiders.spider_registry import get_spider_class
 
-async def async_handle_topic(topic: str):
+
+async def async_handle_topic(topic: str,stop_event: asyncio.Event):
     """
     单个 topic 的消费逻辑,运行在协程中:
     - 从 MQ 中消费消息;
@@ -30,6 +32,7 @@ async def async_handle_topic(topic: str):
             payload = json.loads(message.message_body)
             task_id = payload["id"]
 
+            logger.info(f"{trace_id} - 接收到任务消息: {task_id}")
             async with AsyncMysqlService("system", "crawler") as mysql:
                 user_list = await mysql.get_user_list(task_id)
                 rule_dict = await mysql.get_rule_dict(task_id)
@@ -45,7 +48,19 @@ async def async_handle_topic(topic: str):
             # ack 由 run 成功后执行
             await consumer.ack_message(message.receipt_handle)
 
+            logger.info(f"{trace_id} - 任务 {task_id} 执行成功并已 Ack")
+            aliyun_logger.logging(
+                code="2000",
+                message="任务执行成功",
+                trace_id=trace_id,
+                data={
+                    "task_id": task_id,
+                    "topic": topic
+                }
+            )
+
         except Exception as e:
+            logger.error(f"{trace_id} - 任务处理失败: {e} /n traceback.format_exc()")
             aliyun_logger.logging(
                 code="9001",
                 message=f"处理消息失败: {str(e)}",
@@ -56,18 +71,49 @@ async def async_handle_topic(topic: str):
                     "message_body": message.message_body
                 }
             )
-    # 消费循环启动
-    await consumer.run_forever(handle_single_message)
+        # 自动重启消费循环
+        while not stop_event.is_set():
+            try:
+                await consumer.run_forever(handle_single_message)
+            except Exception as e:
+                aliyun_logger.logging(
+                    code="9002",
+                    message=f"{topic} 消费循环异常即将重启: {str(e)}",
+                    data={
+                        "error_type": type(e).__name__,
+                        "stack_trace": traceback.format_exc(),
+                    }
+                )
+                logger.warning(f"[{topic}] 消费循环异常: {e},5秒后重启")
+                await asyncio.sleep(5)
 
 
 async def run_all_topics(topics: List[str]):
-    """
-    启动当前进程中所有 topic 的协程监听任务。
-    初始化全局 AsyncMysqlService 实例。
-    """
+    stop_event = asyncio.Event()
+    loop = asyncio.get_running_loop()
+
+    def shutdown():
+        print("[系统] 收到停止信号,准备优雅退出...")
+        stop_event.set()
+
+    for sig in [signal.SIGINT, signal.SIGTERM]:
+        loop.add_signal_handler(sig, shutdown)
+
+    tasks = [asyncio.create_task(async_handle_topic(topic, stop_event)) for topic in topics]
+
+    await stop_event.wait()  # 等待停止信号
+
+    print("[系统] 正在取消所有消费任务...")
+    for task in tasks:
+        task.cancel()
+
+    results = await asyncio.gather(*tasks, return_exceptions=True)
+
+    for idx, result in enumerate(results):
+        if isinstance(result, Exception):
+            print(f"[系统] 任务 {topics[idx]} 异常退出: {result}")
 
-    tasks = [asyncio.create_task(async_handle_topic(topic)) for topic in topics]
-    await asyncio.gather(*tasks)
+    print("[系统] 所有任务已退出,进程关闭")
 
 
 def handle_topic_group(topics: List[str]):

+ 3 - 1
scheduler/process_manager.py

@@ -1,6 +1,8 @@
 import multiprocessing
 from typing import List, Dict
+
 from core.utils.log.logger_manager import LoggerManager
+
 logger = LoggerManager.get_logger()
 aliyun_log = LoggerManager.get_aliyun_logger()
 
@@ -21,4 +23,4 @@ def start_worker_process(group_id: int, topic_group: List[str], process_map: Dic
     )
     p.start()
     process_map[group_id] = p
-    logger.info(f"[主进程] 启动进程 PID={p.pid} 处理 topics={topic_group}")
+    logger.info(f"[主进程] 启动进程 PID={p.pid} 处理 topics={topic_group}")

+ 16 - 15
services/async_mysql_service.py

@@ -1,11 +1,12 @@
 import asyncio
 import json
-import os
 import logging
 from typing import List, Optional, Dict, Any, Tuple
+
+from config import settings
 from core.base.async_mysql_client import AsyncMySQLClient
 from core.utils.log.logger_manager import LoggerManager
-from config import settings
+
 logger = logging.getLogger(__name__)
 
 
@@ -20,7 +21,7 @@ class AsyncMysqlService:
     - 完善的错误处理和日志记录
     """
 
-    # 存储不同配置的单例实例,键为(platform, mode)元组
+    # 存储不同配置的单例实例,键为(classname, mode)元组
     _instances: Dict[Tuple[str, str], "AsyncMysqlService"] = {}
 
     def __new__(cls, platform: Optional[str] = None, mode: Optional[str] = None):
@@ -75,16 +76,15 @@ class AsyncMysqlService:
             minsize=1,
             maxsize=10
         )
-        self.logger.info(f"创建数据库服务实例: platform={platform}, mode={mode}")
+        self.logger.info(f"创建数据库服务实例: classname={platform}, mode={mode}")
 
-    # 以下方法与原实现一致,未修改
     async def __aenter__(self):
         """支持async with上下文,初始化连接池"""
         if not self._pool_initialized:
             try:
                 await self._client.init_pool()
                 self._pool_initialized = True
-                self.logger.info(f"连接池初始化成功: platform={self._platform}, mode={self._mode}")
+                self.logger.info(f"连接池初始化成功: classname={self._platform}, mode={self._mode}")
             except Exception as e:
                 self.logger.error(f"连接池初始化失败: {str(e)}")
                 raise
@@ -96,7 +96,7 @@ class AsyncMysqlService:
             try:
                 await self._client.close()
                 self._pool_initialized = False
-                self.logger.info(f"连接池已关闭: platform={self._platform}, mode={self._mode}")
+                self.logger.info(f"连接池已关闭: classname={self._platform}, mode={self._mode}")
             except Exception as e:
                 self.logger.warning(f"连接池关闭失败: {str(e)}")
 
@@ -162,13 +162,13 @@ class AsyncMysqlService:
 
     async def get_today_videos(self) -> int:
         sql = """
-            SELECT COUNT(*) as cnt
-            FROM crawler_video 
-            WHERE DATE(create_time) = CURDATE()
-              AND platform = %s 
-              AND strategy = %s
-        """
-        self.logger.info(f"查询今日视频数量: platform={self.platform}, strategy={self.mode}")
+              SELECT COUNT(*) as cnt
+              FROM crawler_video
+              WHERE DATE (create_time) = CURDATE()
+                AND classname = %s
+                AND strategy = %s \
+              """
+        self.logger.info(f"查询今日视频数量: classname={self.platform}, strategy={self.mode}")
         result = await self.fetch_one(sql, [self.platform, self.mode])
         return result["cnt"] if result else 0
 
@@ -202,5 +202,6 @@ async def demo_usage():
     finally:
         await service.__aexit__(None, None, None)
 
+
 if __name__ == '__main__':
-    asyncio.run(demo_usage())
+    asyncio.run(demo_usage())

+ 0 - 3
services/clean_title.py

@@ -1,6 +1,3 @@
-import asyncio
-
-
 async def clean_title(strings):
     return (
         strings.strip()

+ 14 - 9
services/pipeline.py

@@ -1,12 +1,12 @@
+import os
 import re
 import sys
-import os
 import time
 from datetime import datetime
 
 from core.utils.feishu.feishu_utils import FeishuUtils
-from services.async_mysql_service import AsyncMysqlService
 from core.utils.log.logger_manager import LoggerManager
+from services.async_mysql_service import AsyncMysqlService
 
 sys.path.append(os.getcwd())
 
@@ -137,7 +137,7 @@ class PiaoQuanPipeline:
                 sql = f"""
                 SELECT create_time 
                 FROM crawler_video 
-                WHERE platform = %s AND out_video_id = %s 
+                WHERE classname = %s AND out_video_id = %s 
                   AND create_time >= DATE_SUB(NOW(), INTERVAL %s DAY)
                 """
                 rows = await db.client.fetch_all(sql, [self.platform, out_id, int(day_count)])
@@ -163,15 +163,20 @@ class PiaoQuanPipeline:
             # 标题去重逻辑(示例)
             if self.platform == "zhufuhaoyunbaofu" and self.mode == "recommend":
                 sql = """
-                    SELECT 1 FROM crawler_video 
-                    WHERE platform = %s AND out_video_id = %s AND video_title = %s
-                """
+                      SELECT 1
+                      FROM crawler_video
+                      WHERE classname = %s
+                        AND out_video_id = %s
+                        AND video_title = %s \
+                      """
                 result = await db.client.fetch_one(sql, [self.platform, out_id, title])
             else:
                 sql = """
-                    SELECT 1 FROM crawler_video 
-                    WHERE platform = %s AND out_video_id = %s
-                """
+                      SELECT 1
+                      FROM crawler_video
+                      WHERE classname = %s
+                        AND out_video_id = %s \
+                      """
                 result = await db.client.fetch_one(sql, [self.platform, out_id])
 
             if result:

+ 7 - 8
services/rocketmq_consumer.py

@@ -1,8 +1,9 @@
 import asyncio
 from typing import List, Optional, Callable
+
 from mq_http_sdk.mq_client import MQClient
-from mq_http_sdk.mq_exception import MQExceptionBase
 from mq_http_sdk.mq_consumer import Message
+from mq_http_sdk.mq_exception import MQExceptionBase
 
 from config import settings
 
@@ -16,11 +17,11 @@ class AsyncRocketMQConsumer:
     """
 
     def __init__(
-        self,
-        topic_name: Optional[str],
-        group_id: Optional[str],
-        wait_seconds: Optional[int] = None,
-        batch: Optional[int] = None,
+            self,
+            topic_name: Optional[str],
+            group_id: Optional[str],
+            wait_seconds: Optional[int] = None,
+            batch: Optional[int] = None,
     ):
         # 从环境变量读取配置
         self.endpoint = settings.ROCKETMQ_ENDPOINT
@@ -74,5 +75,3 @@ class AsyncRocketMQConsumer:
             except Exception as e:
                 print(f"[拉取失败] {e}")
                 await asyncio.sleep(2)
-
-

+ 51 - 132
spiders/base_spider.py

@@ -1,20 +1,20 @@
 import asyncio
 import random
+import time
 import traceback
 import uuid
-
-import aiohttp
 from abc import ABC
 from typing import List, Dict, Optional
-import time
-from core.utils.log.logger_manager import LoggerManager
-from services.pipeline import PiaoQuanPipeline
-from core.utils.extractors import safe_extract
-from core.utils.config_loader import ConfigLoader
-from services.async_mysql_service import AsyncMysqlService
-from core.models.video_item import VideoItem
 
+import aiohttp
 
+from core.models.video_item import VideoItem
+from core.utils.spider_config import SpiderConfig
+from core.utils.extractors import safe_extract, extract_fields
+from core.utils.log.logger_manager import LoggerManager
+from services.async_mysql_service import AsyncMysqlService
+from services.pipeline import PiaoQuanPipeline
+from core.base.async_request_client import AsyncRequestClient
 
 class BaseSpider(ABC):
     """
@@ -32,13 +32,13 @@ class BaseSpider(ABC):
         self.class_name = self.__class__.__name__  # 获取子类类名
 
         # 根据类名自动获取配置
-        self.platform_config = ConfigLoader.get_platform_config(platform=str(self.class_name.lower()))
+        self.platform_config = SpiderConfig.get_platform_config(classname=str(self.class_name.lower()))
         if not self.platform_config:
             raise ValueError(f"找不到对应配置: {self.class_name}")
 
         # 初始化日志和MQ
-        self.platform = self.platform_config.get("platform")
-        self.mode = self.platform_config.get("mode")
+        self.platform = self.platform_config.platform
+        self.mode = self.platform_config.mode
         self.logger = LoggerManager.get_logger(platform=self.platform, mode=self.mode)
 
         self.logger.info(f"{trace_id}--初始化爬虫类: {self.class_name}")
@@ -46,90 +46,40 @@ class BaseSpider(ABC):
         self.mq = MQ(topic_name=f"topic_crawler_etl_{env}")
 
         # 请求配置
-        self.method = self.platform_config.get("method", "GET").upper()
-        self.url = self.platform_config.get("url")
-        self.headers = self.platform_config.get("headers", {})
-        self.body = self.platform_config.get("request_body", {})
-        self.field_map = self.platform_config.get("response_parse", {}).get("fields", {})
-        self.data_path = self.platform_config.get("response_parse", {}).get("data_path")
-        self.video_fields_map = self.platform_config.get("video_fields_map", {})
+        self.method = self.platform_config.method.upper()
+        self.url = self.platform_config.url
+        self.headers = self.platform_config.headers
+        self.body = self.platform_config.request_body
+
+
+        self.response =self.platform_config.response_parse
+        self.field_map =  self.response.get("fields", {})
+        self.data_path =  self.response.get("data_path")
+
 
         # 流程控制配置
-        self.loop_times = self.platform_config.get("loop_times", 1)  # 循环次数
-        self.loop_interval = self.platform_config.get("loop_interval", 0)  # 循环间隔(秒)
+        self.loop_times = self.platform_config.loop_times  # 循环次数
+        self.loop_interval = self.platform_config.loop_interval  # 循环间隔(秒)
 
         self.db_service = AsyncMysqlService(platform=self.platform, mode=self.mode)
+        self.request_client = AsyncRequestClient(logger=self.logger)
 
         self.logger.info(
             f"{self.trace_id}--配置: 循环{self.loop_times}次,间隔{self.loop_interval}秒")
 
         self.session = None
 
-    async def _send_async_request(self, session: aiohttp.ClientSession, method: str, url: str,
-                                  **kwargs) -> aiohttp.ClientResponse:
-        """
-        使用提供的 session 发送异步HTTP请求,支持重试机制
-        """
-        retries = 0
-        self.logger.info(f"{self.trace_id}--请求准备: {method} {url}, 参数: {kwargs}")
-
-        while retries < self.MAX_RETRIES:
-            try:
-                async with session.request(method, url, **kwargs) as response:
-                    response.raise_for_status()
-                    self.logger.info(f"{self.trace_id}--请求成功: {response.status}")
-                    return await response.json()
-            except Exception as e:
-                retries += 1
-                remaining_attempts = self.MAX_RETRIES - retries
-
-                if retries < self.MAX_RETRIES:
-                    self.logger.warning(
-                        f"{self.trace_id}--请求失败 (尝试 {retries}/{self.MAX_RETRIES}): {e}. "
-                        f"剩余尝试次数: {remaining_attempts}"
-                    )
-                    await asyncio.sleep(1)
-                else:
-                    self.aliyun_logr.logging(
-                        code="5001",
-                        message="请求失败,已达到最大重试次数",
-                        data={
-                            "url": url,
-                            "method": method,
-                            "error": str(e),
-                            "headers": kwargs.get("headers", {}),
-                            "body": kwargs.get("json", {})
-                        },
-                        trace_id=self.trace_id
-                    )
-                    self.logger.error(f"{self.trace_id}--请求失败,已达到最大重试次数: {e}")
-                    raise
-
-    async def crawl_data(self) -> Optional[List[Dict]]:
-        """异步获取视频数据"""
-        self.logger.info(f"{self.trace_id}--开始获取视频数据")
-        try:
-            response = await self._send_async_request(
-                session=self.session,
-                method=self.method,
-                url=self.url,
-                headers=self.headers,
-                json=self.body
-            )
-            self.logger.debug(f"{self.trace_id}--响应结果: {response}")
-
-            data = safe_extract(response, self.data_path)
-
-            if not data:
-                self.logger.warning(f"{self.trace_id}--未获取到数据,路径: {self.data_path}")
-                return []
 
-            self.logger.info(f"{self.trace_id}--成功获取{len(data)}条视频数据")
-            return data
-        except Exception as e:
-            traceback.extract_stack()
-            self.logger.exception(f"{self.trace_id}--获取视频数据失败: {e}")
-            return []
+    async def crawl_data(self,session) -> Optional[List[Dict]]:
+        response = await self.request_client.request(
+            session=session,
+            method=self.method,
+            url=self.url,
+            headers=self.headers,
+            json=self.body
+        )
+        data = safe_extract(response, self.data_path)
+        return data if data else []
 
     async def filter_data(self, video: Dict) -> bool:
         """校验视频是否符合规则"""
@@ -143,7 +93,6 @@ class BaseSpider(ABC):
         )
         return await pipeline.process_item()
 
-
     async def is_video_count_sufficient(self) -> bool:
         """
         校验视频是否达到当日最大量
@@ -167,41 +116,21 @@ class BaseSpider(ABC):
         """
         self.logger.debug(f"{self.trace_id}--开始处理视频: {video.get('title', '无标题')}")
         publish_user = random.choice(self.user_list)
-        try:
-            # 从 field_map 中动态构建 VideoItem 初始化参数
-            item_kwargs = {}
-            for field, path in self.field_map.items():
-                if not isinstance(path, str) or not path.startswith("$"):
-                    item_kwargs[field] = path
-                    continue
-
-                value = safe_extract(video, path)
-                if value is None:
-                    self.logger.warning(f"{self.trace_id}--字段提取失败: {field} 路径: {path}")
-                    continue
-                item_kwargs[field] = value
-
-            item_kwargs["user_id"] = publish_user["uid"]
-            item_kwargs["user_name"] = publish_user["nick_name"]
-            # 手动注入 platform 与 strategy
-            item_kwargs["platform"] = self.platform
-            item_kwargs["strategy"] = self.mode
-
-
-            try:
-                item = VideoItem(**item_kwargs)
-            except Exception as e:
-                self.logger.warning(f"{self.trace_id}--VideoItem 初始化失败: {e}, 数据: {item_kwargs}")
-                return None
+        item_kwargs = extract_fields(video, self.field_map, logger=self.logger, trace_id=self.trace_id)
+        item_kwargs["user_id"] = publish_user["uid"]
+        item_kwargs["user_name"] = publish_user["nick_name"]
+        item_kwargs["platform"] = self.platform
+        item_kwargs["strategy"] = self.mode
 
+        try:
+            item = VideoItem(**item_kwargs)
             video_dict = await item.produce_item()
             if not video_dict:
-                self.logger.warning(f"{self.trace_id}--VideoItem 校验失败")
+                self.logger.warning(f"{self.trace_id} 校验失败")
                 return None
             return video_dict
-
         except Exception as e:
-            self.logger.exception(f"{self.trace_id}--视频处理异常: {e}")
+            self.logger.error(f"{self.trace_id} VideoItem 初始化失败: {e}")
             return None
 
     async def push_to_etl(self, item: Dict) -> bool:
@@ -226,7 +155,7 @@ class BaseSpider(ABC):
         查询每天的爬虫爬取到的视频数量
         :return:
         """
-        video_count = self.db_service.get_today_videos()
+        video_count = await self.db_service.get_today_videos()
         return video_count
 
     async def integrated_video_handling(self):
@@ -235,6 +164,7 @@ class BaseSpider(ABC):
         :return:
         """
         pass
+
     async def run(self):
         """
         异步运行爬虫任务,严格按顺序执行
@@ -245,27 +175,16 @@ class BaseSpider(ABC):
         5. 推送到ETL
         """
         try:
-            self.logger.info(f"{self.trace_id}--[{self.platform}] 开始执行爬虫任务")
-            total_success = 0
-            total_failed = 0
-
-            async with aiohttp.ClientSession(
-                    timeout=aiohttp.ClientTimeout(total=self.TIMEOUT)
-            ) as session:  # 上下文管理
-                self.session = session
-
+            total_success,total_failed= 0,0
+            loop_start_time = time.time()
+            async with aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=self.TIMEOUT)) as session:
                 for loop_index in range(1, self.loop_times + 1):
-                    if not await self.is_video_count_sufficient():
-                        return
                     self.logger.info(f"{self.trace_id}--步骤1: 开始第 {loop_index}/{self.loop_times} 次循环请求")
-                    loop_start_time = time.time()
-
-                    video_list = await self.crawl_data()
+                    video_list = await self.crawl_data(session)
                     if not video_list:
                         self.logger.warning(f"{self.trace_id}--未获取到视频数据,跳过当前循环")
                         await self._wait_for_next_loop(loop_index)
                         continue
-
                     success_count = 0
                     fail_count = 0
 
@@ -302,7 +221,7 @@ class BaseSpider(ABC):
                     message="爬虫执行指标汇总",
                     data={
                         "trace_id": self.trace_id,
-                        "platform": self.platform,
+                        "classname": self.platform,
                         "success_count": total_success,
                         "fail_count": total_failed
                     },

+ 11 - 1
spiders/benshanzhufu_recommend.py

@@ -1,4 +1,5 @@
 import asyncio
+
 from spiders.base_spider import BaseSpider
 
 
@@ -9,7 +10,16 @@ class BenshanzhufuRecommend(BaseSpider):
 
 async def main():
     rule_dict = {}
-    user_list = [{'uid': 20631262, 'link': 'recommend_2060', 'nick_name': '人老心不老'}, {'uid': 20631263, 'link': 'recommend_2061', 'nick_name': '荷花朵朵'}, {'uid': 20631264, 'link': 'recommend_2062', 'nick_name': '战友情'}, {'uid': 20631265, 'link': 'recommend_2063', 'nick_name': '闲人老李'}, {'uid': 20631266, 'link': 'recommend_2064', 'nick_name': '盛世白莲'}, {'uid': 20631267, 'link': 'recommend_2065', 'nick_name': '星星点灯'}, {'uid': 20631268, 'link': 'recommend_2066', 'nick_name': '老同学'}, {'uid': 20631269, 'link': 'recommend_2067', 'nick_name': '赤子之心'}, {'uid': 20631271, 'link': 'recommend_2068', 'nick_name': '缘分'}, {'uid': 20631272, 'link': 'recommend_2069', 'nick_name': '欢度余生'}]
+    user_list = [{'uid': 20631262, 'link': 'recommend_2060', 'nick_name': '人老心不老'},
+                 {'uid': 20631263, 'link': 'recommend_2061', 'nick_name': '荷花朵朵'},
+                 {'uid': 20631264, 'link': 'recommend_2062', 'nick_name': '战友情'},
+                 {'uid': 20631265, 'link': 'recommend_2063', 'nick_name': '闲人老李'},
+                 {'uid': 20631266, 'link': 'recommend_2064', 'nick_name': '盛世白莲'},
+                 {'uid': 20631267, 'link': 'recommend_2065', 'nick_name': '星星点灯'},
+                 {'uid': 20631268, 'link': 'recommend_2066', 'nick_name': '老同学'},
+                 {'uid': 20631269, 'link': 'recommend_2067', 'nick_name': '赤子之心'},
+                 {'uid': 20631271, 'link': 'recommend_2068', 'nick_name': '缘分'},
+                 {'uid': 20631272, 'link': 'recommend_2069', 'nick_name': '欢度余生'}]
     trace_id = "1321"
     bszf = BenshanzhufuRecommend(rule_dict, user_list, trace_id)
     await bszf.run()

+ 4 - 3
spiders/spider_registry.py

@@ -1,9 +1,10 @@
 # spider_registry.py完整注释版
 """爬虫注册表模块:维护topic到爬虫类的映射关系"""
 
-from spiders.benshanzhufu_recommend import BenshanzhufuRecommend
-from spiders.base_spider import BaseSpider
 from core.utils.log.logger_manager import LoggerManager
+from spiders.base_spider import BaseSpider
+from spiders.benshanzhufu_recommend import BenshanzhufuRecommend
+
 logger = LoggerManager.get_logger()
 aliyun_log = LoggerManager.get_aliyun_logger()
 
@@ -44,4 +45,4 @@ def get_spider_class(topic: str):
 
 def list_registered_topics():
     """获取所有已注册的topic列表"""
-    return list(SPIDER_CLASS_MAP.keys())
+    return list(SPIDER_CLASS_MAP.keys())

+ 13 - 9
spiders/universal_crawler.py

@@ -1,19 +1,20 @@
 import random
 import time
 import uuid
-import requests
 from typing import Dict, List, Optional
-from tenacity import retry, stop_after_attempt, wait_fixed, retry_if_exception_type, RetryCallState
 
+import requests
 from application.config.common import MQ
-from config.config import base_url
 from application.functions import MysqlService
 from application.items import VideoItem
 from application.pipeline import PiaoQuanPipeline
-from core.utils import safe_extract
+from config.config import base_url
+from tenacity import retry, stop_after_attempt, wait_fixed, retry_if_exception_type, RetryCallState
 
+from core.utils import safe_extract
 from spiders.base_spider import BaseSpider  # 抽象基类导入
 
+
 def before_send_log(retry_state: RetryCallState) -> None:
     attempt = retry_state.attempt_number
     last_result = retry_state.outcome
@@ -41,8 +42,10 @@ class UniversalCrawler(BaseSpider):
         self.response_data_path = self.platform_config["response_parse"]["data_path"]
         self.video_fields_map = self.platform_config["response_parse"]["fields"]
 
-    @retry(stop=stop_after_attempt(3), wait=wait_fixed(2), retry=retry_if_exception_type((requests.RequestException, ValueError)), before=before_send_log)
-    def _send_request(self, url: str, method: str = None, headers: Dict = None, payload: Dict = None, timeout: int = 30) -> Optional[Dict]:
+    @retry(stop=stop_after_attempt(3), wait=wait_fixed(2),
+           retry=retry_if_exception_type((requests.RequestException, ValueError)), before=before_send_log)
+    def _send_request(self, url: str, method: str = None, headers: Dict = None, payload: Dict = None,
+                      timeout: int = 30) -> Optional[Dict]:
         method = method or self.request_method
         headers = headers or self.request_headers
         payload = payload or self.request_body
@@ -88,7 +91,7 @@ class UniversalCrawler(BaseSpider):
             val = safe_extract(video, path) if isinstance(path, str) and path.startswith("$") else path
             item.add_video_info(field, val)
 
-        item.add_video_info("platform", self.platform)
+        item.add_video_info("classname", self.platform)
         item.add_video_info("strategy", self.mode)
         item.add_video_info("session", f"{self.platform}-{int(time.time())}")
         user = random.choice(self.user_list)
@@ -113,6 +116,7 @@ class UniversalCrawler(BaseSpider):
             self.aliyun_logr.logging(code="1002", message="成功发送至ETL", data=item, trace_id=self.trace_id)
             if self.download_cnt >= self.download_min_limit:
                 self.has_enough_videos = True
-                self.aliyun_logr.logging(code="2000", message=f"达到下载限制: {self.download_min_limit}", trace_id=self.trace_id)
+                self.aliyun_logr.logging(code="2000", message=f"达到下载限制: {self.download_min_limit}",
+                                         trace_id=self.trace_id)
             return True
-        return False
+        return False

+ 2 - 5
tests/test1.py

@@ -1,7 +1,4 @@
-import asyncio
-import time
-
-topics = [1,2,3,4,5,6,7,8,9,10,11,12,13,14,15]
+topics = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15]
 num_groups = 4
 
-print([topics[i::num_groups] for i in range(num_groups)])
+print([topics[i::num_groups] for i in range(num_groups)])

+ 0 - 3
tests/test_config.py

@@ -1,7 +1,4 @@
 from config import settings
-from services.async_mysql_service import AsyncMysqlService
-from services.rocketmq_consumer import AsyncRocketMQConsumer
 
 print("=== 配置验证 ===")
 print("DB连接:", settings.database_url)
-

+ 3 - 2
tests/test_video_item.py

@@ -1,8 +1,9 @@
 import asyncio
 import time
-from core.models.video_item import VideoItem  # 你的 Pydantic 模型路径
 from pprint import pprint
 
+from core.models.video_item import VideoItem  # 你的 Pydantic 模型路径
+
 
 async def main():
     fake_video_data = {
@@ -15,7 +16,7 @@ async def main():
         "video_title": "   测试 视频 标题!!!",
         "publish_time_stamp": int(time.time()) - 86400,  # 昨天
         "strategy": "recommend",
-        "platform": "test_platform"
+        "classname": "test_platform"
     }
 
     item = VideoItem(**fake_video_data)