# spiders/authorspider.py from datetime import datetime, timedelta from typing import List, Dict, Optional from config import settings from core.base.async_redis_client import RedisManager from core.utils.helpers import is_near_next_day from spiders.basespider import BaseSpider from core.utils.extractors import safe_extract class AuthorSpider(BaseSpider): """作者模式爬虫 """ def __init__(self, rule_dict: Dict, user_list: List, env: str = "prod"): super().__init__(rule_dict, user_list, env) # 账号模式特有状态 self.user_list_from_db = [] # 数据库用户列表 self.current_user_index = 0 # 当前用户索引 self.current_cursor = "" # 当前分页游标(初始为空) async def before_run(self): """运行前:获取用户列表 """ await super().before_run() # await RedisManager.init(redis_url=settings.redis_url) self.user_list_from_db = await self.fetch_user_list() if not self.user_list_from_db: self.logger.warning("用户列表为空,终止账号模式") # 获取当前用户索引 self.current_user_index = await self.get_crawler_user_index() self.logger.info(f"{self.platform}获取用户列表完成,共 {len(self.user_list_from_db)} 个用户") async def execute(self): """执行核心逻辑 - 使用 make_request 方法""" while await self.is_video_count_sufficient(): # 检查时间条件 if await is_near_next_day(): self.logger.info(f"距离第二天不足3分钟,停止执行") return user = self.user_list_from_db[self.current_user_index] crawler_user_uid = user.get("link") self.logger.info( f"处理用户 uid={crawler_user_uid}(第{self.current_user_index + 1}个)," f"当前cursor: {self.current_cursor or '0'}" ) # 构建请求体 request_body = self._build_request_body(user) # 获取当前用户视频 raw_data = await self.crawl_user_videos(request_body, crawler_user_uid) # 处理数据 if self.platform == "xiaoniangao": self.user_list = [user] # 特殊逻辑 pass_video = await self.process_data(raw_data) # 根据是否有更多数据和下一页游标判断是否继续当前用户 if raw_data and self.current_cursor: self.logger.info( f"用户 {crawler_user_uid} 获取到 {pass_video} 个通过视频,继续扫描第{self.current_cursor}页") else: # 没有通过数据或没有更多数据,切换到下一个用户 self.current_user_index += 1 self.current_cursor = "" # 检查是否所有用户处理完毕 if self.current_user_index >= len(self.user_list_from_db): self.current_user_index = 0 # 重置索引 self.current_cursor = "" await self.set_crawler_user_index() await self.wait_between_iterations() def _build_request_body(self, user: Dict) -> Dict: """构建请求体""" virtual_data = { # "uid": "173309188", # 测试 "uid": str(user.get("link")), "next_cursor": self.current_cursor } return self.request_preparer.prepare( request_body_config=self.request_body_template, response_data=virtual_data ) async def crawl_user_videos(self, request_body: Dict, user_uid: str) -> Optional[List[Dict]]: """请求用户视频接口 - 使用 make_request 方法""" # 使用基类的 make_request 方法发送请求 response = await self.make_request(request_body) if not response: self.logger.info(f"用户 {user_uid} 请求失败") return None # 游标处理逻辑 next_cursor_value = safe_extract(response, self.next_cursor) if next_cursor_value is not None: self.current_cursor = str(next_cursor_value) self.logger.debug(f"更新游标: {self.current_cursor}") data_list = safe_extract(response, self.data_path) if not data_list: self.logger.info(f"用户 {user_uid} 无更多视频数据") return None return data_list async def fetch_user_list(self) -> List[Dict]: """获取待爬取的用户列表(从数据库)- 子类实现""" return self.user_list # 默认返回传入的列表 async def fetch_detail(self, item: Dict) -> Dict: """账号模式:补充视频详情(子类自行实现)""" return item # 默认返回原数据 async def get_crawler_user_index(self): return await self.redis_service.get_current_user_index(self.platform) or 0 async def set_crawler_user_index(self): await self.redis_service.set_current_user_index(self.platform, self.current_user_index)