zhangliang преди 2 дни
родител
ревизия
1b6b652c9c
променени са 5 файла, в които са добавени 100 реда и са изтрити 9 реда
  1. 1 1
      config/spiders_config.yaml
  2. 5 4
      core/base/async_request_client.py
  3. 58 0
      core/utils/template_resolver.py
  4. 4 2
      services/pipeline.py
  5. 32 2
      spiders/base_spider.py

+ 1 - 1
config/spiders_config.yaml

@@ -10,7 +10,7 @@ benshanzhufurecommend:
   path: /crawler/ben_shan_zhu_fu/recommend
   method: post
   request_body:
-    cursor: "{{next_cursor}}"
+    cursor: "{{next_cursor|'1'}}"
   loop_times: 200
   loop_interval: 5
   feishu_sheetid: "aTSJH4"

+ 5 - 4
core/base/async_request_client.py

@@ -20,14 +20,16 @@ class AsyncRequestClient:
         while retries < self.max_retries:
             try:
                 if self.logger:
-                    self.logger.info(f"请求 {method} {url}, 尝试 {retries+1}/{self.max_retries}")
+                    self.logger.info(f"请求 {method} {url}, 请求参数{kwargs}")
                 async with session.request(method, url, **kwargs) as response:
                     response.raise_for_status()
                     resp = await response.json()
                     if resp.get('code') != 0:
                         retries += 1
                         if self.logger:
-                            self.logger.warning(f"请求失败 {resp}, 重试 {retries}/{self.max_retries}")
+                            self.logger.warning(f"请求失败code不等于0 {resp}, 重试 {retries}/{self.max_retries}")
+                            await asyncio.sleep(5)
+                            continue
                     return resp
             except Exception as e:
                 retries += 1
@@ -43,5 +45,4 @@ class AsyncRequestClient:
                     return
                 if self.logger:
                     self.logger.warning(f"请求失败 {e}, 重试 {retries}/{self.max_retries}")
-
-                await asyncio.sleep(1)
+                await asyncio.sleep(5)

+ 58 - 0
core/utils/template_resolver.py

@@ -0,0 +1,58 @@
+# core/utils/template_resolver.py
+
+import re
+from typing import Any, Dict
+
+PLACEHOLDER_PATTERN = re.compile(r"\{\{\s*([a-zA-Z0-9_.]+)(\|[^}]+)?\s*\}\}")
+
+def resolve_request_body_template(
+    data: Any,
+    variables: Dict[str, Any],
+) -> Any:
+    """
+    仅解析请求参数中 {{var}} 模板
+
+    Args:
+        data: dict/list/str
+        variables: {"next_cursor": "123", ...}
+
+    Returns:
+        替换后的结构
+    """
+    if isinstance(data, dict):
+        return {k: resolve_request_body_template(v, variables) for k, v in data.items()}
+    elif isinstance(data, list):
+        return [resolve_request_body_template(item, variables) for item in data]
+    elif isinstance(data, str):
+        def replacer(match):
+            var_name = match.group(1)
+            default_value = match.group(2)[1:] if match.group(2) else None
+            value = variables.get(var_name)
+            if value is not None:
+                return str(value)
+            elif default_value is not None:
+                return default_value
+            else:
+                return ""
+        return PLACEHOLDER_PATTERN.sub(replacer, data)
+    else:
+        return data
+
+if __name__ == '__main__':
+    data = {
+        "cursor": "{{next_cursor}}",
+        "page_size": 20,
+        "filters": {
+            "start_date": "{{start_date}}",
+            "end_date": "{{end_date|2025-01-01}}"
+        },
+        "tags": ["{{tag1}}", "{{tag2|default_tag}}"]
+    }
+
+    variables = {
+        "next_cursor":"1",
+        "start_date": "2025-06-30",
+        "tag1": "news"
+    }
+    result = resolve_request_body_template(data, variables)
+    print(result)

+ 4 - 2
services/pipeline.py

@@ -31,7 +31,8 @@ class PiaoQuanPipeline:
 
     async def feishu_time_list(self):
         """从飞书读取天数配置"""
