authorspider.py 4.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122
  1. # spiders/authorspider.py
  2. from datetime import datetime, timedelta
  3. from typing import List, Dict, Optional
  4. from core.utils.helpers import is_near_next_day
  5. from spiders.basespider import BaseSpider
  6. from core.utils.extractors import safe_extract
  7. class AuthorSpider(BaseSpider):
  8. """作者模式爬虫 """
  9. def __init__(self, rule_dict: Dict, user_list: List, env: str = "prod"):
  10. super().__init__(rule_dict, user_list, env)
  11. # 账号模式特有状态
  12. self.user_list_from_db = [] # 数据库用户列表
  13. self.current_user_index = 0 # 当前用户索引
  14. self.current_cursor = "" # 当前分页游标(初始为空)
  15. async def before_run(self):
  16. """运行前:获取用户列表 """
  17. await super().before_run()
  18. self.user_list_from_db = await self.fetch_user_list()
  19. if not self.user_list_from_db:
  20. self.logger.warning("用户列表为空,终止账号模式")
  21. self.logger.info(f"{self.platform}获取用户列表完成,共 {len(self.user_list_from_db)} 个用户")
  22. async def execute(self):
  23. """执行核心逻辑 - 使用 make_request 方法"""
  24. if not await self.is_video_count_sufficient():
  25. self.logger.info("视频数量已达到上限,跳过执行")
  26. return
  27. while await self.is_video_count_sufficient():
  28. # 检查时间条件
  29. if await is_near_next_day():
  30. self.logger.info(f"距离第二天不足3分钟,停止执行")
  31. return
  32. user = self.user_list_from_db[self.current_user_index]
  33. crawler_user_uid = user.get("link")
  34. self.logger.info(
  35. f"处理用户 uid={crawler_user_uid}(第{self.current_user_index + 1}个),"
  36. f"当前cursor: {self.current_cursor or '0'}"
  37. )
  38. # 构建请求体
  39. request_body = self._build_request_body(user)
  40. # 获取当前用户视频
  41. raw_data = await self.crawl_user_videos(request_body, crawler_user_uid)
  42. if not raw_data:
  43. # 当前用户无数据,切换到下一个用户
  44. self.current_user_index += 1
  45. self.current_cursor = "" # 重置游标
  46. await self.wait_between_iterations()
  47. continue
  48. # 处理数据
  49. if self.platform == "xiaoniangao":
  50. self.user_list = [user] # 特殊逻辑
  51. pass_video = await self.process_data(raw_data)
  52. # 根据是否有通过的数据和下一页游标判断是否继续当前用户
  53. if pass_video == 0 and not self.current_cursor:
  54. # 没有通过数据或没有更多数据,切换到下一个用户
  55. self.current_user_index += 1
  56. self.current_cursor = ""
  57. self.logger.info(
  58. f"用户 {crawler_user_uid} 获取到 {pass_video} 个通过视频,继续扫描第{self.current_cursor}页")
  59. # 检查是否所有用户处理完毕
  60. if self.current_user_index >= len(self.user_list_from_db):
  61. self.current_user_index = 0 # 重置索引
  62. self.current_cursor = ""
  63. await self.wait_between_iterations()
  64. def _build_request_body(self, user: Dict) -> Dict:
  65. """构建请求体"""
  66. virtual_data = {
  67. "uid": str(user.get("link")),
  68. "cursor": self.current_cursor
  69. }
  70. return self.request_preparer.prepare(
  71. request_body_config=self.request_body_template,
  72. response_data=virtual_data
  73. )
  74. async def crawl_user_videos(self, request_body: Dict, user_uid: str) -> Optional[List[Dict]]:
  75. """请求用户视频接口 - 使用 make_request 方法"""
  76. # 使用基类的 make_request 方法发送请求
  77. response = await self.make_request(request_body)
  78. if not response:
  79. self.logger.info(f"用户 {user_uid} 请求失败")
  80. return None
  81. # 游标处理逻辑
  82. if safe_extract(response, self.next_cursor):
  83. self.current_cursor = safe_extract(response, self.next_cursor)
  84. data_list = safe_extract(response, self.data_path)
  85. if not data_list:
  86. self.logger.info(f"用户 {user_uid} 无更多视频数据")
  87. return None
  88. return data_list
  89. async def fetch_user_list(self) -> List[Dict]:
  90. """获取待爬取的用户列表(从数据库)- 子类实现"""
  91. return self.user_list # 默认返回传入的列表
  92. async def fetch_detail(self, item: Dict) -> Dict:
  93. """账号模式:补充视频详情(子类自行实现)"""
  94. return item # 默认返回原数据