| 12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273 |
- import asyncio
- from readline import insert_text
- import requests
- import json
- from core.utils.feishu_data_async import FeishuDataAsync
- class WechatDomainFetcher:
- """
- 微信小程序域名信息获取类
- 一次性任务
- """
- def __init__(self):
- pass
- def get_access_token(self, appid, secret):
- """获取微信接口访问令牌"""
- url = "https://api.weixin.qq.com/cgi-bin/token"
- params = {
- "grant_type": "client_credential",
- "appid": appid,
- "secret": secret
- }
- try:
- response = requests.get(url, params=params)
- print(response)
- return response.json().get("access_token")
- except Exception as e:
- print(f"获取微信令牌失败: {str(e)}")
- return None
- def get_domain_info(self, appid, secret):
- """获取小程序域名信息"""
- access_token = self.get_access_token(appid, secret)
- if not access_token:
- return None
- url = f"https://api.weixin.qq.com/wxa/getwxadevinfo?access_token={access_token}"
- try:
- response = requests.get(url)
- return response.json()
- except Exception as e:
- print(f"获取域名信息失败: {str(e)}")
- return None
- async def main():
- async with FeishuDataAsync() as feishu_data:
- config = await feishu_data.get_values("HJlWwCCzwis5KIk24DOc0dtjnhh", "5178f1")
- # 初始化微信域名信息获取器
- domain_fetcher = WechatDomainFetcher()
- for _,app_name, appid, secret in config[1:]:
- if not appid or not secret:
- continue
- domain_info = domain_fetcher.get_domain_info(appid, secret)
- if not domain_info:
- continue
- bizdomain = domain_info.get('bizdomain')
- if bizdomain:
- insert_data = list(map(lambda x: [app_name, x], bizdomain))
- print(insert_data)
- async with FeishuDataAsync() as feishu_data:
- await feishu_data.insert_values("TxA2wpGZHiuLl2kMMokcaU9Mnlb", "d3a349", "A2:B", insert_data)
- else:
- print(f"小程序 {app_name} 无法获取域名信息")
- if __name__ == '__main__':
- asyncio.run(main())
|