# -*- coding: utf-8 -*- # @Author: wangkun # @Time: 2023/2/10 """ 部署机器: Windows 笔记本 获取微信指数小程序请求参数:search_key 1. 启动 WinAppDriver.exe 2. 启动 Charles.exe: 2.1 选中 Proxy - Windows Proxy 2.2 选中 Tools - Auto Save - Enable Auto Save 3. 启动 Python 脚本: 3.1 cd D:\piaoquan_crawler 3.2 python .\weixinzhishu\weixinzhishu_main\search_key.py 每 10 秒获取最新search_key,写入飞书: https://w42nne6hzg.feishu.cn/sheets/shtcnqhMRUGunIfGnGXMOBYiy4K?sheet=sVL74k """ import json import os import sys import time import psutil from appium import webdriver from selenium.webdriver.common.by import By sys.path.append(os.getcwd()) from common.common import Common from common.feishu import Feishu class Searchkey: @classmethod def start_wechat(cls, log_type, crawler): try: Common.logger(log_type, crawler).info('启动"微信"') desired_caps = {'app': r"C:\Program Files (x86)\Tencent\WeChat\WeChat.exe"} driver = webdriver.Remote( command_executor='http://127.0.0.1:4723', desired_capabilities=desired_caps) driver.implicitly_wait(10) Common.logger(log_type, crawler).info('选择对话人"微信同步助手"') driver.find_elements(By.NAME, '微信同步助手')[0].click() time.sleep(1) Common.logger(log_type, crawler).info('点击"微信指数"') driver.find_elements(By.NAME, '消息')[-1].click() time.sleep(1) Common.logger(log_type, crawler).info('退出微信') driver.quit() time.sleep(1) Common.logger(log_type, crawler).info('关闭微信指数') # cls.close_weixinzhishu(log_type, crawler) weixinzhishu_driver = cls.close_weixinzhishu(log_type, crawler) weixinzhishu_driver.find_elements(By.NAME, '关闭')[-1].click() # cls.kill_pid(log_type, crawler) # time.sleep(3) except Exception as e: Common.logger(log_type, crawler).error(f'start_wechat异常:{e}\n') @classmethod def close_weixinzhishu(cls, log_type, crawler, app_name='微信指数'): """ *通过名字找到windowsdriver *通过窗口名称,从桌面对象获取webdriver对象 """ new_caps = {'app': "Root"} try: new_driver = webdriver.Remote(command_executor='http://127.0.0.1:4723', desired_capabilities=new_caps) # new_driver.find_element(By.NAME, "关闭").click() windowElement = new_driver.find_elements(By.NAME, app_name) if len(windowElement) != 0: newWindowHandle = hex(int(windowElement[0].get_attribute("NativeWindowHandle"))) app_caps = {"appTopLevelWindow": newWindowHandle} app_driver = webdriver.Remote(command_executor='http://127.0.0.1:4723', desired_capabilities=app_caps) return app_driver except Exception as e: Common.logger(log_type, crawler).error(f"close_weixinzhishu异常:{e}\n") @classmethod def kill_pid(cls, log_type, crawler): try: os.system('chcp 65001') # 将cmd的显示字符编码从默认的GBK改为UTF-8 list_process = list() pid_list = psutil.pids() for sub_pid in pid_list: try: process_info = psutil.Process(sub_pid) print(process_info) if process_info.name() == 'WeChatAppEx.exe' \ or process_info.name() == 'WeChatOCR.exe' \ or process_info.name() == 'WeChatPlayer.exe' \ or process_info.name() == 'WeChatUtility.exe': list_process.append(sub_pid) except (psutil.NoSuchProcess, psutil.AccessDenied, psutil.ZombieProcess): pass for pid in list_process: os.system('taskkill /f /pid ' + str(pid)) except Exception as e: Common.logger(log_type, crawler).error(f'kill_pid异常:{e}\n') @classmethod def get_search_key(cls, log_type, crawler): try: # charles 抓包文件保存目录 chlsfile_path = f"./{crawler}/{crawler}_chlsfiles/" if len(os.listdir(chlsfile_path)) == 0: Common.logger(log_type, crawler).info("chlsfile文件夹为空,等待10s") cls.start_wechat(log_type, crawler) time.sleep(10) cls.get_search_key(log_type, crawler) else: Common.logger(log_type, crawler).info(f"chlsfile_list:{sorted(os.listdir(chlsfile_path))}") # 获取最新的 chlsfile chlsfile = sorted(os.listdir(chlsfile_path))[-1] # 分离文件名与扩展名 new_file = os.path.splitext(chlsfile) # 重命名文件后缀 os.rename(os.path.join(chlsfile_path, chlsfile), os.path.join(chlsfile_path, new_file[0] + ".txt")) with open(f"{chlsfile_path}{new_file[0]}.txt", encoding='utf-8-sig', errors='ignore') as f: contents = json.load(f, strict=False) if "search.weixin.qq.com" not in [text['host'] for text in contents]: return "未找到search_key" else: for content in contents: if content["host"] == "search.weixin.qq.com" and content["path"] == "/cgi-bin/wxaweb/wxindexgetusergroup": # print(f"content:{content}") text = content['request']['body']['text'] search_key = json.loads(text)['search_key'] openid = json.loads(text)['openid'] return search_key, openid except Exception as e: Common.logger(log_type, crawler).exception(f"get_search_key异常:{e}\n") return None @classmethod def remove_file(cls, log_type, crawler): try: all_file_path = f"./{crawler}/{crawler}_chlsfiles/" if not os.path.exists(all_file_path): os.mkdir(all_file_path) all_file = os.listdir(f"./{crawler}/{crawler}_chlsfiles/") for file in all_file: os.remove(f"./{crawler}/{crawler}_chlsfiles/{file}") except Exception as e: Common.logger(log_type, crawler).error(f"remove_file异常:{e}\n") @classmethod def del_search_key_from_feishu(cls, log_type, crawler): try: sheet = Feishu.get_values_batch(log_type, crawler, 'sVL74k') if len(sheet) <= 51: # print('<=20行') return else: Feishu.dimension_range(log_type, crawler, 'sVL74k', 'ROWS', 52, 52) cls.del_search_key_from_feishu(log_type, crawler) except Exception as e: Common.logger(log_type, crawler).error(f"del_search_key_from_feishu异常:{e}\n") @classmethod def write_search_key_to_feishu(cls, log_type, crawler): Common.logger(log_type, crawler).info('清除 chlsfiles 文件夹') cls.remove_file(log_type, crawler) Common.logger(log_type, crawler).info('启动微信指数小程序') cls.start_wechat(log_type, crawler) Common.logger(log_type, crawler).info('获取 search_key') while True: search_key = cls.get_search_key(log_type, crawler) if search_key is None or search_key == "未找到search_key": time.sleep(3) Common.logger(log_type, crawler).info('未找到search_key,重启打开微信指数,获取 search_key') cls.start_wechat(log_type, crawler) cls.get_search_key(log_type, crawler) else: while True: try: Common.logger(log_type, crawler).info(f'已获取 search_key,openid:{search_key}') Feishu.insert_columns(log_type, crawler, 'sVL74k', 'ROWS', 1, 2) time.sleep(1) time_str = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(int(time.time()))) Feishu.update_values(log_type, crawler, 'sVL74k', 'A2:Z2', [[time_str, search_key[0], search_key[-1]]]) cls.del_search_key_from_feishu(log_type, crawler) Common.logger(log_type, crawler).info(f"search_key:{search_key}写入飞书表成功\n") break except Exception as e: Common.logger(log_type, crawler).error(f"{e}\n") time.sleep(1) return if __name__ == '__main__': while True: Searchkey.write_search_key_to_feishu('searchkey', 'weixinzhishu') Common.logger('searchkey', 'weixinzhishu').info('休眠 10 秒') time.sleep(10) # Searchkey.start_wechat('searchkey', 'weixinzhishu')