Forráskód Böngészése

动态更新请求变量

zhangliang 1 napja
szülő
commit
d2f1ef6044

+ 2 - 1
config/spiders_config.yaml

@@ -10,11 +10,12 @@ benshanzhufurecommend:
   path: /crawler/ben_shan_zhu_fu/recommend
   method: post
   request_body:
-    cursor: "{{next_cursor|'1'}}"
+    cursor: "{{next_cursor}}"
   loop_times: 200
   loop_interval: 5
   feishu_sheetid: "aTSJH4"
   response_parse:
+    data: "$.data"
     next_cursor: "$.data.next_cursor"
     data_path: "$.data.data"
     fields:

+ 2 - 0
core/base/async_request_client.py

@@ -1,5 +1,6 @@
 import asyncio
 from typing import Optional, Dict
+from pprint import pformat
 
 import aiohttp
 
@@ -24,6 +25,7 @@ class AsyncRequestClient:
                 async with session.request(method, url, **kwargs) as response:
                     response.raise_for_status()
                     resp = await response.json()
+                    # self.logger.info(f"{url}响应:\n {pformat(resp)}")
                     if resp.get('code') != 0:
                         retries += 1
                         if self.logger:

+ 17 - 0
core/utils/helpers.py

@@ -44,6 +44,23 @@ async def insert_feishu_data(sheet_id: str,values: List):
         await feishu.insert_values(spreadsheet_token=spreadsheet_token, sheet_id=sheet_id,ranges="A2:Z2",values=values)
 
 
+import re
+
+
+def extract_variables(text: str, pattern: str = r"\{\{(\w+)\}\}") -> list:
+    """
+    从文本中提取模板变量名
+
+    参数:
+        text: 要分析的文本
+        pattern: 正则表达式模式,默认为 "{{变量名}}" 格式
+
+    返回:
+        提取到的变量名列表
+    """
+    return re.findall(pattern, text)
+
+
 
 if __name__ == '__main__':
      filter_word = asyncio.run(insert_feishu_data())

+ 8 - 4
core/utils/template_resolver.py

@@ -7,7 +7,7 @@ PLACEHOLDER_PATTERN = re.compile(r"\{\{\s*([a-zA-Z0-9_.]+)(\|[^}]+)?\s*\}\}")
 
 def resolve_request_body_template(
     data: Any,
-    variables: Dict[str, Any],
+    variables: Dict[str, Any] = {},
 ) -> Any:
     """
     仅解析请求参数中 {{var}} 模板
@@ -18,6 +18,7 @@ def resolve_request_body_template(
 
     Returns:
         替换后的结构
+        :rtype: Any
     """
     if isinstance(data, dict):
         return {k: resolve_request_body_template(v, variables) for k, v in data.items()}
@@ -50,9 +51,12 @@ if __name__ == '__main__':
     }
 
     variables = {
-        "next_cursor":"1",
-        "start_date": "2025-06-30",
-        "tag1": "news"
+
+        # "dada":{"next_cursor":"1"},
+        # "start_date": "2025-06-30",
+        # "tag1": "news",
+        # "tag3": "news",
+        # "default_tag55": "1111",
     }
     result = resolve_request_body_template(data, variables)
     print(result)

+ 2 - 2
services/async_mysql_service.py

@@ -165,10 +165,10 @@ class AsyncMysqlService:
               SELECT COUNT(*) as cnt
               FROM crawler_video
               WHERE DATE (create_time) = CURDATE()
