# -*- coding: utf-8 -*- # @Author: wangkun # @Time: 2023/1/31 """ 公共方法,包含:生成log / 删除log / 下载方法 / 删除 weixinzhishu_chlsfiles / 过滤词库 / 保存视频信息至本地 txt / 翻译 / ffmpeg """ from datetime import date, timedelta from loguru import logger import datetime import os import time import requests import ffmpeg import urllib3 proxies = {"http": None, "https": None} class Common: # 统一获取当前时间 2022-04-14 20:13:51.244472 now = datetime.datetime.now() # 昨天 2022-04-13 yesterday = (date.today() + timedelta(days=-1)).strftime("%Y/%m/%d") # 今天 2022-04-14 today = date.today() # 明天 2022-04-15 tomorrow = (date.today() + timedelta(days=1)).strftime("%Y/%m/%d") # 使用 logger 模块生成日志 @staticmethod def logger(log_type, crawler): """ 使用 logger 模块生成日志 """ # 日志路径 log_dir = f"./{crawler}/logs/" log_path = os.getcwd() + os.sep + log_dir if not os.path.isdir(log_path): os.makedirs(log_path) # 日志文件名 log_name = time.strftime("%Y-%m-%d", time.localtime(time.time())) + f'-{crawler}-{log_type}.log' # 日志不打印到控制台 logger.remove(handler_id=None) # rotation="500 MB",实现每 500MB 存储一个文件 # rotation="12:00",实现每天 12:00 创建一个文件 # rotation="1 week",每周创建一个文件 # retention="10 days",每隔10天之后就会清理旧的日志 # 初始化日志 logger.add(log_dir + log_name, level="INFO", rotation='00:00') return logger # 清除日志,保留最近 10 个文件 @classmethod def del_logs(cls, log_type, crawler): """ 清除冗余日志文件 :return: 保留最近 10 个日志 """ log_dir = f"./{crawler}/logs/" all_files = sorted(os.listdir(log_dir)) all_logs = [] for log in all_files: name = os.path.splitext(log)[-1] if name == ".log": all_logs.append(log) if len(all_logs) <= 10: pass else: for file in all_logs[:len(all_logs) - 10]: os.remove(log_dir + file) cls.logger(log_type, crawler).info("清除日志成功\n") # 删除 charles 缓存文件,只保留最近的两个文件 @classmethod def del_charles_files(cls, log_type, crawler): # 目标文件夹下所有文件 all_file = sorted(os.listdir(f"./{crawler}/{crawler}_chlsfiles/")) for file in all_file[0:-3]: os.remove(f"./{crawler}/{crawler}_chlsfiles/{file}") cls.logger(log_type, crawler).info("删除 charles 缓存文件成功\n") # 保存视频信息至 "./videos/{video_dict['video_title}/info.txt" @classmethod def save_video_info(cls, log_type, crawler, video_dict): with open(f"./{crawler}/videos/{video_dict['video_title']}/info.txt", "a", encoding="UTF-8") as f_a: f_a.write(str(video_dict['video_id']) + "\n" + str(video_dict['video_title']) + "\n" + str(video_dict['duration']) + "\n" + str(video_dict['play_cnt']) + "\n" + str(video_dict['comment_cnt']) + "\n" + str(video_dict['like_cnt']) + "\n" + str(video_dict['share_cnt']) + "\n" + f"{video_dict['video_width']}*{video_dict['video_height']}" + "\n" + str(video_dict['publish_time_stamp']) + "\n" + str(video_dict['user_name']) + "\n" + str(video_dict['avatar_url']) + "\n" + str(video_dict['video_url']) + "\n" + str(video_dict['cover_url']) + "\n" + str(video_dict['session'])) Common.logger(log_type, crawler).info("==========视频信息已保存至info.txt==========") # 封装下载视频或封面的方法 @classmethod def download_method(cls, log_type, crawler, text, title, url): """ 下载封面:text == "cover" ; 下载视频:text == "video" 需要下载的视频标题:d_title 视频封面,或视频播放地址:d_url 下载保存路径:"./files/{d_title}/" """ videos_dir = f"./{crawler}/videos/" if not os.path.exists(videos_dir): os.mkdir(videos_dir) # 首先创建一个保存该视频相关信息的文件夹 video_dir = f"./{crawler}/videos/{title}/" if not os.path.exists(video_dir): os.mkdir(video_dir) # 下载视频 if text == "video": # 需要下载的视频地址 video_url = str(url).replace('http://', 'https://') # 视频名 video_name = "video.mp4" # 下载视频 urllib3.disable_warnings() response = requests.get(video_url, stream=True, proxies=proxies, verify=False) try: with open(video_dir + video_name, "wb") as f: for chunk in response.iter_content(chunk_size=10240): f.write(chunk) cls.logger(log_type, crawler).info("==========视频下载完成==========") except Exception as e: cls.logger(log_type, crawler).error(f"视频下载失败:{e}\n") elif text == "youtube_video": # 需要下载的视频地址 video_url = url # 视频名 video_name = "video.mp4" try: download_cmd = f'yt-dlp -f "bv[height=720][ext=mp4]+ba[ext=m4a]" --merge-output-format mp4 {video_url}-U -o {video_name}' # 'yt-dlp -f "bv[height=720][ext=mp4]+ba[ext=m4a]" --merge-output-format mp4 https://www.youtube.com/watch?v=Q4MtXQY0aHM-U -o video.mp4' Common.logger(log_type, crawler).info(f"download_cmd:{download_cmd}") os.system(download_cmd) move_cmd = f"mv {video_name} {video_dir}" os.system(move_cmd) cls.logger(log_type, crawler).info("==========视频下载完成==========") except Exception as e: Common.logger(log_type, crawler).error(f"视频下载失败:{e}\n") # 下载音频 elif text == "audio": # 需要下载的视频地址 audio_url = str(url).replace('http://', 'https://') # 音频名 audio_name = "audio.mp4" # 下载视频 urllib3.disable_warnings() response = requests.get(audio_url, stream=True, proxies=proxies, verify=False) try: with open(video_dir + audio_name, "wb") as f: for chunk in response.iter_content(chunk_size=10240): f.write(chunk) cls.logger(log_type, crawler).info("==========音频下载完成==========") except Exception as e: cls.logger(log_type, crawler).error(f"音频下载失败:{e}\n") # 下载封面 elif text == "cover": # 需要下载的封面地址 cover_url = str(url) # 封面名 cover_name = "image.jpg" # 下载封面 urllib3.disable_warnings() response = requests.get(cover_url, proxies=proxies, verify=False) try: with open(video_dir + cover_name, "wb") as f: f.write(response.content) cls.logger(log_type, crawler).info("==========封面下载完成==========") except Exception as e: cls.logger(log_type, crawler).error(f"封面下载失败:{e}\n") @classmethod def ffmpeg(cls, log_type, crawler, video_path): probe = ffmpeg.probe(video_path) video_stream = next((stream for stream in probe['streams'] if stream['codec_type'] == 'video'), None) if video_stream is None: Common.logger(log_type, crawler).info('No video Stream found!') return format1 = probe['format'] size = int(int(format1['size']) / 1024 / 1024) width = int(video_stream['width']) height = int(video_stream['height']) duration = int(float(video_stream['duration'])) ffmpeg_dict = { 'width': width, 'height': height, 'duration': duration, 'size': size } return ffmpeg_dict if __name__ == "__main__": pass