Parcourir la source

动态更新请求变量

zhangliang il y a 22 heures
Parent
commit
45590c4f1c

+ 8 - 47
config/spiders_config.yaml

@@ -30,64 +30,25 @@ benshanzhufurecommend:
       out_video_id: "$.nid"
 
 
-
-xngtjl_recommend_prod:
-  platform: xiaoniangaotuijianliu
+yuannifuqimanmanrecommend:
+  platform: yuannifuqimanman
   mode: recommend
-  path: /crawler/ben_shan_zhu_fu/recommend
+  path: /crawler/yuan_ni_fu_qi_man_man/recommend
   method: post
   request_body:
-    cursor: "1"
-  loop_times: 2
-  etl_hook: "process_video_obj"
+    cursor: "{{next_cursor}}"
+  loop_times: 200
+  loop_interval: 5
+  feishu_sheetid: "golXy9"
   response_parse:
+    data: "$.data"
     next_cursor: "$.data.next_cursor"
     data_path: "$.data.data"
     fields:
       video_id: "$.nid"
       video_title: "$.title"
-      play_cnt: 0
-      publish_time_stamp: "$.update_time"
       out_user_id: "$.nid"
       cover_url: "$.video_cover"
-      like_cnt: 0
       video_url: "$.video_url"
       out_video_id: "$.nid"
-  post_actions:
-    - trigger: after_video_processed
-      endpoint: "http://example.com/notify"
-      payload:
-
-
-
-
-zhongqingkandian:
-  mode: recommend
-  path: "/zqkd"
-  paging: true
-  max_pages: 5
-  db_config:
-    table: "zhongqingkandian"
-  etl_hook: "process_video_obj"
-  parse:
-    data_path: "$.data[*]"
-    fields:
-      title: "$.title"
-      vid: "$.id"
-      cover: "$.cover"
-      url: "$.video_url"
-  custom_class: my_crawlers.ZhongqingKandianCrawler
-
-fuqihaoyundao:
-  url: "/fuqi"
-  method: "POST"
-  paging: false
-  retry_times: 2
-  etl_hook: "process_video_obj"
-  parse:
-    data_path: "$.videos[*]"
-    fields:
-      id: "$.id"
-      name: "$.name"
-      mp4: "$.url"
 

+ 2 - 5
core/models/spiders_config_models.py

@@ -8,16 +8,13 @@ class BaseConfig(BaseModel):
 class PlatformConfig(BaseConfig):
     platform: str
     mode: str
-    path: str
-    url: AnyUrl = None
+    path: str = None
+    url: AnyUrl
     method: str
     request_body: dict = {}
     loop_times: int = 1
     loop_interval: int = 0
     response_parse: dict = {}
-    paging: bool = False
-    max_pages: int = 0
-    parse: dict = {}
     retry_times: int = 0
     feishu_sheetid: str
 

+ 1 - 1
core/models/video_item.py

@@ -21,11 +21,11 @@ class VideoItem(BaseModel):
     out_user_id: Optional[str]
     video_url: str
     cover_url: str
-    video_title: str
     platform: str
     strategy: str
     session: Optional[str]
 
+    video_title: Optional[str]
     publish_time_stamp: Optional[int] = None
     update_time_stamp: Optional[int] = None
 

+ 174 - 120
services/pipeline.py

@@ -4,205 +4,259 @@ import sys
 import time
 from datetime import datetime
 
+sys.path.append(os.getcwd())
+
 from core.utils.feishu_data_async import FeishuDataAsync
 from core.utils.log.logger_manager import LoggerManager
 from services.async_mysql_service import AsyncMysqlService
 
-sys.path.append(os.getcwd())
-
 
 class PiaoQuanPipeline:
     """
-    异步版爬虫管道——用于执行视频规则校验
+    完整异步爬虫管道 - 每个校验不通过都详细记录本地日志+阿里云日志
     """
 
     def __init__(self, platform, mode, rule_dict, env, item, trace_id, account=None):
         self.platform = platform
         self.mode = mode
-        self.item = item
         self.rule_dict = rule_dict
         self.env = env
+        self.item = item
         self.trace_id = trace_id
         self.account = account
 
-        # 初始化异步MySQL客户端
         self.mysql = AsyncMysqlService(platform=platform, mode=mode)
+        self.logger = LoggerManager.get_logger(platform=platform, mode=mode)
         self.aliyun_log = LoggerManager.get_aliyun_logger(platform=platform, mode=mode)
 
     async def feishu_time_list(self):
-        """从飞书读取天数配置"""
         async with FeishuDataAsync() as feishu_data:
