Ver código fonte

修复任务id获取

zhangliang 7 horas atrás
pai
commit
f0821ee29c

+ 1 - 1
config/base.py

@@ -33,7 +33,7 @@ class Settings(BaseSettings):
     ROCKETMQ_ACCESS_KEY_ID: str = Field(..., env="ROCKETMQ_ACCESS_KEY_ID")
     ROCKETMQ_ACCESS_KEY_SECRET: str = Field(..., env="ROCKETMQ_ACCESS_KEY_SECRET")
     ROCKETMQ_INSTANCE_ID: str = Field(..., env="ROCKETMQ_INSTANCE_ID")
-    ROCKETMQ_WAIT_SECONDS: int = 10
+    ROCKETMQ_WAIT_SECONDS: int = 30 # 最长30s
     ROCKETMQ_BATCH: int = 1
 
     # 飞书配置

+ 6 - 2
config/spiders_config.yaml

@@ -13,7 +13,9 @@ benshanzhufurecommend:
   request_body:
     cursor: "{{next_cursor}}"
   loop_times: 200
-  loop_interval: 5
+  loop_interval:
+    min: 10
+    max: 30
   feishu_sheetid: "aTSJH4"
   response_parse:
     data: "$.data"
@@ -39,7 +41,9 @@ yuannifuqimanmanrecommend:
   request_body:
     cursor: "{{next_cursor}}"
   loop_times: 200
-  loop_interval: 5
+  loop_interval:
+    min: 10
+    max: 30
   feishu_sheetid: "golXy9"
   response_parse:
     data: "$.data"

+ 1 - 1
core/models/spiders_config_models.py

@@ -14,7 +14,7 @@ class PlatformConfig(BaseConfig):
     method: str
     request_body: dict = {}
     loop_times: int = 1
-    loop_interval: int = 0
+    loop_interval: dict = {}
     response_parse: dict = {}
     retry_times: int = 0
     feishu_sheetid: str

+ 1 - 1
scheduler/async_consumer.py

@@ -101,7 +101,7 @@ async def async_handle_topic(topic: str, stop_event: asyncio.Event):
                         await handle_single_message(message)
                 else:
                     # 无消息时短暂休眠,避免频繁空轮询
-                    await asyncio.sleep(1)
+                    await asyncio.sleep(300)
             except Exception as e:
                 # 非消息处理的异常(如 MQ 连接失败),记录并重试
                 logger.error(f"[{topic}] 消费循环异常: {e}", exc_info=True)

+ 7 - 5
spiders/base_spider.py

@@ -1,4 +1,5 @@
 import asyncio
+import random
 import uuid
 from typing import List, Dict, Optional, Any
 
@@ -59,7 +60,7 @@ class BaseSpider:
 
         # 爬取行为相关的配置
         self.loop_times = self.platform_config.loop_times or 100
-        self.loop_interval = self.platform_config.loop_interval or 5
+        self.loop_interval = self.platform_config.loop_interval
         self.timeout = self.platform_config.request_timeout or 30
         self.max_retries = self.platform_config.max_retries or 3
         self.feishu_sheetid = self.platform_config.feishu_sheetid
@@ -69,7 +70,7 @@ class BaseSpider:
         self.logger = LoggerManager.get_logger(platform=self.platform, mode=self.mode)
         self.aliyun_log = LoggerManager.get_aliyun_logger(platform=self.platform, mode=self.mode)
         self.logger.info(f"爬虫 '{self.platform}/{self.mode}' 初始化...")
-        self.logger.info(f"最大循环次数: {self.loop_times}, 循环间隔: {self.loop_interval}s")
+        self.logger.info(f"最大循环次数: {self.loop_times}, 循环间隔时间: {self.loop_interval}")
 
     def _setup_services(self):
         """初始化外部服务客户端。"""
@@ -282,9 +283,10 @@ class BaseSpider:
 
     async def _wait_for_next_loop(self, current_loop: int) -> None:
         """等待下次循环"""
-        if current_loop < self.loop_times and self.loop_interval > 0:
-            self.logger.info(f"等待 {self.loop_interval} 秒后进行下一次请求")
-            await asyncio.sleep(self.loop_interval)
+        if current_loop < self.loop_times:
+            wait_time = random.randint(self.loop_interval["min"], self.loop_interval["max"])
+            self.logger.info(f"等待 {wait_time} 秒后进行下一次请求")
+            await asyncio.sleep(wait_time)
 
     async def before_run(self):
         """运行前预处理钩子,子类可覆盖"""