123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573 |
- # -*- coding: utf-8 -*-
- # @Author: wangkun
- # @Time: 2023/1/31
- """
- 公共方法,包含:生成log / 删除log / 下载方法 / 删除 weixinzhishu_chlsfiles / 过滤词库 / 保存视频信息至本地 txt / 翻译 / ffmpeg
- """
- from aliyun.log import LogClient, PutLogsRequest, LogItem
- from datetime import date, timedelta
- from datetime import datetime
- from loguru import logger
- from hashlib import md5
- # import datetime
- import os
- import json
- import time
- import requests
- import ffmpeg
- import urllib3
- import subprocess
- proxies = {"http": None, "https": None}
- class Common:
- # 统一获取当前时间 <class 'datetime.datetime'> 2022-04-14 20:13:51.244472
- now = datetime.now()
- # 昨天 <class 'str'> 2022-04-13
- yesterday = (date.today() + timedelta(days=-1)).strftime("%Y-%m-%d")
- # 今天 <class 'datetime.date'> 2022-04-14
- today = date.today()
- # 明天 <class 'str'> 2022-04-15
- tomorrow = (date.today() + timedelta(days=1)).strftime("%Y-%m-%d")
- # 使用 logger 模块生成日志
- @staticmethod
- def logger(log_type, crawler):
- """
- 使用 logger 模块生成日志
- """
- # 日志路径
- log_dir = f"./{crawler}/logs/"
- log_path = os.getcwd() + os.sep + log_dir
- if not os.path.isdir(log_path):
- os.makedirs(log_path)
- # 日志文件名
- # log_name = time.strftime("%Y-%m-%d", time.localtime(time.time())) + f'-{crawler}-{log_type}.log'
- # log_name = datetime.datetime.now().strftime('%Y-%m-%d') + f'-{crawler}-{log_type}.log'
- # log_name = f"{date.today():%Y-%m-%d}-{crawler}-{log_type}.log"
- log_name = (
- f"{crawler}-{log_type}-{datetime.now().date().strftime('%Y-%m-%d')}.log"
- )
- # 日志不打印到控制台
- logger.remove(handler_id=None)
- # rotation="500 MB",实现每 500MB 存储一个文件
- # rotation="12:00",实现每天 12:00 创建一个文件
- # rotation="1 week",每周创建一个文件
- # retention="10 days",每隔10天之后就会清理旧的日志
- # 初始化日志
- # logger.add(f"{log_dir}{log_name}", level="INFO", rotation="00:00", retention="10 days", enqueue=True)
- logger.add(
- os.path.join(log_dir, log_name),
- level="INFO",
- rotation="00:00",
- retention="10 days",
- enqueue=True,
- )
- return logger
- # 写入阿里云日志
- @staticmethod
- def logging(log_type, crawler, env, message):
- """
- 写入阿里云日志
- 测试库: https://sls.console.aliyun.com/lognext/project/crawler-log-dev/logsearch/crawler-log-dev
- 正式库: https://sls.console.aliyun.com/lognext/project/crawler-log-prod/logsearch/crawler-log-prod
- :param log_type: 爬虫策略
- :param crawler: 哪款爬虫
- :param env: 环境
- :param message:日志内容
- :return: None
- """
- # 设置阿里云日志服务的访问信息
- accessKeyId = "LTAIWYUujJAm7CbH"
- accessKey = "RfSjdiWwED1sGFlsjXv0DlfTnZTG1P"
- if env == "dev":
- project = "crawler-log-dev"
- logstore = "crawler-log-dev"
- endpoint = "cn-hangzhou.log.aliyuncs.com"
- elif crawler == "xigua" and log_type == "recommend":
- project = "crawler-log-prod"
- logstore = "crawler-log-prod"
- endpoint = "cn-hangzhou.log.aliyuncs.com"
- elif (
- crawler == "shipinhao"
- or crawler == "kanyikan"
- or crawler == "ganggangdouchuan"
- or crawler == "zhiqingtiantiankan"
- or crawler == "jixiangxingfu"
- or crawler == "zhongmiaoyinxin"
- ):
- project = "crawler-log-prod"
- logstore = "crawler-log-prod"
- endpoint = "cn-hangzhou.log.aliyuncs.com"
- else:
- project = "crawler-log-prod"
- logstore = "crawler-log-prod"
- endpoint = "cn-hangzhou-intranet.log.aliyuncs.com"
- # 创建 LogClient 实例
- client = LogClient(endpoint, accessKeyId, accessKey)
- if "\r" in message:
- message = message.replace("\r", " ")
- if "\n" in message:
- message = message.replace("\n", " ")
- log_group = []
- log_item = LogItem()
- """
- 生成日志消息体格式,例如
- crawler:xigua
- message:不满足抓取规则
- mode:search
- timestamp:1686656143
- """
- contents = [
- (f"crawler", str(crawler)),
- (f"mode", str(log_type)),
- (f"message", str(message)),
- ("timestamp", str(int(time.time()))),
- ]
- log_item.set_contents(contents)
- log_group.append(log_item)
- # 写入日志
- request = PutLogsRequest(
- project=project,
- logstore=logstore,
- topic="",
- source="",
- logitems=log_group,
- compress=False,
- )
- client.put_logs(request)
- # 清除日志,保留最近 10 个文件
- @classmethod
- def del_logs(cls, log_type, crawler):
- """
- 清除冗余日志文件
- :return: 保留最近 10 个日志
- """
- log_dir = f"./{crawler}/logs/"
- all_files = sorted(os.listdir(log_dir))
- all_logs = []
- for log in all_files:
- name = os.path.splitext(log)[-1]
- if name == ".log":
- all_logs.append(log)
- if len(all_logs) <= 30:
- pass
- else:
- for file in all_logs[: len(all_logs) - 30]:
- os.remove(log_dir + file)
- cls.logger(log_type, crawler).info("清除日志成功\n")
- @classmethod
- def get_session(cls, log_type, crawler, env):
- while True:
- # charles 抓包文件保存目录
- charles_file_dir = f"./{crawler}/chlsfiles/"
- if int(len(os.listdir(charles_file_dir))) == 1:
- Common.logger(log_type, crawler).info("未找到chlsfile文件,等待60s")
- cls.logging(log_type, crawler, env, "未找到chlsfile文件,等待60s")
- time.sleep(60)
- continue
- # 目标文件夹下所有文件
- all_file = sorted(os.listdir(charles_file_dir))
- # 获取到目标文件
- old_file = all_file[-2]
- # 分离文件名与扩展名
- new_file = os.path.splitext(old_file)
- # 重命名文件后缀
- os.rename(
- os.path.join(charles_file_dir, old_file),
- os.path.join(charles_file_dir, new_file[0] + ".txt"),
- )
- with open(
- charles_file_dir + new_file[0] + ".txt",
- encoding="utf-8-sig",
- errors="ignore",
- ) as f:
- contents = json.load(f, strict=False)
- if "search.weixin.qq.com" in [text["host"] for text in contents]:
- for text in contents:
- if (
- text["host"] == "search.weixin.qq.com"
- and text["path"] == "/cgi-bin/recwxa/recwxagetunreadmessagecnt"
- ):
- sessions = (
- text["query"].split("session=")[-1].split("&wxaVersion=")[0]
- )
- if "&vid" in sessions:
- session = sessions.split("&vid")[0]
- return session
- elif "&offset" in sessions:
- session = sessions.split("&offset")[0]
- return session
- elif "&wxaVersion" in sessions:
- session = sessions.split("&wxaVersion")[0]
- return session
- elif "&limit" in sessions:
- session = sessions.split("&limit")[0]
- return session
- elif "&scene" in sessions:
- session = sessions.split("&scene")[0]
- return session
- elif "&count" in sessions:
- session = sessions.split("&count")[0]
- return session
- elif "&channelid" in sessions:
- session = sessions.split("&channelid")[0]
- return session
- elif "&subscene" in sessions:
- session = sessions.split("&subscene")[0]
- return session
- elif "&clientVersion" in sessions:
- session = sessions.split("&clientVersion")[0]
- return session
- elif "&sharesearchid" in sessions:
- session = sessions.split("&sharesearchid")[0]
- return session
- elif "&nettype" in sessions:
- session = sessions.split("&nettype")[0]
- return session
- elif "&switchprofile" in sessions:
- session = sessions.split("&switchprofile")[0]
- return session
- elif "&switchnewuser" in sessions:
- session = sessions.split("&switchnewuser")[0]
- return session
- else:
- return sessions
- else:
- cls.logger(log_type, crawler).info("未找到 session,10s后重新获取")
- cls.logging(log_type, crawler, env, "未找到 session,10s后重新获取")
- time.sleep(10)
- # 删除 charles 缓存文件,只保留最近的两个文件
- @classmethod
- def del_charles_files(cls, log_type, crawler):
- # 目标文件夹下所有文件
- all_file = sorted(os.listdir(f"./{crawler}/chlsfiles/"))
- for file in all_file[0:-3]:
- os.remove(f"./{crawler}/chlsfiles/{file}")
- cls.logger(log_type, crawler).info("删除 charles 缓存文件成功\n")
- # 保存视频信息至 "./videos/{video_dict['video_title}/info.txt"
- @classmethod
- def save_video_info(cls, log_type, crawler, video_dict):
- md_title = md5(video_dict["video_title"].encode("utf8")).hexdigest()
- save_dict = {
- "video_title": "video_title",
- "video_id": "video_id",
- "duration": 0,
- "play_cnt": 0,
- "comment_cnt": 0,
- "like_cnt": 0,
- "share_cnt": 0,
- "video_width": 1920,
- "video_height": 1080,
- "publish_time_stamp": 946656000, # 2000-01-01 00:00:00
- "user_name": "crawler",
- "avatar_url": "http://weapppiccdn.yishihui.com/resources/images/pic_normal.png",
- "video_url": "video_url",
- "cover_url": "cover_url",
- "session": f"session-{int(time.time())}",
- }
- for video_key, video_value in video_dict.items():
- for save_key, save_value in save_dict.items():
- if save_key == video_key:
- save_dict[save_key] = video_value
- with open(
- f"./{crawler}/videos/{md_title}/info.txt", "w", encoding="UTF-8"
- ) as f_w:
- f_w.write(
- str(video_dict["video_id"])
- + "\n"
- + str(video_dict["video_title"])
- + "\n"
- + str(video_dict["duration"])
- + "\n"
- + str(video_dict["play_cnt"])
- + "\n"
- + str(video_dict["comment_cnt"])
- + "\n"
- + str(video_dict["like_cnt"])
- + "\n"
- + str(video_dict["share_cnt"])
- + "\n"
- + f"{video_dict['video_width']}*{video_dict['video_height']}"
- + "\n"
- + str(video_dict["publish_time_stamp"])
- + "\n"
- + str(video_dict["user_name"])
- + "\n"
- + str(video_dict["avatar_url"])
- + "\n"
- + str(video_dict["video_url"])
- + "\n"
- + str(video_dict["cover_url"])
- + "\n"
- + str(video_dict["session"])
- )
- Common.logger(log_type, crawler).info("==========视频信息已保存至info.txt==========")
- # 封装下载视频或封面的方法
- @classmethod
- def download_method(cls, log_type, crawler, text, title, url):
- """
- 下载封面:text == "cover" ; 下载视频:text == "video"
- 需要下载的视频标题:d_title
- 视频封面,或视频播放地址:d_url
- 下载保存路径:"./files/{d_title}/"
- """
- videos_dir = f"./{crawler}/videos/"
- if not os.path.exists(videos_dir):
- os.mkdir(videos_dir)
- # 首先创建一个保存该视频相关信息的文件夹
- md_title = md5(title.encode("utf8")).hexdigest()
- video_path = f"./{crawler}/videos/{md_title}/"
- if not os.path.exists(video_path):
- os.mkdir(video_path)
- # 下载视频
- if text == "video":
- # 需要下载的视频地址
- video_url = str(url).replace("http://", "https://")
- # 视频名
- video_name = "video.mp4"
- for i in range(3):
- try:
- # 下载视频,最多重试三次
- urllib3.disable_warnings()
- # response = requests.get(video_url, stream=True, proxies=cls.tunnel_proxies(), verify=False)
- response = requests.get(
- video_url, stream=True, proxies=proxies, verify=False
- )
- with open(video_path + video_name, "wb") as f:
- for chunk in response.iter_content(chunk_size=10240):
- f.write(chunk)
- cls.logger(log_type, crawler).info("==========视频下载完成==========")
- return True
- except Exception as e:
- cls.logger(log_type, crawler).error(f"视频下载失败:{e}\n")
- time.sleep(1)
- return False
- # 下载音频
- elif text == "audio":
- # 需要下载的视频地址
- audio_url = str(url).replace("http://", "https://")
- # 音频名
- audio_name = "audio.mp4"
- # 下载视频
- urllib3.disable_warnings()
- # response = requests.get(audio_url, stream=True, proxies=cls.tunnel_proxies(), verify=False)
- response = requests.get(
- audio_url, stream=True, proxies=proxies, verify=False
- )
- try:
- with open(video_path + audio_name, "wb") as f:
- for chunk in response.iter_content(chunk_size=10240):
- f.write(chunk)
- cls.logger(log_type, crawler).info("==========音频下载完成==========")
- except Exception as e:
- cls.logger(log_type, crawler).error(f"音频下载失败:{e}\n")
- # 下载封面
- elif text == "cover":
- # 需要下载的封面地址
- cover_url = str(url)
- # 封面名
- cover_name = "image.jpg"
- # 下载封面
- urllib3.disable_warnings()
- # response = requests.get(cover_url, proxies=cls.tunnel_proxies(), verify=False)
- response = requests.get(cover_url, verify=False)
- try:
- with open(video_path + cover_name, "wb") as f:
- f.write(response.content)
- cls.logger(log_type, crawler).info("==========封面下载完成==========")
- except Exception as e:
- cls.logger(log_type, crawler).error(f"封面下载失败:{e}\n")
- # youtube 视频下载
- elif text == "youtube_video":
- # 需要下载的视频地址
- video_url = url
- # 视频名
- video_name = "video.mp4"
- try:
- download_cmd = f'yt-dlp -f "bv[height<=720][ext=mp4]+ba[ext=m4a]" --merge-output-format mp4 "{video_url}-U" -o {video_path}{video_name}'
- Common.logger(log_type, crawler).info(f"download_cmd:{download_cmd}")
- os.system(download_cmd)
- # move_cmd = f"mv {video_name} {video_path}"
- # os.system(move_cmd)
- cls.logger(log_type, crawler).info("==========视频下载完成==========")
- except Exception as e:
- Common.logger(log_type, crawler).error(f"视频下载失败:{e}\n")
- # 西瓜视频 / 音频下载
- elif text == "xigua_video":
- # 需要下载的视频地址
- video_url = str(url).replace("http://", "https://")
- # 视频名
- video_name = "video1.mp4"
- # 下载视频
- urllib3.disable_warnings()
- # response = requests.get(video_url, stream=True, proxies=cls.tunnel_proxies(), verify=False)
- response = requests.get(
- video_url, stream=True, proxies=proxies, verify=False
- )
- try:
- with open(video_path + video_name, "wb") as f:
- for chunk in response.iter_content(chunk_size=10240):
- f.write(chunk)
- cls.logger(log_type, crawler).info("==========视频下载完成==========")
- except Exception as e:
- cls.logger(log_type, crawler).error(f"视频下载失败:{e}\n")
- elif text == "xigua_audio":
- # 需要下载的视频地址
- audio_url = str(url).replace("http://", "https://")
- # 音频名
- audio_name = "audio1.mp4"
- # 下载视频
- urllib3.disable_warnings()
- # response = requests.get(audio_url, stream=True, proxies=cls.tunnel_proxies(), verify=False)
- response = requests.get(
- audio_url, stream=True, proxies=proxies, verify=False
- )
- try:
- with open(video_path + audio_name, "wb") as f:
- for chunk in response.iter_content(chunk_size=10240):
- f.write(chunk)
- cls.logger(log_type, crawler).info("==========音频下载完成==========")
- except Exception as e:
- cls.logger(log_type, crawler).error(f"音频下载失败:{e}\n")
- @classmethod
- def ffmpeg(cls, log_type, crawler, video_path):
- Common.logger(log_type, crawler).info(f"video_path:{video_path}")
- video_title = video_path.replace(f"./{crawler}/videos/", "").replace(
- "/video.mp4", ""
- )
- Common.logger(log_type, crawler).info(f"video_title:{video_title}")
- md_title = md5(video_title.encode("utf8")).hexdigest()
- Common.logger(log_type, crawler).info(f"crawler:{crawler}")
- if (
- crawler == "zhiqingtiantiankan"
- or crawler == "ganggangdouchuan"
- or crawler == "jixiangxingfu"
- or crawler == "zhongmiaoyinxin"
- ):
- # video_path = os.path.join("C:\\", "crawler", "piaoquan_crawler", f"{crawler}", "videos", f"{md_title}", "video.mp4")
- video_path = os.path.join(
- ".\\", f"{crawler}", "videos", f"{md_title}", "video.mp4"
- )
- else:
- video_path = f"./{crawler}/videos/{md_title}/video.mp4"
- Common.logger(log_type, crawler).info(f"video_path:{video_path}")
- if os.path.getsize(video_path) == 0:
- Common.logger(log_type, crawler).info(
- f"video_size:{os.path.getsize(video_path)}"
- )
- return
- probe = ffmpeg.probe(video_path)
- video_stream = next(
- (stream for stream in probe["streams"] if stream["codec_type"] == "video"),
- None,
- )
- if video_stream is None:
- Common.logger(log_type, crawler).info("No video Stream found!")
- return
- format1 = probe["format"]
- size = int(int(format1["size"]) / 1024 / 1024)
- width = int(video_stream["width"])
- height = int(video_stream["height"])
- duration = int(float(video_stream["duration"]))
- ffmpeg_dict = {
- "width": width,
- "height": height,
- "duration": duration,
- "size": size,
- }
- return ffmpeg_dict
- # 合并音视频
- @classmethod
- def video_compose(cls, log_type, crawler, video_dir):
- video_title = video_dir.replace(f"./{crawler}/videos/", "")
- md_title = md5(video_title.encode("utf8")).hexdigest()
- video_dir = f"./{crawler}/videos/{md_title}"
- try:
- video_path = f"{video_dir}/video1.mp4"
- audio_path = f"{video_dir}/audio1.mp4"
- out_path = f"{video_dir}/video.mp4"
- cmd = f"ffmpeg -i {video_path} -i {audio_path} -c:v copy -c:a aac -strict experimental -map 0:v:0 -map 1:a:0 {out_path}"
- # print(cmd)
- subprocess.call(cmd, shell=True)
- for file in os.listdir(video_dir):
- if (
- file.split(".mp4")[0] == "video1"
- or file.split(".mp4")[0] == "audio1"
- ):
- os.remove(f"{video_dir}/{file}")
- Common.logger(log_type, crawler).info("合成成功\n")
- except Exception as e:
- Common.logger(log_type, crawler).error(f"video_compose异常:{e}\n")
- # 快代理
- @classmethod
- def tunnel_proxies(cls):
- # 隧道域名:端口号
- tunnel = "q796.kdltps.com:15818"
- # 用户名密码方式
- username = "t17772369458618"
- password = "5zqcjkmy"
- tunnel_proxies = {
- "http": "http://%(user)s:%(pwd)s@%(proxy)s/"
- % {"user": username, "pwd": password, "proxy": tunnel},
- "https": "http://%(user)s:%(pwd)s@%(proxy)s/"
- % {"user": username, "pwd": password, "proxy": tunnel},
- }
- # 白名单方式(需提前设置白名单)
- # proxies = {
- # "http": "http://%(proxy)s/" % {"proxy": tunnel},
- # "https": "http://%(proxy)s/" % {"proxy": tunnel}
- # }
- # 要访问的目标网页
- # target_url = "https://www.kuaishou.com/profile/3xk9tkk6kkwkf7g"
- # target_url = "https://dev.kdlapi.com/testproxy"
- # # 使用隧道域名发送请求
- # response = requests.get(target_url, proxies=proxies)
- # print(response.text)
- return tunnel_proxies # {'http': 'http://t17772369458618:5zqcjkmy@q796.kdltps.com:15818/', 'https': 'http://t17772369458618:5zqcjkmy@q796.kdltps.com:15818/'}
- if __name__ == "__main__":
- # print(datetime.time(hour=0, minute=0))
- # print(f'{date.today():%Y-%m-%d}')
- print(datetime.now().date().strftime("%Y-%m-%d"))
- pass
|