-            summary = await feishu_data.get_values(spreadsheet_token="KsoMsyP2ghleM9tzBfmcEEXBnXg", sheet_id="RuLK77")
+            summary = await feishu_data.get_values(
+                spreadsheet_token="KsoMsyP2ghleM9tzBfmcEEXBnXg",
+                sheet_id="RuLK77"
+            )
+        for row in summary[1:]:
+            if row[0] == self.platform:
+                return row[1]
+        return None
+
+    async def feishu_list(self):
+        async with FeishuDataAsync() as feishu_data:
+            summary = await feishu_data.get_values(
+                spreadsheet_token="KsoMsyP2ghleM9tzBfmcEEXBnXg",
+                sheet_id="letS93"
+            )
         for row in summary[1:]:
             if row[0] == self.platform:
                 return row[1]
         return None
 
     async def publish_time_flag(self) -> bool:
-        """
-        判断发布时间是否过期
-        :return: True or False
-        """
-        publish_time_stamp = self.item["publish_time_stamp"]
-        update_time_stamp = self.item["update_time_stamp"]
+        publish_ts = self.item.get("publish_time_stamp", int(time.time()))
+        update_ts = self.item.get("update_time_stamp", int(time.time()))
+
         max_d = self.rule_dict.get("period", {}).get("max", 1000)
         min_d = self.rule_dict.get("period", {}).get("min", 1000)
         days = max(max_d, min_d)
-        days_time = await self.feishu_time_list()
-        if days_time:
-            days = int(days_time)
+
+        feishu_days = await self.feishu_time_list()
+        if feishu_days:
+            days = int(feishu_days)
 
         now_ts = int(time.time())
 
         if self.platform == "gongzhonghao":
-            if now_ts - publish_time_stamp > 86400 * days and now_ts - update_time_stamp > 86400 * days:
+            if (now_ts - publish_ts > 86400 * days) and (now_ts - update_ts > 86400 * days):
+                msg = f"[发布时间过期] now={now_ts}, publish={publish_ts}, update={update_ts}, limit_days={days}"
+                self.logger.warning(msg)
                 self.aliyun_log.logging(
                     code="2004",
                     trace_id=self.trace_id,
-                    data=self.item,
-                    message=f"发布时间超过{days}天"
+                    data={
+                        "item": self.item,
+                        "now_ts": now_ts,
+                        "publish_ts": publish_ts,
+                        "update_ts": update_ts,
+                        "days_limit": days
+                    },
+                    message=msg,
+                    account=self.account
                 )
                 return False
         else:
             if days == 0:
-                is_today = datetime.fromtimestamp(publish_time_stamp).date() == datetime.today().date()
+                is_today = datetime.fromtimestamp(publish_ts).date() == datetime.today().date()
                 if not is_today:
+                    msg = "[发布时间] 不在今日"
+                    self.logger.warning(msg)
+                    self.aliyun_log.logging(
+                        code="2004",
+                        trace_id=self.trace_id,
+                        data={
+                            "item": self.item,
+                            "publish_ts": publish_ts
+                        },
+                        message=msg,
+                        account=self.account
+                    )
                     return False
-            elif now_ts - publish_time_stamp > 86400 * days:
+            elif now_ts - publish_ts > 86400 * days:
+                msg = f"[发布时间超限制] now={now_ts}, publish={publish_ts}, limit_days={days}"
+                self.logger.warning(msg)
                 self.aliyun_log.logging(
                     code="2004",
                     trace_id=self.trace_id,
-                    data=self.item,
-                    message=f"发布时间超过{days}天"
+                    data={
+                        "item": self.item,
+                        "now_ts": now_ts,
+                        "publish_ts": publish_ts,
+                        "days_limit": days
+                    },
+                    message=msg,
+                    account=self.account
                 )
                 return False
         return True
 
     def title_flag(self) -> bool:
-        """
-        检查标题是否包含敏感词
-        """
-        title = self.item["video_title"]
+        title = self.item.get("video_title", "")
         cleaned_title = re.sub(r"[^\w]", " ", title)
-        sensitive_words = []  # 这里可添加敏感词
-        if any(word in cleaned_title for word in sensitive_words):
-            self.aliyun_log.logging(
-                code="2003",
-                trace_id=self.trace_id,
-                message="标题中包含敏感词",
-                data=self.item,
-                account=self.account
-            )
-            return False
+        sensitive_words = []  # 可配置敏感词列表
+
+        for word in sensitive_words:
+            if word in cleaned_title:
+                msg = f"[标题包含敏感词] {word} in {title}"
+                self.logger.warning(msg)
+                self.aliyun_log.logging(
+                    code="2003",
+                    trace_id=self.trace_id,
+                    data={
+                        "item": self.item,
+                        "title": title,
+                        "matched_word": word
+                    },
+                    message=msg,
+                    account=self.account
+                )
+                return False
         return True
 
     def download_rule_flag(self) -> bool:
