# -*- coding: utf-8 -*- # @Author: wangkun # @Time: 2022/3/30 """ 公共方法,包含:生成log / 删除log / 下载方法 / 读取文件 / 统计下载数 """ from datetime import date, timedelta import datetime import logging import os import time import requests import urllib3 class Common: # 统一获取当前时间 2022-04-14 20:13:51.244472 now = datetime.datetime.now() # 昨天 2022-04-13 yesterday = (date.today() + timedelta(days=-1)).strftime("%Y-%m-%d") # 今天 2022-04-14 today = date.today() # 明天 2022-04-15 tomorrow = (date.today() + timedelta(days=1)).strftime("%Y-%m-%d") @staticmethod def crawler_log(): """ 生成 log 日志 """ # 日志路径 log_dir = "./logs/" log_path = os.getcwd() + os.sep + log_dir if not os.path.isdir(log_path): os.makedirs(log_path) # 日志参数 log_format = "%(asctime)s - %(name)s - %(levelname)s - %(message)s" date_format = "%Y-%m-%d %p %H:%M:%S" log_name = time.strftime("%Y-%m-%d", time.localtime(time.time())) + '.log' # 日志初始化 logging.basicConfig(filename=log_path + log_name, level=logging.INFO, format=log_format, datefmt=date_format) crawler_logger = logging.getLogger("crawler-log") return crawler_logger @classmethod def del_logs(cls): """ 清除冗余日志文件 :return: 保留最近 7 个日志 """ log_dir = "./logs/" all_files = sorted(os.listdir(log_dir)) all_logs = [] for log in all_files: name = os.path.splitext(log)[-1] if name == ".log": all_logs.append(log) if len(all_logs) <= 7: pass else: for file in all_logs[:len(all_logs) - 7]: os.remove(log_dir + file) cls.crawler_log().info("清除冗余日志成功") @classmethod def download_method(cls, text, d_name, d_url): """ 下载封面:text == "cover" ; 下载视频:text == "video" 需要下载的视频标题:d_title 视频封面,或视频播放地址:d_url 下载保存路径:"./files/{d_title}/" """ # 首先创建一个保存该视频相关信息的文件夹 video_dir = "./videos/" + d_name + "/" if not os.path.exists(video_dir): os.mkdir(video_dir) # 下载视频 if text == "video": # 需要下载的视频地址 video_url = d_url # 视频名 video_name = "video.mp4" # 下载视频 urllib3.disable_warnings() response = requests.get(video_url, stream=True, verify=False) try: with open(video_dir + video_name, "wb") as f: for chunk in response.iter_content(chunk_size=10240): f.write(chunk) cls.crawler_log().info("==========视频下载完成==========") except Exception as e: cls.crawler_log().info("视频下载失败:{}".format(e)) # except FileNotFoundError: # cls.kuaishou_log().info("==========视频下载失败==========") # 下载封面 elif text == "cover": # 需要下载的封面地址 cover_url = d_url # 封面名 cover_name = "image.jpg" # 下载封面 urllib3.disable_warnings() response = requests.get(cover_url, verify=False) try: with open(video_dir + cover_name, "wb") as f: f.write(response.content) cls.crawler_log().info("==========封面下载完成==========") except Exception as e: cls.crawler_log().info("封面下载失败:{}".format(e)) # except FileNotFoundError: # cls.kuaishou_log().info("==========封面下载失败==========") @staticmethod def read_txt(t_name): """ 读取 txt 文件 :param t_name: 文件名 :return: 文件内容 """ with open("./txt/" + t_name, "r", encoding="utf8") as f: return f.readlines() @classmethod def kuaishou_download_count(cls): videoid_path = "./txt/kuaishou_videoid.txt" count = 0 for count, line in enumerate(open(videoid_path, "rb").readlines()): count += 1 cls.crawler_log().info('累计下载视频数: {}\n'.format(count)) @classmethod def weishi_download_count(cls): videoid_path = "./txt/weishi_videoid.txt" count = 0 for count, line in enumerate(open(videoid_path, "rb").readlines()): count += 1 cls.crawler_log().info('累计下载视频数: {}\n'.format(count)) @classmethod def kuaishou_today_download_count(cls): """ 统计快手渠道当日下载视频数 :return: """ # 创建空文件 with open("./txt/" + str(cls.today) + "_kuaishou_videoid.txt", "a") as f: f.write("") videoid_path = "./txt/" + str(cls.today) + "_kuaishou_videoid.txt" count = 0 for count, line in enumerate(open(videoid_path, "rb").readlines()): count += 1 return count @classmethod def del_yesterday_kuaishou_videoid_txt(cls): """ 删除快手渠道昨日下载视频数的 txt 文件 :return: """ yesterday_kuaishou_videoid_txt_dir = "./txt/" all_files = sorted(os.listdir(yesterday_kuaishou_videoid_txt_dir)) for file in all_files: name = os.path.splitext(file)[0] if name == cls.yesterday + "_kuaishou_videoid": os.remove(yesterday_kuaishou_videoid_txt_dir + file) Common.crawler_log().info("删除快手昨天下载统计文件成功") if __name__ == "__main__": common = Common() common.del_yesterday_kuaishou_videoid_txt() print(common.kuaishou_today_download_count())