123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108 |
- """
- @author: luojunhui
- """
- from tenacity import (
- retry,
- stop_after_attempt,
- wait_exponential,
- retry_if_exception_type,
- )
- from requests.exceptions import RequestException
- import requests
- import json
- from typing import Optional, Dict
- COMMON_RETRY = dict(
- stop=stop_after_attempt(3), # 总共尝试3次
- wait=wait_exponential(min=2, max=30),
- retry=retry_if_exception_type((RequestException, TimeoutError)),
- reraise=True # 重试耗尽后重新抛出异常
- )
- class WechatChannelAPI:
- """
- wechat channel api by gw
- """
- def __init__(self, base_url: str, token: str, app_id: str):
- self.base_url = base_url
- self.token = token
- self.app_id = app_id
- @retry(**COMMON_RETRY)
- def search(
- self,
- search_key: str,
- search_type: int,
- page: int = 0,
- cookie: str = "",
- search_id: str = "",
- offset: int = 0,
- ) -> Optional[Dict]:
- """
- 搜索微信视频号内容(支持重试)
- :param search_key: 搜索关键字
- :param search_type: 搜索类型,1: 搜索所有视频,2: 搜索视频号账号
- :param page: 页码
- :param cookie: 登录后的cookie
- :param search_id: 搜索id
- :param offset: 偏移量
- :return: 返回搜索结果字典,失败时返回None
- """
- url = f"{self.base_url}/gewe/v2/api/finder/search"
- payload = {
- "appId": self.app_id,
- "proxyIp": "",
- "content": search_key,
- "category": search_type,
- "filter": 0,
- "page": page,
- "cookie": cookie,
- "searchId": search_id,
- "offset": offset,
- }
- headers = {"X-GEWE-TOKEN": self.token, "Content-Type": "application/json"}
- try:
- response = requests.post(url, headers=headers, json=payload, timeout=60)
- response.raise_for_status()
- return response.json()
- except RequestException as e:
- print(f"API请求失败: {e}")
- except json.JSONDecodeError as e:
- print(f"响应解析失败: {e}")
- return None
- @retry(**COMMON_RETRY)
- def get_channel_video_list(
- self, user_id: str, last_buffer: str = ""
- ) -> Optional[Dict]:
- """
- 获取视频号账号的视频列表(支持重试)
- :param user_id: 视频号账号ID
- :param last_buffer: 分页标记,用于获取下一页数据
- :return: 返回视频列表字典,失败时返回None
- """
- url = f"{self.base_url}/gewe/v2/api/finder/userPage"
- payload = {
- "appId": self.app_id,
- "proxyIp": "",
- "lastBuffer": last_buffer,
- "toUserName": user_id,
- "maxId": 0,
- }
- headers = {"X-GEWE-TOKEN": self.token, "Content-Type": "application/json"}
- try:
- response = requests.post(url, headers=headers, json=payload, timeout=60)
- response.raise_for_status()
- return response.json()
- except RequestException as e:
- print(f"获取视频列表请求失败: {e}")
- except json.JSONDecodeError as e:
- print(f"响应解析失败: {e}")
- return None
|