authorspider.py 4.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102
  1. from spiders.basespider import BaseSpider
  2. from typing import Optional, List, Dict
  3. import aiohttp
  4. from core.utils.extractors import safe_extract
  5. class AuthorSpider(BaseSpider):
  6. """账号模式爬虫:从用户列表爬取"""
  7. def __init__(self, rule_dict: Dict, user_list: List, env: str = "prod"):
  8. super().__init__(rule_dict, user_list, env)
  9. # 账号模式特有状态
  10. self.user_list_from_db = [] # 数据库用户列表
  11. self.current_user_index = 0 # 当前用户索引
  12. self.current_cursor = "" # 当前分页游标(初始为空)
  13. self.next_cursor_last = ""
  14. async def before_run(self):
  15. """运行前:获取用户列表"""
  16. self.user_list_from_db = await self.fetch_user_list()
  17. if not self.user_list_from_db:
  18. self.logger.warning("用户列表为空,终止账号模式")
  19. self.logger.info(f"{self.platform}获取用户列表完成,共 {len(self.user_list_from_db)} 个用户")
  20. async def core_loop(self):
  21. """核心循环:处理每个用户的视频"""
  22. async with aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=self.timeout)) as session:
  23. while self.current_user_index < len(self.user_list_from_db):
  24. # 检查数量限制
  25. if not await self.is_video_count_sufficient():
  26. return
  27. # 当前用户
  28. user = self.user_list_from_db[self.current_user_index]
  29. crawler_user_uid = user.get("link") # 数据库中的link字段
  30. self.logger.info(
  31. f"处理用户 uid={crawler_user_uid}(第{self.current_user_index + 1}个),"
  32. f"当前cursor: {self.current_cursor or '0'}"
  33. )
  34. # 构建请求体:注入uid和cursor
  35. request_body = self._build_request_body(user)
  36. # 获取当前用户视频
  37. raw_data = await self.crawl_user_videos(session, request_body, crawler_user_uid)
  38. if not raw_data:
  39. # 切换到下一个用户
  40. self.current_user_index += 1
  41. continue
  42. # 处理数据
  43. if self.platform == "xiaoniangao":
  44. self.user_list = [user]
  45. await self.process_data(raw_data)
  46. if self.current_user_index == len(self.user_list_from_db)-1:
  47. self.current_cursor = self.next_cursor_last
  48. self.current_user_index = 0
  49. continue
  50. self.current_user_index += 1
  51. await self.wait()
  52. def _build_request_body(self, user: Dict) -> Dict:
  53. """构建请求体:将用户link和当前cursor注入"""
  54. # 准备"虚拟数据",键名对应你的配置路径($.uid 和 $.cursor)
  55. virtual_data = {
  56. "uid": str(user.get("link")), # 对应配置中的 $.uid
  57. "cursor": self.current_cursor # 对应配置中的 $.cursor
  58. }
  59. return self.request_preparer.prepare(
  60. request_body_config=self.request_body_template,
  61. response_data=virtual_data
  62. )
  63. async def fetch_user_list(self) -> List[Dict]:
  64. """获取待爬取的用户列表(从数据库)"""
  65. return []
  66. async def crawl_user_videos(self, session, request_body: Dict, user_uid: str) -> Optional[List[Dict]]:
  67. """请求用户视频接口"""
  68. response = await self.request_client.request(
  69. session=session,
  70. method=self.method,
  71. url=self.url,
  72. headers=self.headers,
  73. json=request_body
  74. )
  75. # has_more = safe_extract(response,self.has_more)
  76. # 解析用户视频列表
  77. data_list = safe_extract(response, self.data_path)
  78. if safe_extract(response, self.next_cursor):
  79. self.next_cursor_last = safe_extract(response, self.next_cursor)
  80. if not data_list:
  81. self.logger.info(f"用户 {user_uid} 无更多视频数据")
  82. return None
  83. return data_list
  84. async def fetch_detail(self, item: Dict) -> Dict:
  85. """账号模式:补充视频详情(子类自行实现)"""
  86. return item # 默认返回原数据