import asyncio from readline import insert_text import requests import json from core.utils.feishu_data_async import FeishuDataAsync class WechatDomainFetcher: """ 微信小程序域名信息获取类 一次性任务 """ def __init__(self): pass def get_access_token(self, appid, secret): """获取微信接口访问令牌""" url = "https://api.weixin.qq.com/cgi-bin/token" params = { "grant_type": "client_credential", "appid": appid, "secret": secret } try: response = requests.get(url, params=params) print(response) return response.json().get("access_token") except Exception as e: print(f"获取微信令牌失败: {str(e)}") return None def get_domain_info(self, appid, secret): """获取小程序域名信息""" access_token = self.get_access_token(appid, secret) if not access_token: return None url = f"https://api.weixin.qq.com/wxa/getwxadevinfo?access_token={access_token}" try: response = requests.get(url) return response.json() except Exception as e: print(f"获取域名信息失败: {str(e)}") return None async def main(): async with FeishuDataAsync() as feishu_data: config = await feishu_data.get_values("HJlWwCCzwis5KIk24DOc0dtjnhh", "5178f1") # 初始化微信域名信息获取器 domain_fetcher = WechatDomainFetcher() for _,app_name, appid, secret in config[1:]: if not appid or not secret: continue domain_info = domain_fetcher.get_domain_info(appid, secret) if not domain_info: continue bizdomain = domain_info.get('bizdomain') if bizdomain: insert_data = list(map(lambda x: [app_name, x], bizdomain)) print(insert_data) async with FeishuDataAsync() as feishu_data: await feishu_data.insert_values("TxA2wpGZHiuLl2kMMokcaU9Mnlb", "d3a349", "A2:B", insert_data) else: print(f"小程序 {app_name} 无法获取域名信息") if __name__ == '__main__': asyncio.run(main())