# -*- coding: utf-8 -*- # @Author: wangkun # @Time: 2023/1/31 """ 公共方法,包含:生成log / 删除log / 下载方法 / 删除 weixinzhishu_chlsfiles / 过滤词库 / 保存视频信息至本地 txt / 翻译 / ffmpeg """ from aliyun.log import LogClient, PutLogsRequest, LogItem from datetime import date, timedelta from datetime import datetime from loguru import logger from hashlib import md5 # import datetime import os import json import time import requests import ffmpeg import urllib3 import subprocess proxies = {"http": None, "https": None} class Common: # 统一获取当前时间 2022-04-14 20:13:51.244472 now = datetime.now() # 昨天 2022-04-13 yesterday = (date.today() + timedelta(days=-1)).strftime("%Y-%m-%d") # 今天 2022-04-14 today = date.today() # 明天 2022-04-15 tomorrow = (date.today() + timedelta(days=1)).strftime("%Y-%m-%d") # 使用 logger 模块生成日志 @staticmethod def logger(log_type, crawler): """ 使用 logger 模块生成日志 """ # 日志路径 log_dir = f"./{crawler}/logs/" log_path = os.getcwd() + os.sep + log_dir if not os.path.isdir(log_path): os.makedirs(log_path) # 日志文件名 # log_name = time.strftime("%Y-%m-%d", time.localtime(time.time())) + f'-{crawler}-{log_type}.log' # log_name = datetime.datetime.now().strftime('%Y-%m-%d') + f'-{crawler}-{log_type}.log' # log_name = f"{date.today():%Y-%m-%d}-{crawler}-{log_type}.log" log_name = f"{crawler}-{log_type}-{datetime.now().date().strftime('%Y-%m-%d')}.log" # 日志不打印到控制台 logger.remove(handler_id=None) # rotation="500 MB",实现每 500MB 存储一个文件 # rotation="12:00",实现每天 12:00 创建一个文件 # rotation="1 week",每周创建一个文件 # retention="10 days",每隔10天之后就会清理旧的日志 # 初始化日志 # logger.add(f"{log_dir}{log_name}", level="INFO", rotation="00:00", retention="10 days", enqueue=True) logger.add(os.path.join(log_dir, log_name), level="INFO", rotation="00:00", retention="10 days", enqueue=True) return logger # 写入阿里云日志 @staticmethod def logging(log_type, crawler, env, message): """ 写入阿里云日志 测试库: https://sls.console.aliyun.com/lognext/project/crawler-log-dev/logsearch/crawler-log-dev 正式库: https://sls.console.aliyun.com/lognext/project/crawler-log-prod/logsearch/crawler-log-prod :param log_type: 爬虫策略 :param crawler: 哪款爬虫 :param env: 环境 :param message:日志内容 :return: None """ # 设置阿里云日志服务的访问信息 accessKeyId = 'LTAIWYUujJAm7CbH' accessKey = 'RfSjdiWwED1sGFlsjXv0DlfTnZTG1P' if env == "dev": project = 'crawler-log-dev' logstore = 'crawler-log-dev' endpoint = 'cn-hangzhou.log.aliyuncs.com' elif crawler == "xigua" and log_type == "recommend": project = 'crawler-log-prod' logstore = 'crawler-log-prod' endpoint = 'cn-hangzhou.log.aliyuncs.com' elif crawler == "shipinhao"\ or crawler == "kanyikan"\ or crawler == "ganggangdouchuan"\ or crawler == "zhiqingtiantiankan"\ or crawler == "jixiangxingfu"\ or crawler == "zhufuquanzi" \ or crawler == "xiaoniangaoplus" \ or crawler == "zhongmiaoyinxin" \ or crawler == "huanhuanxixizhufudao": project = 'crawler-log-prod' logstore = 'crawler-log-prod' endpoint = 'cn-hangzhou.log.aliyuncs.com' else: project = 'crawler-log-prod' logstore = 'crawler-log-prod' endpoint = 'cn-hangzhou-intranet.log.aliyuncs.com' # 创建 LogClient 实例 client = LogClient(endpoint, accessKeyId, accessKey) if '\r' in message: message = message.replace('\r', ' ') if '\n' in message: message = message.replace('\n', ' ') log_group = [] log_item = LogItem() """ 生成日志消息体格式,例如 crawler:xigua message:不满足抓取规则 mode:search timestamp:1686656143 """ contents = [(f"crawler", str(crawler)), (f"mode", str(log_type)), (f"message", str(message)), ("timestamp", str(int(time.time())))] log_item.set_contents(contents) log_group.append(log_item) # 写入日志 request = PutLogsRequest(project=project, logstore=logstore, topic="", source="", logitems=log_group, compress=False) client.put_logs(request) # 清除日志,保留最近 10 个文件 @classmethod def del_logs(cls, log_type, crawler): """ 清除冗余日志文件 :return: 保留最近 10 个日志 """ log_dir = f"./{crawler}/logs/" all_files = sorted(os.listdir(log_dir)) all_logs = [] for log in all_files: name = os.path.splitext(log)[-1] if name == ".log": all_logs.append(log) if len(all_logs) <= 30: pass else: for file in all_logs[:len(all_logs) - 30]: os.remove(log_dir + file) cls.logger(log_type, crawler).info("清除日志成功\n") @classmethod def get_session(cls, log_type, crawler, env): while True: # charles 抓包文件保存目录 charles_file_dir = f"./{crawler}/chlsfiles/" if int(len(os.listdir(charles_file_dir))) == 1: Common.logger(log_type, crawler).info("未找到chlsfile文件,等待60s") cls.logging(log_type, crawler, env, "未找到chlsfile文件,等待60s") time.sleep(60) continue # 目标文件夹下所有文件 all_file = sorted(os.listdir(charles_file_dir)) # 获取到目标文件 old_file = all_file[-2] # 分离文件名与扩展名 new_file = os.path.splitext(old_file) # 重命名文件后缀 os.rename(os.path.join(charles_file_dir, old_file), os.path.join(charles_file_dir, new_file[0] + ".txt")) with open(charles_file_dir + new_file[0] + ".txt", encoding='utf-8-sig', errors='ignore') as f: contents = json.load(f, strict=False) if "search.weixin.qq.com" in [text['host'] for text in contents]: for text in contents: if text["host"] == "search.weixin.qq.com" \ and text["path"] == "/cgi-bin/recwxa/recwxagetunreadmessagecnt": sessions = text["query"].split("session=")[-1].split("&wxaVersion=")[0] if "&vid" in sessions: session = sessions.split("&vid")[0] return session elif "&offset" in sessions: session = sessions.split("&offset")[0] return session elif "&wxaVersion" in sessions: session = sessions.split("&wxaVersion")[0] return session elif "&limit" in sessions: session = sessions.split("&limit")[0] return session elif "&scene" in sessions: session = sessions.split("&scene")[0] return session elif "&count" in sessions: session = sessions.split("&count")[0] return session elif "&channelid" in sessions: session = sessions.split("&channelid")[0] return session elif "&subscene" in sessions: session = sessions.split("&subscene")[0] return session elif "&clientVersion" in sessions: session = sessions.split("&clientVersion")[0] return session elif "&sharesearchid" in sessions: session = sessions.split("&sharesearchid")[0] return session elif "&nettype" in sessions: session = sessions.split("&nettype")[0] return session elif "&switchprofile" in sessions: session = sessions.split("&switchprofile")[0] return session elif "&switchnewuser" in sessions: session = sessions.split("&switchnewuser")[0] return session else: return sessions else: cls.logger(log_type, crawler).info("未找到 session,10s后重新获取") cls.logging(log_type, crawler, env, "未找到 session,10s后重新获取") time.sleep(10) # 删除 charles 缓存文件,只保留最近的两个文件 @classmethod def del_charles_files(cls, log_type, crawler): # 目标文件夹下所有文件 all_file = sorted(os.listdir(f"./{crawler}/chlsfiles/")) for file in all_file[0:-3]: os.remove(f"./{crawler}/chlsfiles/{file}") cls.logger(log_type, crawler).info("删除 charles 缓存文件成功\n") # 保存视频信息至 "./videos/{video_dict['video_title}/info.txt" @classmethod def save_video_info(cls, log_type, crawler, video_dict): md_title = md5(video_dict['video_title'].encode('utf8')).hexdigest() save_dict = { "video_title": "video_title", "video_id": "video_id", "duration": 0, "play_cnt": 0, "comment_cnt": 0, "like_cnt": 0, "share_cnt": 0, "video_width": 1920, "video_height": 1080, "publish_time_stamp": 946656000, # 2000-01-01 00:00:00 "user_name": "crawler", "avatar_url": "http://weapppiccdn.yishihui.com/resources/images/pic_normal.png", "video_url": "video_url", "cover_url": "cover_url", "session": f"session-{int(time.time())}", } for video_key, video_value in video_dict.items(): for save_key, save_value in save_dict.items(): if save_key == video_key: save_dict[save_key] = video_value with open(f"./{crawler}/videos/{md_title}/info.txt", "w", encoding="UTF-8") as f_w: f_w.write(str(video_dict['video_id']) + "\n" + str(video_dict['video_title']) + "\n" + str(video_dict['duration']) + "\n" + str(video_dict['play_cnt']) + "\n" + str(video_dict['comment_cnt']) + "\n" + str(video_dict['like_cnt']) + "\n" + str(video_dict['share_cnt']) + "\n" + f"{video_dict['video_width']}*{video_dict['video_height']}" + "\n" + str(video_dict['publish_time_stamp']) + "\n" + str(video_dict['user_name']) + "\n" + str(video_dict['avatar_url']) + "\n" + str(video_dict['video_url']) + "\n" + str(video_dict['cover_url']) + "\n" + str(video_dict['session'])) Common.logger(log_type, crawler).info("==========视频信息已保存至info.txt==========") # 封装下载视频或封面的方法 @classmethod def download_method(cls, log_type, crawler, text, title, url): """ 下载封面:text == "cover" ; 下载视频:text == "video" 需要下载的视频标题:d_title 视频封面,或视频播放地址:d_url 下载保存路径:"./files/{d_title}/" """ videos_dir = f"./{crawler}/videos/" if not os.path.exists(videos_dir): os.mkdir(videos_dir) # 首先创建一个保存该视频相关信息的文件夹 md_title = md5(title.encode('utf8')).hexdigest() video_path = f"./{crawler}/videos/{md_title}/" if not os.path.exists(video_path): os.mkdir(video_path) # 下载视频 if text == "video": # 需要下载的视频地址 video_url = str(url).replace('http://', 'https://') # 视频名 video_name = "video.mp4" for i in range(3): try: # 下载视频,最多重试三次 urllib3.disable_warnings() # response = requests.get(video_url, stream=True, proxies=cls.tunnel_proxies(), verify=False) response = requests.get(video_url, stream=True, proxies=proxies, verify=False) with open(video_path + video_name, "wb") as f: for chunk in response.iter_content(chunk_size=10240): f.write(chunk) cls.logger(log_type, crawler).info("==========视频下载完成==========") return True except Exception as e: cls.logger(log_type, crawler).error(f"视频下载失败:{e}\n") time.sleep(1) return False # 下载音频 elif text == "audio": # 需要下载的视频地址 audio_url = str(url).replace('http://', 'https://') # 音频名 audio_name = "audio.mp4" # 下载视频 urllib3.disable_warnings() # response = requests.get(audio_url, stream=True, proxies=cls.tunnel_proxies(), verify=False) response = requests.get(audio_url, stream=True, proxies=proxies, verify=False) try: with open(video_path + audio_name, "wb") as f: for chunk in response.iter_content(chunk_size=10240): f.write(chunk) cls.logger(log_type, crawler).info("==========音频下载完成==========") except Exception as e: cls.logger(log_type, crawler).error(f"音频下载失败:{e}\n") # 下载封面 elif text == "cover": # 需要下载的封面地址 cover_url = str(url) # 封面名 cover_name = "image.jpg" # 下载封面 urllib3.disable_warnings() # response = requests.get(cover_url, proxies=cls.tunnel_proxies(), verify=False) response = requests.get(cover_url, verify=False) try: with open(video_path + cover_name, "wb") as f: f.write(response.content) cls.logger(log_type, crawler).info("==========封面下载完成==========") except Exception as e: cls.logger(log_type, crawler).error(f"封面下载失败:{e}\n") # youtube 视频下载 elif text == "youtube_video": # 需要下载的视频地址 video_url = url # 视频名 video_name = "video.mp4" try: download_cmd = f'yt-dlp -f "bv[height<=720][ext=mp4]+ba[ext=m4a]" --merge-output-format mp4 "{video_url}-U" -o {video_path}{video_name}' Common.logger(log_type, crawler).info(f"download_cmd:{download_cmd}") os.system(download_cmd) # move_cmd = f"mv {video_name} {video_path}" # os.system(move_cmd) cls.logger(log_type, crawler).info("==========视频下载完成==========") except Exception as e: Common.logger(log_type, crawler).error(f"视频下载失败:{e}\n") # 西瓜视频 / 音频下载 elif text == "xigua_video": # 需要下载的视频地址 video_url = str(url).replace('http://', 'https://') # 视频名 video_name = "video1.mp4" # 下载视频 urllib3.disable_warnings() # response = requests.get(video_url, stream=True, proxies=cls.tunnel_proxies(), verify=False) response = requests.get(video_url, stream=True, proxies=proxies, verify=False) try: with open(video_path + video_name, "wb") as f: for chunk in response.iter_content(chunk_size=10240): f.write(chunk) cls.logger(log_type, crawler).info("==========视频下载完成==========") except Exception as e: cls.logger(log_type, crawler).error(f"视频下载失败:{e}\n") elif text == "xigua_audio": # 需要下载的视频地址 audio_url = str(url).replace('http://', 'https://') # 音频名 audio_name = "audio1.mp4" # 下载视频 urllib3.disable_warnings() # response = requests.get(audio_url, stream=True, proxies=cls.tunnel_proxies(), verify=False) response = requests.get(audio_url, stream=True, proxies=proxies, verify=False) try: with open(video_path + audio_name, "wb") as f: for chunk in response.iter_content(chunk_size=10240): f.write(chunk) cls.logger(log_type, crawler).info("==========音频下载完成==========") except Exception as e: cls.logger(log_type, crawler).error(f"音频下载失败:{e}\n") @classmethod def ffmpeg(cls, log_type, crawler, video_path): Common.logger(log_type, crawler).info(f"video_path:{video_path}") video_title = video_path.replace(f"./{crawler}/videos/", "").replace("/video.mp4", "") Common.logger(log_type, crawler).info(f"video_title:{video_title}") md_title = md5(video_title.encode('utf8')).hexdigest() Common.logger(log_type, crawler).info(f"crawler:{crawler}") # if crawler == "zhiqingtiantiankan" \ # or crawler == "ganggangdouchuan"\ # or crawler == "jixiangxingfu"\ # or crawler == "zhongmiaoyinxin": # # video_path = os.path.join("C:\\", "crawler", "piaoquan_crawler", f"{crawler}", "videos", f"{md_title}", "video.mp4") # video_path = os.path.join(".\\", f"{crawler}", "videos", f"{md_title}", "video.mp4") # else: video_path = f"./{crawler}/videos/{md_title}/video.mp4" Common.logger(log_type, crawler).info(f"video_path:{video_path}") if os.path.getsize(video_path) == 0: Common.logger(log_type, crawler).info(f'video_size:{os.path.getsize(video_path)}') return probe = ffmpeg.probe(video_path) video_stream = next((stream for stream in probe['streams'] if stream['codec_type'] == 'video'), None) if video_stream is None: Common.logger(log_type, crawler).info('No video Stream found!') return format1 = probe['format'] size = int(int(format1['size']) / 1024 / 1024) width = int(video_stream['width']) height = int(video_stream['height']) duration = int(float(video_stream['duration'])) ffmpeg_dict = { 'width': width, 'height': height, 'duration': duration, 'size': size } return ffmpeg_dict # 合并音视频 @classmethod def video_compose(cls, log_type, crawler, video_dir): video_title = video_dir.replace(f"./{crawler}/videos/", "") md_title = md5(video_title.encode('utf8')).hexdigest() video_dir = f"./{crawler}/videos/{md_title}" try: video_path = f'{video_dir}/video1.mp4' audio_path = f'{video_dir}/audio1.mp4' out_path = f'{video_dir}/video.mp4' cmd = f'ffmpeg -i {video_path} -i {audio_path} -c:v copy -c:a aac -strict experimental -map 0:v:0 -map 1:a:0 {out_path}' # print(cmd) subprocess.call(cmd, shell=True) for file in os.listdir(video_dir): if file.split('.mp4')[0] == 'video1' or file.split('.mp4')[0] == 'audio1': os.remove(f'{video_dir}/{file}') Common.logger(log_type, crawler).info('合成成功\n') except Exception as e: Common.logger(log_type, crawler).error(f'video_compose异常:{e}\n') # 快代理 @classmethod def tunnel_proxies(cls): # 隧道域名:端口号 tunnel = "q796.kdltps.com:15818" # 用户名密码方式 username = "t17772369458618" password = "5zqcjkmy" tunnel_proxies = { "http": "http://%(user)s:%(pwd)s@%(proxy)s/" % {"user": username, "pwd": password, "proxy": tunnel}, "https": "http://%(user)s:%(pwd)s@%(proxy)s/" % {"user": username, "pwd": password, "proxy": tunnel} } # 白名单方式(需提前设置白名单) # proxies = { # "http": "http://%(proxy)s/" % {"proxy": tunnel}, # "https": "http://%(proxy)s/" % {"proxy": tunnel} # } # 要访问的目标网页 # target_url = "https://www.kuaishou.com/profile/3xk9tkk6kkwkf7g" # target_url = "https://dev.kdlapi.com/testproxy" # # 使用隧道域名发送请求 # response = requests.get(target_url, proxies=proxies) # print(response.text) return tunnel_proxies # {'http': 'http://t17772369458618:5zqcjkmy@q796.kdltps.com:15818/', 'https': 'http://t17772369458618:5zqcjkmy@q796.kdltps.com:15818/'} if __name__ == "__main__": # print(datetime.time(hour=0, minute=0)) # print(f'{date.today():%Y-%m-%d}') print(datetime.now().date().strftime('%Y-%m-%d')) pass