authorspider.py 4.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117
  1. # spiders/authorspider.py
  2. from datetime import datetime, timedelta
  3. from typing import List, Dict, Optional
  4. from core.utils.helpers import is_near_next_day
  5. from spiders.basespider import BaseSpider
  6. from core.utils.extractors import safe_extract
  7. class AuthorSpider(BaseSpider):
  8. """作者模式爬虫 """
  9. def __init__(self, rule_dict: Dict, user_list: List, env: str = "prod"):
  10. super().__init__(rule_dict, user_list, env)
  11. # 账号模式特有状态
  12. self.user_list_from_db = [] # 数据库用户列表
  13. self.current_user_index = 0 # 当前用户索引
  14. self.current_cursor = "" # 当前分页游标(初始为空)
  15. async def before_run(self):
  16. """运行前:获取用户列表 """
  17. await super().before_run()
  18. self.user_list_from_db = await self.fetch_user_list()
  19. if not self.user_list_from_db:
  20. self.logger.warning("用户列表为空,终止账号模式")
  21. # 获取当前用户索引
  22. self.current_user_index = await self.get_crawler_user_index()
  23. self.logger.info(f"{self.platform}获取用户列表完成,共 {len(self.user_list_from_db)} 个用户")
  24. async def execute(self):
  25. """执行核心逻辑 - 使用 make_request 方法"""
  26. while await self.is_video_count_sufficient():
  27. # 检查时间条件
  28. if await is_near_next_day():
  29. self.logger.info(f"距离第二天不足3分钟,停止执行")
  30. return
  31. user = self.user_list_from_db[self.current_user_index]
  32. crawler_user_uid = user.get("link")
  33. self.logger.info(
  34. f"处理用户 uid={crawler_user_uid}(第{self.current_user_index + 1}个),"
  35. f"当前cursor: {self.current_cursor or '0'}"
  36. )
  37. # 构建请求体
  38. request_body = self._build_request_body(user)
  39. # 获取当前用户视频
  40. raw_data = await self.crawl_user_videos(request_body, crawler_user_uid)
  41. # 处理数据
  42. if self.platform == "xiaoniangao":
  43. self.user_list = [user] # 特殊逻辑
  44. pass_video = await self.process_data(raw_data)
  45. # 根据是否有通过的数据和下一页游标判断是否继续当前用户
  46. if pass_video != 0 and self.current_cursor:
  47. self.logger.info(
  48. f"用户 {crawler_user_uid} 获取到 {pass_video} 个通过视频,继续扫描第{self.current_cursor}页")
  49. else:
  50. # 没有通过数据或没有更多数据,切换到下一个用户
  51. self.current_user_index += 1
  52. self.current_cursor = ""
  53. # 检查是否所有用户处理完毕
  54. if self.current_user_index >= len(self.user_list_from_db):
  55. self.current_user_index = 0 # 重置索引
  56. self.current_cursor = ""
  57. await self.set_crawler_user_index()
  58. await self.wait_between_iterations()
  59. def _build_request_body(self, user: Dict) -> Dict:
  60. """构建请求体"""
  61. virtual_data = {
  62. # "uid": "173309188", # 测试
  63. "uid": str(user.get("link")),
  64. "cursor": self.current_cursor
  65. }
  66. return self.request_preparer.prepare(
  67. request_body_config=self.request_body_template,
  68. response_data=virtual_data
  69. )
  70. async def crawl_user_videos(self, request_body: Dict, user_uid: str) -> Optional[List[Dict]]:
  71. """请求用户视频接口 - 使用 make_request 方法"""
  72. # 使用基类的 make_request 方法发送请求
  73. response = await self.make_request(request_body)
  74. if not response:
  75. self.logger.info(f"用户 {user_uid} 请求失败")
  76. return None
  77. # 游标处理逻辑
  78. if safe_extract(response, self.next_cursor):
  79. self.current_cursor = safe_extract(response, self.next_cursor)
  80. data_list = safe_extract(response, self.data_path)
  81. if not data_list:
  82. self.logger.info(f"用户 {user_uid} 无更多视频数据")
  83. return None
  84. return data_list
  85. async def fetch_user_list(self) -> List[Dict]:
  86. """获取待爬取的用户列表(从数据库)- 子类实现"""
  87. return self.user_list # 默认返回传入的列表
  88. async def fetch_detail(self, item: Dict) -> Dict:
  89. """账号模式:补充视频详情(子类自行实现)"""
  90. return item # 默认返回原数据
  91. async def get_crawler_user_index(self):
  92. return await self.redis_service.get_current_user_index(self.platform) or 0
  93. async def set_crawler_user_index(self):
  94. await self.redis_service.set_current_user_index(self.platform, self.current_user_index)