CrawlerClient.py 6.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193
  1. import json
  2. from typing import List, Dict, Optional, Any
  3. import requests
  4. from model.automation_provide_job import DouYinSearchConfig, ChannelSearchAndDetailDTO
  5. # ==================== 配置与枚举定义 ====================
  6. # 日志配置
  7. # logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
  8. # logger = logging.getLogger(__name__)
  9. class CrawlerClient:
  10. def __init__(self):
  11. self.base_url = "http://crawapi.piaoquantv.com"
  12. self.base_ip = "http://8.217.190.241:8888"
  13. def keyword_search(self, search_config: DouYinSearchConfig) -> Dict[str, Any]:
  14. """
  15. 重载方法:通过DouYinSearchConfig对象进行关键词搜索
  16. """
  17. return self.keyword_search_base(
  18. keywords=search_config.search_content,
  19. contentType=search_config.content_type,
  20. sortType=search_config.sort_type,
  21. publishTime=search_config.publish_time,
  22. duration=search_config.duration,
  23. cursor=search_config.cursor,
  24. account_id=search_config.account_id
  25. )
  26. def keyword_search_base(
  27. self,
  28. keywords: str,
  29. contentType: Optional[str] = None,
  30. sortType: Optional[str] = None,
  31. publishTime: Optional[str] = None,
  32. duration: Optional[str] = None,
  33. cursor: Optional[str] = None,
  34. account_id: Optional[str] = None,
  35. ) -> Dict[str, Any]:
  36. """
  37. 基础关键词搜索方法(对应Java的keywordSearch重载方法)
  38. """
  39. if not keywords:
  40. raise RuntimeError("keywords is not empty")
  41. # 拼接API URL
  42. url = f"{self.base_url}/crawler/dou_yin/keyword"
  43. # 设置默认值
  44. content_type = contentType if contentType else "视频"
  45. sort_type = sortType if sortType else "综合排序"
  46. publish_time = publishTime if publishTime else "不限"
  47. duration_val = duration if duration else "不限"
  48. account_id = account_id if account_id else 98
  49. # 构建请求参数
  50. param_json = {
  51. "keyword": keywords,
  52. "content_type": content_type,
  53. "sort_type": sort_type,
  54. "publish_time": publish_time,
  55. "duration": duration_val,
  56. "cursor": cursor if cursor else "",
  57. "account_id": account_id
  58. }
  59. # 发送POST请求并处理响应
  60. return self._post(url, param_json)
  61. def dou_yin_keywords_search(
  62. self,
  63. search_config: DouYinSearchConfig,
  64. is_need_content_detail: bool = False,
  65. is_need_fans_portrait: bool = False
  66. ) -> List[ChannelSearchAndDetailDTO]:
  67. """
  68. 抖音关键词搜索,返回包含详情/粉丝画像的完整结果列表
  69. """
  70. search_result_json = {}
  71. try:
  72. # 执行关键词搜索
  73. search_result_json = self.keyword_search(search_config)
  74. except Exception as e:
  75. pass
  76. # logger.error(f"关键词 {search_config.search_content} 搜索异常", exc_info=e)
  77. # 解析搜索结果列表
  78. search_result = search_result_json.get("data", [])
  79. if not isinstance(search_result, list) or len(search_result) == 0:
  80. return []
  81. # logger.info(f"关键词 {search_config.search_content} 搜索视频数: {len(search_result)}")
  82. # 构建返回结果
  83. result = []
  84. for search_json in search_result:
  85. dto = ChannelSearchAndDetailDTO()
  86. # 提取基础信息
  87. channel_content_id = search_json.get("aweme_id", "")
  88. author_info = search_json.get("author", {})
  89. channel_account_id = author_info.get("sec_uid", "")
  90. # 初始化详情和画像
  91. content_detail = {}
  92. fans_portrait = {}
  93. try:
  94. if is_need_content_detail and channel_content_id:
  95. content_detail = self.get_content_detail_by_id(channel_content_id)
  96. except Exception as e:
  97. # logger.error(f"获取站外视频 {channel_content_id} 的内容详情异常", exc_info=e)
  98. pass
  99. try:
  100. if is_need_fans_portrait and channel_account_id:
  101. fans_portrait = self.get_fans_portrait_by_id(channel_account_id)
  102. except Exception as e:
  103. # logger.error(f"获取站外视频对应账号 {channel_account_id} 的粉丝画像异常", exc_info=e)
  104. pass
  105. # 填充DTO字段
  106. dto.search_content = search_config.search_content
  107. dto.search_result = search_json
  108. dto.channel_content_id = channel_content_id
  109. dto.channel_account_id = channel_account_id
  110. dto.content_detail = content_detail
  111. dto.fans_portrait = fans_portrait
  112. result.append(dto)
  113. return result
  114. def get_content_detail_by_id(self, content_id: str) -> Dict[str, Any]:
  115. """根据内容ID获取详情(无缓存)"""
  116. if not content_id:
  117. return {}
  118. url = f"{self.base_ip}/crawler/dou_yin/detail"
  119. param_json = {"content_id": content_id}
  120. return self._post(url, param_json)
  121. def get_fans_portrait_by_id(self, account_id: str) -> Dict[str, Any]:
  122. """根据账号ID获取粉丝画像(无缓存)"""
  123. if not account_id:
  124. return {}
  125. url = f"{self.base_url}/crawler/dou_yin/re_dian_bao/account_fans_portrait"
  126. param_json = {
  127. "account_id": account_id,
  128. "need_province": False,
  129. "need_city": False,
  130. "need_city_level": False,
  131. "need_gender": False,
  132. "need_age": True,
  133. "need_phone_brand": False,
  134. "need_phone_price": False
  135. }
  136. return self._post(url, param_json)
  137. @classmethod
  138. def _post(cls, url: str, params: Dict[str, Any]) -> Dict[str, Any]:
  139. """
  140. 通用POST请求方法(对应Java的post私有方法)
  141. """
  142. # logger.info(f"invoke crawler api request. url:{url}, request:{params}")
  143. # 发送POST请求
  144. response_str = requests.post(url, json.dumps(params)).text
  145. response_str = response_str if response_str else "{}"
  146. # 解析响应
  147. try:
  148. resp_json = json.loads(response_str)
  149. except json.JSONDecodeError:
  150. # logger.error(f"响应JSON解析失败: {response_str}")
  151. resp_json = {}
  152. # logger.info(f"invoke crawler api result. respJson: {resp_json}")
  153. # 检查响应码
  154. if resp_json.get("code") != "0" and resp_json.get("code") != 0:
  155. raise RuntimeError(resp_json.get("msg", "API调用失败"))
  156. # 返回data字段
  157. return resp_json.get("data", {})
  158. # ==================== 使用示例 ====================
  159. if __name__ == "__main__":
  160. pass