zhangliang пре 1 недеља
родитељ
комит
74cbd0b5dd

+ 3 - 2
README.md

@@ -20,7 +20,7 @@
 │   │   ├── aliyun_logger.py  # 阿里云SLS日志适配器
 │   │   └── local_logger.py   # 本地文件日志(按天滚动)
 │   └── spider/            # 爬虫核心组件
-│       ├── base_spider.py    # 爬虫基类(定义run/parse等抽象方法)
+│       ├── basespider.py    # 爬虫基类(定义run/parse等抽象方法)
 │       ├── registry.py       # 爬虫注册中心(动态加载子类)
 │       └── pipeline.py       # 数据处理流水线(清洗/去重/存储)
 ├── spiders/               # 业务爬虫实现
@@ -144,4 +144,5 @@ benshanzhufu:
 - 每条消息创建一个 UniversalCrawler 实例,执行 `.run()`,完成后再 ACK
 - 失败或超时不会阻塞其他任务
 
-pip freeze > requirements.txt
+pip freeze > requirements.txt
+pip install -r requirements.txt

+ 37 - 0
config/spiders_config.yaml

@@ -57,3 +57,40 @@ yuannifuqimanmanrecommend:
       video_url: "$.video_url"
       out_video_id: "$.nid"
 
+xiaoniangaoauthor:
+  platform: xiaoniangao
+  mode: author
+  path: /crawler/xiao_nian_gao_plus/blogger
+  method: post
+  request_body:
+      cursor: "{{next_cursor}}"
+      account_id: "{{uid}}" # 数据库的uid
+  loop_times: 100
+  loop_interval:
+    min: 30
+    max: 60
+  feishu_sheetid: "golXy9"
+  response_parse:
+    uid: "$.uid" # 数据库的uid
+    next_cursor: "$.cursor"
+    data: "$.data"
+    has_more: "$.data.has_more"
+    data_path: "$.data.data"
+    fields:
+      video_title: "$.title"
+      duration: "$.du"
+      play_cnt: "$.play_pv"
+      like_cnt: "$.favor.total"
+      comment_cnt: "$.comment_count"
+      share_cnt: "$.share"
+      width: "$.w"
+      height: "$.h"
+      avatar_url: "$.user.hurl"
+      cover_url: "$.url"
+      video_url: "$.v_url"
+      out_user_id: "$.user.mid"
+      out_video_id: "$.vid"
+
+
+
+

+ 20 - 6
core/base/async_request_client.py

@@ -42,12 +42,25 @@ class AsyncRequestClient:
                     if resp.get('code') != 0:
                         retries += 1
                         if self.logger:
-                            self.logger.info(f"响应 {resp}, 重试 {retries}/{self.max_retries}")
+                            self.logger.info(f"{url} 响应 {resp}, 重试 {retries}/{self.max_retries}")
+                        if retries >= self.max_retries:
+                            error_msg = f"请求响应code非0且达到最大重试次数 {self.max_retries}"
+                            if self.logger:
+                                self.logger.error(error_msg)
+                            if self.aliyun_log:
+                                self.aliyun_log.logging(
+                                    code="9006",
+                                    message=error_msg,
+                                    data={
+                                        "url": url,
+                                        "method": method,
+                                        "requestBody": kwargs,
+                                        "response": resp
+                                    }
+                                )
                         await asyncio.sleep(5)
                         continue
-                    self.logger.info(f"响应: {resp}")
-
-
+                    self.logger.info(f"{url} 响应: {resp}")
                     return resp
             except Exception as e:
                 retries += 1
