""" @author: luojunhui """ from tenacity import ( retry, stop_after_attempt, wait_exponential, retry_if_exception_type, ) from requests.exceptions import RequestException import requests import json from typing import Optional, Dict COMMON_RETRY = dict( stop=stop_after_attempt(3), # 总共尝试3次 wait=wait_exponential(min=2, max=30), retry=retry_if_exception_type((RequestException, TimeoutError)), reraise=True # 重试耗尽后重新抛出异常 ) class WechatChannelAPI: """ wechat channel api by gw """ def __init__(self, base_url: str, token: str, app_id: str): self.base_url = base_url self.token = token self.app_id = app_id @retry(**COMMON_RETRY) def search( self, search_key: str, search_type: int, page: int = 0, cookie: str = "", search_id: str = "", offset: int = 0, ) -> Optional[Dict]: """ 搜索微信视频号内容(支持重试) :param search_key: 搜索关键字 :param search_type: 搜索类型,1: 搜索所有视频,2: 搜索视频号账号 :param page: 页码 :param cookie: 登录后的cookie :param search_id: 搜索id :param offset: 偏移量 :return: 返回搜索结果字典,失败时返回None """ url = f"{self.base_url}/gewe/v2/api/finder/search" payload = { "appId": self.app_id, "proxyIp": "", "content": search_key, "category": search_type, "filter": 0, "page": page, "cookie": cookie, "searchId": search_id, "offset": offset, } headers = {"X-GEWE-TOKEN": self.token, "Content-Type": "application/json"} try: response = requests.post(url, headers=headers, json=payload, timeout=60) response.raise_for_status() return response.json() except RequestException as e: print(f"API请求失败: {e}") except json.JSONDecodeError as e: print(f"响应解析失败: {e}") return None @retry(**COMMON_RETRY) def get_channel_video_list( self, user_id: str, last_buffer: str = "" ) -> Optional[Dict]: """ 获取视频号账号的视频列表(支持重试) :param user_id: 视频号账号ID :param last_buffer: 分页标记,用于获取下一页数据 :return: 返回视频列表字典,失败时返回None """ url = f"{self.base_url}/gewe/v2/api/finder/userPage" payload = { "appId": self.app_id, "proxyIp": "", "lastBuffer": last_buffer, "toUserName": user_id, "maxId": 0, } headers = {"X-GEWE-TOKEN": self.token, "Content-Type": "application/json"} try: response = requests.post(url, headers=headers, json=payload, timeout=60) response.raise_for_status() return response.json() except RequestException as e: print(f"获取视频列表请求失败: {e}") except json.JSONDecodeError as e: print(f"响应解析失败: {e}") return None