123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123 |
- # spiders/authorspider.py
- from datetime import datetime, timedelta
- from typing import List, Dict, Optional
- from config import settings
- from core.base.async_redis_client import RedisManager
- from core.utils.helpers import is_near_next_day
- from spiders.basespider import BaseSpider
- from core.utils.extractors import safe_extract
- class AuthorSpider(BaseSpider):
- """作者模式爬虫 """
- def __init__(self, rule_dict: Dict, user_list: List, env: str = "prod"):
- super().__init__(rule_dict, user_list, env)
- # 账号模式特有状态
- self.user_list_from_db = [] # 数据库用户列表
- self.current_user_index = 0 # 当前用户索引
- self.current_cursor = "" # 当前分页游标(初始为空)
- async def before_run(self):
- """运行前:获取用户列表 """
- await super().before_run()
- # await RedisManager.init(redis_url=settings.redis_url)
- self.user_list_from_db = await self.fetch_user_list()
- if not self.user_list_from_db:
- self.logger.warning("用户列表为空,终止账号模式")
- # 获取当前用户索引
- self.current_user_index = await self.get_crawler_user_index()
- self.logger.info(f"{self.platform}获取用户列表完成,共 {len(self.user_list_from_db)} 个用户")
- async def execute(self):
- """执行核心逻辑 - 使用 make_request 方法"""
- while await self.is_video_count_sufficient():
- # 检查时间条件
- if await is_near_next_day():
- self.logger.info(f"距离第二天不足3分钟,停止执行")
- return
- user = self.user_list_from_db[self.current_user_index]
- crawler_user_uid = user.get("link")
- self.logger.info(
- f"处理用户 uid={crawler_user_uid}(第{self.current_user_index + 1}个),"
- f"当前cursor: {self.current_cursor or '0'}"
- )
- # 构建请求体
- request_body = self._build_request_body(user)
- # 获取当前用户视频
- raw_data = await self.crawl_user_videos(request_body, crawler_user_uid)
- # 处理数据
- if self.platform == "xiaoniangao":
- self.user_list = [user] # 特殊逻辑
- pass_video = await self.process_data(raw_data)
- # 根据是否有更多数据和下一页游标判断是否继续当前用户
- if raw_data and self.current_cursor:
- self.logger.info(
- f"用户 {crawler_user_uid} 获取到 {pass_video} 个通过视频,继续扫描第{self.current_cursor}页")
- else:
- # 没有通过数据或没有更多数据,切换到下一个用户
- self.current_user_index += 1
- self.current_cursor = ""
- # 检查是否所有用户处理完毕
- if self.current_user_index >= len(self.user_list_from_db):
- self.current_user_index = 0 # 重置索引
- self.current_cursor = ""
- await self.set_crawler_user_index()
- await self.wait_between_iterations()
- def _build_request_body(self, user: Dict) -> Dict:
- """构建请求体"""
- virtual_data = {
- # "uid": "173309188", # 测试
- "uid": str(user.get("link")),
- "next_cursor": self.current_cursor
- }
- return self.request_preparer.prepare(
- request_body_config=self.request_body_template,
- response_data=virtual_data
- )
- async def crawl_user_videos(self, request_body: Dict, user_uid: str) -> Optional[List[Dict]]:
- """请求用户视频接口 - 使用 make_request 方法"""
- # 使用基类的 make_request 方法发送请求
- response = await self.make_request(request_body)
- if not response:
- self.logger.info(f"用户 {user_uid} 请求失败")
- return None
- # 游标处理逻辑
- next_cursor_value = safe_extract(response, self.next_cursor)
- if next_cursor_value is not None:
- self.current_cursor = str(next_cursor_value)
- self.logger.debug(f"更新游标: {self.current_cursor}")
- data_list = safe_extract(response, self.data_path)
- if not data_list:
- self.logger.info(f"用户 {user_uid} 无更多视频数据")
- return None
- return data_list
- async def fetch_user_list(self) -> List[Dict]:
- """获取待爬取的用户列表(从数据库)- 子类实现"""
- return self.user_list # 默认返回传入的列表
- async def fetch_detail(self, item: Dict) -> Dict:
- """账号模式:补充视频详情(子类自行实现)"""
- return item # 默认返回原数据
- async def get_crawler_user_index(self):
- return await self.redis_service.get_current_user_index(self.platform) or 0
- async def set_crawler_user_index(self):
- await self.redis_service.set_current_user_index(self.platform, self.current_user_index)
|