@@ -57,8 +70,9 @@ class AsyncRequestClient:
                     if self.aliyun_log:
                         self.aliyun_log.logging(
                             code="9006",
-                            message=f"请求异常达到最大重试次数",
-                            data={"url": url,
+                            message="请求异常达到最大重试次数",
+                            data={
+                                  "url": url,
                                   "method": method,
                                   "requestBody": kwargs,
                                   "response": resp

+ 1 - 0
core/utils/log/log_codes.py

@@ -73,6 +73,7 @@ CODES = {
     "9022": "子进程内部异常",
     "9023": "请求返回code非0",
     "9024": "字段提取失败",
+    "9025": "接口返回为空",
 
     # 系统致命错误 (99xx)
     "9900": "数据库连接失败",

+ 29 - 5
core/utils/path_utils.py

@@ -1,16 +1,32 @@
 import os
+import sys
 
 
-def get_project_path() -> str:
+def find_project_root(marker: str = ".env") -> str:
     """
-    获取 AutoScraperX 项目根路径
-    支持从任何子模块中调用而不会路径错乱
+    查找项目根目录。它通过向上搜索一个标记文件(如 '.env'、'.git'、'pyproject.toml')来工作。
+    :param marker: 用于识别项目根目录的标记文件或目录名。
+    :return: 项目根目录的绝对路径。
+    :raises FileNotFoundError: 如果找不到任何标记文件,意味着项目结构可能不标准。
     """
-    return os.path.dirname(os.path.abspath(__file__)).split("AutoScraperX")[0] + "AutoScraperX"
+    current_path = os.path.abspath(__file__)  # 获取当前文件的绝对路径
+    while True:
+        if os.path.exists(os.path.join(current_path, marker)):
+            # 找到了标记文件,当前目录即为项目根目录
+            return current_path
+
+        parent_path = os.path.dirname(current_path)
+
+        if parent_path == current_path:
+            error_msg = f"项目根目录标记文件 '{marker}' 未在任何父目录中找到,从 {os.path.abspath(__file__)} 向上查找。"
+            raise FileNotFoundError(error_msg)
+        current_path = parent_path  # 继续向上级目录搜索
+
+
 
 
 # 项目根目录
-project_root = get_project_path()
+project_root = find_project_root()
 
 # 配置路径
 config_dir = os.path.join(project_root, "config")
@@ -18,6 +34,7 @@ spiders_config_path = os.path.join(config_dir, "spiders_config.yaml")
 
 # 日志路径
 log_dir = os.path.join(project_root, "logs")
+os.makedirs(log_dir, exist_ok=True)
 
 
 __all__ = [
@@ -25,4 +42,11 @@ __all__ = [
     "config_dir",
     "spiders_config_path",
     "log_dir",
+    "get_project_path"
 ]
+def get_project_path() -> str:
+    """
+    获取 AutoScraperX 项目根路径
+    支持从任何子模块中调用而不会路径错乱
+    """
+    return project_root

+ 9 - 0
services/async_mysql_service.py

@@ -160,6 +160,11 @@ class AsyncMysqlService:
         result = await self.fetch_one(sql, [self.platform, self.mode])
         return result["cnt"] if result else 0
 
+    async def get_xng_mid(self) -> int:
+        sql = """select DISTINCT(uid) from xng_uid ORDER BY `data_time` DESC limit 3;"""
+        result = await self.fetch_all(sql)
+        return result if result else 0
+
 
 # 全局便捷访问函数(支持None参数)
 async def get_db_service(platform: Optional[str] = None, mode: Optional[str] = None) -> AsyncMysqlService:
@@ -176,6 +181,10 @@ async def demo_usage():
         users = await default_service.get_user_list(8)
         print(f"系统配置用户数: {users}")
 
+    async with AsyncMysqlService() as default_service:
+        users = await default_service.get_xng_mid()
+        print(f"小年糕用户数: {users}")
+
     # 方式二:显式传入None
     async with AsyncMysqlService(None, None) as system_service:
         rule = await system_service.get_rule_dict(18)

+ 97 - 0
spiders/authorspider.py

@@ -0,0 +1,97 @@
+from basespider import BaseSpider
+from typing import Optional, List, Dict
+import aiohttp
+
+from core.utils.extractors import safe_extract
+
+
+class AuthorSpider(BaseSpider):
+    """账号模式爬虫:从用户列表爬取"""
+
+    def __init__(self, rule_dict: Dict, user_list: List, env: str = "prod"):
+        super().__init__(rule_dict, user_list, env)
+        # 账号模式特有状态
+        self.user_list_from_db = []  # 数据库用户列表
+        self.current_user_index = 0  # 当前用户索引
+        self.current_cursor = "" # 当前分页游标(初始为空)
+
+
+    async def before_run(self):
+        """运行前:获取用户列表"""
+        self.user_list_from_db = await self.fetch_user_list()
+        if not self.user_list_from_db:
+            self.logger.warning("用户列表为空,终止账号模式")
+        self.logger.info(f"{self.platform}获取用户列表完成,共 {len(self.user_list_from_db)} 个用户")
+    async def core_loop(self):
+        """核心循环:处理每个用户的视频"""
+        async with aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=self.timeout)) as session:
+            while self.current_user_index < len(self.user_list_from_db):
+                # 检查数量限制
+                if not await self.is_video_count_sufficient():
+                    return
+                # 当前用户
+                user = self.user_list_from_db[self.current_user_index]
+                user_uid = user.get("uid")  # 数据库中的uid字段
+                self.logger.info(
+                    f"处理用户 uid={user_uid}(第{self.current_user_index + 1}个),"
+                    f"当前cursor: {self.current_cursor or '0'}"
+                )
+
+                # 构建请求体:注入uid和cursor
+                request_body = self._build_request_body(user)
+
+                # 获取当前用户视频
+                hase_more,raw_data = await self.crawl_user_videos(session, request_body, user_uid)
+                if not hase_more:
+                    self.logger.info(f"用户 {user_uid} 第{int(self.current_cursor or 0) + 1}页无更多视频")
+                if not raw_data:
+                    # 切换到下一个用户
+                    self.current_user_index += 1
+                    continue
+                # 处理数据
+                await self.process_raw_data(raw_data)
+                if self.current_user_index == len(self.user_list_from_db)-1:
+                    self.current_cursor = str(int(self.current_cursor or 0) + 1)
+                    self.current_user_index = 0
+                    continue
+                self.current_user_index += 1
+                await self.wait()
+
+
+    def _build_request_body(self, user: Dict) -> Dict:
+        """构建请求体:将用户uid和当前cursor注入"""
+        # 准备"虚拟数据",键名对应你的配置路径($.uid 和 $.cursor)
+        virtual_data = {
+            "uid": str(user.get("uid")),  # 对应配置中的 $.uid
+            "cursor": self.current_cursor  # 对应配置中的 $.cursor
+        }
+
+        return self.request_preparer.prepare(
+            request_body_config=self.request_body_template,
+            response_data=virtual_data
+        )
+
+    async def fetch_user_list(self) -> List[Dict]:
+        """获取待爬取的用户列表(从数据库)"""
+        return []
+
+    async def crawl_user_videos(self, session, request_body: Dict, user_uid: str) -> Optional[List[Dict]]:
+        """请求用户视频接口"""
+        response = await self.request_client.request(
+            session=session,
+            method=self.method,
+            url=self.url,
+            headers=self.headers,
+            json=request_body
+        )
+        has_more = safe_extract(response,self.has_more)
+        # 解析用户视频列表
+        data_list = safe_extract(response, self.data_path)
+        if not data_list:
+            self.logger.info(f"用户 {user_uid} 第{self.current_cursor}页无视频数据")
+            return None, None
+        return has_more, data_list
+
+    async def fetch_detail(self, item: Dict) -> Dict:
+        """账号模式:补充视频详情(子类自行实现)"""
+        return item  # 默认返回原数据

+ 0 - 297
spiders/base_spider.py

@@ -1,297 +0,0 @@
-import asyncio
-import random
-import uuid
-from typing import List, Dict, Optional, Any
-
-import aiohttp
-
-from core.models.video_item import VideoItem
-from core.utils.helpers import generate_titles
-from core.utils.request_preparer import RequestPreparer
-from core.utils.spider_config import SpiderConfig
-from core.utils.extractors import safe_extract, extract_fields
-from core.utils.log.logger_manager import LoggerManager
-from services.async_mysql_service import AsyncMysqlService
-from services.pipeline import PiaoQuanPipeline
-from core.base.async_request_client import AsyncRequestClient
-from services.async_mq_producer import AsyncMQProducer
-
-
-class BaseSpider:
-    """
-    通用爬虫基类,支持:
-    - 依赖请求参数动态替换(cursor 或其它参数)
-    - 支持单请求和依赖请求的分页抓取
-    - 统一日志、MQ推送、异常捕获、异步请求
-    子类只需根据业务重写少量方法,如 process_video/process_item。
-    """
-
-    def __init__(self, rule_dict: Dict, user_list: List, env: str = "prod"):
-        self.rule_dict = rule_dict
-        self.user_list = user_list
-        self.env = env
-        self.class_name = self.__class__.__name__.lower()
-
-        # --- 1. 初始化核心组件 ---
-        self._setup_configuration()
-        self._setup_logging()
-        self._setup_services()
-        self._setup_state()
-
-    #  初始化辅助方法
-    def _setup_configuration(self):
-        """加载并设置爬虫的核心配置。"""
-        self.platform_config = SpiderConfig.get_platform_config(classname=self.class_name)
-        if not self.platform_config:
-            raise ValueError(f"找不到爬虫配置: {self.class_name}")
-
-        self.platform = self.platform_config.platform
-        self.mode = self.platform_config.mode
-        self.url = self.platform_config.url
-        self.method = self.platform_config.method.upper()
-        self.headers = self.platform_config.headers or {}
-
-        # 请求和解析相关的配置
-        self.request_body_template = self.platform_config.request_body or {}
-        self.response_parse_config = self.platform_config.response_parse or {}
-        self.data_path = self.response_parse_config.get("data_path")
-        # self.next_cursor_path = self.response_parse_config.get("next_cursor")
-        self.field_map = self.response_parse_config.get("fields", {})
-
-        # 爬取行为相关的配置
-        self.loop_times = self.platform_config.loop_times or 100
-        self.loop_interval = self.platform_config.loop_interval
-        self.timeout = self.platform_config.request_timeout or 30
-        self.max_retries = self.platform_config.max_retries or 3
-        self.feishu_sheetid = self.platform_config.feishu_sheetid
-
-    def _setup_logging(self):
-        """初始化日志记录器。"""
-        self.logger = LoggerManager.get_logger(platform=self.platform, mode=self.mode)
-        self.aliyun_log = LoggerManager.get_aliyun_logger(platform=self.platform, mode=self.mode)
-        self.logger.info(f"爬虫 '{self.platform}/{self.mode}' 初始化...")
-        self.logger.info(f"最大循环次数: {self.loop_times}, 循环间隔时间: {self.loop_interval}")
-
-    def _setup_services(self):
-        """初始化外部服务客户端。"""
-        self.request_client = AsyncRequestClient(logger=self.logger, aliyun_log=self.aliyun_log)
-        self.db_service = AsyncMysqlService(platform=self.platform, mode=self.mode)
-        self.mq_producer = AsyncMQProducer(topic_name="topic_crawler_etl_prod_v2", platform=self.platform,
-                                           mode=self.mode)
-
-    def _setup_state(self):
-        """初始化爬虫的内部状态。"""
-        self.last_response_data = {}
-        self.request_preparer = RequestPreparer(
-            response_parse_config=self.response_parse_config,
-            logger=self.logger,
-            aliyun_log=self.aliyun_log
-        )
-
-
-
-    async def run(self):
-        """ 爬虫主流程 """
-        await self.before_run()
-        total_success, total_fail = 0, 0
-        async with aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=self.timeout)) as session:
-            for loop_index in range(self.loop_times):
-                if not await self.is_video_count_sufficient():
-                    self.logger.info(f"视频抓取数量已达上限,停止爬取")
-                    return
-                succ, fail = await self.run_single_loop(session)
-                total_success += succ
-                total_fail += fail
-                await self._wait_for_next_loop(loop_index + 1)
-
-            self.logger.info(f"爬虫完成 成功:{total_success} 失败:{total_fail}")
-
-    async def run_single_loop(self, session) -> (int, int):
-        """
-        执行单轮的请求、解析和处理。
-        返回: (本轮成功处理的数量, 本轮失败处理的数量)
-        """
-        success_count, fail_count = 0, 0
-        try:
-            # 爬取数据
-            videos = await self.crawl_data(session)
-            if not videos:
-                self.logger.info(f"无数据返回,停止本轮")
-                return success_count, fail_count
-
-            for video in videos:
-                # 依赖接口请求
-                video_obj = await self.fetch_dependent_data(video)
-                res = await self.process_and_push_video(video_obj)
-                if res:
-                    success_count += 1
-                else:
-                    fail_count += 1
-            self.logger.info(f"接口返回<{len(videos)}>条视频,处理成功<{success_count}>条,处理失败:<{fail_count}>")
-            await self.after_run()
-
-        except Exception as e:
-            self.logger.exception(f"运行异常: {e}")
-
-        return success_count, fail_count
-
-    async def fetch_dependent_data(self, video: Dict) -> Dict:
-        """
-        可在子类重写以实现依赖请求,用返回结果补充原有 video。
-        默认不做处理。
-        """
-        return video
-
-    async def crawl_data(self, session) -> Optional[List[Dict]]:
-        """
-        请求接口,自动渲染动态参数,自动更新游标
-        支持单请求和多请求(分页)逻辑。
-        """
-        request_body = self.request_preparer.prepare(self.request_body_template,
-                                                     self.last_response_data)
-        # 发送请求
-        response = await self.request_client.request(
-            session=session,
-            method=self.method,
-            url=self.url,
-            headers=self.headers,
-            json = request_body
-        )
-
-        if not response:
-            self.logger.error(f"响应为空")
-            return
-
-        self.last_response_data = response
-        # 解析数据列表
-        data_list = safe_extract(response, self.data_path)
-        if not data_list:
-            self.logger.info(f"接口返回视频列表为空{response}")
-            self.aliyun_log.logging(
-                code="9021",
-                message="接口返回视频列表为空",
-                data= response
-            )
-            return
-
-        return data_list
-
-    async def process_and_push_video(self, video: Dict[str, Any]) -> bool:
-        """
-        数据处理完整流程(字段映射 -> 校验 -> 推送)
-        子类可重写 process_video 或 filter_data 来定制处理和校验逻辑
-        """
-        try:
-            # 字段映射
-            video_obj = await self.process_video(video)
-            if not video_obj:
-                return False
-
-            if not await self.filter_data(video_obj):
-                return False
-
-            await self.integrated_video_handling(video_obj)
-            pushed = await self.push_to_etl(video_obj)
-            return pushed
-        except Exception as e:
-            self.logger.exception(f"视频处理异常: {e}")
-            return False
-
-    async def process_video(self, video: Dict) -> Optional[Dict]:
-        """
-        统一字段抽取及 VideoItem 初始化
-        子类可重写或扩展以定制字段映射、过滤等
-        """
-        self.logger.debug(f"处理视频数据: {video.get('title', '无标题')}")
-        if self.user_list:
-            import random
-            publish_user = random.choice(self.user_list)
-        else:
-            self.logger.error(f"未获取到用户列表数据{self.user_list}")
-            return
-
-        item_kwargs = extract_fields(video, self.field_map, logger=self.logger,aliyun_log=self.aliyun_log)
-        item_kwargs.update({
-            "user_id": publish_user.get("uid"),
-            "user_name": publish_user.get("nick_name"),
-            "platform": self.platform,
-            "strategy": self.mode,
-        })
-
-        try:
-            item = VideoItem(**item_kwargs)
-            video_dict = await item.produce_item()
-            if not video_dict:
-                self.logger.warning(f"VideoItem 校验失败")
-                return None
-            return video_dict
-        except Exception as e:
-            self.logger.error(f"VideoItem 初始化失败: {e}")
-            return None
-
-    async def filter_data(self, video: Dict) -> bool:
-        """
-        数据校验过滤,默认使用 PiaoQuanPipeline
-        子类可重写此方法实现自定义过滤
-        """
-        pipeline = PiaoQuanPipeline(
-            platform=self.platform,
-            mode=self.mode,
-            rule_dict=self.rule_dict,
-            env=self.env,
-            item=video,
-            trace_id=self.platform + str(uuid.uuid1())
-        )
-        return await pipeline.process_item()
-
-    async def integrated_video_handling(self, video: Dict) -> None:
-        """
-        钩子函数:可在此实现自动生成标题或其他业务逻辑
-        """
-        await generate_titles(self.feishu_sheetid, video)
-
-    async def push_to_etl(self, video: Dict) -> bool:
-        """
-        推送消息到消息队列ETL
-        """
-        try:
-            await self.mq_producer.send_msg(video)
-            self.logger.info(f"成功推送视频至ETL")
-            return True
-        except Exception as e:
-            self.logger.exception(f"推送ETL失败: {e}")
-            return False
-
-    async def is_video_count_sufficient(self) -> bool:
-        """
-        判断当天抓取的视频是否已达到上限,达到则停止继续抓取
-        """
-        max_count = self.rule_dict.get("videos_cnt", {}).get("min", 0)
-        if max_count <= 0:
-            return True
-        async with AsyncMysqlService(self.platform, self.mode) as mysql:
-            current_count = await mysql.get_today_videos()
-            if current_count >= max_count:
-                self.aliyun_log.logging(code="1011", message="视频数量达到当日最大值", data=f"<今日视频数量>{current_count}")
-                return False
-            self.logger.info(f"{self.platform} 今日入库视频数: {current_count}/{max_count}")
-            self.aliyun_log.logging(code="1012",
-                                    message=f"目前入库量{current_count}",
-                                    data=f"{current_count}/{max_count}"
-                                    )
-            return True
-
-    async def _wait_for_next_loop(self, current_loop: int) -> None:
-        """等待下次循环"""
-        if current_loop < self.loop_times:
-            wait_time = random.randint(self.loop_interval["min"], self.loop_interval["max"])
-            self.logger.info(f"等待 {wait_time} 秒后进行下一次请求")
-            await asyncio.sleep(wait_time)
-
-    async def before_run(self):
-        """运行前预处理钩子,子类可覆盖"""
-        pass
-
-    async def after_run(self):
-        """运行后处理钩子,子类可覆盖"""
-        pass

+ 224 - 0
spiders/basespider.py

@@ -0,0 +1,224 @@
+import asyncio
+import random
+import traceback
+import uuid
+from typing import List, Dict, Optional, Any
+from abc import ABC, abstractmethod
+
+from core.models.video_item import VideoItem
+from core.utils.helpers import generate_titles
+from core.utils.request_preparer import RequestPreparer
+from core.utils.spider_config import SpiderConfig
+from core.utils.extractors import safe_extract, extract_fields
+from core.utils.log.logger_manager import LoggerManager
+from services.async_mysql_service import AsyncMysqlService
+from services.pipeline import PiaoQuanPipeline
+from core.base.async_request_client import AsyncRequestClient
+from services.async_mq_producer import AsyncMQProducer
+
+
+class BaseSpider(ABC):
+    """通用爬虫基类"""
+
+    def __init__(self, rule_dict: Dict, user_list: List, env: str = "prod"):
+        self.rule_dict = rule_dict
+        self.user_list = user_list
+        self.env = env
+        self.class_name = self.__class__.__name__.lower()
+        print(self.class_name)
+
+        # 初始化核心组件
+        self._setup_configuration()
+        self._setup_logging()
+        self._setup_services()
+        self._setup_state()
+
+        # 通用状态
+        self.total_success = 0
+        self.total_fail = 0
+
+    def _setup_configuration(self):
+        self.platform_config = SpiderConfig.get_platform_config(classname=self.class_name)
+        if not self.platform_config:
+            raise ValueError(f"找不到爬虫配置: {self.class_name}")
+        self.platform = self.platform_config.platform
+        self.mode = self.platform_config.mode
+        self.url = self.platform_config.url
+        self.method = self.platform_config.method.upper()
+        self.headers = self.platform_config.headers or {}
+        self.request_body_template = self.platform_config.request_body or {}
+        self.response_parse_config = self.platform_config.response_parse or {}
+        self.data_path = self.response_parse_config.get("data_path")
+        self.has_more = self.response_parse_config.get("has_more")
+        self.field_map = self.response_parse_config.get("fields", {})
+        self.loop_times = self.platform_config.loop_times or 100
+        self.loop_interval = self.platform_config.loop_interval or {"min": 2, "max": 5}
+        self.timeout = self.platform_config.request_timeout or 30
+        self.max_retries = self.platform_config.max_retries or 3
+        self.feishu_sheetid = self.platform_config.feishu_sheetid
+
+    def _setup_logging(self):
+        self.logger = LoggerManager.get_logger(platform=self.platform, mode=self.mode)
+        self.aliyun_log = LoggerManager.get_aliyun_logger(platform=self.platform, mode=self.mode)
+        self.logger.info(f"爬虫 '{self.platform}/{self.mode}' 初始化...")
+
+    def _setup_services(self):
+        self.request_client = AsyncRequestClient(logger=self.logger, aliyun_log=self.aliyun_log)
+        self.db_service = AsyncMysqlService(platform=self.platform, mode=self.mode)
+        self.mq_producer = AsyncMQProducer(topic_name="topic_crawler_etl_prod_v2", platform=self.platform, mode=self.mode)
+
+    def _setup_state(self):
+        self.last_response_data = {}
+        self.request_preparer = RequestPreparer(
+            response_parse_config=self.response_parse_config,
+            logger=self.logger,
+            aliyun_log=self.aliyun_log
+        )
+
+    # 核心入口(统一流程)
+    async def run(self):
+        """主流程:初始化→核心循环→收尾"""
+        self.logger.info(f"开始运行爬虫: {self.platform}/{self.mode}")
+        await self.before_run()
+        try:
+            await self.core_loop()  # 子类实现具体模式逻辑
+        except Exception as e:
+            tb = traceback.format_exc()
+            self.logger.exception(f"运行异常: {e},堆栈信息:{tb}")
+        finally:
+            await self.after_run()
+            self.logger.info(f"总统计:成功{self.total_success},失败{self.total_fail}")
+
+    @abstractmethod
+    async def core_loop(self):
+        """子类必须实现:模式特有核心循环(推荐/账号)"""
+        pass
+
+    async def fetch_detail(self, item: Dict) -> Dict:
+        """子类选择实现:补充详情(完全由子类控制)"""
+        return item
+
+    # 通用数据处理流程
+    async def process_raw_data(self, raw_data: List[Dict]):
+        """处理原始数据列表(清洗→过滤→推送)"""
+        for item in raw_data:
+            try:
+                # 补充详情(完全由子类实现)
+                detail_data = await self.fetch_detail(item)
+                # 处理并推送
+                result = await self.process_and_push_video(detail_data)
+                if result:
+                    self.total_success += 1
+                else:
+                    self.total_fail += 1
+            except Exception as e:
+                self.logger.exception(f"处理单条数据失败: {e}")
+                self.total_fail += 1
+
+    async def process_and_push_video(self, video: Dict[str, Any]) -> bool:
+        try:
+            video_obj = await self.process_video(video)
+            if not video_obj:
+                return False
+            if not await self.filter_data(video_obj):
+                return False
+            await self.integrated_video_handling(video_obj)
+            return await self.push_to_etl(video_obj)
+        except Exception as e:
+            self.logger.exception(f"视频处理异常: {e}")
+            return False
+
+    async def process_video(self, video: Dict) -> Optional[Dict]:
+        """
+        字段映射
+        统一字段抽取及 VideoItem 初始化
+        """
+        self.logger.info(f"处理视频数据: {video}")
+        if self.user_list:
+            publish_user = random.choice(self.user_list)
+        else:
+            self.logger.error(f"未获取到用户列表数据{self.user_list}")
+            return
+        item_kwargs = extract_fields(video, self.field_map, logger=self.logger, aliyun_log=self.aliyun_log)
+        item_kwargs.update({
+            "user_id": publish_user.get("uid"),
+            "user_name": publish_user.get("nick_name"),
+            "platform": self.platform,
+            "strategy": self.mode,
+        })
+        try:
+            item = VideoItem(**item_kwargs)
+            video_dict = await item.produce_item()
+            if not video_dict:
+                self.logger.warning(f"VideoItem 校验失败")
+                return None
+            return video_dict
+        except Exception as e:
+            self.logger.error(f"VideoItem 初始化失败: {e}")
+            return None
+
+    async def filter_data(self, video: Dict) -> bool:
+        """
+           数据校验过滤,默认使用 PiaoQuanPipeline
+           子类可重写此方法实现自定义过滤
+        """
+        pipeline = PiaoQuanPipeline(
+            platform=self.platform,
+            mode=self.mode,
+            rule_dict=self.rule_dict,
+            env=self.env,
+            item=video,
+            trace_id=self.platform + str(uuid.uuid1())
+        )
+        return await pipeline.process_item()
+
+    async def integrated_video_handling(self, video: Dict) -> None:
+        """
+          钩子函数:可在此实现自动生成标题或其他业务逻辑
+        """
+        await generate_titles(self.feishu_sheetid, video)
+
+    async def push_to_etl(self, video: Dict) -> bool:
+        try:
+            await self.mq_producer.send_msg(video)
+            self.logger.info(f"成功推送视频至ETL: {video}")
+            return True
+        except Exception as e:
+            self.logger.exception(f"推送ETL失败: {e}")
+            return False
+
+    async def is_video_count_sufficient(self) -> bool:
+        """
+        校验当日视频是否达到最大爬取量
+        True未达到
+        False达到最大量
+        :return:True/False
+        """
+        max_count = self.rule_dict.get("videos_cnt", {}).get("min", 0)
+        if max_count <= 0:
+            self.logger.info(f"{self.platform} 未限制视频入库量,跳过检测")
+            return True
+        current_count = await self.db_service.get_today_videos()
+        if current_count >= max_count:
+            self.logger.info(f"{self.platform} 视频数量达到当日最大值: {current_count}/{max_count}")
+            self.aliyun_log.logging(code="1011", message="视频数量达到当日最大值", data=f"{current_count}")
+            return False
+        self.logger.info(f"{self.platform} 今日入库视频数: {current_count}/{max_count}")
+        self.aliyun_log.logging(code="1012",
+                                message=f"目前入库量{current_count}",
+                                data=f"{current_count}/{max_count}"
+                                )
+        return True
+
+    async def wait(self):
+        wait_time = random.randint(self.loop_interval["min"], self.loop_interval["max"])
+        self.logger.info(f"等待 {wait_time} 秒后继续")
+        await asyncio.sleep(wait_time)
+
+    async def before_run(self):
+        """运行前钩子(子类可重写)"""
+        pass
+
+    async def after_run(self):
+        """运行后钩子(子类可重写)"""
+        pass

+ 3 - 3
spiders/benshanzhufu_recommend.py

@@ -1,14 +1,14 @@
 import asyncio
 
-from spiders.base_spider import BaseSpider
+from spiders.recommendspider import RecommendSpider
 
 
-class BenshanzhufuRecommend(BaseSpider):
+class BenshanzhufuRecommend(RecommendSpider):
     pass
 
 
 async def main():
-    rule_dict = {}
+    rule_dict = {"videos_cnt":{"min":500}}
     user_list = [{'uid': 20631262, 'link': 'recommend_2060', 'nick_name': '人老心不老'},
                  {'uid': 20631263, 'link': 'recommend_2061', 'nick_name': '荷花朵朵'},
                  {'uid': 20631264, 'link': 'recommend_2062', 'nick_name': '战友情'},

+ 49 - 0
spiders/recommendspider.py

@@ -0,0 +1,49 @@
+from basespider import BaseSpider
+from typing import Optional, List, Dict
+import aiohttp
+
+from core.utils.extractors import safe_extract
+
+
+class RecommendSpider(BaseSpider):
+    """推荐模式爬虫:从推荐接口分页爬取"""
+
+    async def core_loop(self):
+        """核心循环:分页请求推荐接口"""
+        async with aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=self.timeout)) as session:
+            for loop_index in range(self.loop_times):
+                # 检查数量限制
+                self.logger.info(f"检测{self.platform}当日入库视频量")
+                if not await self.is_video_count_sufficient():
+                   return
+                # 获取推荐列表数据
+                self.logger.info(f"开始获取{self.platform}推荐列表数据")
+                raw_data = await self.crawl_data(session)
+                if not raw_data:
+                    self.logger.info("视频列表为空,开始下次请求")
+                    await self.wait()
+                    continue
+                # 处理数据
+                await self.process_raw_data(raw_data)
+                # 等待下一轮
+                await self.wait()
+
+    async def crawl_data(self, session) -> Optional[List[Dict]]:
+        """请求推荐接口(适配推荐模式)"""
+        request_body = self.request_preparer.prepare(self.request_body_template, self.last_response_data)
+        response = await self.request_client.request(
+            session=session,
+            method=self.method,
+            url=self.url,
+            headers=self.headers,
+            json=request_body
+        )
+
+        self.last_response_data = response
+        # 解析推荐列表
+        data_list = safe_extract(response, self.data_path)
+        if not data_list:
+            self.logger.info(f"接口返回视频列表为空: {response}")
+            self.aliyun_log.logging(code="9021", message="接口返回视频列表为空", data=response)
+            return
+        return data_list

