authorspider.py 5.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123
  1. # spiders/authorspider.py
  2. from datetime import datetime, timedelta
  3. from typing import List, Dict, Optional
  4. from config import settings
  5. from core.base.async_redis_client import RedisManager
  6. from core.utils.helpers import is_near_next_day
  7. from spiders.basespider import BaseSpider
  8. from core.utils.extractors import safe_extract
  9. class AuthorSpider(BaseSpider):
  10. """作者模式爬虫 """
  11. def __init__(self, rule_dict: Dict, user_list: List, env: str = "prod"):
  12. super().__init__(rule_dict, user_list, env)
  13. # 账号模式特有状态
  14. self.user_list_from_db = [] # 数据库用户列表
  15. self.current_user_index = 0 # 当前用户索引
  16. self.current_cursor = "" # 当前分页游标(初始为空)
  17. async def before_run(self):
  18. """运行前:获取用户列表 """
  19. await super().before_run()
  20. # await RedisManager.init(redis_url=settings.redis_url)
  21. self.user_list_from_db = await self.fetch_user_list()
  22. if not self.user_list_from_db:
  23. self.logger.warning("用户列表为空,终止账号模式")
  24. # 获取当前用户索引
  25. self.current_user_index = await self.get_crawler_user_index()
  26. self.logger.info(f"{self.platform}获取用户列表完成,共 {len(self.user_list_from_db)} 个用户")
  27. async def execute(self):
  28. """执行核心逻辑 - 使用 make_request 方法"""
  29. while await self.is_video_count_sufficient():
  30. # 检查时间条件
  31. if await is_near_next_day():
  32. self.logger.info(f"距离第二天不足3分钟,停止执行")
  33. return
  34. user = self.user_list_from_db[self.current_user_index]
  35. crawler_user_uid = user.get("link")
  36. self.logger.info(
  37. f"处理用户 uid={crawler_user_uid}(第{self.current_user_index + 1}个),"
  38. f"当前cursor: {self.current_cursor or '0'}"
  39. )
  40. # 构建请求体
  41. request_body = self._build_request_body(user)
  42. # 获取当前用户视频
  43. raw_data = await self.crawl_user_videos(request_body, crawler_user_uid)
  44. # 处理数据
  45. if self.platform == "xiaoniangao":
  46. self.user_list = [user] # 特殊逻辑
  47. pass_video = await self.process_data(raw_data)
  48. # 根据是否有更多数据和下一页游标判断是否继续当前用户
  49. if raw_data and self.current_cursor:
  50. self.logger.info(
  51. f"用户 {crawler_user_uid} 获取到 {pass_video} 个通过视频,继续扫描第{self.current_cursor}页")
  52. else:
  53. # 没有通过数据或没有更多数据,切换到下一个用户
  54. self.current_user_index += 1
  55. self.current_cursor = ""
  56. # 检查是否所有用户处理完毕
  57. if self.current_user_index >= len(self.user_list_from_db):
  58. self.current_user_index = 0 # 重置索引
  59. self.current_cursor = ""
  60. await self.set_crawler_user_index()
  61. await self.wait_between_iterations()
  62. def _build_request_body(self, user: Dict) -> Dict:
  63. """构建请求体"""
  64. virtual_data = {
  65. # "uid": "173309188", # 测试
  66. "uid": str(user.get("link")),
  67. "next_cursor": self.current_cursor
  68. }
  69. return self.request_preparer.prepare(
  70. request_body_config=self.request_body_template,
  71. response_data=virtual_data
  72. )
  73. async def crawl_user_videos(self, request_body: Dict, user_uid: str) -> Optional[List[Dict]]:
  74. """请求用户视频接口 - 使用 make_request 方法"""
  75. # 使用基类的 make_request 方法发送请求
  76. response = await self.make_request(request_body)
  77. if not response:
  78. self.logger.info(f"用户 {user_uid} 请求失败")
  79. return None
  80. # 游标处理逻辑
  81. next_cursor_value = safe_extract(response, self.next_cursor)
  82. if next_cursor_value is not None:
  83. self.current_cursor = str(next_cursor_value)
  84. self.logger.debug(f"更新游标: {self.current_cursor}")
  85. data_list = safe_extract(response, self.data_path)
  86. if not data_list:
  87. self.logger.info(f"用户 {user_uid} 无更多视频数据")
  88. return None
  89. return data_list
  90. async def fetch_user_list(self) -> List[Dict]:
  91. """获取待爬取的用户列表(从数据库)- 子类实现"""
  92. return self.user_list # 默认返回传入的列表
  93. async def fetch_detail(self, item: Dict) -> Dict:
  94. """账号模式:补充视频详情(子类自行实现)"""
  95. return item # 默认返回原数据
  96. async def get_crawler_user_index(self):
  97. return await self.redis_service.get_current_user_index(self.platform) or 0
  98. async def set_crawler_user_index(self):
  99. await self.redis_service.set_current_user_index(self.platform, self.current_user_index)