-        """
-        检查是否符合各项下载数值规则
-        """
-        for key in self.item:
-            if self.rule_dict.get(key):
-                max_value = int(self.rule_dict[key].get("max", 999999999))
-                if key == "peroid":
-                    continue
-                val = int(self.item.get(key, 0))
-                if not int(self.rule_dict[key]["min"]) <= val <= max_value:
-                    self.aliyun_log.logging(
-                        code="2004",
-                        trace_id=self.trace_id,
-                        message=f"{key}: 不符合规则",
-                        data=self.item,
-                        account=self.account
-                    )
-                    return False
+        for key, rule in self.rule_dict.items():
+            if key == "period":
+                continue
+            item_value = int(self.item.get(key, 0))
+            min_v = int(rule.get("min", 0))
+            max_v = int(rule.get("max", 999999999))
+            if not (min_v <= item_value <= max_v):
+                msg = f"[{key} 校验失败] value={item_value}, expected=[{min_v}, {max_v}]"
+                self.logger.warning(msg)
+                self.aliyun_log.logging(
+                    code="2004",
+                    trace_id=self.trace_id,
+                    data={
+                        "item": self.item,
+                        "field": key,
+                        "item_value": item_value,
+                        "min": min_v,
+                        "max": max_v
+                    },
+                    message=msg,
+                    account=self.account
+                )
+                return False
         return True
 
-    async def feishu_list(self):
-        """从飞书拉取天数配置,用于去重判断"""
-        async with FeishuDataAsync() as feishu_data:
-            summary = await feishu_data.get_values(spreadsheet_token="KsoMsyP2ghleM9tzBfmcEEXBnXg", sheet_id="letS93")
-        for row in summary[1:]:
-            if row[0] == self.platform:
-                return row[1]
-        return None
-
     async def repeat_video(self) -> bool:
-        """
-        判断视频是否重复(包括飞书配置去重天数逻辑)
-        """
-        out_id = self.item["out_video_id"]
-        title = self.item["video_title"]
-        day_count = await self.feishu_list()
+        out_id = self.item.get("out_video_id")
+        title = self.item.get("video_title", "")
 
-        async with self.mysql as db:
-            if day_count:
-                sql = f"""
-                SELECT create_time 
-                FROM crawler_video 
-                WHERE classname = %s AND out_video_id = %s 
-                  AND create_time >= DATE_SUB(NOW(), INTERVAL %s DAY)
-                """
-                rows = await db.client.fetch_all(sql, [self.platform, out_id, int(day_count)])
-                if rows:
-                    self.aliyun_log.logging(
-                        code="2002",
-                        trace_id=self.trace_id,
-                        message="重复的视频",
-                        data=self.item,
-                        account=self.account
-                    )
-                    return False
+        bypass_platforms = {
+            "zhufuniannianshunxinjixiang", "weiquanshipin", "piaoquangushi", "lepaoledong", "zhufukuaizhuan",
+            "linglingkuailezhufu", "lepaoledongdijie", "jierizhufuhuakaifugui", "haoyunzhufuduo",
+            "quzhuan", "zhufudewenhou", "jierizhufuxingfujixiang", "haoyoushipin", "xinshiquan",
+            "laonianshenghuokuaile", "laonianquan"
+        }
+
+        if self.platform in bypass_platforms or (self.platform, self.mode) in {
+            ("zhuwanwufusunew", "recommend"),
+            ("jixiangxingfu", "recommend"),
+            ("yuannifuqichangzai", "recommend"),
+            ("benshanzhufu", "recommend"),
+            ("zuihaodesongni", "recommend"),
+            ("tiantianjufuqi", "recommend")
+        }:
+            self.logger.info("[去重] 平台配置无需去重,直接通过")
+            return True
 