-                AND classname = %s
+                AND platform = %s
                 AND strategy = %s \
               """
-        self.logger.info(f"查询今日视频数量: classname={self.platform}, strategy={self.mode}")
+        self.logger.info(f"查询今日视频数量: platform={self.platform}, strategy={self.mode}")
         result = await self.fetch_one(sql, [self.platform, self.mode])
         return result["cnt"] if result else 0
 

+ 95 - 128
spiders/base_spider.py

@@ -1,15 +1,16 @@
 import asyncio
+import json
 import random
 import time
 import traceback
 import uuid
 from abc import ABC
-from typing import List, Dict, Optional
+from typing import List, Dict, Optional, Any
 
 import aiohttp
 
 from core.models.video_item import VideoItem
-from core.utils.helpers import generate_titles
+from core.utils.helpers import generate_titles, extract_variables
 from core.utils.spider_config import SpiderConfig
 from core.utils.extractors import safe_extract, extract_fields
 from core.utils.log.logger_manager import LoggerManager
@@ -20,8 +21,6 @@ from core.base.async_request_client import AsyncRequestClient
 from services.async_mq_producer import AsyncMQProducer
 
 
-import re
-PLACEHOLDER_PATTERN = re.compile(r"\{\{\s*([a-zA-Z0-9_.]+)(\|[^}]+)?\s*\}\}")
 class BaseSpider(ABC):
     """
     通用爬虫基类:支持严格顺序执行流程
@@ -42,72 +41,116 @@ class BaseSpider(ABC):
         if not self.platform_config:
             raise ValueError(f"找不到对应配置: {self.class_name}")
 
-        # 初始化日志和MQ
+        # 平台信息与日志初始化
         self.platform = self.platform_config.platform
         self.mode = self.platform_config.mode
         self.logger = LoggerManager.get_logger(platform=self.platform, mode=self.mode)
-        self.logger.info(f"{trace_id}--初始化爬虫类: {self.class_name}")
         self.aliyun_logr = LoggerManager.get_aliyun_logger(platform=self.platform, mode=self.mode)
+
+        # MQ用于推送至ETL
         self.mq_producer = AsyncMQProducer(topic_name="topic_crawler_etl_prod_v2",platform=self.platform,mode=self.mode)
 
         # 请求配置
         self.method = self.platform_config.method.upper()
         self.url = self.platform_config.url
         self.headers = self.platform_config.headers
-        self.body = self.platform_config.request_body
-
+        self.request_body = self.platform_config.request_body
 
+        # 响应解析配置
         self.response =self.platform_config.response_parse
         self.field_map =  self.response.get("fields", {})
         self.data_path =  self.response.get("data_path")
         self.next_cursor_path = self.response.get("next_cursor")
-
+        self.response_data = self.response.get("data")
 
         # 流程控制配置
         self.loop_times = self.platform_config.loop_times  # 循环次数
         self.loop_interval = self.platform_config.loop_interval  # 循环间隔(秒)
 
+        # 数据库与请求客户端
         self.db_service = AsyncMysqlService(platform=self.platform, mode=self.mode)
         self.request_client = AsyncRequestClient(logger=self.logger,aliyun_log=self.aliyun_logr)
 
-        self.logger.info(
-            f"{self.trace_id}--配置: 循环{self.loop_times}次,间隔{self.loop_interval}秒")
-
-        self.session = None
         self.feishu_sheetid = self.platform_config.feishu_sheetid
 
-        # 动态检测分页变量名
-        self.cursor_variable_name = None
-        body_str = str(self.body)
-        match = PLACEHOLDER_PATTERN.search(body_str)
-        if match:
-            self.cursor_variable_name = match.group(1)
-            self.logger.info(f"{self.trace_id}--检测到分页变量名: {self.cursor_variable_name}")
-        else:
-            self.logger.info(f"{self.trace_id}--未检测到分页变量名,默认不分页")
 
-        self.current_cursor = None
+        self.timeout = 30
+
 
+        self.max_retries = 3
+        self.resolved_body =  resolve_request_body_template(self.request_body)
 
-    async def crawl_data(self,session,dynamic_variables=None) -> Optional[List[Dict]]:
-        dynamic_variables = dynamic_variables or {}
-        if self.cursor_variable_name:
-            dynamic_variables[self.cursor_variable_name] = self.current_cursor or ""
 
