| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193 |
- import json
- import logging
- from typing import List, Dict, Optional, Any
- import requests
- from model.automation_provide_job import DouYinSearchConfig, ChannelSearchAndDetailDTO
- # ==================== 配置与枚举定义 ====================
- # 日志配置
- # logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
- # logger = logging.getLogger(__name__)
- class CrawlerClient:
- def __init__(self):
- self.base_url = "http://crawapi.piaoquantv.com"
- self.base_ip = "http://8.217.190.241:8888"
- def keyword_search(self, search_config: DouYinSearchConfig) -> Dict[str, Any]:
- """
- 重载方法:通过DouYinSearchConfig对象进行关键词搜索
- """
- return self.keyword_search_base(
- keywords=search_config.search_content,
- contentType=search_config.content_type,
- sortType=search_config.sort_type,
- publishTime=search_config.publish_time,
- duration=search_config.duration,
- cursor=search_config.cursor,
- account_id=search_config.account_id
- )
- def keyword_search_base(
- self,
- keywords: str,
- contentType: Optional[str] = None,
- sortType: Optional[str] = None,
- publishTime: Optional[str] = None,
- duration: Optional[str] = None,
- cursor: Optional[str] = None,
- account_id: Optional[int] = None,
- ) -> Dict[str, Any]:
- """
- 基础关键词搜索方法(对应Java的keywordSearch重载方法)
- """
- if not keywords:
- raise RuntimeError("keywords is not empty")
- # 拼接API URL
- url = f"{self.base_url}/crawler/dou_yin/keyword"
- # 设置默认值
- content_type = contentType if contentType else "视频"
- sort_type = sortType if sortType else "综合排序"
- publish_time = publishTime if publishTime else "不限"
- duration_val = duration if duration else "不限"
- account_id = account_id if account_id else 98
- # 构建请求参数
- param_json = {
- "keyword": keywords,
- "content_type": content_type,
- "sort_type": sort_type,
- "publish_time": publish_time,
- "duration": duration_val,
- "cursor": cursor if cursor else "",
- "account_id": account_id
- }
- # 发送POST请求并处理响应
- return self._post(url, param_json)
- def dou_yin_keywords_search(
- self,
- search_config: DouYinSearchConfig,
- is_need_content_detail: bool = False,
- is_need_fans_portrait: bool = False
- ) -> List[ChannelSearchAndDetailDTO]:
- """
- 抖音关键词搜索,返回包含详情/粉丝画像的完整结果列表
- """
- search_result_json = {}
- try:
- # 执行关键词搜索
- search_result_json = self.keyword_search(search_config)
- except Exception as e:
- pass
- # logger.error(f"关键词 {search_config.search_content} 搜索异常", exc_info=e)
- # 解析搜索结果列表
- search_result = search_result_json.get("data", [])
- if not isinstance(search_result, list) or len(search_result) == 0:
- return []
- # logger.info(f"关键词 {search_config.search_content} 搜索视频数: {len(search_result)}")
- # 构建返回结果
- result = []
- for search_json in search_result:
- dto = ChannelSearchAndDetailDTO()
- # 提取基础信息
- channel_content_id = search_json.get("aweme_id", "")
- author_info = search_json.get("author", {})
- channel_account_id = author_info.get("sec_uid", "")
- # 初始化详情和画像
- content_detail = {}
- fans_portrait = {}
- try:
- if is_need_content_detail and channel_content_id:
- content_detail = self.get_content_detail_by_id(channel_content_id)
- except Exception as e:
- # logger.error(f"获取站外视频 {channel_content_id} 的内容详情异常", exc_info=e)
- pass
- try:
- if is_need_fans_portrait and channel_account_id:
- fans_portrait = self.get_fans_portrait_by_id(channel_account_id)
- except Exception as e:
- # logger.error(f"获取站外视频对应账号 {channel_account_id} 的粉丝画像异常", exc_info=e)
- pass
- # 填充DTO字段
- dto.search_content = search_config.search_content
- dto.search_result = search_json
- dto.channel_content_id = channel_content_id
- dto.channel_account_id = channel_account_id
- dto.content_detail = content_detail
- dto.fans_portrait = fans_portrait
- result.append(dto)
- return result
- def get_content_detail_by_id(self, content_id: str) -> Dict[str, Any]:
- """根据内容ID获取详情(无缓存)"""
- if not content_id:
- return {}
- url = f"{self.base_ip}/crawler/dou_yin/detail"
- param_json = {"content_id": content_id}
- return self._post(url, param_json)
- def get_fans_portrait_by_id(self, account_id: str) -> Dict[str, Any]:
- """根据账号ID获取粉丝画像(无缓存)"""
- if not account_id:
- return {}
- url = f"{self.base_url}/crawler/dou_yin/re_dian_bao/account_fans_portrait"
- param_json = {
- "account_id": account_id,
- "need_province": False,
- "need_city": False,
- "need_city_level": False,
- "need_gender": False,
- "need_age": True,
- "need_phone_brand": False,
- "need_phone_price": False
- }
- return self._post(url, param_json)
- @classmethod
- def _post(cls, url: str, params: Dict[str, Any]) -> Dict[str, Any]:
- """
- 通用POST请求方法(对应Java的post私有方法)
- """
- # logger.info(f"invoke crawler api request. url:{url}, request:{params}")
- # 发送POST请求
- response_str = requests.post(url, json.dumps(params)).text
- response_str = response_str if response_str else "{}"
- # 解析响应
- try:
- resp_json = json.loads(response_str)
- except json.JSONDecodeError:
- # logger.error(f"响应JSON解析失败: {response_str}")
- resp_json = {}
- # logger.info(f"invoke crawler api result. respJson: {resp_json}")
- # 检查响应码
- if resp_json.get("code") != "0":
- raise RuntimeError(resp_json.get("msg", "API调用失败"))
- # 返回data字段
- return resp_json.get("data", {})
- # ==================== 使用示例 ====================
- if __name__ == "__main__":
- pass
|