-            # 特定平台绕过去重判断
-            bypass = {
-                ("zhufuniannianshunxinjixiang", "recommend"),
-                ("benshanzhufu", "recommend"),
-                ("tiantianjufuqi", "recommend"),
-            }
-            if (self.platform, self.mode) in bypass:
-                return True
-
-            # 标题去重逻辑(示例)
-            if self.platform == "zhufuhaoyunbaofu" and self.mode == "recommend":
-                sql = """
-                      SELECT 1
-                      FROM crawler_video
-                      WHERE classname = %s
-                        AND out_video_id = %s
-                        AND video_title = %s \
-                      """
-                result = await db.client.fetch_one(sql, [self.platform, out_id, title])
-            else:
-                sql = """
-                      SELECT 1
-                      FROM crawler_video
-                      WHERE classname = %s
-                        AND out_video_id = %s \
-                      """
-                result = await db.client.fetch_one(sql, [self.platform, out_id])
-
-            if result:
+        day_count = await self.feishu_list()
+        if day_count:
+            sql = """
+                SELECT UNIX_TIMESTAMP(create_time) as ts FROM crawler_video
+                WHERE platform = %s AND out_video_id = %s AND create_time >= DATE_SUB(NOW(), INTERVAL %s DAY)
+            """
+            rows = await self.mysql.fetch_all(sql, [self.platform, out_id, int(day_count)])
+            if rows:
+                msg = f"[去重失败] {out_id} 在 {day_count} 天内已存在"
+                self.logger.warning(msg)
                 self.aliyun_log.logging(
                     code="2002",
                     trace_id=self.trace_id,
-                    message="重复的视频",
-                    data=self.item,
+                    data={
+                        "item": self.item,
+                        "existing_timestamps": [r["ts"] for r in rows],
+                        "day_count": day_count
+                    },
+                    message=msg,
                     account=self.account
                 )
                 return False
 
+        if self.platform == "zhufuhaoyunbaofu" and self.mode == "recommend":
+            sql = """
+                SELECT 1 FROM crawler_video WHERE platform = %s AND out_video_id = %s AND video_title = %s
+            """
+            result = await self.mysql.fetch_one(sql, [self.platform, out_id, title])
+        else:
+            sql = """
+                SELECT 1 FROM crawler_video WHERE platform = %s AND out_video_id = %s
+            """
+            result = await self.mysql.fetch_one(sql, [self.platform, out_id])
+
+        if result:
+            msg = f"[去重失败] {out_id} 已存在"
+            self.logger.warning(msg)
+            self.aliyun_log.logging(
+                code="2002",
+                trace_id=self.trace_id,
+                data={
+                    "item": self.item,
+                    "out_video_id": out_id
+                },
+                message=msg,
+                account=self.account
+            )
+            return False
+
+        self.logger.info("[去重] 校验通过")
         return True
 
     async def process_item(self) -> bool:
         """
-        异步执行所有规则校验
+        异步执行完整规则流程,并输出详细本地日志和云日志
         """
+        self.logger.info(f"开始校验: {self.item.get('out_video_id', '')}")
         if not await self.publish_time_flag():
+            self.logger.info("校验结束: 发布时间不符合")
             return False
         if not self.title_flag():
+            self.logger.info("校验结束: 标题不符合")
             return False
         if not await self.repeat_video():
+            self.logger.info("校验结束: 去重不符合")
             return False
         if not self.download_rule_flag():
+            self.logger.info("校验结束: 下载规则不符合")
             return False
+        self.logger.info("校验结束: 全部通过")
         return True

+ 198 - 140
spiders/base_spider.py

@@ -1,10 +1,5 @@
 import asyncio
-import json
-import random
-import time
-import traceback
 import uuid
-from abc import ABC
 from typing import List, Dict, Optional, Any
 
 import aiohttp
@@ -21,232 +16,295 @@ from core.base.async_request_client import AsyncRequestClient
 from services.async_mq_producer import AsyncMQProducer
 
 
-class BaseSpider(ABC):
+class BaseSpider:
     """
-    通用爬虫基类:支持严格顺序执行流程
+    通用爬虫基类,支持:
+    - 依赖请求参数动态替换(cursor 或其它参数)
+    - 支持单请求和依赖请求的分页抓取
+    - 统一日志、MQ推送、异常捕获、异步请求
+    子类只需根据业务重写少量方法,如 process_video/process_item。
     """
 
-    MAX_RETRIES = 3  # 单个请求最大重试次数
-    TIMEOUT = 30  # 请求超时时间(秒)
-
     def __init__(self, rule_dict: Dict, user_list: List, trace_id: str, env: str = "prod"):
         self.trace_id = trace_id
         self.env = env
         self.user_list = user_list
         self.rule_dict = rule_dict
-        self.class_name = self.__class__.__name__  # 获取子类类名
+        self.class_name = self.__class__.__name__.lower()
 
-        # 根据类名自动获取配置
-        self.platform_config = SpiderConfig.get_platform_config(classname=str(self.class_name.lower()))
+        # 取配置
+        self.platform_config = SpiderConfig.get_platform_config(classname=self.class_name)
         if not self.platform_config:
             raise ValueError(f"找不到对应配置: {self.class_name}")
 