-        summary = await FeishuDataAsync.get_values("KsoMsyP2ghleM9tzBfmcEEXBnXg", "RuLK77")
+        async with FeishuDataAsync() as feishu_data:
+            summary = await feishu_data.get_values(spreadsheet_token="KsoMsyP2ghleM9tzBfmcEEXBnXg", sheet_id="RuLK77")
         for row in summary[1:]:
             if row[0] == self.platform:
                 return row[1]
@@ -118,7 +119,8 @@ class PiaoQuanPipeline:
 
     async def feishu_list(self):
         """从飞书拉取天数配置,用于去重判断"""
-        summary = FeishuUtils.get_values_batch("KsoMsyP2ghleM9tzBfmcEEXBnXg", "letS93")
+        async with FeishuDataAsync() as feishu_data:
+            summary = await feishu_data.get_values(spreadsheet_token="KsoMsyP2ghleM9tzBfmcEEXBnXg", sheet_id="letS93")
         for row in summary[1:]:
             if row[0] == self.platform:
                 return row[1]

+ 32 - 2
spiders/base_spider.py

@@ -13,11 +13,15 @@ from core.utils.helpers import generate_titles
 from core.utils.spider_config import SpiderConfig
 from core.utils.extractors import safe_extract, extract_fields
 from core.utils.log.logger_manager import LoggerManager
+from core.utils.template_resolver import resolve_request_body_template
 from services.async_mysql_service import AsyncMysqlService
 from services.pipeline import PiaoQuanPipeline
 from core.base.async_request_client import AsyncRequestClient
 from services.async_mq_producer import AsyncMQProducer
 
+
+import re
+PLACEHOLDER_PATTERN = re.compile(r"\{\{\s*([a-zA-Z0-9_.]+)(\|[^}]+)?\s*\}\}")
 class BaseSpider(ABC):
     """
     通用爬虫基类:支持严格顺序执行流程
@@ -56,6 +60,7 @@ class BaseSpider(ABC):
         self.response =self.platform_config.response_parse
         self.field_map =  self.response.get("fields", {})
         self.data_path =  self.response.get("data_path")
+        self.next_cursor_path = self.response.get("next_cursor")
 
 
         # 流程控制配置
@@ -71,15 +76,39 @@ class BaseSpider(ABC):
         self.session = None
         self.feishu_sheetid = self.platform_config.feishu_sheetid
 
+        # 动态检测分页变量名
+        self.cursor_variable_name = None
+        body_str = str(self.body)
+        match = PLACEHOLDER_PATTERN.search(body_str)
+        if match:
+            self.cursor_variable_name = match.group(1)
+            self.logger.info(f"{self.trace_id}--检测到分页变量名: {self.cursor_variable_name}")
+        else:
+            self.logger.info(f"{self.trace_id}--未检测到分页变量名,默认不分页")
+
+        self.current_cursor = None
+
+
+    async def crawl_data(self,session,dynamic_variables=None) -> Optional[List[Dict]]:
+        dynamic_variables = dynamic_variables or {}
+        if self.cursor_variable_name:
+            dynamic_variables[self.cursor_variable_name] = self.current_cursor or ""
+
+        resolved_body = resolve_request_body_template(self.body, dynamic_variables)
 
-    async def crawl_data(self,session) -> Optional[List[Dict]]:
         response = await self.request_client.request(
             session=session,
             method=self.method,
             url=self.url,
             headers=self.headers,
-            json=self.body
+            json=resolved_body
         )
+
+        if self.next_cursor_path:
+            extracted_cursor = safe_extract(response, self.next_cursor_path)
+            self.current_cursor = extracted_cursor if extracted_cursor else None
+            self.logger.info(f"{self.trace_id}--解析到下一页 {self.cursor_variable_name}: {self.current_cursor}")
+
         data = safe_extract(response, self.data_path)
         return data if data else []
 
@@ -200,6 +229,7 @@ class BaseSpider(ABC):
                     success_count = 0
                     fail_count = 0
 
+
                     for video in video_list:
                         # 提取视频字段映射关系
                         video_obj = await self.process_video(video)