gzh_fans.py 2.3 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667
  1. import random
  2. from app.infra.shared import AsyncHttpClient
  3. # 抓取公众号粉丝
  4. async def get_gzh_fans(token, cookie, cursor_id, cursor_timestamp):
  5. url = "https://mp.weixin.qq.com/cgi-bin/user_tag"
  6. params = {
  7. "action": "get_user_list",
  8. "groupid": "-2",
  9. "begin_openid": cursor_id,
  10. "begin_create_time": cursor_timestamp,
  11. "limit": "20",
  12. "offset": "0",
  13. "backfoward": "1",
  14. "token": token,
  15. "lang": "zh_CN",
  16. "f": "json",
  17. "ajax": "1",
  18. "fingerprint": "42694935a543ed714f89d4b05ef3a4e6",
  19. "random": random.random(),
  20. }
  21. headers = {
  22. "accept": "application/json, text/javascript, */*; q=0.01",
  23. "accept-language": "zh-CN,zh;q=0.9",
  24. "cache-control": "no-cache",
  25. "pragma": "no-cache",
  26. "priority": "u=1, i",
  27. "referer": f"https://mp.weixin.qq.com/cgi-bin/user_tag?action=get_all_data&lang=zh_CN&token={token}",
  28. "sec-ch-ua": '"Google Chrome";v="143", "Chromium";v="143", "Not A(Brand";v="24"',
  29. "sec-ch-ua-mobile": "?0",
  30. "sec-ch-ua-platform": '"macOS"',
  31. "sec-fetch-dest": "empty",
  32. "sec-fetch-mode": "cors",
  33. "sec-fetch-site": "same-origin",
  34. "user-agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/143.0.0.0 Safari/537.36",
  35. "x-requested-with": "XMLHttpRequest",
  36. "Cookie": cookie,
  37. }
  38. # 发送请求
  39. async with AsyncHttpClient(timeout=10) as http_client:
  40. response = await http_client.get(url, headers=headers, params=params)
  41. return response
  42. # 获取 access_token
  43. async def get_access_token(app_id, app_secret):
  44. url = f"https://api.weixin.qq.com/cgi-bin/stable_token"
  45. data = {"grant_type": "client_credential", "appid": app_id, "secret": app_secret}
  46. async with AsyncHttpClient(timeout=100) as http_client:
  47. response = await http_client.post(url, json=data)
  48. return response
  49. # 批量获取 union_id
  50. async def get_union_id_batch(access_token, user_list):
  51. url = f"https://api.weixin.qq.com/cgi-bin/user/info/batchget?access_token={access_token}"
  52. payload = {
  53. "user_list": user_list,
  54. }
  55. async with AsyncHttpClient(timeout=100) as http_client:
  56. response = await http_client.post(url, json=payload)
  57. return response