123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222 |
- # -*- coding: utf-8 -*-
- # @Author: wangkun
- # @Time: 2022/6/27
- """
- 公共方法,包含:生成log / 删除log / 获取session / 下载方法 / 读取文件 / 统计下载数
- """
- import json
- from datetime import date, timedelta
- import datetime
- import os
- import time
- import requests
- import urllib3
- from loguru import logger
- proxies = {"http": None, "https": None}
- class Common:
- # 统一获取当前时间 <class 'datetime.datetime'> 2022-04-14 20:13:51.244472
- now = datetime.datetime.now()
- # 昨天 <class 'str'> 2022-04-13
- yesterday = (date.today() + timedelta(days=-1)).strftime("%Y-%m-%d")
- # 今天 <class 'datetime.date'> 2022-04-14
- today = date.today()
- # 明天 <class 'str'> 2022-04-15
- tomorrow = (date.today() + timedelta(days=1)).strftime("%Y-%m-%d")
- # 使用 logger 模块生成日志
- @staticmethod
- def logger(log_type):
- """
- 使用 logger 模块生成日志
- """
- # 日志路径
- log_dir = "./crawler_monitor/logs/"
- log_path = os.getcwd() + os.sep + log_dir
- if not os.path.isdir(log_path):
- os.makedirs(log_path)
- # 日志文件名
- if log_type == "kanyikan":
- log_name = time.strftime("%Y-%m-%d", time.localtime(time.time())) + '-monitor-kanyikan.log'
- elif log_type == "xiaoniangao":
- log_name = time.strftime("%Y-%m-%d", time.localtime(time.time())) + '-monitor-xiaoniangao.log'
- else:
- log_name = time.strftime("%Y-%m-%d", time.localtime(time.time())) + '.log'
- # 日志不打印到控制台
- logger.remove(handler_id=None)
- # rotation="500 MB",实现每 500MB 存储一个文件
- # rotation="12:00",实现每天 12:00 创建一个文件
- # rotation="1 week",每周创建一个文件
- # retention="10 days",每隔10天之后就会清理旧的日志
- # 初始化日志
- logger.add(log_dir + log_name, level="INFO", rotation='00:00')
- return logger
- @classmethod
- def del_logs(cls, log_type):
- """
- 清除冗余日志文件
- :return: 保留最近 6 个日志
- """
- log_dir = "./crawler_monitor/logs/"
- all_files = sorted(os.listdir(log_dir))
- all_logs = []
- for log in all_files:
- name = os.path.splitext(log)[-1]
- if name == ".log":
- all_logs.append(log)
- if len(all_logs) <= 6:
- pass
- else:
- for file in all_logs[:len(all_logs) - 7]:
- os.remove(log_dir + file)
- cls.logger(log_type).info("清除冗余日志成功")
- # 删除 charles 缓存文件,只保留最近的两个文件
- @classmethod
- def del_charles_files(cls):
- # 目标文件夹下所有文件
- all_file = sorted(os.listdir("./crawler-kanyikan-recommend/chlsfiles/"))
- for file in all_file[0:-2]:
- os.remove("./crawler-kanyikan-recommend/chlsfiles/" + file)
- cls.logger("kanyikan").info("删除 charles 缓存文件成功")
- @classmethod
- def download_method(cls, log_type, text, d_name, d_url):
- """
- 下载封面:text == "cover" ; 下载视频:text == "video"
- 需要下载的视频标题:d_title
- 视频封面,或视频播放地址:d_url
- 下载保存路径:"./videos/{d_title}/"
- """
- # 首先创建一个保存该视频相关信息的文件夹
- video_dir = "./videos/" + d_name + "/"
- if not os.path.exists(video_dir):
- os.mkdir(video_dir)
- # 下载视频
- if text == "video":
- # 需要下载的视频地址
- video_url = d_url
- # 视频名
- video_name = "video.mp4"
- # 下载视频
- urllib3.disable_warnings()
- response = requests.get(video_url, stream=True, proxies=proxies, verify=False)
- try:
- with open(video_dir + video_name, "wb") as f:
- for chunk in response.iter_content(chunk_size=10240):
- f.write(chunk)
- cls.logger(log_type).info("==========视频下载完成==========")
- except Exception as e:
- cls.logger(log_type).exception("视频下载失败:{}", e)
- # 下载封面
- elif text == "cover":
- # 需要下载的封面地址
- cover_url = d_url
- # 封面名
- cover_name = "image.jpg"
- # 下载封面
- urllib3.disable_warnings()
- response = requests.get(cover_url, proxies=proxies, verify=False)
- try:
- with open(video_dir + cover_name, "wb") as f:
- f.write(response.content)
- cls.logger(log_type).info("==========封面下载完成==========")
- except Exception as e:
- cls.logger(log_type).exception("封面下载失败:{}", e)
- @classmethod
- def get_session(cls):
- # charles 抓包文件保存目录
- charles_file_dir = "./crawler-kanyikan-recommend/chlsfiles/"
- if int(len(os.listdir(charles_file_dir))) == 1:
- Common.logger("kanyikan").info("未找到chlsfile文件,等待60s")
- time.sleep(60)
- else:
- try:
- # 目标文件夹下所有文件
- all_file = sorted(os.listdir(charles_file_dir))
- # 获取到目标文件
- old_file = all_file[-3]
- # 分离文件名与扩展名
- new_file = os.path.splitext(old_file)
- # 重命名文件后缀
- os.rename(os.path.join(charles_file_dir, old_file),
- os.path.join(charles_file_dir, new_file[0] + ".txt"))
- with open(charles_file_dir + new_file[0] + ".txt", encoding='utf-8-sig', errors='ignore') as f:
- contents = json.load(f, strict=False)
- if "search.weixin.qq.com" in [text['host'] for text in contents]:
- for text in contents:
- if text["host"] == "search.weixin.qq.com" \
- and text["path"] == "/cgi-bin/recwxa/recwxagetunreadmessagecnt":
- sessions = text["query"].split("session=")[-1].split("&wxaVersion=")[0]
- if "&vid" in sessions:
- session = sessions.split("&vid")[0]
- return session
- elif "&offset" in sessions:
- session = sessions.split("&offset")[0]
- return session
- elif "&wxaVersion" in sessions:
- session = sessions.split("&wxaVersion")[0]
- return session
- elif "&limit" in sessions:
- session = sessions.split("&limit")[0]
- return session
- elif "&scene" in sessions:
- session = sessions.split("&scene")[0]
- return session
- elif "&count" in sessions:
- session = sessions.split("&count")[0]
- return session
- elif "&channelid" in sessions:
- session = sessions.split("&channelid")[0]
- return session
- elif "&subscene" in sessions:
- session = sessions.split("&subscene")[0]
- return session
- elif "&clientVersion" in sessions:
- session = sessions.split("&clientVersion")[0]
- return session
- elif "&sharesearchid" in sessions:
- session = sessions.split("&sharesearchid")[0]
- return session
- elif "&nettype" in sessions:
- session = sessions.split("&nettype")[0]
- return session
- elif "&switchprofile" in sessions:
- session = sessions.split("&switchprofile")[0]
- return session
- elif "&switchnewuser" in sessions:
- session = sessions.split("&switchnewuser")[0]
- return session
- else:
- return sessions
- else:
- cls.logger("kanyikan").info("未找到 session,10s后重新获取")
- time.sleep(10)
- cls.get_session()
- except Exception as e:
- cls.logger("kanyikan").exception("获取 session 异常,30s后重试:{}", e)
- time.sleep(30)
- cls.get_session()
- if __name__ == "__main__":
- common = Common()
|