CrawlerClient.py 6.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193
  1. import json
  2. import logging
  3. from typing import List, Dict, Optional, Any
  4. import requests
  5. from model.automation_provide_job import DouYinSearchConfig, ChannelSearchAndDetailDTO
  6. # ==================== 配置与枚举定义 ====================
  7. # 日志配置
  8. # logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
  9. # logger = logging.getLogger(__name__)
  10. class CrawlerClient:
  11. def __init__(self):
  12. self.base_url = "http://crawapi.piaoquantv.com"
  13. self.base_ip = "http://8.217.190.241:8888"
  14. def keyword_search(self, search_config: DouYinSearchConfig) -> Dict[str, Any]:
  15. """
  16. 重载方法:通过DouYinSearchConfig对象进行关键词搜索
  17. """
  18. return self.keyword_search_base(
  19. keywords=search_config.search_content,
  20. contentType=search_config.content_type,
  21. sortType=search_config.sort_type,
  22. publishTime=search_config.publish_time,
  23. duration=search_config.duration,
  24. cursor=search_config.cursor,
  25. account_id=search_config.account_id
  26. )
  27. def keyword_search_base(
  28. self,
  29. keywords: str,
  30. contentType: Optional[str] = None,
  31. sortType: Optional[str] = None,
  32. publishTime: Optional[str] = None,
  33. duration: Optional[str] = None,
  34. cursor: Optional[str] = None,
  35. account_id: Optional[int] = None,
  36. ) -> Dict[str, Any]:
  37. """
  38. 基础关键词搜索方法(对应Java的keywordSearch重载方法)
  39. """
  40. if not keywords:
  41. raise RuntimeError("keywords is not empty")
  42. # 拼接API URL
  43. url = f"{self.base_url}/crawler/dou_yin/keyword"
  44. # 设置默认值
  45. content_type = contentType if contentType else "视频"
  46. sort_type = sortType if sortType else "综合排序"
  47. publish_time = publishTime if publishTime else "不限"
  48. duration_val = duration if duration else "不限"
  49. account_id = account_id if account_id else 98
  50. # 构建请求参数
  51. param_json = {
  52. "keyword": keywords,
  53. "content_type": content_type,
  54. "sort_type": sort_type,
  55. "publish_time": publish_time,
  56. "duration": duration_val,
  57. "cursor": cursor if cursor else "",
  58. "account_id": account_id
  59. }
  60. # 发送POST请求并处理响应
  61. return self._post(url, param_json)
  62. def dou_yin_keywords_search(
  63. self,
  64. search_config: DouYinSearchConfig,
  65. is_need_content_detail: bool = False,
  66. is_need_fans_portrait: bool = False
  67. ) -> List[ChannelSearchAndDetailDTO]:
  68. """
  69. 抖音关键词搜索,返回包含详情/粉丝画像的完整结果列表
  70. """
  71. search_result_json = {}
  72. try:
  73. # 执行关键词搜索
  74. search_result_json = self.keyword_search(search_config)
  75. except Exception as e:
  76. pass
  77. # logger.error(f"关键词 {search_config.search_content} 搜索异常", exc_info=e)
  78. # 解析搜索结果列表
  79. search_result = search_result_json.get("data", [])
  80. if not isinstance(search_result, list) or len(search_result) == 0:
  81. return []
  82. # logger.info(f"关键词 {search_config.search_content} 搜索视频数: {len(search_result)}")
  83. # 构建返回结果
  84. result = []
  85. for search_json in search_result:
  86. dto = ChannelSearchAndDetailDTO()
  87. # 提取基础信息
  88. channel_content_id = search_json.get("aweme_id", "")
  89. author_info = search_json.get("author", {})
  90. channel_account_id = author_info.get("sec_uid", "")
  91. # 初始化详情和画像
  92. content_detail = {}
  93. fans_portrait = {}
  94. try:
  95. if is_need_content_detail and channel_content_id:
  96. content_detail = self.get_content_detail_by_id(channel_content_id)
  97. except Exception as e:
  98. # logger.error(f"获取站外视频 {channel_content_id} 的内容详情异常", exc_info=e)
  99. pass
  100. try:
  101. if is_need_fans_portrait and channel_account_id:
  102. fans_portrait = self.get_fans_portrait_by_id(channel_account_id)
  103. except Exception as e:
  104. # logger.error(f"获取站外视频对应账号 {channel_account_id} 的粉丝画像异常", exc_info=e)
  105. pass
  106. # 填充DTO字段
  107. dto.search_content = search_config.search_content
  108. dto.search_result = search_json
  109. dto.channel_content_id = channel_content_id
  110. dto.channel_account_id = channel_account_id
  111. dto.content_detail = content_detail
  112. dto.fans_portrait = fans_portrait
  113. result.append(dto)
  114. return result
  115. def get_content_detail_by_id(self, content_id: str) -> Dict[str, Any]:
  116. """根据内容ID获取详情(无缓存)"""
  117. if not content_id:
  118. return {}
  119. url = f"{self.base_ip}/crawler/dou_yin/detail"
  120. param_json = {"content_id": content_id}
  121. return self._post(url, param_json)
  122. def get_fans_portrait_by_id(self, account_id: str) -> Dict[str, Any]:
  123. """根据账号ID获取粉丝画像(无缓存)"""
  124. if not account_id:
  125. return {}
  126. url = f"{self.base_url}/crawler/dou_yin/re_dian_bao/account_fans_portrait"
  127. param_json = {
  128. "account_id": account_id,
  129. "need_province": False,
  130. "need_city": False,
  131. "need_city_level": False,
  132. "need_gender": False,
  133. "need_age": True,
  134. "need_phone_brand": False,
  135. "need_phone_price": False
  136. }
  137. return self._post(url, param_json)
  138. @classmethod
  139. def _post(cls, url: str, params: Dict[str, Any]) -> Dict[str, Any]:
  140. """
  141. 通用POST请求方法(对应Java的post私有方法)
  142. """
  143. # logger.info(f"invoke crawler api request. url:{url}, request:{params}")
  144. # 发送POST请求
  145. response_str = requests.post(url, json.dumps(params)).text
  146. response_str = response_str if response_str else "{}"
  147. # 解析响应
  148. try:
  149. resp_json = json.loads(response_str)
  150. except json.JSONDecodeError:
  151. # logger.error(f"响应JSON解析失败: {response_str}")
  152. resp_json = {}
  153. # logger.info(f"invoke crawler api result. respJson: {resp_json}")
  154. # 检查响应码
  155. if resp_json.get("code") != "0":
  156. raise RuntimeError(resp_json.get("msg", "API调用失败"))
  157. # 返回data字段
  158. return resp_json.get("data", {})
  159. # ==================== 使用示例 ====================
  160. if __name__ == "__main__":
  161. pass