-        # 平台信息与日志初始化
         self.platform = self.platform_config.platform
         self.mode = self.platform_config.mode
         self.logger = LoggerManager.get_logger(platform=self.platform, mode=self.mode)
         self.aliyun_logr = LoggerManager.get_aliyun_logger(platform=self.platform, mode=self.mode)
+        self.mq_producer = AsyncMQProducer(topic_name="topic_crawler_etl_prod_v2", platform=self.platform, mode=self.mode)
 
-        # MQ用于推送至ETL
-        self.mq_producer = AsyncMQProducer(topic_name="topic_crawler_etl_prod_v2",platform=self.platform,mode=self.mode)
-
-        # 请求配置
         self.method = self.platform_config.method.upper()
         self.url = self.platform_config.url
-        self.headers = self.platform_config.headers
-        self.request_body = self.platform_config.request_body
-
-        # 响应解析配置
-        self.response =self.platform_config.response_parse
-        self.field_map =  self.response.get("fields", {})
-        self.data_path =  self.response.get("data_path")
-        self.next_cursor_path = self.response.get("next_cursor")
-        self.response_data = self.response.get("data")
+        self.headers = self.platform_config.headers or {}
+        self.request_body_template = self.platform_config.request_body or {}
 
-        # 流程控制配置
-        self.loop_times = self.platform_config.loop_times  # 循环次数
-        self.loop_interval = self.platform_config.loop_interval  # 循环间隔(秒)
-
-        # 数据库与请求客户端
-        self.db_service = AsyncMysqlService(platform=self.platform, mode=self.mode)
-        self.request_client = AsyncRequestClient(logger=self.logger,aliyun_log=self.aliyun_logr)
+        self.response_parse = self.platform_config.response_parse or {}
+        self.next_cursor_path = self.response_parse.get("next_cursor")
+        self.data_path = self.response_parse.get("data_path")
+        self.field_map = self.response_parse.get("fields", {})
 
+        self.loop_times = self.platform_config.loop_times or 100
+        self.loop_interval = self.platform_config.loop_interval or 5
         self.feishu_sheetid = self.platform_config.feishu_sheetid
 
+        self.db_service = AsyncMysqlService(platform=self.platform, mode=self.mode)
+        self.request_client = AsyncRequestClient(logger=self.logger, aliyun_log=self.aliyun_logr)
 
         self.timeout = 30
-
-
         self.max_retries = 3
-        self.resolved_body =  resolve_request_body_template(self.request_body)
 
+        # 当前分页游标,默认空字符串,支持动态替换request_body中任何字段(如cursor)
+        self.dynamic_params = {key: "" for key in self.request_body_template.keys()}
+        # 允许子类重写,支持多游标等复杂情况
+        self.current_cursor = ""
+
+        self.download_cnt = 0
+        self.limit_flag = False
 
 
     async def run(self):
-        """
-        爬虫入口,执行完整循环(抓取、处理、推送)
-        """
+        """ 爬虫主流程 """
         await self.before_run()
-        total_success, total_failed = 0, 0
+
+        total_success, total_fail = 0, 0
         async with aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=self.timeout)) as session:
-            for loop_index in range(1, self.loop_times + 1):
-                # 判断是否已达今日抓取上限
+            for loop_index in range(self.loop_times):
+                if self.limit_flag:
+                    self.logger.info(f"{self.trace_id} 已达到抓取限制,停止爬虫")
+                    break
+
                 if not await self.is_video_count_sufficient():
-                    return
-                success_count, fail_count = await self.run_single_loop(session)
-                total_success += success_count
-                total_failed += fail_count
-                if loop_index < self.loop_times:
-                    await asyncio.sleep(self.loop_interval)
+                    self.logger.info(f"{self.trace_id} 视频抓取数量已达上限,提前结束")
+                    break
+
+                succ, fail = await self.run_single_loop(session)
+                total_success += succ
+                total_fail += fail
+
+                await self._wait_for_next_loop(loop_index + 1)
+
         await self.after_run()
-        self.logger.info(f"{self.trace_id} 爬虫完成 成功:{total_success} 失败:{total_failed}")
+        self.logger.info(f"{self.trace_id} 爬虫完成 成功:{total_success} 失败:{total_fail}")
 
     async def run_single_loop(self, session) -> (int, int):
         """
-        单次抓取循环,抓取视频列表并处理推送
+        单轮请求与处理
         """
         success_count, fail_count = 0, 0
