|
|
@@ -0,0 +1,73 @@
|
|
|
+import asyncio
|
|
|
+from readline import insert_text
|
|
|
+
|
|
|
+import requests
|
|
|
+import json
|
|
|
+
|
|
|
+from core.utils.feishu_data_async import FeishuDataAsync
|
|
|
+
|
|
|
+class WechatDomainFetcher:
|
|
|
+ """
|
|
|
+ 微信小程序域名信息获取类
|
|
|
+ 一次性任务
|
|
|
+ """
|
|
|
+
|
|
|
+ def __init__(self):
|
|
|
+ pass
|
|
|
+
|
|
|
+ def get_access_token(self, appid, secret):
|
|
|
+ """获取微信接口访问令牌"""
|
|
|
+ url = "https://api.weixin.qq.com/cgi-bin/token"
|
|
|
+ params = {
|
|
|
+ "grant_type": "client_credential",
|
|
|
+ "appid": appid,
|
|
|
+ "secret": secret
|
|
|
+ }
|
|
|
+
|
|
|
+ try:
|
|
|
+ response = requests.get(url, params=params)
|
|
|
+ print(response)
|
|
|
+ return response.json().get("access_token")
|
|
|
+ except Exception as e:
|
|
|
+ print(f"获取微信令牌失败: {str(e)}")
|
|
|
+ return None
|
|
|
+
|
|
|
+ def get_domain_info(self, appid, secret):
|
|
|
+ """获取小程序域名信息"""
|
|
|
+ access_token = self.get_access_token(appid, secret)
|
|
|
+ if not access_token:
|
|
|
+ return None
|
|
|
+
|
|
|
+ url = f"https://api.weixin.qq.com/wxa/getwxadevinfo?access_token={access_token}"
|
|
|
+
|
|
|
+ try:
|
|
|
+ response = requests.get(url)
|
|
|
+ return response.json()
|
|
|
+ except Exception as e:
|
|
|
+ print(f"获取域名信息失败: {str(e)}")
|
|
|
+ return None
|
|
|
+
|
|
|
+async def main():
|
|
|
+ async with FeishuDataAsync() as feishu_data:
|
|
|
+ config = await feishu_data.get_values("HJlWwCCzwis5KIk24DOc0dtjnhh", "5178f1")
|
|
|
+ # 初始化微信域名信息获取器
|
|
|
+ domain_fetcher = WechatDomainFetcher()
|
|
|
+
|
|
|
+ for _,app_name, appid, secret in config[1:]:
|
|
|
+ if not appid or not secret:
|
|
|
+ continue
|
|
|
+ domain_info = domain_fetcher.get_domain_info(appid, secret)
|
|
|
+ if not domain_info:
|
|
|
+ continue
|
|
|
+ bizdomain = domain_info.get('bizdomain')
|
|
|
+ if bizdomain:
|
|
|
+ insert_data = list(map(lambda x: [app_name, x], bizdomain))
|
|
|
+ print(insert_data)
|
|
|
+ async with FeishuDataAsync() as feishu_data:
|
|
|
+ await feishu_data.insert_values("TxA2wpGZHiuLl2kMMokcaU9Mnlb", "d3a349", "A2:B", insert_data)
|
|
|
+ else:
|
|
|
+ print(f"小程序 {app_name} 无法获取域名信息")
|
|
|
+
|
|
|
+
|
|
|
+if __name__ == '__main__':
|
|
|
+ asyncio.run(main())
|