+ 3 - 1
spiders/spider_registry.py

@@ -2,8 +2,9 @@
 """爬虫注册表模块:维护topic到爬虫类的映射关系"""
 
 from core.utils.log.logger_manager import LoggerManager
-from spiders.base_spider import BaseSpider
+from spiders.basespider import BaseSpider
 from spiders.benshanzhufu_recommend import BenshanzhufuRecommend
+from spiders.xiaoniangao_author import XiaoniangaoAuthor
 from spiders.yuannifuqimanman_recommend import YuannifuqimanmanRecommend
 
 logger = LoggerManager.get_logger()
@@ -14,6 +15,7 @@ aliyun_log = LoggerManager.get_aliyun_logger()
 SPIDER_CLASS_MAP = {
     "bszf_recommend_prod": BenshanzhufuRecommend,
     "ynfqmm_recommend_prod": YuannifuqimanmanRecommend,
+    "xng_author_prod": XiaoniangaoAuthor,
     # 新增爬虫时在此添加映射
 }
 

+ 0 - 122
spiders/universal_crawler.py

@@ -1,122 +0,0 @@
-import random
-import time
-import uuid
-from typing import Dict, List, Optional
-
-import requests
-from application.config.common import MQ
-from application.functions import MysqlService
-from application.items import VideoItem
-from application.pipeline import PiaoQuanPipeline
-from config.config import base_url
-from tenacity import retry, stop_after_attempt, wait_fixed, retry_if_exception_type, RetryCallState
-
-from core.utils import safe_extract
-from spiders.base_spider import BaseSpider  # 抽象基类导入
-
-
-def before_send_log(retry_state: RetryCallState) -> None:
-    attempt = retry_state.attempt_number
-    last_result = retry_state.outcome
-    if last_result.failed:
-        exc = last_result.exception()
-        logger = retry_state.kwargs.get('logger')
-        url = retry_state.args[0] if retry_state.args else "unknown"
-        if logger:
-            logger.warning(f"请求失败,准备重试 ({attempt}/3): {url}, 错误: {str(exc)}")
-
-
-class UniversalCrawler(BaseSpider):
-    def __init__(self, platform_config: Dict, rule_dict: Dict, user_list: List, trace_id: str, env: str = "prod"):
-        super().__init__(platform_config, rule_dict, user_list, trace_id, env)
-        self.mq = MQ(topic_name=f"topic_crawler_etl_{env}")
-        self.base_api = base_url
-        self.has_enough_videos = False
-        self.download_cnt = 0
-        self.loop_times = self.platform_config.get('loop_times', 1)
-
-        self.request_method = self.platform_config["method"].upper()
-        self.request_url = self.platform_config["url"]
-        self.request_headers = self.platform_config.get("headers", {})
-        self.request_body = self.platform_config.get("request_body", {})
-        self.response_data_path = self.platform_config["response_parse"]["data_path"]
-        self.video_fields_map = self.platform_config["response_parse"]["fields"]
-
-    @retry(stop=stop_after_attempt(3), wait=wait_fixed(2),
-           retry=retry_if_exception_type((requests.RequestException, ValueError)), before=before_send_log)
-    def _send_request(self, url: str, method: str = None, headers: Dict = None, payload: Dict = None,
-                      timeout: int = 30) -> Optional[Dict]:
-        method = method or self.request_method
-        headers = headers or self.request_headers
-        payload = payload or self.request_body
-
-        response = requests.request(method=method, url=url, headers=headers, json=payload, timeout=timeout)
-        response.raise_for_status()
-        resp = response.json()
-        if resp.get("code") == 0:
-            return resp
-        raise ValueError(f"API响应错误: {resp}")
-
-    def fetch_video_data(self) -> Optional[List[Dict]]:
-        self.logger.info(f"{self.trace_id}--请求视频数据: {self.request_url}")
-        try:
-            response = self._send_request(self.request_url)
-            return safe_extract(response, self.response_data_path) or []
-        except Exception as e:
-            self.logger.error(f"{self.trace_id}--请求失败: {e}")
-            return []
-
-    def is_video_qualified(self, video: Dict) -> bool:
-        if not self.rule_dict:
-            return True
-
-        rule_duration = self.rule_dict.get("duration")
-        if rule_duration:
-            video_url = safe_extract(video, self.video_fields_map.get("video_url"))
-            duration = self.get_video_duration(video_url)
-            if not (rule_duration['min'] <= duration <= rule_duration['max']):
-                return False
-
-        rule_videos_cnt = self.rule_dict.get("videos_cnt")
-        if rule_videos_cnt:
-            video_count = MysqlService(self.platform, self.mode, self.trace_id).get_today_videos()
-            if video_count >= rule_videos_cnt.get("min", 0):
-                return False
-
-        return True
-
-    def transform_to_etl_item(self, video: Dict) -> Optional[Dict]:
-        item = VideoItem()
-        for field, path in self.video_fields_map.items():
-            val = safe_extract(video, path) if isinstance(path, str) and path.startswith("$") else path
-            item.add_video_info(field, val)
-
-        item.add_video_info("classname", self.platform)
-        item.add_video_info("strategy", self.mode)
-        item.add_video_info("session", f"{self.platform}-{int(time.time())}")
-        user = random.choice(self.user_list)
-        item.add_video_info("user_id", user["uid"])
-        item.add_video_info("user_name", user["nick_name"])
-
-        return item.produce_item()
-
-    def push_to_etl(self, item: Dict) -> bool:
-        trace_id = f"{self.platform}-{uuid.uuid4()}"
-        pipeline = PiaoQuanPipeline(
-            platform=self.platform,
-            mode=self.mode,
-            rule_dict=self.rule_dict,
-            env=self.env,
-            item=item,
-            trace_id=trace_id,
-        )
-        if pipeline.process_item():
-            self.download_cnt += 1
-            self.mq.send_msg(item)
-            self.aliyun_log.logging(code="1002", message="成功发送至ETL", data=item, trace_id=self.trace_id)
-            if self.download_cnt >= self.download_min_limit:
-                self.has_enough_videos = True
-                self.aliyun_log.logging(code="2000", message=f"达到下载限制: {self.download_min_limit}",
-                                        trace_id=self.trace_id)
-            return True
-        return False

