zhangliang před 1 týdnem
rodič
revize
5ac5a02ff2

+ 1 - 1
config/spiders_config.yaml

@@ -72,7 +72,7 @@ xiaoniangaoauthor:
   feishu_sheetid: "K0gA9Y"
   response_parse:
     uid: "$.uid" # 数据库的uid
-    next_cursor: "$.cursor"
+    next_cursor: "$.data.next_cursor"
     data: "$.data"
     has_more: "$.data.has_more"
     data_path: "$.data.data"

+ 2 - 2
core/base/async_request_client.py

@@ -135,7 +135,7 @@ class AsyncRequestClient:
                 data={
                     "url": url,
                     "method": method,
-                    "requestBody": {k: v for k, v in kwargs.items() if k != 'json'}
+                    "requestBody": {k: v for k, v in kwargs.items() if k == 'json'}
                 }
             )
 
@@ -185,7 +185,7 @@ class AsyncRequestClient:
                 data={
                     "url": url,
                     "method": method,
-                    "requestBody": {k: v for k, v in kwargs.items() if k != 'json'},
+                    "requestBody": {k: v for k, v in kwargs.items() if k == 'json'},
                     "error_type": type(error).__name__,
                     "error_message": str(error)
                 }

+ 18 - 0
services/async_redis_service.py

@@ -47,3 +47,21 @@ class AsyncRedisService:
         key = self._build_key(message_id)
         pool = RedisManager.get_pool()
         await pool.set(key, "1", ex=self.ttl)
+
+    async def get_current_user_index(self, platform: str,):
+        """
+        获取当前爬虫账户下标
+        """
+        key = f"{platform}_user_crawler_index"
+        pool = RedisManager.get_pool()
+        return await pool.get(key)
+
+    async def set_current_user_index(self, platform: str, user_index: int):
+        """
+        设置爬取账号下标
+        """
+        key = f"{platform}_user_crawler_index"
+        pool = RedisManager.get_pool()
+        await pool.set(key, user_index, ex=self.ttl)
+
+

+ 15 - 20
spiders/authorspider.py

@@ -23,14 +23,12 @@ class AuthorSpider(BaseSpider):
         self.user_list_from_db = await self.fetch_user_list()
         if not self.user_list_from_db:
             self.logger.warning("用户列表为空,终止账号模式")
+        # 获取当前用户索引
+        self.current_user_index = await self.get_crawler_user_index()
         self.logger.info(f"{self.platform}获取用户列表完成,共 {len(self.user_list_from_db)} 个用户")
 
     async def execute(self):
         """执行核心逻辑 - 使用 make_request 方法"""
-        if not await self.is_video_count_sufficient():
-            self.logger.info("视频数量已达到上限,跳过执行")
-            return
-
         while await self.is_video_count_sufficient():
 
             # 检查时间条件
@@ -51,39 +49,30 @@ class AuthorSpider(BaseSpider):
             # 获取当前用户视频
             raw_data = await self.crawl_user_videos(request_body, crawler_user_uid)
 
-            if not raw_data:
-                # 当前用户无数据,切换到下一个用户
-                self.current_user_index += 1
-                self.current_cursor = ""  # 重置游标
-                await self.wait_between_iterations()
-                continue
-
-
             # 处理数据
             if self.platform == "xiaoniangao":
                 self.user_list = [user]  # 特殊逻辑
-
             pass_video = await self.process_data(raw_data)
             # 根据是否有通过的数据和下一页游标判断是否继续当前用户
-            if pass_video == 0 and not self.current_cursor:
+            if pass_video != 0 and self.current_cursor:
+                self.logger.info(
+                    f"用户 {crawler_user_uid} 获取到 {pass_video} 个通过视频,继续扫描第{self.current_cursor}页")
+            else:
                 # 没有通过数据或没有更多数据,切换到下一个用户
                 self.current_user_index += 1
                 self.current_cursor = ""
 
-            self.logger.info(
-                f"用户 {crawler_user_uid} 获取到 {pass_video} 个通过视频,继续扫描第{self.current_cursor}页")
-
-
             # 检查是否所有用户处理完毕
             if self.current_user_index >= len(self.user_list_from_db):
                 self.current_user_index = 0  # 重置索引
                 self.current_cursor = ""
-
+            await self.set_crawler_user_index()
             await self.wait_between_iterations()
 
     def _build_request_body(self, user: Dict) -> Dict:
         """构建请求体"""
         virtual_data = {
+            # "uid": "173309188", # 测试
             "uid": str(user.get("link")),
             "cursor": self.current_cursor
         }
