wx_getDomainInfo.py 2.6 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980
  1. import asyncio
  2. from readline import insert_text
  3. import urllib
  4. import requests
  5. import json
  6. import warnings
  7. # 禁用 urllib3 InsecureRequestWarning 警告
  8. warnings.filterwarnings('ignore', message='Unverified HTTPS request')
  9. from core.utils.feishu_data_async import FeishuDataAsync
  10. class WechatDomainFetcher:
  11. """
  12. 微信小程序域名信息获取类
  13. 一次性任务
  14. """
  15. def __init__(self):
  16. pass
  17. def get_access_token(self, appid, secret):
  18. """获取微信接口访问令牌"""
  19. url = "https://api.weixin.qq.com/cgi-bin/token"
  20. params = {
  21. "grant_type": "client_credential",
  22. "appid": appid,
  23. "secret": secret
  24. }
  25. try:
  26. response = requests.get(url, params=params,verify= False)
  27. resp = response.json()
  28. if resp.get("errcode",0) != 0:
  29. print(f"获取微信令牌失败: {resp}")
  30. return None
  31. return response.json().get("access_token")
  32. except Exception as e:
  33. print(f"获取微信令牌失败: {str(e)}")
  34. return None
  35. def get_domain_info(self, appid, secret):
  36. """获取小程序域名信息"""
  37. access_token = self.get_access_token(appid, secret)
  38. url = f"https://api.weixin.qq.com/wxa/getwxadevinfo?access_token={access_token}"
  39. try:
  40. response = requests.get(url,verify= False)
  41. if not response.json():
  42. print(response.json())
  43. return response.json()
  44. except Exception as e:
  45. print(f"获取域名信息失败: {str(e)}")
  46. return None
  47. async def main():
  48. async with FeishuDataAsync() as feishu_data:
  49. config = await feishu_data.get_values("HJlWwCCzwis5KIk24DOc0dtjnhh", "5178f1")
  50. # 初始化微信域名信息获取器
  51. domain_fetcher = WechatDomainFetcher()
  52. for _,app_name, appid, secret in config[1:]:
  53. print(f"正在获取小程序 {app_name} 的域名信息...")
  54. if not appid or not secret:
  55. print(f"小程序 {app_name} 缺少 appid 或 secret")
  56. continue
  57. domain_info = domain_fetcher.get_domain_info(appid.strip(), secret.strip())
  58. bizdomain = domain_info.get('bizdomain')
  59. if bizdomain:
  60. insert_data = list(map(lambda x: [app_name, x], bizdomain))
  61. async with FeishuDataAsync() as feishu_data:
  62. await feishu_data.insert_values("TxA2wpGZHiuLl2kMMokcaU9Mnlb", "d3a349", "A2:B", insert_data)
  63. else:
  64. print(f"小程序 {app_name} 无法获取域名信息{domain_info}")
  65. if __name__ == '__main__':
  66. asyncio.run(main())