-        video_list = await self.crawl_data(session)
-        if not video_list:
-            self.logger.info(f"{self.trace_id} 未获取到视频")
-            return success_count, fail_count
-        for video in video_list:
-            result = await self.process_and_push_video(video)
-            if result:
-                success_count += 1
-            else:
-                fail_count += 1
+        try:
+            videos = await self.crawl_data(session)
+            if not videos:
+                self.logger.info(f"{self.trace_id} 无数据返回,停止本轮")
+                return success_count, fail_count
+
+            for video in videos:
+                # 依赖接口请求
+                video_obj = await self.fetch_dependent_data(video)
+                res = await self.process_and_push_video(video_obj)
+                if res:
+                    success_count += 1
+                else:
+                    fail_count += 1
+                if self.limit_flag:
+                    break
+
+        except Exception as e:
+            self.logger.exception(f"{self.trace_id} 运行异常: {e}")
+
         return success_count, fail_count
 
-    async def process_and_push_video(self, video: Dict[str, Any]) -> bool:
+    async def fetch_dependent_data(self, video: Dict) -> Dict:
         """
-        单条视频处理流程:字段提取 -> 校验过滤 -> 标题处理 -> 推送ETL
+        可在子类重写以实现依赖请求,用返回结果补充原有 video。
+        默认不做处理。
         """
-        try:
-            video_obj = await self.process_video(video)
-            if not video_obj:
-                return False
-            if not await self.filter_data(video_obj):
-                return False
-            await self.integrated_video_handling(video_obj)
-            return await self.push_to_etl(video_obj)
-        except Exception as e:
-            self.logger.exception(f"{self.trace_id} 视频处理异常 {e}")
-            return False
+        return video
 
-    async def crawl_data(self,session) -> Optional[List[Dict]]:
+    async def crawl_data(self, session) -> Optional[List[Dict]]:
         """
-        抓取数据,自动重试,自动分页
-        :param session:
-        :param dynamic_variables:
-        :return:
+        请求接口,自动渲染动态参数,自动更新游标
+        支持单请求和多请求(分页)逻辑。
         """
+        # 动态渲染请求体
+        # resolved_body = self._render_request_body()
 
+        # 发送请求
         response = await self.request_client.request(
             session=session,
             method=self.method,
             url=self.url,
             headers=self.headers,
-            json=self.resolved_body
+            json= self.dynamic_params
         )
-        print(safe_extract(response, self.response_data))
-        self.resolved_body = resolve_request_body_template(self.request_body,safe_extract(response, self.response_data) )
 
-        data = safe_extract(response, self.data_path)
-        return data if data else []
+        if not response:
+            self.logger.error(f"{self.trace_id} 响应为空")
+            return []
 
-    async def filter_data(self, video: Dict) -> bool:
-        """校验视频是否符合规则"""
-        pipeline = PiaoQuanPipeline(
-            platform=self.platform,
-            mode=self.mode,
-            rule_dict=self.rule_dict,
-            env=self.env,
-            item=video,
-            trace_id=self.platform + str(uuid.uuid1())
-        )
-        return await pipeline.process_item()
+        # 更新游标(支持动态参数更新)
+        if self.next_cursor_path:
+            next_cursor = safe_extract(response, self.next_cursor_path) or ""
+            self._update_cursor(next_cursor)
 
-    async def is_video_count_sufficient(self) -> bool:
+        # 解析数据列表
+        data_list = safe_extract(response, self.data_path)
+        if not data_list:
+            self.logger.info(f"{self.trace_id} 未获取到有效数据")
+            return []
+
+        return data_list
+
+    def _render_request_body(self) -> Dict:
         """
-        校验视频是否达到当日最大量
-        :return:True False
+        用当前动态参数渲染请求体模板,支持多参数动态替换
         """