-        resolved_body = resolve_request_body_template(self.body, dynamic_variables)
+
+    async def run(self):
+        """
+        爬虫入口,执行完整循环(抓取、处理、推送)
+        """
+        await self.before_run()
+        total_success, total_failed = 0, 0
+        async with aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=self.timeout)) as session:
+            for loop_index in range(1, self.loop_times + 1):
+                # 判断是否已达今日抓取上限
+                if not await self.is_video_count_sufficient():
+                    return
+                success_count, fail_count = await self.run_single_loop(session)
+                total_success += success_count
+                total_failed += fail_count
+                if loop_index < self.loop_times:
+                    await asyncio.sleep(self.loop_interval)
+        await self.after_run()
+        self.logger.info(f"{self.trace_id} 爬虫完成 成功:{total_success} 失败:{total_failed}")
+
+    async def run_single_loop(self, session) -> (int, int):
+        """
+        单次抓取循环,抓取视频列表并处理推送
+        """
+        success_count, fail_count = 0, 0
+        video_list = await self.crawl_data(session)
+        if not video_list:
+            self.logger.info(f"{self.trace_id} 未获取到视频")
+            return success_count, fail_count
+        for video in video_list:
+            result = await self.process_and_push_video(video)
+            if result:
+                success_count += 1
+            else:
+                fail_count += 1
+        return success_count, fail_count
+
+    async def process_and_push_video(self, video: Dict[str, Any]) -> bool:
+        """
+        单条视频处理流程:字段提取 -> 校验过滤 -> 标题处理 -> 推送ETL
+        """
+        try:
+            video_obj = await self.process_video(video)
+            if not video_obj:
+                return False
+            if not await self.filter_data(video_obj):
+                return False
+            await self.integrated_video_handling(video_obj)
+            return await self.push_to_etl(video_obj)
+        except Exception as e:
+            self.logger.exception(f"{self.trace_id} 视频处理异常 {e}")
+            return False
+
+    async def crawl_data(self,session) -> Optional[List[Dict]]:
+        """
+        抓取数据,自动重试,自动分页
+        :param session:
+        :param dynamic_variables:
+        :return:
+        """
 
         response = await self.request_client.request(
             session=session,
             method=self.method,
             url=self.url,
             headers=self.headers,
-            json=resolved_body
+            json=self.resolved_body
         )
-
-        if self.next_cursor_path:
-            extracted_cursor = safe_extract(response, self.next_cursor_path)
-            self.current_cursor = extracted_cursor if extracted_cursor else None
-            self.logger.info(f"{self.trace_id}--解析到下一页 {self.cursor_variable_name}: {self.current_cursor}")
+        print(safe_extract(response, self.response_data))
+        self.resolved_body = resolve_request_body_template(self.request_body,safe_extract(response, self.response_data) )
 
         data = safe_extract(response, self.data_path)
         return data if data else []
@@ -170,30 +213,17 @@ class BaseSpider(ABC):
             self.logger.error(f"{self.trace_id} VideoItem 初始化失败: {e}")
             return None
 
-    async def push_to_etl(self, item: Dict) -> bool:
-        """推送数据到ETL(同步)"""
-        self.logger.info(f"{self.trace_id}--开始推送数据到ETL: {item.get('video_id', item.get('title', '无标题'))}")
+    async def push_to_etl(self, video: Dict[str, Any]) -> bool:
+        """
+        推送处理完毕的视频到 ETL
+        """
         try:
-            await self.mq_producer.send_msg(item)
-            self.aliyun_logr.logging(
-                code="1009",
-                message="成功发送至ETL",
-                data=item,
-                trace_id=self.trace_id
-            )
-            self.logger.info(f"{self.trace_id}--数据推送成功")
+            await self.mq_producer.send_msg(video)
             return True
         except Exception as e:
-            self.logger.exception(f"{self.trace_id}--数据推送失败: {e}, 内容: {item}")
+            self.logger.exception(f"{self.trace_id} 推送ETL失败 {e}")
             return False
 
