123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238 |
- # -*- coding: utf-8 -*-
- # @Author: wangkun
- # @Time: 2023/2/20
- """
- 1. 安装 atomac:
- 1.1.翻墙
- 1.2.pip3 install git+https://github.com/pyatom/pyatom/
- 2. 启动 Charles.exe:
- 2.1 选中 Proxy - Windows Proxy
- 2.2 选中 Tools - Auto Save - Enable Auto Save
- 3. 启动 Python 脚本:
- 3.1 cd ./piaoquan_crawler/
- 3.2 python ./weixinzhishu/weixinzhishu_main/search_key_mac.py
- 4. 每 10 秒获取最新search_key,写入飞书: https://w42nne6hzg.feishu.cn/sheets/shtcnqhMRUGunIfGnGXMOBYiy4K?sheet=sVL74k
- """
- import json
- import os
- import sys
- import time
- import atomac
- sys.path.append(os.getcwd())
- from common.common import Common
- from common.feishu import Feishu
- class SearchKey:
- # 启动微信 / 微信指数小程序 ; 关闭微信 / 微信指数小程序
- @classmethod
- def start_wechat(cls, log_type, crawler):
- try:
- # 启动应用并获取应用信息
- bundle_id = "com.tencent.xinWeChat"
- Common.logger(log_type, crawler).info("启动微信")
- atomac.launchAppByBundleId(bundle_id)
- automator = atomac.getAppRefByBundleId(bundle_id)
- time.sleep(1)
- # 获取当前应用window
- window = automator.windows()[0]
- Common.logger(log_type, crawler).info(f"当前应用window:{window.AXTitle}")
- # # 登录
- # login_btn = window.findFirstR(AXRole="AXButton", AXDescription="进入微信")
- # # print(login_btn.getAttributes())
- # # print(f"AXEnabled:{login_btn.AXEnabled}")
- # # print(f"AXDescription:{login_btn.AXDescription}")
- # # print(f"AXParent:{login_btn.AXParent}")
- # # print(f"AXSize:{login_btn.AXSize}")
- # # print(f"AXFocused:{login_btn.AXFocused}")
- # # print(f"AXRole:{login_btn.AXRole}")
- # # print(f"AXTopLevelUIElement:{login_btn.AXTopLevelUIElement}")
- # # print(f"AXHelp:{login_btn.AXHelp}")
- # # print(f"AXPosition:{login_btn.AXPosition}")
- # # print(f"AXWindow:{login_btn.AXWindow}")
- # # print(f"AXRoleDescription:{login_btn.AXRoleDescription}")
- # # print(f"AXIdentifier:{login_btn.AXIdentifier}")
- # if login_btn:
- # Common.logger(log_type, crawler).info("发现登录按钮,点击登录")
- # login_btn.Press()
- # time.sleep(1)
- # 查找聊天按钮
- Common.logger(log_type, crawler).info("查找聊天按钮")
- chat_btn = window.findFirstR(AXHelp="微信", AXRole="AXRadioButton")
- # 点击聊天按钮,展开聊天列表
- Common.logger(log_type, crawler).info("点击聊天按钮")
- chat_btn.Press()
- time.sleep(1)
- # 查找文件传输助手
- Common.logger(log_type, crawler).info("查找文件传输助手")
- chat_help = window.findFirstR(AXIdentifier="MMChatsTableCellView_0" ,AXRole="AXCell")
- Common.logger(log_type, crawler).info(f"文件传输助手:{chat_help}")
- chat_help_position = chat_help.AXPosition
- chat_help_size = chat_help.AXSize
- chat_help_coor = (abs(chat_help_position[0])+chat_help_size[0]/2, abs(chat_help_position[1])+chat_help_size[1]/2)
- Common.logger(log_type, crawler).info(f"文件传输助手坐标:{chat_help_coor}")
- Common.logger(log_type, crawler).info("点击文件传输助手")
- for i in range(2):
- chat_help.clickMouseButtonLeft(chat_help_coor)
- time.sleep(1)
- # 查找微信指数小程序消息
- Common.logger(log_type, crawler).info("查找微信指数小程序消息")
- weixinzhishu = window.findFirstR(AXValue="微信指数", AXRole="AXStaticText")
- Common.logger(log_type, crawler).info(f"微信指数小程序:{weixinzhishu}")
- weixinzhishu_box = weixinzhishu.AXPosition
- weixinzhishu_position = weixinzhishu.AXSize
- weixinzhishu_coord = (abs(weixinzhishu_box[0])+weixinzhishu_position[0]/2, abs(weixinzhishu_box[1])+weixinzhishu_position[1])
- Common.logger(log_type, crawler).info(f"微信指数小程序坐标:{weixinzhishu_coord}")
- Common.logger(log_type, crawler).info("点击微信指数小程序消息")
- for i in range(2):
- weixinzhishu.clickMouseButtonLeft(weixinzhishu_coord)
- time.sleep(3)
- close_wechat = window.findFirstR(AXSubrole="AXCloseButton", AXRole="AXButton")
- close_wechat.Press()
- time.sleep(1)
- Common.logger(log_type, crawler).info("关闭微信成功")
- Common.logger(log_type, crawler).info("关闭微信指数小程序")
- time.sleep(15)
- cmd = "ps aux | grep Program.app | grep -v grep | awk '{print $2}' | xargs kill -9"
- os.system(cmd)
- Common.logger(log_type, crawler).info("微信指数小程序关闭成功")
- time.sleep(1)
- except Exception as e:
- Common.logger(log_type, crawler).info(f"start_wechat:{e}\n")
- # 获取微信指数小程序 search_key
- @classmethod
- def get_wechat_key(cls, log_type, crawler):
- try:
- while True:
- chlsfile_path = f"./{crawler}/{crawler}_chlsfiles/"
- if len(os.listdir(chlsfile_path)) == 0:
- Common.logger(log_type, crawler).info("chlsfile文件夹空,等待 3 秒")
- time.sleep(3)
- cls.start_wechat(log_type, crawler)
- continue
- Common.logger(log_type, crawler).info(f"chlsfile_list:{sorted(os.listdir(chlsfile_path))}")
- # 获取最新的 chlsfile
- chlsfile = sorted(os.listdir(chlsfile_path))[-1]
- # 分离文件名与扩展名
- new_file = os.path.splitext(chlsfile)
- # 重命名文件后缀
- os.rename(os.path.join(chlsfile_path, chlsfile),
- os.path.join(chlsfile_path, new_file[0] + ".txt"))
- with open(f"{chlsfile_path}{new_file[0]}.txt", encoding='utf-8-sig', errors='ignore') as f:
- contents = json.load(f, strict=False)
- if "search.weixin.qq.com" not in [text['host'] for text in contents]:
- return "未找到wechat_key"
- else:
- for content in contents:
- if content["host"] == "search.weixin.qq.com" and content["path"] == "/cgi-bin/wxaweb/wxawebreport":
- # print(f"content:{content}")
- text = content['request']['body']['text']
- search_key = json.loads(text)['search_key']
- openid = json.loads(text)['openid']
- wechat_key_dict = {
- "search_key": search_key,
- "openid": openid,
- }
- return wechat_key_dict
- elif content["host"] == "search.weixin.qq.com" and content["path"] == "/cgi-bin/wxaweb/wxindexgetusergroup":
- text = content['request']['body']['text']
- search_key = json.loads(text)['search_key']
- openid = json.loads(text)['openid']
- wechat_key_dict = {
- "search_key": search_key,
- "openid": openid,
- }
- return wechat_key_dict
- else:
- return "未找到wechat_key"
- except Exception as e:
- Common.logger(log_type, crawler).error(f"get_wechat_key:{e}\n")
- # 删除 chlsfile 文件
- @classmethod
- def remove_chlsfile(cls, log_type, crawler):
- try:
- all_file_path = f"./{crawler}/{crawler}_chlsfiles/"
- if not os.path.exists(all_file_path):
- os.mkdir(all_file_path)
- all_file = os.listdir(all_file_path)
- for file in all_file:
- os.remove(f"./{crawler}/{crawler}_chlsfiles/{file}")
- except Exception as e:
- Common.logger(log_type, crawler).error(f"remove_file异常:{e}\n")
- # 删除微信指数小程序 search_key
- @classmethod
- def del_wechat_key(cls, log_type, crawler):
- try:
- while True:
- sheet = Feishu.get_values_batch(log_type, crawler, 'sVL74k')
- if sheet is None:
- Common.logger(log_type, crawler).info(f"wechat_key_sheet:{sheet}, 1秒后重新获取")
- time.sleep(1)
- continue
- if len(sheet) <= 51:
- return
- else:
- Feishu.dimension_range(log_type, crawler, 'sVL74k', 'ROWS', 52, 52)
- except Exception as e:
- Common.logger(log_type, crawler).error(f"del_wechat_key异常:{e}\n")
- # 微信指数小程序 search_key 写入飞书
- @classmethod
- def write_wechat_key(cls, log_type, crawler):
- try:
- Common.logger(log_type, crawler).info(f"清空 chlsfiles 文件夹")
- cls.remove_chlsfile(log_type, crawler)
- Common.logger(log_type, crawler).info('获取 wechat_key')
- while True:
- cls.start_wechat(log_type, crawler)
- wechat_key_dict = cls.get_wechat_key(log_type, crawler)
- if wechat_key_dict is None or wechat_key_dict == "未找到wechat_key":
- Common.logger(log_type, crawler).info("未找到wechat_key,重新获取")
- time.sleep(1)
- continue
- for k, v in wechat_key_dict.items():
- Common.logger(log_type, crawler).info(f"{k}:{v}")
- Feishu.insert_columns(log_type, crawler, 'sVL74k', 'ROWS', 1, 2)
- time_str = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(int(time.time())))
- values = [[time_str, wechat_key_dict['search_key'], wechat_key_dict['openid']]]
- time.sleep(1)
- Feishu.update_values(log_type, crawler, 'sVL74k', 'A2:Z2', values)
- cls.del_wechat_key(log_type, crawler)
- Common.logger(log_type, crawler).info("wechat_key写入飞书成功")
- Common.del_logs(log_type, crawler)
- return
- except Exception as e:
- Common.logger(log_type, crawler).error(f"write_wechat_key:{e}\n")
- @classmethod
- def main(cls, log_type, crawler):
- while True:
- try:
- cls.write_wechat_key(log_type, crawler)
- Common.logger(log_type, crawler).info('休眠10秒\n')
- time.sleep(10)
- except Exception as e:
- Common.logger(log_type, crawler).error(f"{e}\n")
- if __name__ == "__main__":
- # SearchKey.start_wechat("search", "weixinzhishu")
- # SearchKey.del_wechat_key("search", "weixinzhishu")
- # SearchKey.write_wechat_key("search", "weixinzhishu")
- # print(SearchKey.get_wechat_key("search", "weixinzhishu"))
- SearchKey.main("search", "weixinzhishu")
- pass
|