# -*- coding: utf-8 -*- # @Author: wangkun # @Time: 2022/9/1 import json import os import sys import time import psutil as psutil # import atomacos from appium import webdriver from selenium.webdriver.common.by import By sys.path.append(os.getcwd()) # from crawler_shipinhao.main.common import Common # from crawler_shipinhao.main.feishu_lib import Feishu from main.feishu_lib import Feishu from main.common import Common class GetUrl: @classmethod def kill_pid(cls): os.system('chcp 65001') # 将cmd的显示字符编码从默认的GBK改为UTF-8 list_process = list() pid_list = psutil.pids() for sub_pid in pid_list: try: process_info = psutil.Process(sub_pid) if process_info.name() == 'WechatBrowser.exe' or process_info.name() == 'WeChatPlayer.exe': list_process.append(sub_pid) except (psutil.NoSuchProcess, psutil.AccessDenied, psutil.ZombieProcess): pass for pid in list_process: os.system('taskkill /f /pid ' + str(pid)) @classmethod def click_video(cls, log_type): try: Common.logger(log_type).info('启动"微信"') desired_caps = {'app': r"C:\Program Files (x86)\Tencent\WeChat\WeChat.exe"} driver = webdriver.Remote( command_executor='http://127.0.0.1:4723', desired_capabilities=desired_caps) driver.implicitly_wait(10) Common.logger(log_type).info('点击"聊天窗口"') driver.find_element(By.NAME, '聊天').click() Common.logger(log_type).info('点击"爬虫群"') driver.find_element(By.NAME, '爬虫群').click() Common.logger(log_type).info('点击视频') # driver.find_elements(By.ID, '42.131546.3.3981')[-1].click() driver.find_elements(By.NAME, '消息')[-1].click() Common.logger(log_type).info('退出视频号') time.sleep(5) cls.kill_pid() Common.logger(log_type).info('退出微信') driver.quit() except Exception as e: Common.logger(log_type).error('click_video异常:{}', e) @classmethod def get_url(cls, log_type): try: # charles 抓包文件保存目录 charles_file_dir = r"./chlsfiles/" if int(len(os.listdir(charles_file_dir))) == 0: Common.logger(log_type).info("未找到chlsfile文件,等待2s") time.sleep(2) else: # 目标文件夹下所有文件 all_file = sorted(os.listdir(charles_file_dir)) # 获取到目标文件 old_file = all_file[-1] # 分离文件名与扩展名 new_file = os.path.splitext(old_file) # 重命名文件后缀 os.rename(os.path.join(charles_file_dir, old_file), os.path.join(charles_file_dir, new_file[0] + ".txt")) with open(charles_file_dir + new_file[0] + ".txt", encoding='utf-8-sig', errors='ignore') as f: contents = json.load(f, strict=False) video_url_list = [] cover_url_list = [] if "finder.video.qq.com" in [text['host'] for text in contents]: for text in contents: if text["host"] == "finder.video.qq.com" and text["path"] == "/251/20302/stodownload": video_url_list.append(text) elif text["host"] == "finder.video.qq.com" and text["path"] == "/251/20304/stodownload": cover_url_list.append(text) video_url = video_url_list[0]['host']+video_url_list[0]['path']+'?'+video_url_list[0]['query'] cover_url = cover_url_list[0]['host']+cover_url_list[0]['path']+'?'+cover_url_list[0]['query'] head_url = cover_url # print(f'video_url:{video_url}') # print(f'cover_url:{cover_url}') # print(f'head_url:{head_url}') return video_url, cover_url, head_url else: Common.logger(log_type).info("未找到url") return '未找到url' except Exception as e: Common.logger(log_type).exception("get_url异常:{}", e) return None @classmethod def write_url(cls, log_type): # try: while True: if Feishu.get_values_batch(log_type, 'shipinhao', 'FSDlBy')[1][11] is None: Common.del_charles_files('recommend') cls.click_video(log_type) Common.logger(log_type).info('等待 2s') time.sleep(2) Common.logger(log_type).info('获取视频头像/封面/播放地址') # print('获取视频头像/封面/播放地址') urls = cls.get_url(log_type) if urls == '未找到url': cls.write_url(log_type) else: Feishu.update_values(log_type, 'shipinhao', 'FSDlBy', 'J2:L2', [['https://'+urls[2], 'https://'+urls[1], 'https://'+urls[0]]]) Common.logger(log_type).info('视频地址信息写入飞书成功\n') Common.del_charles_files('recommend') break else: Common.logger(log_type).info('视频已有地址信息,休眠 10s') time.sleep(10) break # except Exception as e: # # Feishu.dimension_range(log_type, 'shipinhao', 'FSDlBy', 'ROWS', 2, 2) # Common.logger(log_type).error('write_url异常:{}', e) @classmethod def run_get_url(cls, log_type): # try: while True: if len(Feishu.get_values_batch(log_type, 'shipinhao', 'FSDlBy')) == 1: Common.logger(log_type).info('暂无需要获取地址的视频信息') time.sleep(30) break else: cls.write_url(log_type) # except Exception as e: # Common.logger(log_type).error('run_get_url异常:{}', e) if __name__ == '__main__': # GetUrl.write_url('recommend') # print(len(Feishu.get_values_batch('recommend', 'shipinhao', 'FSDlBy'))) while True: GetUrl.run_get_url('recommend') Common.del_logs('recommend') time.sleep(1)