authorspider.py 4.1 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697
  1. from spiders.basespider import BaseSpider
  2. from typing import Optional, List, Dict
  3. import aiohttp
  4. from core.utils.extractors import safe_extract
  5. class AuthorSpider(BaseSpider):
  6. """账号模式爬虫:从用户列表爬取"""
  7. def __init__(self, rule_dict: Dict, user_list: List, env: str = "prod"):
  8. super().__init__(rule_dict, user_list, env)
  9. # 账号模式特有状态
  10. self.user_list_from_db = [] # 数据库用户列表
  11. self.current_user_index = 0 # 当前用户索引
  12. self.current_cursor = "" # 当前分页游标(初始为空)
  13. async def before_run(self):
  14. """运行前:获取用户列表"""
  15. self.user_list_from_db = await self.fetch_user_list()
  16. if not self.user_list_from_db:
  17. self.logger.warning("用户列表为空,终止账号模式")
  18. self.logger.info(f"{self.platform}获取用户列表完成,共 {len(self.user_list_from_db)} 个用户")
  19. async def core_loop(self):
  20. """核心循环:处理每个用户的视频"""
  21. async with aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=self.timeout)) as session:
  22. while self.current_user_index < len(self.user_list_from_db):
  23. # 检查数量限制
  24. if not await self.is_video_count_sufficient():
  25. return
  26. # 当前用户
  27. user = self.user_list_from_db[self.current_user_index]
  28. user_uid = user.get("uid") # 数据库中的uid字段
  29. self.logger.info(
  30. f"处理用户 uid={user_uid}(第{self.current_user_index + 1}个),"
  31. f"当前cursor: {self.current_cursor or '0'}"
  32. )
  33. # 构建请求体:注入uid和cursor
  34. request_body = self._build_request_body(user)
  35. # 获取当前用户视频
  36. hase_more,raw_data = await self.crawl_user_videos(session, request_body, user_uid)
  37. if not hase_more:
  38. self.logger.info(f"用户 {user_uid} 第{int(self.current_cursor or 0) + 1}页无更多视频")
  39. if not raw_data:
  40. # 切换到下一个用户
  41. self.current_user_index += 1
  42. continue
  43. # 处理数据
  44. await self.process_raw_data(raw_data)
  45. if self.current_user_index == len(self.user_list_from_db)-1:
  46. self.current_cursor = str(int(self.current_cursor or 0) + 1)
  47. self.current_user_index = 0
  48. continue
  49. self.current_user_index += 1
  50. await self.wait()
  51. def _build_request_body(self, user: Dict) -> Dict:
  52. """构建请求体:将用户uid和当前cursor注入"""
  53. # 准备"虚拟数据",键名对应你的配置路径($.uid 和 $.cursor)
  54. virtual_data = {
  55. "uid": str(user.get("uid")), # 对应配置中的 $.uid
  56. "cursor": self.current_cursor # 对应配置中的 $.cursor
  57. }
  58. return self.request_preparer.prepare(
  59. request_body_config=self.request_body_template,
  60. response_data=virtual_data
  61. )
  62. async def fetch_user_list(self) -> List[Dict]:
  63. """获取待爬取的用户列表(从数据库)"""
  64. return []
  65. async def crawl_user_videos(self, session, request_body: Dict, user_uid: str) -> Optional[List[Dict]]:
  66. """请求用户视频接口"""
  67. response = await self.request_client.request(
  68. session=session,
  69. method=self.method,
  70. url=self.url,
  71. headers=self.headers,
  72. json=request_body
  73. )
  74. has_more = safe_extract(response,self.has_more)
  75. # 解析用户视频列表
  76. data_list = safe_extract(response, self.data_path)
  77. if not data_list:
  78. self.logger.info(f"用户 {user_uid} 第{self.current_cursor}页无视频数据")
  79. return None, None
  80. return has_more, data_list
  81. async def fetch_detail(self, item: Dict) -> Dict:
  82. """账号模式:补充视频详情(子类自行实现)"""
  83. return item # 默认返回原数据