# -*- coding: utf-8 -*- # @Author: wangkun # @Time: 2022/4/25 """ 从 微信小程序-本山祝福短视频 中,下载符合规则的视频 """ import json import os import random import shutil import sys import time from urllib import parse import ffmpeg import requests import urllib3 sys.path.append(os.getcwd()) from main.common import Common from main.bszf_publish import Publish from main.feishu_lib import Feishu proxies = {"http": None, "https": None} class Recommend: # 翻页参数 visitor_key = "" page = 1 # 过滤词库 @classmethod def sensitive_words(cls, log_type): word_list = [] # 从云文档读取所有敏感词,添加到词库列表 lists = Feishu.get_values_batch(log_type, "bszf", "DjXfqG") for i in lists: for j in i: # 过滤空的单元格内容 if j is None: pass else: word_list.append(j) return word_list # 获取已下载视频宽高、时长等信息 @classmethod def get_video_info_from_local(cls, video_path): probe = ffmpeg.probe(video_path) video_stream = next((stream for stream in probe['streams'] if stream['codec_type'] == 'video'), None) if video_stream is None: print('No video stream found!') return width = int(video_stream['width']) height = int(video_stream['height']) duration = float(video_stream['duration']) return width, height, duration # 推荐列表获取视频 @classmethod def get_recommend(cls, log_type): """ 获取首页推荐视频列表,写入:https://w42nne6hzg.feishu.cn/sheets/shtcnGh2rrsPYM4iVNEBO7OqWrb?sheet=CcHgO7 """ now = int(time.time() * 1000) url = "https://bszf.wentingyou.cn/index.php/v111/index/index?parameter=" header = { "Connection": "keep-alive", "vision": "1.1.0", "content-type": "application/x-www-form-urlencoded", "scene": "1008", "content-time": str(now), "token": "", "visitorKey": "165086930003741", "chatKey": "wx0fb8149da961d3b0", "cache-time": str(now), "Accept-Encoding": "gzip,compress,br,deflate", "User-Agent": "Mozilla/5.0 (iPhone; CPU iPhone OS 14_7_1 like Mac OS X) " "AppleWebKit/605.1.15 (KHTML, like Gecko) Mobile/15E148 " "MicroMessenger/8.0.20(0x1800142d) NetType/WIFI Language/zh_CN", "Referer": "https://servicewechat.com/wx0fb8149da961d3b0/2/page-frame.html" } parameter = { "cid": "", "page": random.randint(1, 76), "is_ads": 1, "model": "iPhone 11", "mini_version": "8.0.25", "origin_channel": "-1", "origin_type": "2", "origin_level": "0", "ini_id": cls.visitor_key } params = parse.quote(json.dumps(parameter)) url = url + str(params) try: urllib3.disable_warnings() r = requests.get(headers=header, url=url, proxies=proxies, verify=False) response = json.loads(r.content.decode("utf8")) # 翻页 cls.visitor_key = r.json()["data"]["visitor_key"] cls.page += 1 if "data" not in response: Common.logger(log_type).warning("get_recommend, response:{}".format(response)) time.sleep(3) else: feeds = response["data"]["list"] for i in range(len(feeds)): if "nid" not in feeds[i]: video_id = 0 else: video_id = feeds[i]["nid"] if "video_cover" not in feeds[i]: cover_url = 0 else: cover_url = feeds[i]["video_cover"] if "video_url" not in feeds[i]: video_url = 0 elif ".mp4" not in feeds[i]["video_url"]: video_url = 0 else: video_url = feeds[i]["video_url"] if "commentCount" not in feeds[i]: video_comment_cnt = 0 else: video_comment_cnt = feeds[i]["commentCount"] if "update_time" not in feeds[i]: video_send_time = 0 else: video_send_time = feeds[i]["update_time"] # 视频标题过滤话题及处理特殊字符 if "title" not in feeds[i]: video_title = 0 else: video_title = feeds[i]["title"].strip().replace("\n", "")\ .replace("/", "").replace("本山祝福", "").replace(" ", "")\ .replace(" ", "").replace("&NBSP", "").replace("\r", "")\ .replace("#", "").replace(".", "。").replace("\\", "")\ .replace(":", "").replace("*", "").replace("?", "")\ .replace("?", "").replace('"', "").replace("<", "")\ .replace(">", "").replace("|", "") like_cnt = "0" share_cnt = "0" play_cnt = "0" user_name = "本山祝福" head_url = cover_url user_id = "benshanzhufu" Common.logger(log_type).info("video_title:{}".format(video_title)) Common.logger(log_type).info("video_id:{}".format(video_id)) Common.logger(log_type).info( "video_send_time:{}", time.strftime( "%Y/%m/%d %H:%M:%S", time.localtime(int(video_send_time)))) Common.logger(log_type).info("video_url:{}".format(video_url)) # 过滤无效视频 if video_id == 0 or cover_url == 0 or video_url == 0: Common.logger(log_type).info("无效视频\n") # 已下载表去重:https://w42nne6hzg.feishu.cn/sheets/shtcnGh2rrsPYM4iVNEBO7OqWrb?sheet=440018 elif str(video_id) in [n for m in Feishu.get_values_batch(log_type, "bszf", "440018") for n in m]: Common.logger(log_type).info("视频已下载\n") # recommend_feeds表去重:https://w42nne6hzg.feishu.cn/sheets/shtcnGh2rrsPYM4iVNEBO7OqWrb?sheet=CcHgO7 elif str(video_id) in [n for m in Feishu.get_values_batch(log_type, "bszf", "CcHgO7") for n in m]: Common.logger(log_type).info("视频已在recommend_feeds表中\n") # # 竖版视频表去重:https://w42nne6hzg.feishu.cn/sheets/shtcnGh2rrsPYM4iVNEBO7OqWrb?sheet=dAcOWt # elif str(video_id) in [n for m in Feishu.get_values_batch(log_type, "bszf", "dAcOWt") for n in m]: # Common.logger(log_type).info("视频已在竖版视频表中\n") else: time.sleep(1) Feishu.insert_columns(log_type, "bszf", "CcHgO7", "ROWS", 1, 2) get_feeds_time = int(time.time()) values = [[str(time.strftime("%Y/%m/%d %H:%M:%S", time.localtime(get_feeds_time))), "推荐榜", str(video_id), video_title, int(play_cnt), int(like_cnt), int(share_cnt), int(video_comment_cnt), time.strftime("%Y/%m/%d %H:%M:%S", time.localtime(int(video_send_time))), user_name, user_id, head_url, cover_url, video_url]] time.sleep(1) Feishu.update_values(log_type, "bszf", "CcHgO7", "A2:N2", values) Common.logger(log_type).info("添加至recommend_feeds成功\n") except Exception as e: Common.logger(log_type).error("get_recommend异常:{}".format(e)) # 下载 / 上传 @classmethod def download_publish(cls, log_type, env): """ 下载视频 测试环境:env == dev 正式环境:env == prod """ try: recommend_feeds_sheet = Feishu.get_values_batch(log_type, "bszf", "CcHgO7") for i in range(1, len(recommend_feeds_sheet)): download_video_id = recommend_feeds_sheet[i][2] download_video_title = recommend_feeds_sheet[i][3] download_video_play_cnt = recommend_feeds_sheet[i][4] download_video_comment_cnt = recommend_feeds_sheet[i][7] download_video_like_cnt = recommend_feeds_sheet[i][5] download_video_share_cnt = recommend_feeds_sheet[i][6] download_video_send_time = recommend_feeds_sheet[i][8] download_user_name = recommend_feeds_sheet[i][9] download_user_id = recommend_feeds_sheet[i][10] download_head_url = recommend_feeds_sheet[i][11] download_cover_url = recommend_feeds_sheet[i][12] download_video_url = recommend_feeds_sheet[i][13] Common.logger(log_type).info("正在判断第{}行", i + 1) Common.logger(log_type).info("download_video_title:{}", download_video_title) Common.logger(log_type).info("download_video_send_time:{}", download_video_send_time) Common.logger(log_type).info("download_video_url:{}", download_video_url) # 过滤空行 if download_video_id is None or download_video_title is None or download_video_play_cnt is None: Common.logger(log_type).warning("空行,略过\n") # 过滤敏感词 elif any(word if word in download_video_title else False for word in cls.sensitive_words(log_type)) is True: Feishu.dimension_range(log_type, "bszf", "CcHgO7", "ROWS", i + 1, i + 1) Common.logger(log_type).info("视频已中敏感词,删除成功\n") return # 已下载视频表去重 elif str(download_video_id) in [n for m in Feishu.get_values_batch(log_type, "bszf", "440018") for n in m]: Feishu.dimension_range(log_type, "bszf", "CcHgO7", "ROWS", i + 1, i + 1) Common.logger(log_type).info("该视频已下载,删除成功\n") return # 满足下载规则 else: # 下载视频 Common.download_method(log_type=log_type, text="video", d_name=str(download_video_title), d_url=str(download_video_url)) # 获取视频时长 video_info = cls.get_video_info_from_local("./videos/" + download_video_title + "/video.mp4") download_video_resolution = str(video_info[0]) + "*" + str(video_info[1]) download_video_duration = video_info[2] # 视频时长<40s,直接删除 if int(download_video_duration) < 40: # 删除视频文件夹 shutil.rmtree("./videos/" + download_video_title + "/") # 删除云文档recommend_feeds中的记录 Feishu.dimension_range(log_type, "bszf", "CcHgO7", "ROWS", i + 1, i + 1) Common.logger(log_type).info("时长:{}<40秒,删除成功\n", int(download_video_duration)) return # # 竖版视频不下载,写入竖版视频表 # elif int(video_info[0]) < int(video_info[1]): # # 删除视频文件夹 # shutil.rmtree("./videos/" + download_video_title + "/") # # 删除在 recommend_feeds 的记录 # Feishu.dimension_range(log_type, "bszf", "CcHgO7", "ROWS", i + 1, i + 1) # Common.logger(log_type).info("宽:{}<高:{},删除成功", int(video_info[0]), int(video_info[1])) # # # 添加到竖版视频表 # time.sleep(1) # Feishu.insert_columns(log_type, "bszf", "dAcOWt", "ROWS", 1, 2) # # 视频ID工作表,首行写入数据 # upload_time = int(time.time()) # values = [[time.strftime("%Y/%m/%d %H:%M:%S", time.localtime(upload_time)), # "推荐榜", # str(download_video_id), # str(download_video_title), # int(download_video_play_cnt), # int(download_video_like_cnt), # int(download_video_share_cnt), # int(download_video_comment_cnt), # int(download_video_duration), # str(download_video_resolution), # str(download_video_send_time), # str(download_user_name), # str(download_user_id), # str(download_head_url), # str(download_cover_url), # str(download_video_url)]] # time.sleep(1) # Feishu.update_values(log_type, "bszf", "dAcOWt", "A2:P2", values) # Common.logger(log_type).info("写入竖版视频表成功\n") # return else: # 下载封面 Common.download_method(log_type=log_type, text="cover", d_name=str(download_video_title), d_url=str(download_cover_url)) # 保存视频信息至 "./videos/{download_video_title}/info.txt" with open("./videos/" + download_video_title + "/" + "info.txt", "a", encoding="UTF-8") as f_a: f_a.write(str(download_video_id) + "\n" + str(download_video_title) + "\n" + str(int(download_video_duration)) + "\n" + str(download_video_play_cnt) + "\n" + str(download_video_comment_cnt) + "\n" + str(download_video_like_cnt) + "\n" + str(download_video_share_cnt) + "\n" + str(download_video_resolution) + "\n" + str(int(time.mktime( time.strptime(download_video_send_time, "%Y/%m/%d %H:%M:%S")))) + "\n" + str(download_user_name) + "\n" + str(download_head_url) + "\n" + str(download_video_url) + "\n" + str(download_cover_url) + "\n" + "benshanzhufu") Common.logger(log_type).info("==========视频信息已保存至info.txt==========") # 上传视频 Common.logger(log_type).info("开始上传视频:{}".format(download_video_title)) our_video_id = Publish.upload_and_publish(log_type, env, "play") our_video_link = "https://admin.piaoquantv.com/cms/post-detail/" + str(our_video_id) + "/info" Common.logger(log_type).info("视频上传完成:{}", download_video_title) # 保存视频 ID 到云文档:https://w42nne6hzg.feishu.cn/sheets/shtcnGh2rrsPYM4iVNEBO7OqWrb?sheet=440018 Common.logger(log_type).info("保存视频ID至云文档:{}", download_video_title) # 视频ID工作表,插入首行 Feishu.insert_columns(log_type, "bszf", "440018", "ROWS", 1, 2) # 视频ID工作表,首行写入数据 upload_time = int(time.time()) values = [[time.strftime("%Y/%m/%d %H:%M:%S", time.localtime(upload_time)), "推荐榜", str(download_video_id), str(download_video_title), our_video_link, int(download_video_play_cnt), int(download_video_comment_cnt), int(download_video_like_cnt), int(download_video_share_cnt), int(download_video_duration), str(download_video_resolution), str(download_video_send_time), str(download_user_name), str(download_user_id), str(download_head_url), str(download_cover_url), str(download_video_url)]] time.sleep(1) Feishu.update_values(log_type, "bszf", "440018", "E2:V2", values) # 删除行或列,可选 ROWS、COLUMNS Feishu.dimension_range(log_type, "bszf", "CcHgO7", "ROWS", i + 1, i + 1) Common.logger(log_type).info("视频:{},下载/上传成功\n", download_video_title) return except Exception as e: Common.logger(log_type).error("download_publish异常:{}", e) # 执行下载 / 上传 @classmethod def run_download_publish(cls, log_type, env): try: while True: time.sleep(1) recommend_feeds_sheet = Feishu.get_values_batch(log_type, "bszf", "CcHgO7") if len(recommend_feeds_sheet) == 1: Common.logger(log_type).info("下载/上传完成\n") break else: cls.download_publish(log_type, env) time.sleep(random.randint(5, 10)) except Exception as e: Common.logger(log_type).error("run_download_publish异常:{}", e) if __name__ == "__main__": recommend = Recommend() recommend.get_recommend("recommend") recommend.run_download_publish("recommend", "dev")