123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188 |
- # -*- coding: utf-8 -*-
- # @Author: wangkun
- # @Time: 2023/2/10
- """
- 部署机器: Windows 笔记本
- 获取微信指数小程序请求参数:search_key
- 1. 启动 WinAppDriver.exe
- 2. 启动 Charles.exe:
- 2.1 选中 Proxy - Windows Proxy
- 2.2 选中 Tools - Auto Save - Enable Auto Save
- 3. 启动 Python 脚本:
- 3.1 cd D:\piaoquan_crawler
- 3.2 python .\weixinzhishu\weixinzhishu_main\search_key.py
- 每 10 秒获取最新search_key,写入飞书: https://w42nne6hzg.feishu.cn/sheets/shtcnqhMRUGunIfGnGXMOBYiy4K?sheet=sVL74k
- """
- import json
- import os
- import sys
- import time
- import psutil
- from appium import webdriver
- from selenium.webdriver.common.by import By
- sys.path.append(os.getcwd())
- from common.common import Common
- from common.feishu import Feishu
- class Searchkey:
- @classmethod
- def start_wechat(cls, log_type, crawler):
- try:
- Common.logger(log_type, crawler).info('启动"微信"')
- desired_caps = {'app': r"C:\Program Files (x86)\Tencent\WeChat\WeChat.exe"}
- driver = webdriver.Remote(
- command_executor='http://127.0.0.1:4723',
- desired_capabilities=desired_caps)
- driver.implicitly_wait(10)
- Common.logger(log_type, crawler).info('选择对话人"微信同步助手"')
- driver.find_elements(By.NAME, '微信同步助手')[0].click()
- time.sleep(1)
- Common.logger(log_type, crawler).info('点击"微信指数"')
- driver.find_elements(By.NAME, '消息')[-1].click()
- time.sleep(1)
- Common.logger(log_type, crawler).info('退出微信')
- driver.quit()
- time.sleep(1)
- Common.logger(log_type, crawler).info('关闭微信指数')
- weixinzhishu_driver = cls.close_weixinzhishu(log_type, crawler)
- weixinzhishu_driver.find_elements(By.NAME, '关闭')[-1].click()
- except Exception as e:
- Common.logger(log_type, crawler).error(f'start_wechat异常:{e}\n')
- @classmethod
- def close_weixinzhishu(cls, log_type, crawler, app_name='微信指数'):
- """
- *通过名字找到windowsdriver
- *通过窗口名称,从桌面对象获取webdriver对象
- """
- new_caps = {'app': "Root"}
- try:
- new_driver = webdriver.Remote(command_executor='http://127.0.0.1:4723', desired_capabilities=new_caps)
- windowElement = new_driver.find_elements(By.NAME, app_name)
- if len(windowElement) != 0:
- newWindowHandle = hex(int(windowElement[0].get_attribute("NativeWindowHandle")))
- app_caps = {"appTopLevelWindow": newWindowHandle}
- app_driver = webdriver.Remote(command_executor='http://127.0.0.1:4723',
- desired_capabilities=app_caps)
- return app_driver
- except Exception as e:
- Common.logger(log_type, crawler).error(f"close_weixinzhishu异常:{e}\n")
- @classmethod
- def kill_pid(cls, log_type, crawler):
- try:
- os.system('chcp 65001') # 将cmd的显示字符编码从默认的GBK改为UTF-8
- list_process = list()
- pid_list = psutil.pids()
- for sub_pid in pid_list:
- try:
- process_info = psutil.Process(sub_pid)
- print(process_info)
- if process_info.name() == 'WeChatAppEx.exe' \
- or process_info.name() == 'WeChatOCR.exe' \
- or process_info.name() == 'WeChatPlayer.exe' \
- or process_info.name() == 'WeChatUtility.exe':
- list_process.append(sub_pid)
- except (psutil.NoSuchProcess, psutil.AccessDenied, psutil.ZombieProcess):
- pass
- for pid in list_process:
- os.system('taskkill /f /pid ' + str(pid))
- except Exception as e:
- Common.logger(log_type, crawler).error(f'kill_pid异常:{e}\n')
- @classmethod
- def get_search_key(cls, log_type, crawler):
- try:
- # charles 抓包文件保存目录
- chlsfile_path = f"./{crawler}/{crawler}_chlsfiles/"
- if len(os.listdir(chlsfile_path)) == 0:
- Common.logger(log_type, crawler).info("chlsfile文件夹为空,等待10s")
- cls.start_wechat(log_type, crawler)
- time.sleep(10)
- cls.get_search_key(log_type, crawler)
- else:
- Common.logger(log_type, crawler).info(f"chlsfile_list:{sorted(os.listdir(chlsfile_path))}")
- # 获取最新的 chlsfile
- chlsfile = sorted(os.listdir(chlsfile_path))[-1]
- # 分离文件名与扩展名
- new_file = os.path.splitext(chlsfile)
- # 重命名文件后缀
- os.rename(os.path.join(chlsfile_path, chlsfile),
- os.path.join(chlsfile_path, new_file[0] + ".txt"))
- with open(f"{chlsfile_path}{new_file[0]}.txt", encoding='utf-8-sig', errors='ignore') as f:
- contents = json.load(f, strict=False)
- if "search.weixin.qq.com" not in [text['host'] for text in contents]:
- return "未找到search_key"
- else:
- for content in contents:
- if content["host"] == "search.weixin.qq.com" and content["path"] == "/cgi-bin/wxaweb/wxindexgetusergroup":
- # print(f"content:{content}")
- text = content['request']['body']['text']
- search_key = json.loads(text)['search_key']
- openid = json.loads(text)['openid']
- return search_key, openid
- except Exception as e:
- Common.logger(log_type, crawler).exception(f"get_search_key异常:{e}\n")
- return None
- @classmethod
- def remove_file(cls, log_type, crawler):
- try:
- all_file_path = f"./{crawler}/{crawler}_chlsfiles/"
- if not os.path.exists(all_file_path):
- os.mkdir(all_file_path)
- all_file = os.listdir(f"./{crawler}/{crawler}_chlsfiles/")
- for file in all_file:
- os.remove(f"./{crawler}/{crawler}_chlsfiles/{file}")
- except Exception as e:
- Common.logger(log_type, crawler).error(f"remove_file异常:{e}\n")
- @classmethod
- def del_search_key_from_feishu(cls, log_type, crawler):
- try:
- sheet = Feishu.get_values_batch(log_type, crawler, 'sVL74k')
- if len(sheet) <= 21:
- # print('<=20行')
- return
- else:
- Feishu.dimension_range(log_type, crawler, 'sVL74k', 'ROWS', 22, 22)
- cls.del_search_key_from_feishu(log_type, crawler)
- except Exception as e:
- Common.logger(log_type, crawler).error(f"del_search_key_from_feishu异常:{e}\n")
- @classmethod
- def write_search_key_to_feishu(cls, log_type, crawler):
- Common.logger(log_type, crawler).info('清除 chlsfiles 文件夹')
- cls.remove_file(log_type, crawler)
- Common.logger(log_type, crawler).info('启动微信指数小程序')
- cls.start_wechat(log_type, crawler)
- Common.logger(log_type, crawler).info('获取 search_key')
- while True:
- search_key = cls.get_search_key(log_type, crawler)
- if search_key is None or search_key == "未找到search_key":
- time.sleep(3)
- Common.logger(log_type, crawler).info('未找到search_key,重启打开微信指数,获取 search_key')
- cls.start_wechat(log_type, crawler)
- cls.get_search_key(log_type, crawler)
- else:
- Common.logger(log_type, crawler).info(f'已获取 search_key,openid:{search_key}')
- Feishu.insert_columns(log_type, crawler, 'sVL74k', 'ROWS', 1, 2)
- time.sleep(1)
- time_str = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(int(time.time())))
- Feishu.update_values(log_type, crawler, 'sVL74k', 'A2:Z2', [[time_str, search_key[0], search_key[-1]]])
- cls.del_search_key_from_feishu(log_type, crawler)
- Common.logger(log_type, crawler).info(f"search_key:{search_key}写入飞书表成功\n")
- return
- if __name__ == '__main__':
- while True:
- Searchkey.write_search_key_to_feishu('searchkey', 'weixinzhishu')
- Common.logger('searchkey', 'weixinzhishu').info('休眠 10 秒')
- time.sleep(10)
- # Searchkey.start_wechat('searchkey', 'weixinzhishu')
|