@@ -119,4 +108,10 @@ class AuthorSpider(BaseSpider):
 
     async def fetch_detail(self, item: Dict) -> Dict:
         """账号模式:补充视频详情(子类自行实现)"""
-        return item  # 默认返回原数据
+        return item  # 默认返回原数据
+
+    async def get_crawler_user_index(self):
+        return await self.redis_service.get_current_user_index(self.platform) or 0
+
+    async def set_crawler_user_index(self):
+        await self.redis_service.set_current_user_index(self.platform, self.current_user_index)

+ 22 - 3
spiders/basespider.py

@@ -14,6 +14,7 @@ from core.video_processor import VideoProcessor
 from services.async_mysql_service import AsyncMysqlService
 from services.async_mq_producer import AsyncMQProducer
 from core.base.async_request_client import AsyncRequestClient
+from services.async_redis_service import AsyncRedisService
 
 
 class BaseSpider(ABC):
@@ -22,7 +23,8 @@ class BaseSpider(ABC):
     def __init__(self, rule_dict: Dict, user_list: List, env: str = "prod",
                  request_client: AsyncRequestClient = None,
                  db_service: AsyncMysqlService = None,
-                 mq_producer: AsyncMQProducer = None):
+                 mq_producer: AsyncMQProducer = None,
+                 redis_service: AsyncRedisService = None):
         # 基础属性
         self.rule_dict = rule_dict
         self.user_list = user_list
@@ -32,6 +34,7 @@ class BaseSpider(ABC):
         self.request_client = request_client
         self.db_service = db_service
         self.mq_producer = mq_producer
+        self.redis_service = redis_service
 
         # 通过类名获取配置
         class_name = self.__class__.__name__.lower()
@@ -96,6 +99,9 @@ class BaseSpider(ABC):
                 platform=self.config.platform,
                 mode=self.config.mode
             )
+        if not self.redis_service:
+            # RedisManager.init(redis_url=settings.redis_url)
+            self.redis_service = AsyncRedisService()
 
     def _setup_from_config(self):
         """从配置中设置属性"""
@@ -137,6 +143,19 @@ class BaseSpider(ABC):
 
     async def process_data(self, data: List[Dict]):
         """处理数据"""
+        # 处理data为None的情况
+        if data is None:
+            data_length = 0
+        else:
+            data_length = len(data)
+            
+        self.aliyun_log.logging(
+            code="1001",
+            message=f"获取到的列表长度:{data_length}",
+            data=data_length,
+        )
+        if not data:
+            return 0
         success_count = 0
         for item in data:
             self.aliyun_log.logging(
@@ -153,7 +172,7 @@ class BaseSpider(ABC):
             except Exception as e:
                 self.logger.error(f"处理单条数据失败: {e}")
                 self.stats['fail'] += 1
-        self.logger.info(f"批次处理完成: 成功 {success_count}/{len(data)}")
+        self.logger.info(f"批次处理完成: 成功 {success_count}/{data_length}")
         return success_count
 
 
@@ -211,8 +230,8 @@ class BaseSpider(ABC):
             return True
 
         current_count = await self.db_service.get_today_videos()
+        self.logger.info(f"已抓取数量: {current_count}/{max_count}")
         if current_count >= max_count:
-            self.logger.info(f"视频数量达到当日最大值: {current_count}/{max_count}")
             self.aliyun_log.logging(
                 code="1011",
                 message="视频数量达到最大值",

+ 0 - 9
spiders/recommendspider.py

@@ -14,10 +14,6 @@ class RecommendSpider(BaseSpider):
 
     async def execute(self):
         """执行核心逻辑 - 使用 make_request 方法"""
-        if not await self.is_video_count_sufficient():
-            self.logger.info("视频数量已达到上限,跳过执行")
-            return
-
         iteration = 0
         while iteration < self.loop_times and await self.is_video_count_sufficient():
             self.logger.info(f"执行第 {iteration + 1} 轮")
@@ -41,11 +37,6 @@ class RecommendSpider(BaseSpider):
 
             # 提取数据
             data_list = safe_extract(response, self.data_path)
-            if not data_list:
-                self.logger.info("未获取到数据")
-                iteration += 1
-                await self.wait_between_iterations()
-                continue
 
             # 处理数据
             await self.process_data(data_list)