# -*- coding: utf-8 -*- # @Author: wangkun # @Time: 2022/6/27 """ 公共方法,包含:生成log / 删除log / 获取session / 下载方法 / 读取文件 / 统计下载数 """ import json from datetime import date, timedelta import datetime import os import time import requests import urllib3 from loguru import logger proxies = {"http": None, "https": None} class Common: # 统一获取当前时间 2022-04-14 20:13:51.244472 now = datetime.datetime.now() # 昨天 2022-04-13 yesterday = (date.today() + timedelta(days=-1)).strftime("%Y-%m-%d") # 今天 2022-04-14 today = date.today() # 明天 2022-04-15 tomorrow = (date.today() + timedelta(days=1)).strftime("%Y-%m-%d") # 使用 logger 模块生成日志 @staticmethod def logger(log_type): """ 使用 logger 模块生成日志 """ # 日志路径 log_dir = "./crawler_monitor/logs/" log_path = os.getcwd() + os.sep + log_dir if not os.path.isdir(log_path): os.makedirs(log_path) # 日志文件名 if log_type == "kanyikan": log_name = time.strftime("%Y-%m-%d", time.localtime(time.time())) + '-monitor-kanyikan.log' elif log_type == "xiaoniangao": log_name = time.strftime("%Y-%m-%d", time.localtime(time.time())) + '-monitor-xiaoniangao.log' else: log_name = time.strftime("%Y-%m-%d", time.localtime(time.time())) + '.log' # 日志不打印到控制台 logger.remove(handler_id=None) # rotation="500 MB",实现每 500MB 存储一个文件 # rotation="12:00",实现每天 12:00 创建一个文件 # rotation="1 week",每周创建一个文件 # retention="10 days",每隔10天之后就会清理旧的日志 # 初始化日志 logger.add(log_dir + log_name, level="INFO", rotation='00:00') return logger @classmethod def del_logs(cls, log_type): """ 清除冗余日志文件 :return: 保留最近 6 个日志 """ log_dir = "./crawler_monitor/logs/" all_files = sorted(os.listdir(log_dir)) all_logs = [] for log in all_files: name = os.path.splitext(log)[-1] if name == ".log": all_logs.append(log) if len(all_logs) <= 6: pass else: for file in all_logs[:len(all_logs) - 7]: os.remove(log_dir + file) cls.logger(log_type).info("清除冗余日志成功") # 删除 charles 缓存文件,只保留最近的两个文件 @classmethod def del_charles_files(cls): # 目标文件夹下所有文件 all_file = sorted(os.listdir("./crawler-kanyikan-recommend/chlsfiles/")) for file in all_file[0:-2]: os.remove("./crawler-kanyikan-recommend/chlsfiles/" + file) cls.logger("kanyikan").info("删除 charles 缓存文件成功") @classmethod def download_method(cls, log_type, text, d_name, d_url): """ 下载封面:text == "cover" ; 下载视频:text == "video" 需要下载的视频标题:d_title 视频封面,或视频播放地址:d_url 下载保存路径:"./videos/{d_title}/" """ # 首先创建一个保存该视频相关信息的文件夹 video_dir = "./videos/" + d_name + "/" if not os.path.exists(video_dir): os.mkdir(video_dir) # 下载视频 if text == "video": # 需要下载的视频地址 video_url = d_url # 视频名 video_name = "video.mp4" # 下载视频 urllib3.disable_warnings() response = requests.get(video_url, stream=True, proxies=proxies, verify=False) try: with open(video_dir + video_name, "wb") as f: for chunk in response.iter_content(chunk_size=10240): f.write(chunk) cls.logger(log_type).info("==========视频下载完成==========") except Exception as e: cls.logger(log_type).exception("视频下载失败:{}", e) # 下载封面 elif text == "cover": # 需要下载的封面地址 cover_url = d_url # 封面名 cover_name = "image.jpg" # 下载封面 urllib3.disable_warnings() response = requests.get(cover_url, proxies=proxies, verify=False) try: with open(video_dir + cover_name, "wb") as f: f.write(response.content) cls.logger(log_type).info("==========封面下载完成==========") except Exception as e: cls.logger(log_type).exception("封面下载失败:{}", e) @classmethod def get_session(cls): # charles 抓包文件保存目录 charles_file_dir = "./crawler-kanyikan-recommend/chlsfiles/" if int(len(os.listdir(charles_file_dir))) == 1: Common.logger("kanyikan").info("未找到chlsfile文件,等待60s") time.sleep(60) else: try: # 目标文件夹下所有文件 all_file = sorted(os.listdir(charles_file_dir)) # 获取到目标文件 old_file = all_file[-3] # 分离文件名与扩展名 new_file = os.path.splitext(old_file) # 重命名文件后缀 os.rename(os.path.join(charles_file_dir, old_file), os.path.join(charles_file_dir, new_file[0] + ".txt")) with open(charles_file_dir + new_file[0] + ".txt", encoding='utf-8-sig', errors='ignore') as f: contents = json.load(f, strict=False) if "search.weixin.qq.com" in [text['host'] for text in contents]: for text in contents: if text["host"] == "search.weixin.qq.com" \ and text["path"] == "/cgi-bin/recwxa/recwxagetunreadmessagecnt": sessions = text["query"].split("session=")[-1].split("&wxaVersion=")[0] if "&vid" in sessions: session = sessions.split("&vid")[0] return session elif "&offset" in sessions: session = sessions.split("&offset")[0] return session elif "&wxaVersion" in sessions: session = sessions.split("&wxaVersion")[0] return session elif "&limit" in sessions: session = sessions.split("&limit")[0] return session elif "&scene" in sessions: session = sessions.split("&scene")[0] return session elif "&count" in sessions: session = sessions.split("&count")[0] return session elif "&channelid" in sessions: session = sessions.split("&channelid")[0] return session elif "&subscene" in sessions: session = sessions.split("&subscene")[0] return session elif "&clientVersion" in sessions: session = sessions.split("&clientVersion")[0] return session elif "&sharesearchid" in sessions: session = sessions.split("&sharesearchid")[0] return session elif "&nettype" in sessions: session = sessions.split("&nettype")[0] return session elif "&switchprofile" in sessions: session = sessions.split("&switchprofile")[0] return session elif "&switchnewuser" in sessions: session = sessions.split("&switchnewuser")[0] return session else: return sessions else: cls.logger("kanyikan").info("未找到 session,10s后重新获取") time.sleep(10) cls.get_session() except Exception as e: cls.logger("kanyikan").exception("获取 session 异常,30s后重试:{}", e) time.sleep(30) cls.get_session() if __name__ == "__main__": common = Common()