123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102 |
- from spiders.basespider import BaseSpider
- from typing import Optional, List, Dict
- import aiohttp
- from core.utils.extractors import safe_extract
- class AuthorSpider(BaseSpider):
- """账号模式爬虫:从用户列表爬取"""
- def __init__(self, rule_dict: Dict, user_list: List, env: str = "prod"):
- super().__init__(rule_dict, user_list, env)
- # 账号模式特有状态
- self.user_list_from_db = [] # 数据库用户列表
- self.current_user_index = 0 # 当前用户索引
- self.current_cursor = "" # 当前分页游标(初始为空)
- self.next_cursor_last = ""
- async def before_run(self):
- """运行前:获取用户列表"""
- self.user_list_from_db = await self.fetch_user_list()
- if not self.user_list_from_db:
- self.logger.warning("用户列表为空,终止账号模式")
- self.logger.info(f"{self.platform}获取用户列表完成,共 {len(self.user_list_from_db)} 个用户")
- async def core_loop(self):
- """核心循环:处理每个用户的视频"""
- async with aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=self.timeout)) as session:
- while self.current_user_index < len(self.user_list_from_db):
- # 检查数量限制
- if not await self.is_video_count_sufficient():
- return
- # 当前用户
- user = self.user_list_from_db[self.current_user_index]
- crawler_user_uid = user.get("link") # 数据库中的link字段
- self.logger.info(
- f"处理用户 uid={crawler_user_uid}(第{self.current_user_index + 1}个),"
- f"当前cursor: {self.current_cursor or '0'}"
- )
- # 构建请求体:注入uid和cursor
- request_body = self._build_request_body(user)
- # 获取当前用户视频
- raw_data = await self.crawl_user_videos(session, request_body, crawler_user_uid)
- if not raw_data:
- # 切换到下一个用户
- self.current_user_index += 1
- continue
- # 处理数据
- if self.platform == "xiaoniangao":
- self.user_list = [user]
- await self.process_data(raw_data)
- if self.current_user_index == len(self.user_list_from_db)-1:
- self.current_cursor = self.next_cursor_last
- self.current_user_index = 0
- continue
- self.current_user_index += 1
- await self.wait()
- def _build_request_body(self, user: Dict) -> Dict:
- """构建请求体:将用户link和当前cursor注入"""
- # 准备"虚拟数据",键名对应你的配置路径($.uid 和 $.cursor)
- virtual_data = {
- "uid": str(user.get("link")), # 对应配置中的 $.uid
- "cursor": self.current_cursor # 对应配置中的 $.cursor
- }
- return self.request_preparer.prepare(
- request_body_config=self.request_body_template,
- response_data=virtual_data
- )
- async def fetch_user_list(self) -> List[Dict]:
- """获取待爬取的用户列表(从数据库)"""
- return []
- async def crawl_user_videos(self, session, request_body: Dict, user_uid: str) -> Optional[List[Dict]]:
- """请求用户视频接口"""
- response = await self.request_client.request(
- session=session,
- method=self.method,
- url=self.url,
- headers=self.headers,
- json=request_body
- )
- # has_more = safe_extract(response,self.has_more)
- # 解析用户视频列表
- data_list = safe_extract(response, self.data_path)
- if safe_extract(response, self.next_cursor):
- self.next_cursor_last = safe_extract(response, self.next_cursor)
- if not data_list:
- self.logger.info(f"用户 {user_uid} 无更多视频数据")
- return None
- return data_list
- async def fetch_detail(self, item: Dict) -> Dict:
- """账号模式:补充视频详情(子类自行实现)"""
- return item # 默认返回原数据
|