-        rule_videos_cnt = self.rule_dict.get("videos_cnt")
-        if not rule_videos_cnt:
-            return True
-        async with AsyncMysqlService(self.platform, self.mode) as mysql:
-            video_count = await mysql.get_today_videos()
-        if video_count >= rule_videos_cnt.get("min", 200):
-            self.logger.info(f"{self.trace_id}--今日视频已达到最大量{video_count}")
-            self.aliyun_logr.logging(
-                code="1011",
-                message=f"视频数量达到当日最大值",
-                data=f"<今日视频数量>{video_count}"
-            )
+        body = {}
+        for k, v in self.request_body_template.items():
+            if isinstance(v, str) and v.startswith("{{") and v.endswith("}}"):
+                key = v.strip("{} ")
+                body[k] = self.dynamic_params.get(key, "")
+            else:
+                body[k] = v
+        return body
+
+    def _update_cursor(self, cursor_value: str):
+        """
+        更新分页游标并动态参数,方便下一次请求使用
+        """
+        self.current_cursor = cursor_value
+        # 如果配置的游标字段在请求体中,更新动态参数
+        if "cursor" in self.dynamic_params:
+            self.dynamic_params["cursor"] = cursor_value
+
+    async def process_and_push_video(self, video: Dict[str, Any]) -> bool:
+        """
+        数据处理完整流程(字段映射 -> 校验 -> 推送)
+        子类可重写 process_video 或 filter_data 来定制处理和校验逻辑
+        """
+        try:
+            video_obj = await self.process_video(video)
+            if not video_obj:
+                return False
+
+            if not await self.filter_data(video_obj):
+                return False
+
+            await self.integrated_video_handling(video_obj)
+            pushed = await self.push_to_etl(video_obj)
+
+            # 达到下载上限,停止继续抓取
+            if self.rule_dict.get("videos_cnt", {}).get("min") and \
+                    self.download_cnt >= self.rule_dict["videos_cnt"]["min"]:
+                self.limit_flag = True
+
+            if pushed:
+                self.download_cnt += 1
+
+            return pushed
+        except Exception as e:
+            self.logger.exception(f"{self.trace_id} 视频处理异常: {e}")
             return False
-        self.logger.info(f"{self.trace_id}--今日视频已入库{video_count}")
-        return True
 
     async def process_video(self, video: Dict) -> Optional[Dict]:
         """
-        处理单条视频数据,字段映射关系,必要字段检验
-        :param video:
-        :return:
+        统一字段抽取及 VideoItem 初始化
+        子类可重写或扩展以定制字段映射、过滤等
         """
-        self.logger.debug(f"{self.trace_id}--开始处理视频: {video.get('title', '无标题')}")
-        publish_user = random.choice(self.user_list)
-        item_kwargs = extract_fields(video, self.field_map, logger=self.logger, trace_id=self.trace_id,aliyun_log=self.aliyun_logr)
-        item_kwargs["user_id"] = publish_user["uid"]
-        item_kwargs["user_name"] = publish_user["nick_name"]
-        item_kwargs["platform"] = self.platform
-        item_kwargs["strategy"] = self.mode
+        self.logger.debug(f"{self.trace_id} 处理视频数据: {video.get('title', '无标题')}")
+        publish_user = None
+        if self.user_list:
+            import random
+            publish_user = random.choice(self.user_list)
+        else:
+            publish_user = {"uid": "default", "nick_name": "default_user"}
+
+        item_kwargs = extract_fields(video, self.field_map, logger=self.logger, trace_id=self.trace_id, aliyun_log=self.aliyun_logr)
+        item_kwargs.update({
+            "user_id": publish_user.get("uid"),
+            "user_name": publish_user.get("nick_name"),
+            "platform": self.platform,
+            "strategy": self.mode
+        })
 
         try:
             item = VideoItem(**item_kwargs)
             video_dict = await item.produce_item()
             if not video_dict:
-                self.logger.warning(f"{self.trace_id} 校验失败")
+                self.logger.warning(f"{self.trace_id} VideoItem 校验失败")
                 return None
             return video_dict
         except Exception as e:
             self.logger.error(f"{self.trace_id} VideoItem 初始化失败: {e}")
             return None
 
-    async def push_to_etl(self, video: Dict[str, Any]) -> bool:
+    async def filter_data(self, video: Dict) -> bool:
         """
-        推送处理完毕的视频到 ETL
+        数据校验过滤,默认使用 PiaoQuanPipeline
+        子类可重写此方法实现自定义过滤
+        """
+        pipeline = PiaoQuanPipeline(
+            platform=self.platform,
+            mode=self.mode,
+            rule_dict=self.rule_dict,
+            env=self.env,
+            item=video,
+            trace_id=self.platform + str(uuid.uuid1())
+        )
+        return await pipeline.process_item()
+
+    async def integrated_video_handling(self, video: Dict) -> None:
+        """
+        钩子函数:可在此实现自动生成标题或其他业务逻辑
+        """
+        await generate_titles(self.feishu_sheetid, video)
+
+    async def push_to_etl(self, video: Dict) -> bool:
+        """
+        推送消息到消息队列ETL
         """
         try:
             await self.mq_producer.send_msg(video)
+            self.logger.info(f"{self.trace_id} 成功推送视频至ETL")
             return True
         except Exception as e:
-            self.logger.exception(f"{self.trace_id} 推送ETL失败 {e}")
+            self.logger.exception(f"{self.trace_id} 推送ETL失败: {e}")
             return False
 
