| 1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980 |
- import asyncio
- from readline import insert_text
- import urllib
- import requests
- import json
- import warnings
- # 禁用 urllib3 InsecureRequestWarning 警告
- warnings.filterwarnings('ignore', message='Unverified HTTPS request')
- from core.utils.feishu_data_async import FeishuDataAsync
- class WechatDomainFetcher:
- """
- 微信小程序域名信息获取类
- 一次性任务
- """
- def __init__(self):
- pass
- def get_access_token(self, appid, secret):
- """获取微信接口访问令牌"""
- url = "https://api.weixin.qq.com/cgi-bin/token"
- params = {
- "grant_type": "client_credential",
- "appid": appid,
- "secret": secret
- }
- try:
- response = requests.get(url, params=params,verify= False)
- resp = response.json()
- if resp.get("errcode",0) != 0:
- print(f"获取微信令牌失败: {resp}")
- return None
- return response.json().get("access_token")
- except Exception as e:
- print(f"获取微信令牌失败: {str(e)}")
- return None
- def get_domain_info(self, appid, secret):
- """获取小程序域名信息"""
- access_token = self.get_access_token(appid, secret)
- url = f"https://api.weixin.qq.com/wxa/getwxadevinfo?access_token={access_token}"
- try:
- response = requests.get(url,verify= False)
- if not response.json():
- print(response.json())
- return response.json()
- except Exception as e:
- print(f"获取域名信息失败: {str(e)}")
- return None
- async def main():
- async with FeishuDataAsync() as feishu_data:
- config = await feishu_data.get_values("HJlWwCCzwis5KIk24DOc0dtjnhh", "5178f1")
- # 初始化微信域名信息获取器
- domain_fetcher = WechatDomainFetcher()
- for _,app_name, appid, secret in config[1:]:
- print(f"正在获取小程序 {app_name} 的域名信息...")
- if not appid or not secret:
- print(f"小程序 {app_name} 缺少 appid 或 secret")
- continue
- domain_info = domain_fetcher.get_domain_info(appid.strip(), secret.strip())
- bizdomain = domain_info.get('bizdomain')
- if bizdomain:
- insert_data = list(map(lambda x: [app_name, x], bizdomain))
- async with FeishuDataAsync() as feishu_data:
- await feishu_data.insert_values("TxA2wpGZHiuLl2kMMokcaU9Mnlb", "d3a349", "A2:B", insert_data)
- else:
- print(f"小程序 {app_name} 无法获取域名信息{domain_info}")
- if __name__ == '__main__':
- asyncio.run(main())
|