123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232 |
- # -*- coding: utf-8 -*-
- # @Author: wangkun
- # @Time: 2023/1/31
- """
- 公共方法,包含:生成log / 删除log / 下载方法 / 删除 chlsfiles / 过滤词库 / 保存视频信息至本地 txt / 翻译 / ffmpeg
- """
- from datetime import date, timedelta
- from loguru import logger
- import datetime
- import os
- import time
- import requests
- import json
- import ffmpeg
- from urllib import parse, request
- import urllib3
- proxies = {"http": None, "https": None}
- class Common:
- # 统一获取当前时间 <class 'datetime.datetime'> 2022-04-14 20:13:51.244472
- now = datetime.datetime.now()
- # 昨天 <class 'str'> 2022-04-13
- yesterday = (date.today() + timedelta(days=-1)).strftime("%Y/%m/%d")
- # 今天 <class 'datetime.date'> 2022-04-14
- today = date.today()
- # 明天 <class 'str'> 2022-04-15
- tomorrow = (date.today() + timedelta(days=1)).strftime("%Y/%m/%d")
- # 使用 logger 模块生成日志
- @staticmethod
- def logger(log_type, crawler):
- """
- 使用 logger 模块生成日志
- """
- # 日志路径
- log_dir = f"./{crawler}/logs/"
- log_path = os.getcwd() + os.sep + log_dir
- if not os.path.isdir(log_path):
- os.makedirs(log_path)
- # 日志文件名
- log_name = time.strftime("%Y-%m-%d", time.localtime(time.time())) + f'-{crawler}-{log_type}.log'
- # 日志不打印到控制台
- logger.remove(handler_id=None)
- # rotation="500 MB",实现每 500MB 存储一个文件
- # rotation="12:00",实现每天 12:00 创建一个文件
- # rotation="1 week",每周创建一个文件
- # retention="10 days",每隔10天之后就会清理旧的日志
- # 初始化日志
- logger.add(log_dir + log_name, level="INFO", rotation='00:00')
- return logger
- # 清除日志,保留最近 10 个文件
- @classmethod
- def del_logs(cls, log_type, crawler):
- """
- 清除冗余日志文件
- :return: 保留最近 10 个日志
- """
- log_dir = f"./{crawler}/logs/"
- all_files = sorted(os.listdir(log_dir))
- all_logs = []
- for log in all_files:
- name = os.path.splitext(log)[-1]
- if name == ".log":
- all_logs.append(log)
- if len(all_logs) <= 10:
- pass
- else:
- for file in all_logs[:len(all_logs) - 10]:
- os.remove(log_dir + file)
- cls.logger(log_type, crawler).info("清除日志成功")
- # 删除 charles 缓存文件,只保留最近的两个文件
- @classmethod
- def del_charles_files(cls, log_type, crawler):
- # 目标文件夹下所有文件
- all_file = sorted(os.listdir(f"./{crawler}/chlsfiles/"))
- for file in all_file[0:-3]:
- os.remove(f"./{crawler}/chlsfiles/{file}")
- cls.logger(log_type, crawler).info("删除 charles 缓存文件成功")
- # 保存视频信息至 "./videos/{video_dict['video_title}/info.txt"
- @classmethod
- def save_video_info(cls, log_type, crawler, video_dict):
- with open(f"./{crawler}/videos/{video_dict['video_title']}/info.txt",
- "a", encoding="UTF-8") as f_a:
- f_a.write(str(video_dict['video_id']) + "\n" +
- str(video_dict['video_title']) + "\n" +
- str(video_dict['duration']) + "\n" +
- str(video_dict['play_cnt']) + "\n" +
- str(video_dict['comment_cnt']) + "\n" +
- str(video_dict['like_cnt']) + "\n" +
- str(video_dict['share_cnt']) + "\n" +
- f"{video_dict['video_width']}*{video_dict['video_height']}" + "\n" +
- str(video_dict['publish_time']) + "\n" +
- str(video_dict['user_name']) + "\n" +
- str(video_dict['avatar_url']) + "\n" +
- str(video_dict['video_url']) + "\n" +
- str(video_dict['cover_url']) + "\n" +
- str(video_dict['session']))
- Common.logger(log_type, crawler).info("==========视频信息已保存至info.txt==========")
- # 封装下载视频或封面的方法
- @classmethod
- def download_method(cls, log_type, crawler, text, title, url):
- """
- 下载封面:text == "cover" ; 下载视频:text == "video"
- 需要下载的视频标题:d_title
- 视频封面,或视频播放地址:d_url
- 下载保存路径:"./files/{d_title}/"
- """
- videos_dir = f"./{crawler}/videos/"
- if not os.path.exists(videos_dir):
- os.mkdir(videos_dir)
- # 首先创建一个保存该视频相关信息的文件夹
- video_dir = f"./{crawler}/videos/{title}/"
- if not os.path.exists(video_dir):
- os.mkdir(video_dir)
- # 下载视频
- if text == "video":
- # 需要下载的视频地址
- video_url = str(url).replace('http://', 'https://')
- # 视频名
- video_name = "video.mp4"
- # 下载视频
- urllib3.disable_warnings()
- response = requests.get(video_url, stream=True, proxies=proxies, verify=False)
- try:
- with open(video_dir + video_name, "wb") as f:
- for chunk in response.iter_content(chunk_size=10240):
- f.write(chunk)
- cls.logger(log_type, crawler).info("==========视频下载完成==========")
- except Exception as e:
- cls.logger(log_type, crawler).error(f"视频下载失败:{e}\n")
- # 下载音频
- elif text == "audio":
- # 需要下载的视频地址
- audio_url = str(url).replace('http://', 'https://')
- # 音频名
- audio_name = "audio.mp4"
- # 下载视频
- urllib3.disable_warnings()
- response = requests.get(audio_url, stream=True, proxies=proxies, verify=False)
- try:
- with open(video_dir + audio_name, "wb") as f:
- for chunk in response.iter_content(chunk_size=10240):
- f.write(chunk)
- cls.logger(log_type, crawler).info("==========音频下载完成==========")
- except Exception as e:
- cls.logger(log_type, crawler).error(f"音频下载失败:{e}\n")
- # 下载封面
- elif text == "cover":
- # 需要下载的封面地址
- cover_url = str(url)
- # 封面名
- cover_name = "image.jpg"
- # 下载封面
- urllib3.disable_warnings()
- response = requests.get(cover_url, proxies=proxies, verify=False)
- try:
- with open(video_dir + cover_name, "wb") as f:
- f.write(response.content)
- cls.logger(log_type, crawler).info("==========封面下载完成==========")
- except Exception as e:
- cls.logger(log_type, crawler).error(f"封面下载失败:{e}\n")
- # 有道翻译:英文 → 中文
- @classmethod
- def fanyi(cls, query):
- req_url = 'http://fanyi.youdao.com/translate' # 创建连接接口
- # 创建要提交的数据
- Form_Date = {'i': query,
- 'doctype': 'json',
- 'form': 'AUTO',
- 'to': 'AUTO',
- # 'to': 'Chinese',
- 'smartresult': 'dict',
- 'client': 'fanyideskweb',
- 'salt': '1526995097962',
- 'sign': '8e4c4765b52229e1f3ad2e633af89c76',
- 'version': '2.1',
- 'keyform': 'fanyi.web',
- 'action': 'FY_BY_REALTIME',
- 'typoResult': 'false'}
- data = parse.urlencode(Form_Date).encode('utf-8') # 数据转换
- response = request.urlopen(req_url, data) # 提交数据并解析
- html = response.read().decode('utf-8') # 服务器返回结果读取
- # print(html)
- # 可以看出html是一个json格式
- translate_results = json.loads(html) # 以json格式载入
- translate_results = translate_results['translateResult'][0][0]['tgt'] # json格式调取
- # print(translate_results) # 输出结果
- return translate_results # 返回结果
- @classmethod
- def ffmpeg(cls, log_type, crawler, video_path):
- probe = ffmpeg.probe(video_path)
- video_stream = next((stream for stream in probe['streams'] if stream['codec_type'] == 'video'), None)
- if video_stream is None:
- Common.logger(log_type, crawler).info('No video Stream found!')
- return
- format1 = probe['format']
- size = int(format1['size']) / 1024 / 1024
- width = int(video_stream['width'])
- height = int(video_stream['height'])
- duration = int(float(video_stream['duration']))
- ffmpeg_dict = {
- 'width': width,
- 'height': height,
- 'duration': duration,
- 'size': size
- }
- return ffmpeg_dict
- if __name__ == "__main__":
- res = Common.fanyi("10 MOST UNIQUE Dance Groups EVER On Britain's Got Talent!")
- print(res)
|