authorspider.py 4.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113
  1. from datetime import datetime, timedelta
  2. from spiders.basespider import BaseSpider
  3. from typing import Optional, List, Dict
  4. import aiohttp
  5. from core.utils.extractors import safe_extract
  6. class AuthorSpider(BaseSpider):
  7. """账号模式爬虫:从用户列表爬取"""
  8. def __init__(self, rule_dict: Dict, user_list: List, env: str = "prod"):
  9. super().__init__(rule_dict, user_list, env)
  10. # 账号模式特有状态
  11. self.user_list_from_db = [] # 数据库用户列表
  12. self.current_user_index = 0 # 当前用户索引
  13. self.current_cursor = "" # 当前分页游标(初始为空)
  14. self.next_cursor_last = ""
  15. async def before_run(self):
  16. """运行前:获取用户列表"""
  17. self.user_list_from_db = await self.fetch_user_list()
  18. if not self.user_list_from_db:
  19. self.logger.warning("用户列表为空,终止账号模式")
  20. self.logger.info(f"{self.platform}获取用户列表完成,共 {len(self.user_list_from_db)} 个用户")
  21. async def core_loop(self):
  22. """核心循环:处理每个用户的视频"""
  23. async with aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=self.timeout)) as session:
  24. while self.current_user_index < len(self.user_list_from_db):
  25. if self.is_less_than_3_minutes():
  26. return
  27. # 检查数量限制
  28. if not await self.is_video_count_sufficient():
  29. return
  30. # 当前用户
  31. user = self.user_list_from_db[self.current_user_index]
  32. crawler_user_uid = user.get("link") # 数据库中的link字段
  33. self.logger.info(
  34. f"处理用户 uid={crawler_user_uid}(第{self.current_user_index + 1}个),"
  35. f"当前cursor: {self.current_cursor or '0'}"
  36. )
  37. # 构建请求体:注入uid和cursor
  38. request_body = self._build_request_body(user)
  39. # 获取当前用户视频
  40. raw_data = await self.crawl_user_videos(session, request_body, crawler_user_uid)
  41. if not raw_data:
  42. # 切换到下一个用户
  43. self.current_user_index += 1
  44. continue
  45. # 处理数据
  46. if self.platform == "xiaoniangao":
  47. self.user_list = [user]
  48. await self.process_data(raw_data)
  49. if self.current_user_index == len(self.user_list_from_db)-1:
  50. self.current_cursor = self.next_cursor_last
  51. self.current_user_index = 0
  52. continue
  53. self.current_user_index += 1
  54. await self.wait()
  55. def _build_request_body(self, user: Dict) -> Dict:
  56. """构建请求体:将用户link和当前cursor注入"""
  57. # 准备"虚拟数据",键名对应你的配置路径($.uid 和 $.cursor)
  58. virtual_data = {
  59. "uid": str(user.get("link")), # 对应配置中的 $.uid
  60. "cursor": self.current_cursor # 对应配置中的 $.cursor
  61. }
  62. return self.request_preparer.prepare(
  63. request_body_config=self.request_body_template,
  64. response_data=virtual_data
  65. )
  66. async def fetch_user_list(self) -> List[Dict]:
  67. """获取待爬取的用户列表(从数据库)"""
  68. return []
  69. async def crawl_user_videos(self, session, request_body: Dict, user_uid: str) -> Optional[List[Dict]]:
  70. """请求用户视频接口"""
  71. response = await self.request_client.request(
  72. session=session,
  73. method=self.method,
  74. url=self.url,
  75. headers=self.headers,
  76. json=request_body
  77. )
  78. # has_more = safe_extract(response,self.has_more)
  79. # 解析用户视频列表
  80. data_list = safe_extract(response, self.data_path)
  81. if safe_extract(response, self.next_cursor):
  82. self.next_cursor_last = safe_extract(response, self.next_cursor)
  83. if not data_list:
  84. self.logger.info(f"用户 {user_uid} 无更多视频数据")
  85. return None
  86. return data_list
  87. async def fetch_detail(self, item: Dict) -> Dict:
  88. """账号模式:补充视频详情(子类自行实现)"""
  89. return item # 默认返回原数据
  90. def is_less_than_3_minutes(self):
  91. now = datetime.now()
  92. tomorrow = now.date() + timedelta(days=1)
  93. midnight = datetime.combine(tomorrow, datetime.min.time())
  94. time_left = midnight - now
  95. return time_left.total_seconds() < 3 * 60