-
-    async def integrated_video_handling(self,video: Dict) -> Optional[Dict]:
+    async def is_video_count_sufficient(self) -> bool:
         """
-        视频处理
-        :return:
+        判断当天抓取的视频是否已达到上限,达到则停止继续抓取
         """
-        await generate_titles(self.feishu_sheetid,video)
-
+        max_count = self.rule_dict.get("videos_cnt", {}).get("min", 0)
+        if max_count <= 0:
+            return True
+        async with AsyncMysqlService(self.platform, self.mode) as mysql:
+            current_count = await mysql.get_today_videos()
+        if current_count >= max_count:
+            self.logger.info(f"{self.trace_id} 今日视频已达上限: {current_count}")
+            self.aliyun_logr.logging(code="1011", message="视频数量达到当日最大值", data=f"<今日视频数量>{current_count}")
+            return False
+        return True
 
     async def _wait_for_next_loop(self, current_loop: int) -> None:
-        """等待下一次循环请求"""
+        """等待下次循环"""
         if current_loop < self.loop_times and self.loop_interval > 0:
-            self.logger.info(f"{self.trace_id}--等待 {self.loop_interval} 秒后进行下一次请求")
+            self.logger.info(f"{self.trace_id} 等待 {self.loop_interval} 秒后进行下一次请求")
             await asyncio.sleep(self.loop_interval)
 
     async def before_run(self):
-        """
-        可覆写钩子:在运行前执行,如拉取Token等
-        """
+        """运行前预处理钩子,子类可覆盖"""
         pass
 
     async def after_run(self):
-        """
-        可覆写钩子:在运行后执行,如统计汇报等
-        """
+        """运行后处理钩子,子类可覆盖"""
         pass

+ 1 - 2
spiders/benshanzhufu_recommend.py

@@ -4,8 +4,7 @@ from spiders.base_spider import BaseSpider
 
 
 class BenshanzhufuRecommend(BaseSpider):
-    def __init__(self, rule_dict, user_list, trace_id):
-        super().__init__(rule_dict, user_list, trace_id)
+    pass
 
 
 async def main():

+ 2 - 0
spiders/spider_registry.py

@@ -4,6 +4,7 @@
 from core.utils.log.logger_manager import LoggerManager
 from spiders.base_spider import BaseSpider
 from spiders.benshanzhufu_recommend import BenshanzhufuRecommend
+from spiders.yuannifuqimanman_recommend import YuannifuqimanmanRecommend
 
 logger = LoggerManager.get_logger()
 aliyun_log = LoggerManager.get_aliyun_logger()
@@ -12,6 +13,7 @@ aliyun_log = LoggerManager.get_aliyun_logger()
 # 格式说明:键为MQ主题名称,值为继承自BaseSpider的爬虫类
 SPIDER_CLASS_MAP = {
     "bszf_recommend_prod": BenshanzhufuRecommend,
+    "ynfqmm_recommend_prod": YuannifuqimanmanRecommend,
     # 新增爬虫时在此添加映射
 }
 

+ 28 - 0
spiders/yuannifuqimanman_recommend.py

@@ -0,0 +1,28 @@
+import asyncio
+
+from spiders.base_spider import BaseSpider
+
+
+class YuannifuqimanmanRecommend(BaseSpider):
+    pass
+
+
+async def main():
+    rule_dict = {}
+    user_list = [{'uid': 20631262, 'link': 'recommend_2060', 'nick_name': '人老心不老'},
+                 {'uid': 20631263, 'link': 'recommend_2061', 'nick_name': '荷花朵朵'},
+                 {'uid': 20631264, 'link': 'recommend_2062', 'nick_name': '战友情'},
+                 {'uid': 20631265, 'link': 'recommend_2063', 'nick_name': '闲人老李'},
+                 {'uid': 20631266, 'link': 'recommend_2064', 'nick_name': '盛世白莲'},
+                 {'uid': 20631267, 'link': 'recommend_2065', 'nick_name': '星星点灯'},
+                 {'uid': 20631268, 'link': 'recommend_2066', 'nick_name': '老同学'},
+                 {'uid': 20631269, 'link': 'recommend_2067', 'nick_name': '赤子之心'},
+                 {'uid': 20631271, 'link': 'recommend_2068', 'nick_name': '缘分'},
+                 {'uid': 20631272, 'link': 'recommend_2069', 'nick_name': '欢度余生'}]
+    trace_id = "yuannifuqimanman_202507021200"
+    bszf = YuannifuqimanmanRecommend(rule_dict, user_list, trace_id)
+    await bszf.run()
+
+
+if __name__ == '__main__':
+    asyncio.run(main())  # 异步入口