# -*- coding: utf-8 -*- # @Author: wangkun # @Time: 2023/2/20 import json import os import sys import time import atomac sys.path.append(os.getcwd()) from common.common import Common from common.feishu import Feishu class SearchKey: # 启动微信 / 微信指数小程序 ; 关闭微信 / 微信指数小程序 @classmethod def start_wechat(cls, log_type, crawler): try: # 启动应用并获取应用信息 bundle_id = "com.tencent.xinWeChat" Common.logger(log_type, crawler).info("启动微信") atomac.launchAppByBundleId(bundle_id) automator = atomac.getAppRefByBundleId(bundle_id) time.sleep(1) # 获取当前应用window window = automator.windows()[0] Common.logger(log_type, crawler).info(f"当前应用window:{window.AXTitle}") # 查找聊天按钮 Common.logger(log_type, crawler).info("查找聊天按钮") chat_btn = window.findFirstR(AXHelp="微信", AXRole="AXRadioButton") # print(chat_btn.getAttributes()) # 点击聊天按钮,展开聊天列表 Common.logger(log_type, crawler).info("点击聊天按钮") chat_btn.Press() time.sleep(1) # 查找文件传输助手 Common.logger(log_type, crawler).info("查找文件传输助手") chat_help = window.findFirstR(AXIdentifier="MMChatsTableCellView_0" ,AXRole="AXCell") Common.logger(log_type, crawler).info(f"文件传输助手:{chat_help}") chat_help_position = chat_help.AXPosition chat_help_size = chat_help.AXSize chat_help_coor = (abs(chat_help_position[0])+chat_help_size[0]/2, abs(chat_help_position[1])+chat_help_size[1]/2) Common.logger(log_type, crawler).info(f"文件传输助手坐标:{chat_help_coor}") Common.logger(log_type, crawler).info("点击文件传输助手") for i in range(2): chat_help.clickMouseButtonLeft(chat_help_coor) time.sleep(1) Common.logger(log_type, crawler).info("查找微信指数小程序消息") weixinzhishu = window.findFirstR(AXValue="微信指数", AXRole="AXStaticText") Common.logger(log_type, crawler).info(f"微信指数小程序:{weixinzhishu}") weixinzhishu_box = weixinzhishu.AXPosition weixinzhishu_position = weixinzhishu.AXSize weixinzhishu_coord = (abs(weixinzhishu_box[0])+weixinzhishu_position[0]/2, abs(weixinzhishu_box[1])+weixinzhishu_position[1]) Common.logger(log_type, crawler).info(f"微信指数小程序坐标:{weixinzhishu_coord}") Common.logger(log_type, crawler).info("点击微信指数小程序消息") for i in range(2): weixinzhishu.clickMouseButtonLeft(weixinzhishu_coord) time.sleep(3) close_wechat = window.findFirstR(AXSubrole="AXCloseButton", AXRole="AXButton") close_wechat.Press() time.sleep(1) Common.logger(log_type, crawler).info("关闭微信成功") Common.logger(log_type, crawler).info("关闭微信指数小程序") cmd = "ps aux | grep Program.app | grep -v grep | awk '{print $2}' | xargs kill -9" os.system(cmd) Common.logger(log_type, crawler).info("微信指数小程序关闭成功") time.sleep(1) except Exception as e: Common.logger(log_type, crawler).info(f"start_wechat:{e}\n") # 获取微信指数小程序 search_key @classmethod def get_wechat_key(cls, log_type, crawler): try: while True: chlsfile_path = f"./{crawler}/{crawler}_chlsfiles/" if len(os.listdir(chlsfile_path)) == 0: Common.logger(log_type, crawler).info("chlsfile文件夹空,等待 3 秒") time.sleep(3) cls.start_wechat(log_type, crawler) continue Common.logger(log_type, crawler).info(f"chlsfile_list:{sorted(os.listdir(chlsfile_path))}") # 获取最新的 chlsfile chlsfile = sorted(os.listdir(chlsfile_path))[-1] # 分离文件名与扩展名 new_file = os.path.splitext(chlsfile) # 重命名文件后缀 os.rename(os.path.join(chlsfile_path, chlsfile), os.path.join(chlsfile_path, new_file[0] + ".txt")) with open(f"{chlsfile_path}{new_file[0]}.txt", encoding='utf-8-sig', errors='ignore') as f: contents = json.load(f, strict=False) if "search.weixin.qq.com" not in [text['host'] for text in contents]: return "未找到wechat_key" else: for content in contents: if content["host"] == "search.weixin.qq.com" and content["path"] == "/cgi-bin/wxaweb/wxawebreport": # print(f"content:{content}") text = content['request']['body']['text'] search_key = json.loads(text)['search_key'] openid = json.loads(text)['openid'] wechat_key_dict = { "search_key": search_key, "openid": openid, } return wechat_key_dict elif content["host"] == "search.weixin.qq.com" and content["path"] == "/cgi-bin/wxaweb/wxindexgetusergroup": text = content['request']['body']['text'] search_key = json.loads(text)['search_key'] openid = json.loads(text)['openid'] wechat_key_dict = { "search_key": search_key, "openid": openid, } return wechat_key_dict else: return "未找到wechat_key" except Exception as e: Common.logger(log_type, crawler).error(f"get_wechat_key:{e}\n") # 删除 chlsfile 文件 @classmethod def remove_chlsfile(cls, log_type, crawler): try: all_file_path = f"./{crawler}/{crawler}_chlsfiles/" if not os.path.exists(all_file_path): os.mkdir(all_file_path) all_file = os.listdir(all_file_path) for file in all_file: os.remove(f"./{crawler}/{crawler}_chlsfiles/{file}") except Exception as e: Common.logger(log_type, crawler).error(f"remove_file异常:{e}\n") # 删除微信指数小程序 search_key @classmethod def del_wechat_key(cls, log_type, crawler): try: while True: sheet = Feishu.get_values_batch(log_type, crawler, 'sVL74k') if sheet is None: Common.logger(log_type, crawler).info(f"wechat_key_sheet:{sheet}, 1秒后重新获取") time.sleep(1) continue if len(sheet) <= 51: return else: Feishu.dimension_range(log_type, crawler, 'sVL74k', 'ROWS', 52, 52) except Exception as e: Common.logger(log_type, crawler).error(f"del_wechat_key异常:{e}\n") # 微信指数小程序 search_key 写入飞书 @classmethod def write_wechat_key(cls, log_type, crawler): try: Common.logger(log_type, crawler).info(f"清空 chlsfiles 文件夹") cls.remove_chlsfile(log_type, crawler) Common.logger(log_type, crawler).info('获取 wechat_key') while True: cls.start_wechat(log_type, crawler) wechat_key_dict = cls.get_wechat_key(log_type, crawler) if wechat_key_dict is None or wechat_key_dict == "未找到wechat_key": Common.logger(log_type, crawler).info("未找到wechat_key,重新获取") time.sleep(1) continue for k, v in wechat_key_dict.items(): Common.logger(log_type, crawler).info(f"{k}:{v}") Feishu.insert_columns(log_type, crawler, 'sVL74k', 'ROWS', 1, 2) time_str = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(int(time.time()))) values = [[time_str, wechat_key_dict['search_key'], wechat_key_dict['openid']]] time.sleep(1) Feishu.update_values(log_type, crawler, 'sVL74k', 'A2:Z2', values) cls.del_wechat_key(log_type, crawler) Common.logger(log_type, crawler).info("wechat_key写入飞书成功") Common.del_logs(log_type, crawler) return except Exception as e: Common.logger(log_type, crawler).error(f"write_wechat_key:{e}\n") @classmethod def main(cls, log_type, crawler): while True: try: cls.write_wechat_key(log_type, crawler) Common.logger('searchkey', 'weixinzhishu').info('休眠10秒\n') time.sleep(10) except Exception as e: Common.logger(log_type, crawler).error(f"{e}\n") if __name__ == "__main__": # SearchKey.start_wechat("search", "weixinzhishu") # SearchKey.del_wechat_key("search", "weixinzhishu") # SearchKey.write_wechat_key("search", "weixinzhishu") # print(SearchKey.get_wechat_key("search", "weixinzhishu")) SearchKey.main("search", "weixinzhishu") pass