123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171 |
- # -*- coding: utf-8 -*-
- # @Author: wangkun
- # @Time: 2023/2/10
- import json
- import os
- import sys
- import time
- import psutil as psutil
- from appium import webdriver
- from selenium.webdriver.common.by import By
- sys.path.append(os.getcwd())
- from common.common import Common
- class Searchkey:
- # @classmethod
- # def kill_pid(cls, log_type):
- # try:
- # os.system('chcp 65001') # 将cmd的显示字符编码从默认的GBK改为UTF-8
- # list_process = list()
- # pid_list = psutil.pids()
- # for sub_pid in pid_list:
- # try:
- # process_info = psutil.Process(sub_pid)
- # if process_info.name() == 'WechatBrowser.exe' or process_info.name() == 'WeChatPlayer.exe':
- # list_process.append(sub_pid)
- # except (psutil.NoSuchProcess, psutil.AccessDenied, psutil.ZombieProcess):
- # pass
- # for pid in list_process:
- # os.system('taskkill /f /pid ' + str(pid))
- # except Exception as e:
- # Common.logger(log_type).error('kill_pid异常:{}', e)
- @classmethod
- def start_wechat(cls, log_type, crawler):
- try:
- # Common.logger(log_type, crawler).info('启动"微信"')
- print('启动"微信"')
- desired_caps = {'app': r"C:\Program Files (x86)\Tencent\WeChat\WeChat.exe"}
- driver = webdriver.Remote(
- command_executor='http://127.0.0.1:4723',
- desired_capabilities=desired_caps)
- driver.implicitly_wait(10)
- # Common.logger(log_type).info('点击"聊天窗口"')
- # driver.find_element(By.NAME, '聊天').click()
- #
- # Common.logger(log_type).info('点击"爬虫群"')
- # driver.find_elements(By.NAME, '爬虫群')[0].click()
- # Common.logger(log_type, crawler).info('点击微信指数')
- print('点击微信指数')
- driver.find_elements(By.NAME, '消息')[-1].click()
- # Common.logger(log_type, crawler).info('休眠 10 秒,退出微信指数')
- # time.sleep(10)
- # cls.kill_pid(log_type)
- print("3秒后退出微信")
- time.sleep(3)
- # Common.logger(log_type, crawler).info('退出微信')
- driver.quit()
- except Exception as e:
- Common.logger(log_type, crawler).error('click_video异常:{}', e)
- # @classmethod
- # def get_url(cls, log_type):
- # try:
- # # charles 抓包文件保存目录
- # charles_file_dir = r"./weixinzhishu_chlsfiles/"
- #
- # if len(os.listdir(charles_file_dir)) == 0:
- # Common.logger(log_type).info("未找到chlsfile文件,等待2s")
- # time.sleep(2)
- # else:
- # # 目标文件夹下所有文件
- # all_file = sorted(os.listdir(charles_file_dir))
- #
- # # 获取到目标文件
- # old_file = all_file[-1]
- #
- # # 分离文件名与扩展名
- # new_file = os.path.splitext(old_file)
- #
- # # 重命名文件后缀
- # os.rename(os.path.join(charles_file_dir, old_file),
- # os.path.join(charles_file_dir, new_file[0] + ".txt"))
- #
- # with open(charles_file_dir + new_file[0] + ".txt", encoding='utf-8-sig', errors='ignore') as f:
- # contents = json.load(f, strict=False)
- #
- # video_url_list = []
- # cover_url_list = []
- #
- # if "finder.video.qq.com" in [text['host'] for text in contents]:
- # for text in contents:
- # if text["host"] == "finder.video.qq.com" and text["path"] == "/251/20302/stodownload":
- # video_url_list.append(text)
- # elif text["host"] == "finder.video.qq.com" and text["path"] == "/251/20304/stodownload":
- # cover_url_list.append(text)
- #
- # video_url = video_url_list[0]['host']+video_url_list[0]['path']+'?'+video_url_list[0]['query']
- # cover_url = cover_url_list[0]['host']+cover_url_list[0]['path']+'?'+cover_url_list[0]['query']
- # head_url = cover_url
- #
- # # print(f'video_url:{video_url}')
- # # print(f'cover_url:{cover_url}')
- # # print(f'head_url:{head_url}')
- #
- # return video_url, cover_url, head_url
- # else:
- # Common.logger(log_type).info("未找到url")
- # return '未找到url'
- #
- # except Exception as e:
- # Common.logger(log_type).exception("get_url异常:{}\n", e)
- # return None
- # @classmethod
- # def write_url(cls, log_type):
- # try:
- # while True:
- # if Feishu.get_values_batch(log_type, 'shipinhao', 'FSDlBy')[1][11] is None:
- # Common.del_charles_files('recommend')
- # cls.start_wechat(log_type)
- # Common.logger(log_type).info('等待 2s')
- # time.sleep(2)
- # Common.logger(log_type).info('获取视频头像/封面/播放地址')
- # urls = cls.get_url(log_type)
- # if urls == '未找到url':
- # time.sleep(1)
- # cls.write_url(log_type)
- # elif urls is None:
- # time.sleep(1)
- # cls.write_url(log_type)
- # else:
- # Feishu.update_values(log_type, 'shipinhao', 'FSDlBy', 'J2:L2',
- # [['https://'+urls[2], 'https://'+urls[1], 'https://'+urls[0]]])
- # Common.logger(log_type).info('视频地址信息写入飞书成功\n')
- # Common.del_charles_files('recommend')
- # break
- # else:
- # Common.logger(log_type).info('视频已有地址信息,休眠 10s')
- # time.sleep(10)
- # break
- # except Exception as e:
- # # Feishu.dimension_range(log_type, 'shipinhao', 'FSDlBy', 'ROWS', 2, 2)
- # Common.logger(log_type).error('write_url异常:{}\n', e)
- # @classmethod
- # def run_get_url(cls, log_type):
- # try:
- # while True:
- # if len(Feishu.get_values_batch(log_type, 'shipinhao', 'FSDlBy')) == 1:
- # Common.logger(log_type).info('暂无需要获取地址的视频信息')
- # time.sleep(30)
- # break
- # else:
- # cls.write_url(log_type)
- #
- # except Exception as e:
- # Common.logger(log_type).error('run_get_url异常:{}\n', e)
- if __name__ == '__main__':
- Searchkey.start_wechat('weixin', 'weixinzhishu')
- # while True:
- # ShipinhaoWindows.run_get_url('recommend')
- # Common.del_logs('recommend')
- # time.sleep(1)
- pass
|