-    async def get_today_videos(self):
-        """
-        查询每天的爬虫爬取到的视频数量
-        :return:
-        """
-        video_count = await self.db_service.get_today_videos()
-        return video_count
 
     async def integrated_video_handling(self,video: Dict) -> Optional[Dict]:
         """
@@ -202,84 +232,21 @@ class BaseSpider(ABC):
         """
         await generate_titles(self.feishu_sheetid,video)
 
-    async def run(self):
-        """
-        异步运行爬虫任务,严格按顺序执行
-        1. 爬取
-        2. 处理每条数据,字段校验
-        3. 过滤(重复,平台规则,标题,发布时间)
-        4. 标题处理
-        5. 推送到ETL
-        """
-        try:
-            total_success,total_failed= 0,0
-            loop_start_time = time.time()
-            async with aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=self.TIMEOUT)) as session:
-                for loop_index in range(1, self.loop_times + 1):
-                    # 判断当日视频数量已达到最大量
-                    if not await self.is_video_count_sufficient():
-                        return
-                    self.logger.info(f"{self.trace_id}--步骤1: 开始第 {loop_index}/{self.loop_times} 次循环请求")
-                    # 请求视频列表
-                    video_list = await self.crawl_data(session)
-                    if not video_list:
-                        self.logger.warning(f"{self.trace_id}--未获取到视频数据,跳过当前循环")
-                        await self._wait_for_next_loop(loop_index)
-                        continue
-                    success_count = 0
-                    fail_count = 0
-
-
-                    for video in video_list:
-                        # 提取视频字段映射关系
-                        video_obj = await self.process_video(video)
-                        if not video_obj:
-                            self.logger.warning(f"{self.trace_id}--视频处理失败,已跳过")
-                            fail_count += 1
-                            continue
-                        # 视频过滤规则
-                        if not await self.filter_data(video_obj):
-                            self.logger.debug(f"{self.trace_id}--视频不符合规则,已跳过")
-                            continue
-                        # 视频处理
-                        await self.integrated_video_handling(video_obj)
-
-                        if await self.push_to_etl(video_obj):
-                            success_count += 1
-                        else:
-                            fail_count += 1
-
-                    total_success += success_count
-                    total_failed += fail_count
-
-                    loop_duration = time.time() - loop_start_time
-                    self.logger.info(f"{self.trace_id}--第 {loop_index}/{self.loop_times} 次循环完成. "
-                                     f"成功: {success_count}, 失败: {fail_count}, 耗时: {loop_duration:.2f}秒")
-
-                    await self._wait_for_next_loop(loop_index)
-
-                # 全局指标日志
-                self.aliyun_logr.logging(
-                    code="1003",
-                    message="爬虫执行指标汇总",
-                    data={
-                        "trace_id": self.trace_id,
-                        "classname": self.platform,
-                        "success_count": total_success,
-                        "fail_count": total_failed
-                    },
-                    trace_id=self.trace_id
-                )
-
-                self.logger.info(
-                    f"{self.trace_id}--[{self.platform}] 爬虫任务全部完成,总成功: {total_success}, 总失败: {total_failed}")
-                return True
-        except Exception as e:
-            self.logger.error(f"爬虫致命错误: {e}")
-            raise
 
     async def _wait_for_next_loop(self, current_loop: int) -> None:
         """等待下一次循环请求"""
         if current_loop < self.loop_times and self.loop_interval > 0:
             self.logger.info(f"{self.trace_id}--等待 {self.loop_interval} 秒后进行下一次请求")
             await asyncio.sleep(self.loop_interval)
+
+    async def before_run(self):
+        """
+        可覆写钩子:在运行前执行,如拉取Token等
+        """
+        pass
+
+    async def after_run(self):
+        """
+        可覆写钩子:在运行后执行,如统计汇报等
+        """
+        pass