123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168 |
- # -*- coding: utf-8 -*-
- # @Author: wangkun
- # @Time: 2022/9/1
- import json
- import os
- import sys
- import time
- import psutil as psutil
- from appium import webdriver
- from selenium.webdriver.common.by import By
- sys.path.append(os.getcwd())
- from main.feishu_lib import Feishu
- from main.common import Common
- class ShipinhaoWindows:
- @classmethod
- def kill_pid(cls, log_type):
- try:
- os.system('chcp 65001') # 将cmd的显示字符编码从默认的GBK改为UTF-8
- list_process = list()
- pid_list = psutil.pids()
- for sub_pid in pid_list:
- try:
- process_info = psutil.Process(sub_pid)
- if process_info.name() == 'WechatBrowser.exe' or process_info.name() == 'WeChatPlayer.exe':
- list_process.append(sub_pid)
- except (psutil.NoSuchProcess, psutil.AccessDenied, psutil.ZombieProcess):
- pass
- for pid in list_process:
- os.system('taskkill /f /pid ' + str(pid))
- except Exception as e:
- Common.logger(log_type).error('kill_pid异常:{}', e)
- @classmethod
- def click_video(cls, log_type):
- try:
- Common.logger(log_type).info('启动"微信"')
- desired_caps = {'app': r"C:\Program Files (x86)\Tencent\WeChat\WeChat.exe"}
- driver = webdriver.Remote(
- command_executor='http://127.0.0.1:4723',
- desired_capabilities=desired_caps)
- driver.implicitly_wait(10)
- # Common.logger(log_type).info('点击"聊天窗口"')
- # driver.find_element(By.NAME, '聊天').click()
- #
- # Common.logger(log_type).info('点击"爬虫群"')
- # driver.find_elements(By.NAME, '爬虫群')[0].click()
- Common.logger(log_type).info('点击视频')
- driver.find_elements(By.NAME, '消息')[-1].click()
- Common.logger(log_type).info('休眠 10 秒,退出视频号')
- time.sleep(10)
- cls.kill_pid(log_type)
- Common.logger(log_type).info('退出微信')
- driver.quit()
- except Exception as e:
- Common.logger(log_type).error('click_video异常:{}', e)
- @classmethod
- def get_url(cls, log_type):
- try:
- # charles 抓包文件保存目录
- charles_file_dir = r"./chlsfiles/"
- if len(os.listdir(charles_file_dir)) == 0:
- Common.logger(log_type).info("未找到chlsfile文件,等待2s")
- time.sleep(2)
- else:
- # 目标文件夹下所有文件
- all_file = sorted(os.listdir(charles_file_dir))
- # 获取到目标文件
- old_file = all_file[-1]
- # 分离文件名与扩展名
- new_file = os.path.splitext(old_file)
- # 重命名文件后缀
- os.rename(os.path.join(charles_file_dir, old_file),
- os.path.join(charles_file_dir, new_file[0] + ".txt"))
- with open(charles_file_dir + new_file[0] + ".txt", encoding='utf-8-sig', errors='ignore') as f:
- contents = json.load(f, strict=False)
- video_url_list = []
- cover_url_list = []
- if "finder.video.qq.com" in [text['host'] for text in contents]:
- for text in contents:
- if text["host"] == "finder.video.qq.com" and text["path"] == "/251/20302/stodownload":
- video_url_list.append(text)
- elif text["host"] == "finder.video.qq.com" and text["path"] == "/251/20304/stodownload":
- cover_url_list.append(text)
- video_url = video_url_list[0]['host']+video_url_list[0]['path']+'?'+video_url_list[0]['query']
- cover_url = cover_url_list[0]['host']+cover_url_list[0]['path']+'?'+cover_url_list[0]['query']
- head_url = cover_url
- # print(f'video_url:{video_url}')
- # print(f'cover_url:{cover_url}')
- # print(f'head_url:{head_url}')
- return video_url, cover_url, head_url
- else:
- Common.logger(log_type).info("未找到url")
- return '未找到url'
- except Exception as e:
- Common.logger(log_type).exception("get_url异常:{}\n", e)
- return None
- @classmethod
- def write_url(cls, log_type):
- try:
- while True:
- if Feishu.get_values_batch(log_type, 'shipinhao', 'FSDlBy')[1][11] is None:
- Common.del_charles_files('recommend')
- cls.click_video(log_type)
- Common.logger(log_type).info('等待 2s')
- time.sleep(2)
- Common.logger(log_type).info('获取视频头像/封面/播放地址')
- urls = cls.get_url(log_type)
- if urls == '未找到url':
- time.sleep(1)
- cls.write_url(log_type)
- elif urls is None:
- time.sleep(1)
- cls.write_url(log_type)
- else:
- Feishu.update_values(log_type, 'shipinhao', 'FSDlBy', 'J2:L2',
- [['https://'+urls[2], 'https://'+urls[1], 'https://'+urls[0]]])
- Common.logger(log_type).info('视频地址信息写入飞书成功\n')
- Common.del_charles_files('recommend')
- break
- else:
- Common.logger(log_type).info('视频已有地址信息,休眠 10s')
- time.sleep(10)
- break
- except Exception as e:
- # Feishu.dimension_range(log_type, 'shipinhao', 'FSDlBy', 'ROWS', 2, 2)
- Common.logger(log_type).error('write_url异常:{}\n', e)
- @classmethod
- def run_get_url(cls, log_type):
- try:
- while True:
- if len(Feishu.get_values_batch(log_type, 'shipinhao', 'FSDlBy')) == 1:
- Common.logger(log_type).info('暂无需要获取地址的视频信息')
- time.sleep(30)
- break
- else:
- cls.write_url(log_type)
- except Exception as e:
- Common.logger(log_type).error('run_get_url异常:{}\n', e)
- if __name__ == '__main__':
- while True:
- ShipinhaoWindows.run_get_url('recommend')
- Common.del_logs('recommend')
- time.sleep(1)
- pass
|