# -*- coding: utf-8 -*- # @Author: wangkun # @Time: 2023/2/20 """ 1. 安装 atomac: 1.1.翻墙 1.2.pip3 install git+https://github.com/pyatom/pyatom/ 2. 启动 Charles.exe: 2.1 选中 Proxy - Windows Proxy 2.2 选中 Tools - Auto Save - Enable Auto Save 3. 启动 Python 脚本: 3.1 cd ./piaoquan_crawler/ 3.2 python ./weixinzhishu/weixinzhishu_main/search_key_mac.py 4. 每 10 秒获取最新search_key,写入飞书: https://w42nne6hzg.feishu.cn/sheets/shtcnqhMRUGunIfGnGXMOBYiy4K?sheet=sVL74k """ from datetime import datetime import json import os import sys import time import atomac sys.path.append(os.getcwd()) from common.common import Common from common.feishu import Feishu class SearchKey: # 启动微信 / 微信指数小程序 ; 关闭微信 / 微信指数小程序 @classmethod def start_wechat(cls, log_type, crawler): try: # 启动应用并获取应用信息 bundle_id = "com.tencent.xinWeChat" Common.logger(log_type, crawler).info("启动微信") atomac.launchAppByBundleId(bundle_id) automator = atomac.getAppRefByBundleId(bundle_id) time.sleep(1) # 获取当前应用window window = automator.windows()[0] Common.logger(log_type, crawler).info(f"当前应用window:{window.AXTitle}") # # 登录 # login_btn = window.findFirstR(AXRole="AXButton", AXDescription="进入微信") # # print(login_btn.getAttributes()) # # print(f"AXEnabled:{login_btn.AXEnabled}") # # print(f"AXDescription:{login_btn.AXDescription}") # # print(f"AXParent:{login_btn.AXParent}") # # print(f"AXSize:{login_btn.AXSize}") # # print(f"AXFocused:{login_btn.AXFocused}") # # print(f"AXRole:{login_btn.AXRole}") # # print(f"AXTopLevelUIElement:{login_btn.AXTopLevelUIElement}") # # print(f"AXHelp:{login_btn.AXHelp}") # # print(f"AXPosition:{login_btn.AXPosition}") # # print(f"AXWindow:{login_btn.AXWindow}") # # print(f"AXRoleDescription:{login_btn.AXRoleDescription}") # # print(f"AXIdentifier:{login_btn.AXIdentifier}") # if login_btn: # Common.logger(log_type, crawler).info("发现登录按钮,点击登录") # login_btn.Press() # time.sleep(1) # 查找聊天按钮 Common.logger(log_type, crawler).info("查找聊天按钮") chat_btn = window.findFirstR(AXHelp="微信", AXRole="AXRadioButton") # 点击聊天按钮,展开聊天列表 Common.logger(log_type, crawler).info("点击聊天按钮") chat_btn.Press() time.sleep(1) # 查找文件传输助手 Common.logger(log_type, crawler).info("查找文件传输助手") chat_help = window.findFirstR(AXIdentifier="MMChatsTableCellView_0" ,AXRole="AXCell") Common.logger(log_type, crawler).info(f"文件传输助手:{chat_help}") chat_help_position = chat_help.AXPosition chat_help_size = chat_help.AXSize chat_help_coor = (abs(chat_help_position[0])+chat_help_size[0]/2, abs(chat_help_position[1])+chat_help_size[1]/2) Common.logger(log_type, crawler).info(f"文件传输助手坐标:{chat_help_coor}") Common.logger(log_type, crawler).info("点击文件传输助手") for i in range(2): chat_help.clickMouseButtonLeft(chat_help_coor) time.sleep(1) # 查找微信指数小程序消息 Common.logger(log_type, crawler).info("查找微信指数小程序消息") weixinzhishu = window.findFirstR(AXValue="微信指数", AXRole="AXStaticText") Common.logger(log_type, crawler).info(f"微信指数小程序:{weixinzhishu}") weixinzhishu_box = weixinzhishu.AXPosition weixinzhishu_position = weixinzhishu.AXSize weixinzhishu_coord = (abs(weixinzhishu_box[0])+weixinzhishu_position[0]/2, abs(weixinzhishu_box[1])+weixinzhishu_position[1]) Common.logger(log_type, crawler).info(f"微信指数小程序坐标:{weixinzhishu_coord}") Common.logger(log_type, crawler).info("点击微信指数小程序消息") for i in range(2): weixinzhishu.clickMouseButtonLeft(weixinzhishu_coord) time.sleep(3) close_wechat = window.findFirstR(AXSubrole="AXCloseButton", AXRole="AXButton") close_wechat.Press() time.sleep(1) Common.logger(log_type, crawler).info("关闭微信成功") Common.logger(log_type, crawler).info("关闭微信指数小程序") time.sleep(15) cmd = "ps aux | grep Program.app | grep -v grep | awk '{print $2}' | xargs kill -9" os.system(cmd) Common.logger(log_type, crawler).info("微信指数小程序关闭成功") time.sleep(1) except Exception as e: Common.logger(log_type, crawler).info(f"start_wechat:{e}\n") # 获取微信指数小程序 search_key @classmethod def get_wechat_key(cls, log_type, crawler): try: while True: chlsfile_path = f"./{crawler}/{crawler}_chlsfiles/" if len(os.listdir(chlsfile_path)) == 0: Common.logger(log_type, crawler).info("chlsfile文件夹空,等待 3 秒") time.sleep(3) cls.start_wechat(log_type, crawler) continue Common.logger(log_type, crawler).info(f"chlsfile_list:{sorted(os.listdir(chlsfile_path))}") # 获取最新的 chlsfile chlsfile = sorted(os.listdir(chlsfile_path))[-1] # 分离文件名与扩展名 new_file = os.path.splitext(chlsfile) # 重命名文件后缀 os.rename(os.path.join(chlsfile_path, chlsfile), os.path.join(chlsfile_path, new_file[0] + ".txt")) with open(f"{chlsfile_path}{new_file[0]}.txt", encoding='utf-8-sig', errors='ignore') as f: contents = json.load(f, strict=False) if "search.weixin.qq.com" not in [text['host'] for text in contents]: return "未找到wechat_key" else: for content in contents: if content["host"] == "search.weixin.qq.com" and content["path"] == "/cgi-bin/wxaweb/wxawebreport": # print(f"content:{content}") text = content['request']['body']['text'] search_key = json.loads(text)['search_key'] openid = json.loads(text)['openid'] wechat_key_dict = { "search_key": search_key, "openid": openid, } return wechat_key_dict elif content["host"] == "search.weixin.qq.com" and content["path"] == "/cgi-bin/wxaweb/wxindexgetusergroup": text = content['request']['body']['text'] search_key = json.loads(text)['search_key'] openid = json.loads(text)['openid'] wechat_key_dict = { "search_key": search_key, "openid": openid, } return wechat_key_dict else: return "未找到wechat_key" except Exception as e: Common.logger(log_type, crawler).error(f"get_wechat_key:{e}\n") # 删除 chlsfile 文件 @classmethod def remove_chlsfile(cls, log_type, crawler): try: all_file_path = f"./{crawler}/{crawler}_chlsfiles/" if not os.path.exists(all_file_path): os.mkdir(all_file_path) all_file = os.listdir(all_file_path) for file in all_file: os.remove(f"./{crawler}/{crawler}_chlsfiles/{file}") except Exception as e: Common.logger(log_type, crawler).error(f"remove_file异常:{e}\n") # 删除微信指数小程序 search_key @classmethod def del_wechat_key(cls, log_type, crawler): try: while True: sheet = Feishu.get_values_batch(log_type, crawler, 'sVL74k') if sheet is None: Common.logger(log_type, crawler).info(f"wechat_key_sheet:{sheet}, 1秒后重新获取") time.sleep(1) continue if len(sheet) <= 51: return else: Feishu.dimension_range(log_type, crawler, 'sVL74k', 'ROWS', 52, 52) except Exception as e: Common.logger(log_type, crawler).error(f"del_wechat_key异常:{e}\n") # 微信指数小程序 search_key 写入飞书 @classmethod def write_wechat_key(cls, log_type, crawler): try: Common.logger(log_type, crawler).info(f"清空 chlsfiles 文件夹") cls.remove_chlsfile(log_type, crawler) Common.logger(log_type, crawler).info('获取 wechat_key') while True: cls.start_wechat(log_type, crawler) wechat_key_dict = cls.get_wechat_key(log_type, crawler) if wechat_key_dict is None or wechat_key_dict == "未找到wechat_key": Common.logger(log_type, crawler).info("未找到wechat_key,重新获取") time.sleep(1) continue for k, v in wechat_key_dict.items(): Common.logger(log_type, crawler).info(f"{k}:{v}") Feishu.insert_columns(log_type, crawler, 'sVL74k', 'ROWS', 1, 2) time_str = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(int(time.time()))) values = [[time_str, wechat_key_dict['search_key'], wechat_key_dict['openid']]] time.sleep(1) Feishu.update_values(log_type, crawler, 'sVL74k', 'A2:Z2', values) cls.del_wechat_key(log_type, crawler) Common.logger(log_type, crawler).info("wechat_key写入飞书成功") Common.del_logs(log_type, crawler) return except Exception as e: Common.logger(log_type, crawler).error(f"write_wechat_key:{e}\n") @classmethod def main(cls, log_type, crawler): while True: if 11 <= datetime.now().hour < 14: cls.write_wechat_key(log_type, crawler) Common.logger(log_type, crawler).info('休眠10秒\n') time.sleep(10) else: Common.logger(log_type, crawler).info("休眠中,获取 search_key 的时间段为: 11:00:00 - 13:59:59") time.sleep(60) if __name__ == "__main__": SearchKey.main("search", "weixinzhishu") pass