from spiders.basespider import BaseSpider from typing import Optional, List, Dict import aiohttp from core.utils.extractors import safe_extract class AuthorSpider(BaseSpider): """账号模式爬虫:从用户列表爬取""" def __init__(self, rule_dict: Dict, user_list: List, env: str = "prod"): super().__init__(rule_dict, user_list, env) # 账号模式特有状态 self.user_list_from_db = [] # 数据库用户列表 self.current_user_index = 0 # 当前用户索引 self.current_cursor = "" # 当前分页游标(初始为空) self.next_cursor_last = "" async def before_run(self): """运行前:获取用户列表""" self.user_list_from_db = await self.fetch_user_list() if not self.user_list_from_db: self.logger.warning("用户列表为空,终止账号模式") self.logger.info(f"{self.platform}获取用户列表完成,共 {len(self.user_list_from_db)} 个用户") async def core_loop(self): """核心循环:处理每个用户的视频""" async with aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=self.timeout)) as session: while self.current_user_index < len(self.user_list_from_db): # 检查数量限制 if not await self.is_video_count_sufficient(): return # 当前用户 user = self.user_list_from_db[self.current_user_index] crawler_user_uid = user.get("link") # 数据库中的link字段 self.logger.info( f"处理用户 uid={crawler_user_uid}(第{self.current_user_index + 1}个)," f"当前cursor: {self.current_cursor or '0'}" ) # 构建请求体:注入uid和cursor request_body = self._build_request_body(user) # 获取当前用户视频 raw_data = await self.crawl_user_videos(session, request_body, crawler_user_uid) if not raw_data: # 切换到下一个用户 self.current_user_index += 1 continue # 处理数据 if self.platform == "xiaoniangao": self.user_list = [user] await self.process_data(raw_data) if self.current_user_index == len(self.user_list_from_db)-1: self.current_cursor = self.next_cursor_last self.current_user_index = 0 continue self.current_user_index += 1 await self.wait() def _build_request_body(self, user: Dict) -> Dict: """构建请求体:将用户link和当前cursor注入""" # 准备"虚拟数据",键名对应你的配置路径($.uid 和 $.cursor) virtual_data = { "uid": str(user.get("link")), # 对应配置中的 $.uid "cursor": self.current_cursor # 对应配置中的 $.cursor } return self.request_preparer.prepare( request_body_config=self.request_body_template, response_data=virtual_data ) async def fetch_user_list(self) -> List[Dict]: """获取待爬取的用户列表(从数据库)""" return [] async def crawl_user_videos(self, session, request_body: Dict, user_uid: str) -> Optional[List[Dict]]: """请求用户视频接口""" response = await self.request_client.request( session=session, method=self.method, url=self.url, headers=self.headers, json=request_body ) # has_more = safe_extract(response,self.has_more) # 解析用户视频列表 data_list = safe_extract(response, self.data_path) if safe_extract(response, self.next_cursor): self.next_cursor_last = safe_extract(response, self.next_cursor) if not data_list: self.logger.info(f"用户 {user_uid} 无更多视频数据") return None return data_list async def fetch_detail(self, item: Dict) -> Dict: """账号模式:补充视频详情(子类自行实现)""" return item # 默认返回原数据