gewe_api.py 3.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108
  1. """
  2. @author: luojunhui
  3. """
  4. from tenacity import (
  5. retry,
  6. stop_after_attempt,
  7. wait_exponential,
  8. retry_if_exception_type,
  9. )
  10. from requests.exceptions import RequestException
  11. import requests
  12. import json
  13. from typing import Optional, Dict
  14. COMMON_RETRY = dict(
  15. stop=stop_after_attempt(3), # 总共尝试3次
  16. wait=wait_exponential(min=2, max=30),
  17. retry=retry_if_exception_type((RequestException, TimeoutError)),
  18. reraise=True # 重试耗尽后重新抛出异常
  19. )
  20. class WechatChannelAPI:
  21. """
  22. wechat channel api by gw
  23. """
  24. def __init__(self, base_url: str, token: str, app_id: str):
  25. self.base_url = base_url
  26. self.token = token
  27. self.app_id = app_id
  28. @retry(**COMMON_RETRY)
  29. def search(
  30. self,
  31. search_key: str,
  32. search_type: int,
  33. page: int = 0,
  34. cookie: str = "",
  35. search_id: str = "",
  36. offset: int = 0,
  37. ) -> Optional[Dict]:
  38. """
  39. 搜索微信视频号内容(支持重试)
  40. :param search_key: 搜索关键字
  41. :param search_type: 搜索类型,1: 搜索所有视频,2: 搜索视频号账号
  42. :param page: 页码
  43. :param cookie: 登录后的cookie
  44. :param search_id: 搜索id
  45. :param offset: 偏移量
  46. :return: 返回搜索结果字典,失败时返回None
  47. """
  48. url = f"{self.base_url}/gewe/v2/api/finder/search"
  49. payload = {
  50. "appId": self.app_id,
  51. "proxyIp": "",
  52. "content": search_key,
  53. "category": search_type,
  54. "filter": 0,
  55. "page": page,
  56. "cookie": cookie,
  57. "searchId": search_id,
  58. "offset": offset,
  59. }
  60. headers = {"X-GEWE-TOKEN": self.token, "Content-Type": "application/json"}
  61. try:
  62. response = requests.post(url, headers=headers, json=payload, timeout=60)
  63. response.raise_for_status()
  64. return response.json()
  65. except RequestException as e:
  66. print(f"API请求失败: {e}")
  67. except json.JSONDecodeError as e:
  68. print(f"响应解析失败: {e}")
  69. return None
  70. @retry(**COMMON_RETRY)
  71. def get_channel_video_list(
  72. self, user_id: str, last_buffer: str = ""
  73. ) -> Optional[Dict]:
  74. """
  75. 获取视频号账号的视频列表(支持重试)
  76. :param user_id: 视频号账号ID
  77. :param last_buffer: 分页标记,用于获取下一页数据
  78. :return: 返回视频列表字典,失败时返回None
  79. """
  80. url = f"{self.base_url}/gewe/v2/api/finder/userPage"
  81. payload = {
  82. "appId": self.app_id,
  83. "proxyIp": "",
  84. "lastBuffer": last_buffer,
  85. "toUserName": user_id,
  86. "maxId": 0,
  87. }
  88. headers = {"X-GEWE-TOKEN": self.token, "Content-Type": "application/json"}
  89. try:
  90. response = requests.post(url, headers=headers, json=payload, timeout=60)
  91. response.raise_for_status()
  92. return response.json()
  93. except RequestException as e:
  94. print(f"获取视频列表请求失败: {e}")
  95. except json.JSONDecodeError as e:
  96. print(f"响应解析失败: {e}")
  97. return None