123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157 |
- """
- 公共方法,包含:生成log / 删除log / 下载方法 / 读取文件 / 统计下载数
- """
- from datetime import date, timedelta
- from loguru import logger
- import datetime
- import os
- import time
- import requests
- import urllib3
- proxies = {"http": None, "https": None}
- class Common:
-
- now = datetime.datetime.now()
-
- yesterday = (date.today() + timedelta(days=-1)).strftime("%Y/%m/%d")
-
- today = date.today()
-
- tomorrow = (date.today() + timedelta(days=1)).strftime("%Y/%m/%d")
-
- @staticmethod
- def logger(log_type):
- """
- 使用 logger 模块生成日志
- """
-
- log_dir = r"./logs/"
- log_path = os.getcwd() + os.sep + log_dir
- if not os.path.isdir(log_path):
- os.makedirs(log_path)
-
- log_name = time.strftime("%Y-%m-%d", time.localtime(time.time())) + '-kdjsfq-'+str(log_type)+'.log'
-
- logger.remove(handler_id=None)
-
-
-
-
-
- logger.add(log_dir + log_name, level="INFO", rotation='00:00')
- return logger
-
- @classmethod
- def del_logs(cls, log_type):
- """
- 清除冗余日志文件
- :return: 保留最近 10 个日志
- """
- log_dir = "./logs/"
- all_files = sorted(os.listdir(log_dir))
- all_logs = []
- for log in all_files:
- name = os.path.splitext(log)[-1]
- if name == ".log":
- all_logs.append(log)
- if len(all_logs) <= 10:
- pass
- else:
- for file in all_logs[:len(all_logs) - 10]:
- os.remove(log_dir + file)
- cls.logger(log_type).info("清除日志成功")
-
- @classmethod
- def del_charles_files(cls, log_type):
-
- all_file = sorted(os.listdir("./chlsfiles/"))
- for file in all_file[0:-3]:
- os.remove(r"./chlsfiles/" + file)
- cls.logger(log_type).info("删除 charles 缓存文件成功")
-
- @classmethod
- def download_method(cls, log_type, text, d_name, d_url):
- """
- 下载封面:text == "cover" ; 下载视频:text == "video"
- 需要下载的视频标题:d_title
- 视频封面,或视频播放地址:d_url
- 下载保存路径:"./files/{d_title}/"
- """
- videos_dir = r"./videos/"
- if not os.path.exists(videos_dir):
- os.mkdir(videos_dir)
-
- video_dir = "./videos/" + str(d_name) + "/"
- if not os.path.exists(video_dir):
- os.mkdir(video_dir)
-
- if text == "video":
-
- video_url = str(d_url)
-
- video_name = "video.mp4"
-
- urllib3.disable_warnings()
- response = requests.get(video_url, stream=True, proxies=proxies, verify=False)
- try:
- with open(video_dir + video_name, "wb") as f:
- for chunk in response.iter_content(chunk_size=10240):
- f.write(chunk)
- cls.logger(log_type).info("==========视频下载完成==========")
- except Exception as e:
- cls.logger(log_type).error(f"视频下载失败:{e}\n")
-
- elif text == "audio":
-
- audio_url = str(d_url).replace('http://', 'https://')
-
- audio_name = "audio1.mp4"
-
- urllib3.disable_warnings()
- response = requests.get(audio_url, stream=True, proxies=proxies, verify=False)
- try:
- with open(video_dir + audio_name, "wb") as f:
- for chunk in response.iter_content(chunk_size=10240):
- f.write(chunk)
- cls.logger(log_type).info("==========音频下载完成==========")
- except Exception as e:
- cls.logger(log_type).error(f"音频下载失败:{e}\n")
-
- elif text == "cover":
-
- cover_url = str(d_url)
-
- cover_name = "image.jpg"
-
- urllib3.disable_warnings()
- response = requests.get(cover_url, proxies=proxies, verify=False)
- try:
- with open(video_dir + cover_name, "wb") as f:
- f.write(response.content)
- cls.logger(log_type).info("==========封面下载完成==========")
- except Exception as e:
- cls.logger(log_type).error(f"封面下载失败:{e}\n")
- if __name__ == "__main__":
- common = Common()
|