|
|
@@ -1,8 +1,13 @@
|
|
|
import asyncio
|
|
|
from readline import insert_text
|
|
|
-
|
|
|
+import urllib
|
|
|
import requests
|
|
|
import json
|
|
|
+import warnings
|
|
|
+
|
|
|
+# 禁用 urllib3 InsecureRequestWarning 警告
|
|
|
+warnings.filterwarnings('ignore', message='Unverified HTTPS request')
|
|
|
+
|
|
|
|
|
|
from core.utils.feishu_data_async import FeishuDataAsync
|
|
|
|
|
|
@@ -25,8 +30,11 @@ class WechatDomainFetcher:
|
|
|
}
|
|
|
|
|
|
try:
|
|
|
- response = requests.get(url, params=params)
|
|
|
- print(response)
|
|
|
+ response = requests.get(url, params=params,verify= False)
|
|
|
+ resp = response.json()
|
|
|
+ if resp.get("errcode",0) != 0:
|
|
|
+ print(f"获取微信令牌失败: {resp}")
|
|
|
+ return None
|
|
|
return response.json().get("access_token")
|
|
|
except Exception as e:
|
|
|
print(f"获取微信令牌失败: {str(e)}")
|
|
|
@@ -35,13 +43,13 @@ class WechatDomainFetcher:
|
|
|
def get_domain_info(self, appid, secret):
|
|
|
"""获取小程序域名信息"""
|
|
|
access_token = self.get_access_token(appid, secret)
|
|
|
- if not access_token:
|
|
|
- return None
|
|
|
|
|
|
url = f"https://api.weixin.qq.com/wxa/getwxadevinfo?access_token={access_token}"
|
|
|
|
|
|
try:
|
|
|
- response = requests.get(url)
|
|
|
+ response = requests.get(url,verify= False)
|
|
|
+ if not response.json():
|
|
|
+ print(response.json())
|
|
|
return response.json()
|
|
|
except Exception as e:
|
|
|
print(f"获取域名信息失败: {str(e)}")
|
|
|
@@ -54,19 +62,18 @@ async def main():
|
|
|
domain_fetcher = WechatDomainFetcher()
|
|
|
|
|
|
for _,app_name, appid, secret in config[1:]:
|
|
|
+ print(f"正在获取小程序 {app_name} 的域名信息...")
|
|
|
if not appid or not secret:
|
|
|
+ print(f"小程序 {app_name} 缺少 appid 或 secret")
|
|
|
continue
|
|
|
- domain_info = domain_fetcher.get_domain_info(appid, secret)
|
|
|
- if not domain_info:
|
|
|
- continue
|
|
|
+ domain_info = domain_fetcher.get_domain_info(appid.strip(), secret.strip())
|
|
|
bizdomain = domain_info.get('bizdomain')
|
|
|
if bizdomain:
|
|
|
insert_data = list(map(lambda x: [app_name, x], bizdomain))
|
|
|
- print(insert_data)
|
|
|
async with FeishuDataAsync() as feishu_data:
|
|
|
await feishu_data.insert_values("TxA2wpGZHiuLl2kMMokcaU9Mnlb", "d3a349", "A2:B", insert_data)
|
|
|
else:
|
|
|
- print(f"小程序 {app_name} 无法获取域名信息")
|
|
|
+ print(f"小程序 {app_name} 无法获取域名信息{domain_info}")
|
|
|
|
|
|
|
|
|
if __name__ == '__main__':
|