wx_getDomainInfo.py 2.2 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273
  1. import asyncio
  2. from readline import insert_text
  3. import requests
  4. import json
  5. from core.utils.feishu_data_async import FeishuDataAsync
  6. class WechatDomainFetcher:
  7. """
  8. 微信小程序域名信息获取类
  9. 一次性任务
  10. """
  11. def __init__(self):
  12. pass
  13. def get_access_token(self, appid, secret):
  14. """获取微信接口访问令牌"""
  15. url = "https://api.weixin.qq.com/cgi-bin/token"
  16. params = {
  17. "grant_type": "client_credential",
  18. "appid": appid,
  19. "secret": secret
  20. }
  21. try:
  22. response = requests.get(url, params=params)
  23. print(response)
  24. return response.json().get("access_token")
  25. except Exception as e:
  26. print(f"获取微信令牌失败: {str(e)}")
  27. return None
  28. def get_domain_info(self, appid, secret):
  29. """获取小程序域名信息"""
  30. access_token = self.get_access_token(appid, secret)
  31. if not access_token:
  32. return None
  33. url = f"https://api.weixin.qq.com/wxa/getwxadevinfo?access_token={access_token}"
  34. try:
  35. response = requests.get(url)
  36. return response.json()
  37. except Exception as e:
  38. print(f"获取域名信息失败: {str(e)}")
  39. return None
  40. async def main():
  41. async with FeishuDataAsync() as feishu_data:
  42. config = await feishu_data.get_values("HJlWwCCzwis5KIk24DOc0dtjnhh", "5178f1")
  43. # 初始化微信域名信息获取器
  44. domain_fetcher = WechatDomainFetcher()
  45. for _,app_name, appid, secret in config[1:]:
  46. if not appid or not secret:
  47. continue
  48. domain_info = domain_fetcher.get_domain_info(appid, secret)
  49. if not domain_info:
  50. continue
  51. bizdomain = domain_info.get('bizdomain')
  52. if bizdomain:
  53. insert_data = list(map(lambda x: [app_name, x], bizdomain))
  54. print(insert_data)
  55. async with FeishuDataAsync() as feishu_data:
  56. await feishu_data.insert_values("TxA2wpGZHiuLl2kMMokcaU9Mnlb", "d3a349", "A2:B", insert_data)
  57. else:
  58. print(f"小程序 {app_name} 无法获取域名信息")
  59. if __name__ == '__main__':
  60. asyncio.run(main())