+ 31 - 0
spiders/xiaoniangao_author.py

@@ -0,0 +1,31 @@
+import asyncio
+from typing import List, Dict
+
+from spiders.authorspider import AuthorSpider
+
+
+class XiaoniangaoAuthor(AuthorSpider):
+    async def fetch_user_list(self) -> List[Dict]:
+        """获取待爬取的用户列表(从数据库)"""
+        datas =await self.db_service.get_xng_mid()
+        return datas
+
+
+
+
+
+async def main():
+    rule_dict = {"videos_cnt":{"min":1500}}
+    user_list = [{'uid': 58527261, 'link': '116311065', 'nick_name': '像我这样'},
+                 {'uid': 58527262, 'link': '104703232', 'nick_name': '不再厌倦'},
+                 {'uid': 58527263, 'link': '609255292', 'nick_name': '好演技'}]
+
+    trace_id = "1321"
+    xng = XiaoniangaoAuthor(rule_dict, user_list, trace_id)
+    await xng.run()
+
+
+
+
+if __name__ == '__main__':
+    asyncio.run(main())  # 异步入口

+ 3 - 2
spiders/yuannifuqimanman_recommend.py

@@ -1,9 +1,10 @@
 import asyncio
 
-from spiders.base_spider import BaseSpider
+from spiders.basespider import BaseSpider
+from spiders.recommendspider import RecommendSpider
 
 
-class YuannifuqimanmanRecommend(BaseSpider):
+class YuannifuqimanmanRecommend(RecommendSpider):
     pass