# -*- coding: utf-8 -*- # @Author: wangkun # @Time: 2022/4/8 import json import os import random import sys import time import requests import urllib3 sys.path.append(os.getcwd()) from main.common import Common from main.feishu_lib import Feishu from main.weishi_publish import Publish proxies = {"http": None, "https": None} class Recommend: # 配置微信号 wechat_sheet = Feishu.get_values_batch('recommend', 'weishi', '9fTK1f') Referer = wechat_sheet[2][2] wesee_openid = wechat_sheet[3][2] wesee_openkey = wechat_sheet[4][2] wesee_personid = wechat_sheet[5][2] wesee_access_token = wechat_sheet[6][2] wesee_thr_appid = wechat_sheet[7][2] # 已抓取视频数 video_count = [] crawler_count = 50 # 标题过滤词库 @classmethod def video_title_sensitive_words(cls, log_type): # 敏感词库列表 word_list = [] # 从云文档读取所有敏感词,添加到词库列表 lists = Feishu.get_values_batch(log_type, 'weishi', "2Oxf8C") for a in lists: for j in a: # 过滤空的单元格内容 if j is None: pass else: word_list.append(j) return word_list # 用户名过滤词库 @classmethod def username_sensitive_words(cls, log_type): # 敏感词库列表 word_list = [] # 从云文档读取所有敏感词,添加到词库列表 lists = Feishu.get_values_batch(log_type, 'weishi', "KnVAc2") for a in lists: for j in a: # 过滤空的单元格内容 if j is None: pass else: word_list.append(j) return word_list # 抓取基础规则 @staticmethod def download_rule(duration, width, height, like_cnt): """ 下载视频的基本规则 :param duration: 时长 :param width: 宽 :param height: 高 :param like_cnt: 点赞量 :return: 满足规则,返回 True;反之,返回 False """ if int(float(duration)) >= 60: if int(width) >= 720 or int(height) >= 720: if int(like_cnt) >= 1000: return True else: return False return False return False # 抓取列表 @classmethod def get_feeds(cls, log_type): """ 1.从微视小程序首页推荐,获取视频列表 2.先在 https://w42nne6hzg.feishu.cn/sheets/shtcn5YSWg91JfVGzj0SFZIRRPh?sheet=caa3fa 中去重 3.再从 https://w42nne6hzg.feishu.cn/sheets/shtcn5YSWg91JfVGzj0SFZIRRPh?sheet=O7fCzr 中去重 4.添加视频信息至 https://w42nne6hzg.feishu.cn/sheets/shtcn5YSWg91JfVGzj0SFZIRRPh?sheet=O7fCzr """ try: url = "https://api.weishi.qq.com/trpc.weishi.weishi_h5_proxy.weishi_h5_proxy/WxminiGetFeedList" headers = { "content-type": "application/json", "Accept-Encoding": "gzip,compress,br,deflate", "User-Agent": "Mozilla/5.0 (iPhone; CPU iPhone OS 14_7_1 like Mac OS X)" " AppleWebKit/605.1.15 (KHTML, like Gecko) Mobile/15E148" " MicroMessenger/8.0.20(0x18001442) NetType/WIFI Language/zh_CN", "Referer": str(cls.Referer) } cookies = { "wesee_authtype": "3", "wesee_openid": str(cls.wesee_openid), "wesee_openkey": str(cls.wesee_openkey), "wesee_personid": str(cls.wesee_personid), "wesee_refresh_token": "", "wesee_access_token": str(cls.wesee_access_token), "wesee_thr_appid": str(cls.wesee_thr_appid), "wesee_ichid": "8" } json_data = { "req_body": { "requestType": 16, "isrefresh": 1, "isfirst": 1, "attachInfo": "", "scene_id": 22, "requestExt": { "mini_openid": str(cls.wesee_openid), "notLogin-personid": str(cls.wesee_personid) } }, "req_header": { "mapExt": "{\"imageSize\":\"480\",\"adaptScene\":\"PicHDWebpLimitScene\"}" } } # while True: urllib3.disable_warnings() r = requests.post(headers=headers, url=url, cookies=cookies, json=json_data, proxies=proxies, verify=False) response = json.loads(r.content.decode("utf8")) feeds = response["rsp_body"]["feeds"] for i in range(len(feeds)): # 视频标题过滤话题及处理特殊字符 weishi_title = feeds[i]["desc"] if weishi_title == '': weishi_title = '。。。' else: weishi_title = weishi_title title_split1 = weishi_title.split(" #") if title_split1[0] != "": title1 = title_split1[0] else: title1 = title_split1[-1] title_split2 = title1.split(" #") if title_split2[0] != "": title2 = title_split2[0] else: title2 = title_split2[-1] title_split3 = title2.split("@") if title_split3[0] != "": title3 = title_split3[0] else: title3 = title_split3[-1] # 视频标题 video_title = title3.strip().replace("\n", "").replace("/", "")\ .replace("快手", "").replace(" ", "").replace(" ", "").replace("&NBSP", "")\ .replace("\r", "").replace("#", "").replace(".", "。").replace("\\", "").replace(":", "")\ .replace("*", "").replace("?", "").replace("?", "").replace('"', "").replace("<", "")\ .replace(">", "").replace("|", "").replace("微视", "")[:40] # 视频 ID if "id" not in feeds[i]["video"]: video_id = 0 else: video_id = feeds[i]["video"]["id"] # 播放数 if "playNum" not in feeds[i]["ugcData"]: video_play_cnt = 0 else: video_play_cnt = feeds[i]["ugcData"]["playNum"] # 点赞数 if "dingCount" not in feeds[i]["ugcData"]: video_like_cnt = 0 else: video_like_cnt = feeds[i]["ugcData"]["dingCount"] # 分享数 if "shareNum" not in feeds[i]["ugcData"]: video_share_cnt = 0 else: video_share_cnt = feeds[i]["ugcData"]["shareNum"] # 评论数 if "totalCommentNum" not in feeds[i]["ugcData"]: video_comment_cnt = 0 else: video_comment_cnt = feeds[i]["ugcData"]["totalCommentNum"] # 视频时长 if "duration" not in feeds[i]["video"]: video_duration = 0 else: video_duration = int(int(feeds[i]["video"]["duration"]) / 1000) # 视频宽高 if "width" not in feeds[i]["video"] or "height" not in feeds[i]["video"]: video_width = 0 video_height = 0 video_resolution = str(video_width) + "*" + str(video_height) else: video_width = feeds[i]["video"]["width"] video_height = feeds[i]["video"]["height"] video_resolution = str(video_width) + "*" + str(video_height) # 视频发布时间 if "createTime" not in feeds[i]: video_send_time = 0 else: video_send_time = int(feeds[i]["createTime"]) * 1000 # 用户昵称 user_name = feeds[i]["poster"]["nick"].strip().replace("\n", "") \ .replace("/", "").replace("快手", "").replace(" ", "") \ .replace(" ", "").replace("&NBSP", "").replace("\r", "").replace("微视", "") # 用户 ID user_id = feeds[i]["poster"]["id"] # 用户头像地址 if "thumbURL" not in feeds[i]["material"] and "avatar" not in feeds[i]["poster"]: head_url = 0 elif "thumbURL" in feeds[i]["material"]: head_url = feeds[i]["material"]["thumbURL"] else: head_url = feeds[i]["poster"]["avatar"] # 视频封面地址 if len(feeds[i]["images"]) == 0: cover_url = 0 else: cover_url = feeds[i]["images"][0]["url"] # 视频播放地址 if "url" not in feeds[i]["video"]: video_url = 0 else: video_url = feeds[i]["video"]["url"] Common.logger(log_type).info("video_title:{}".format(video_title)) Common.logger(log_type).info("video_id:{}".format(video_id)) Common.logger(log_type).info("video_like_cnt:{}".format(video_like_cnt)) Common.logger(log_type).info("video_share_cnt:{}".format(video_share_cnt)) Common.logger(log_type).info("video_comment_cnt:{}".format(video_comment_cnt)) Common.logger(log_type).info("video_duration:{}秒".format(video_duration)) Common.logger(log_type).info( "video_send_time:{}".format(time.strftime( "%Y/%m/%d %H:%M:%S", time.localtime(int(video_send_time) / 1000)))) Common.logger(log_type).info("user_name:{}".format(user_name)) Common.logger(log_type).info("video_url:{}".format(video_url)) # Common.logger(log_type).info("video_play_cnt:{}".format(video_play_cnt)) # Common.logger(log_type).info("video_resolution:{}".format(video_resolution)) # Common.logger(log_type).info("user_id:{}".format(user_id)) # Common.logger(log_type).info("head_url:{}".format(head_url)) # Common.logger(log_type).info("cover_url:{}".format(cover_url)) # 过滤无效视频 if video_id == 0 or video_duration == 0 or video_send_time == 0 or head_url == 0 \ or cover_url == 0 or video_url == 0: Common.logger(log_type).info("无效视频\n") # 判断基础规则 elif cls.download_rule(video_duration, video_width, video_height, video_like_cnt) is False: Common.logger(log_type).info("不满足基础规则\n") # 标题敏感词过滤 elif any(word if word in weishi_title else False for word in cls.video_title_sensitive_words(log_type)) is True: Common.logger(log_type).info("标题已中敏感词:{}\n".format(weishi_title)) # 用户名敏感词过滤 elif any(word if word in user_name else False for word in cls.username_sensitive_words(log_type)) is True: Common.logger(log_type).info("用户名已中敏感词:{}\n".format(user_name)) # 从已下载云文档去重 elif str(video_id) in [j for m in Feishu.get_values_batch(log_type, 'weishi', "caa3fa") for j in m]: Common.logger(log_type).info("视频已下载:{}\n", video_title) # 从 云文档 去重:https://w42nne6hzg.feishu.cn/sheets/shtcn5YSWg91JfVGzj0SFZIRRPh?sheet=O7fCzr elif str(video_id) in [j for n in Feishu.get_values_batch(log_type, 'weishi', "O7fCzr") for j in n]: Common.logger(log_type).info("视频已存在:{}\n", video_title) else: # # 添加到已下载视频列表 # cls.video_count.append(video_id) # feeds工作表,插入首行 Feishu.insert_columns(log_type, 'weishi', "O7fCzr", "ROWS", 1, 2) # 获取当前时间 get_feeds_time = int(time.time()) # 工作表 feeds 中写入数据 values = [[time.strftime("%Y/%m/%d %H:%M:%S", time.localtime(int(get_feeds_time))), "推荐榜", str(video_id), video_title, int(video_play_cnt), int(video_comment_cnt), int(video_like_cnt), int(video_share_cnt), video_duration, video_resolution, time.strftime("%Y/%m/%d %H:%M:%S", time.localtime(int(video_send_time / 1000))), user_name, user_id, head_url, cover_url, video_url]] # 等待 1s,防止操作云文档太频繁,导致报错 time.sleep(1) Feishu.update_values(log_type, 'weishi', "O7fCzr", "A2:T2", values) Common.logger(log_type).info("视频保存至云文档成功\n") time.sleep(random.randint(3, 5)) # # 每天抓取 50 条 # if len(cls.video_count) >= cls.crawler_count: # Common.logger(log_type).info("已抓取{}条数据\n", len(cls.video_count)) # cls.video_count = [] # return except Exception as e: Common.logger(log_type).error("get_feeds异常:{}\n".format(e)) # 下载/上传 @classmethod def download_publish(cls, log_type, env): try: recommend_sheet = Feishu.get_values_batch(log_type, 'weishi', "O7fCzr") for i in range(1, len(recommend_sheet)): download_video_id = recommend_sheet[i][2] download_video_title = recommend_sheet[i][3] download_video_play_cnt = recommend_sheet[i][4] download_video_comment_cnt = recommend_sheet[i][5] download_video_like_cnt = recommend_sheet[i][6] download_video_share_cnt = recommend_sheet[i][7] download_video_duration = recommend_sheet[i][8] download_video_resolution = recommend_sheet[i][9] download_video_send_time = recommend_sheet[i][10] download_user_name = recommend_sheet[i][11] download_user_id = recommend_sheet[i][12] download_head_url = recommend_sheet[i][13] download_cover_url = recommend_sheet[i][14] download_video_url = recommend_sheet[i][15] # Common.logger(log_type).info("download_video_title:{}", download_video_title) # Common.logger(log_type).info("download_video_id:{}", download_video_id) # Common.logger(log_type).info("download_video_play_cnt:{}", download_video_play_cnt) # Common.logger(log_type).info("download_video_comment_cnt:{}", download_video_comment_cnt) # Common.logger(log_type).info("download_video_share_cnt:{}", download_video_share_cnt) # Common.logger(log_type).info("download_user_name:{}", download_user_name) # Common.logger(log_type).info("download_user_id:{}", download_user_id) # Common.logger(log_type).info("download_head_url:{}", download_head_url) # Common.logger(log_type).info("download_cover_url:{}", download_cover_url) Common.logger(log_type).info("正在判断第{}行:{}", i+1, download_video_title) Common.logger(log_type).info("like_cnt:{}", download_video_like_cnt) Common.logger(log_type).info("duration:{}", download_video_duration) Common.logger(log_type).info("resolution:{}", download_video_resolution) Common.logger(log_type).info("send_time:{}", download_video_send_time) Common.logger(log_type).info("video_url:{}", download_video_url) # 过滤空行 if download_video_id is None or download_video_title is None: # 删除行或列,可选 ROWS、COLUMNS Feishu.dimension_range(log_type, 'weishi', "O7fCzr", "ROWS", i + 1, i + 1) Common.logger(log_type).warning("空行,已删除\n") return # 去重 elif download_video_id in [j for m in Feishu.get_values_batch(log_type, 'weishi', "caa3fa") for j in m]: # 删除行或列,可选 ROWS、COLUMNS Feishu.dimension_range(log_type, 'weishi', "O7fCzr", "ROWS", i + 1, i + 1) Common.logger(log_type).info("视频已下载:{}\n", download_video_title) return else: # 下载封面 Common.download_method(log_type, text="cover", d_name=str(download_video_title), d_url=str(download_cover_url)) # 下载视频 Common.download_method(log_type, text="video", d_name=str(download_video_title), d_url=str(download_video_url)) # 保存视频信息至 "./videos/{download_video_title}/info.txt" with open("./videos/" + download_video_title + "/" + "info.txt", "a", encoding="UTF-8") as f_a: f_a.write(str(download_video_id) + "\n" + str(download_video_title) + "\n" + str(download_video_duration) + "\n" + str(download_video_play_cnt) + "\n" + str(download_video_comment_cnt) + "\n" + str(download_video_like_cnt) + "\n" + str(download_video_share_cnt) + "\n" + str(download_video_resolution) + "\n" + str(int(time.mktime( time.strptime(download_video_send_time, "%Y/%m/%d %H:%M:%S")))) + "\n" + str(download_user_name) + "\n" + str(download_head_url) + "\n" + str(download_video_url) + "\n" + str(download_cover_url) + "\n" + str(cls.wesee_access_token)) Common.logger(log_type).info("视频信息已保存至info.txt") # 上传视频 Common.logger(log_type).info("开始上传视频:{}".format(download_video_title)) our_video_id = Publish.upload_and_publish(log_type, env, "play") our_video_link = "https://admin.piaoquantv.com/cms/post-detail/" + str(our_video_id) + "/info" Common.logger(log_type).info("视频上传完成:{}", download_video_title) # 视频ID工作表,插入首行 Feishu.insert_columns(log_type, 'weishi', "caa3fa", "ROWS", 1, 2) # 视频ID工作表,首行写入数据 upload_time = int(time.time()) values = [[str(time.strftime("%Y/%m/%d %H:%M:%S", time.localtime(upload_time))), "推荐榜", str(download_video_title), str(download_video_id), our_video_link, download_video_play_cnt, download_video_comment_cnt, download_video_like_cnt, download_video_share_cnt, download_video_duration, str(download_video_resolution), str(download_video_send_time), str(download_user_name), str(download_user_id), str(download_head_url), str(download_cover_url), str(download_video_url)]] time.sleep(1) Feishu.update_values(log_type, 'weishi', "caa3fa", "F2:W2", values) Common.logger(log_type).info("视频已保存至云文档:{}", download_video_title) # 删除行或列,可选 ROWS、COLUMNS Feishu.dimension_range(log_type, 'weishi', "O7fCzr", "ROWS", i + 1, i + 1) Common.logger(log_type).info("视频:{},下载/上传成功\n", download_video_title) return except Exception as e: Feishu.dimension_range(log_type, 'weishi', "O7fCzr", "ROWS", 2, 2) Common.logger(log_type).error("download_publish异常,已删除该条数据:{}\n", e) # 执行 下载/上传 @classmethod def run_download_publish(cls, log_type, env): try: while True: if len(Feishu.get_values_batch(log_type, 'weishi', 'O7fCzr')) == 1: Common.logger(log_type).info("下载/上传完成\n") break else: cls.download_publish(log_type, env) time.sleep(random.randint(1, 3)) except Exception as e: Common.logger(log_type).error("run_download_publish异常:{}", e) if __name__ == "__main__": # Recommend.get_feeds('weishi') Recommend.download_publish('weishi', 'dev') # print(Recommend.Referer) # print(Recommend.wesee_openid) # print(Recommend.wesee_openkey) # print(Recommend.wesee_personid) # print(Recommend.wesee_access_token) # print(Recommend.wesee_thr_appid) pass