import json import logging from typing import List, Dict, Optional, Any import requests from model.automation_provide_job import DouYinSearchConfig, ChannelSearchAndDetailDTO # ==================== 配置与枚举定义 ==================== # 日志配置 # logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') # logger = logging.getLogger(__name__) class CrawlerClient: def __init__(self): self.base_url = "http://crawapi.piaoquantv.com" self.base_ip = "http://8.217.190.241:8888" def keyword_search(self, search_config: DouYinSearchConfig) -> Dict[str, Any]: """ 重载方法:通过DouYinSearchConfig对象进行关键词搜索 """ return self.keyword_search_base( keywords=search_config.search_content, contentType=search_config.content_type, sortType=search_config.sort_type, publishTime=search_config.publish_time, duration=search_config.duration, cursor=search_config.cursor, account_id=search_config.account_id ) def keyword_search_base( self, keywords: str, contentType: Optional[str] = None, sortType: Optional[str] = None, publishTime: Optional[str] = None, duration: Optional[str] = None, cursor: Optional[str] = None, account_id: Optional[int] = None, ) -> Dict[str, Any]: """ 基础关键词搜索方法(对应Java的keywordSearch重载方法) """ if not keywords: raise RuntimeError("keywords is not empty") # 拼接API URL url = f"{self.base_url}/crawler/dou_yin/keyword" # 设置默认值 content_type = contentType if contentType else "视频" sort_type = sortType if sortType else "综合排序" publish_time = publishTime if publishTime else "不限" duration_val = duration if duration else "不限" account_id = account_id if account_id else 98 # 构建请求参数 param_json = { "keyword": keywords, "content_type": content_type, "sort_type": sort_type, "publish_time": publish_time, "duration": duration_val, "cursor": cursor if cursor else "", "accountId": account_id } # 发送POST请求并处理响应 return self._post(url, param_json) def dou_yin_keywords_search( self, search_config: DouYinSearchConfig, is_need_content_detail: bool = False, is_need_fans_portrait: bool = False ) -> List[ChannelSearchAndDetailDTO]: """ 抖音关键词搜索,返回包含详情/粉丝画像的完整结果列表 """ search_result_json = {} try: # 执行关键词搜索 search_result_json = self.keyword_search(search_config) except Exception as e: pass # logger.error(f"关键词 {search_config.search_content} 搜索异常", exc_info=e) # 解析搜索结果列表 search_result = search_result_json.get("data", []) if not isinstance(search_result, list) or len(search_result) == 0: return [] # logger.info(f"关键词 {search_config.search_content} 搜索视频数: {len(search_result)}") # 构建返回结果 result = [] for search_json in search_result: dto = ChannelSearchAndDetailDTO() # 提取基础信息 channel_content_id = search_json.get("aweme_id", "") author_info = search_json.get("author", {}) channel_account_id = author_info.get("sec_uid", "") # 初始化详情和画像 content_detail = {} fans_portrait = {} try: if is_need_content_detail and channel_content_id: content_detail = self.get_content_detail_by_id(channel_content_id) except Exception as e: # logger.error(f"获取站外视频 {channel_content_id} 的内容详情异常", exc_info=e) pass try: if is_need_fans_portrait and channel_account_id: fans_portrait = self.get_fans_portrait_by_id(channel_account_id) except Exception as e: # logger.error(f"获取站外视频对应账号 {channel_account_id} 的粉丝画像异常", exc_info=e) pass # 填充DTO字段 dto.search_content = search_config.search_content dto.search_result = search_json dto.channel_content_id = channel_content_id dto.channel_account_id = channel_account_id dto.content_detail = content_detail dto.fans_portrait = fans_portrait result.append(dto) return result def get_content_detail_by_id(self, content_id: str) -> Dict[str, Any]: """根据内容ID获取详情(无缓存)""" if not content_id: return {} url = f"{self.base_ip}/crawler/dou_yin/detail" param_json = {"content_id": content_id} return self._post(url, param_json) def get_fans_portrait_by_id(self, account_id: str) -> Dict[str, Any]: """根据账号ID获取粉丝画像(无缓存)""" if not account_id: return {} url = f"{self.base_url}/crawler/dou_yin/re_dian_bao/account_fans_portrait" param_json = { "account_id": account_id, "need_province": False, "need_city": False, "need_city_level": False, "need_gender": False, "need_age": True, "need_phone_brand": False, "need_phone_price": False } return self._post(url, param_json) @classmethod def _post(cls, url: str, params: Dict[str, Any]) -> Dict[str, Any]: """ 通用POST请求方法(对应Java的post私有方法) """ # logger.info(f"invoke crawler api request. url:{url}, request:{params}") # 发送POST请求 response_str = requests.post(url, json.dumps(params)).text response_str = response_str if response_str else "{}" # 解析响应 try: resp_json = json.loads(response_str) except json.JSONDecodeError: # logger.error(f"响应JSON解析失败: {response_str}") resp_json = {} # logger.info(f"invoke crawler api result. respJson: {resp_json}") # 检查响应码 if resp_json.get("code") != "0": raise RuntimeError(resp_json.get("msg", "API调用失败")) # 返回data字段 return resp_json.get("data", {}) # ==================== 使用示例 ==================== if __name__ == "__main__": pass