# -*- coding: utf-8 -*- # @Author: wangkun # @Time: 2023/3/13 import json import os import random import shutil import sys import time import requests import urllib3 sys.path.append(os.getcwd()) from common.common import Common from common.scheduling_db import MysqlHelper from common.publish import Publish from common.feishu import Feishu proxies = {"http": None, "https": None} class XiaoniangaoFollow: platform = "小年糕" # 小程序个人主页视频列表翻页参数 next_t = None # 配置微信 # wechat_sheet = Feishu.get_values_batch("follow", "xiaoniangao", "dzcWHw") # follow_x_mid = wechat_sheet[2][3] # follow_x_token_id = wechat_sheet[3][3] # follow_referer = wechat_sheet[4][3] # follow_uid = wechat_sheet[5][3] # follow_token = wechat_sheet[6][3] # 过滤敏感词 @classmethod def filter_words(cls, log_type): # 敏感词库列表 word_list = [] # 从云文档读取所有敏感词,添加到词库列表 lists = Feishu.get_values_batch(log_type, "xiaoniangao", "DRAnZh") for i in lists: for j in i: # 过滤空的单元格内容 if j is None: pass else: word_list.append(j) return word_list # 基础门槛规则 @staticmethod def download_rule(video_dict): """ 下载视频的基本规则 :param video_dict: 视频信息,字典格式 :return: 满足规则,返回 True;反之,返回 False """ # 视频时长 if int(float(video_dict['duration'])) >= 40: # 宽或高 if int(video_dict['video_width']) >= 0 or int(video_dict['video_height']) >= 0: # 播放量 if int(video_dict['play_cnt']) >= 500: # 分享量 if int(video_dict['share_cnt']) >= 0: return True else: return False else: return False else: return False else: return False # 从云文档获取关注用户列表 @classmethod def get_users(cls, log_type, crawler): try: while True: follow_sheet = Feishu.get_values_batch(log_type, "xiaoniangao", "oNpThi") if follow_sheet is None: time.sleep(1) continue if len(follow_sheet) == 1: Common.logger(log_type, crawler).info("暂无定向爬取账号") else: user_list = [] for i in range(1, len(follow_sheet)): profile_id = follow_sheet[i][0] profile_mid = follow_sheet[i][1] user_name = follow_sheet[i][2] user_dict = { "profile_id": profile_id, "profile_mid": profile_mid, "user_name": user_name, } user_list.append(user_dict) return user_list except Exception as e: Common.logger(log_type, crawler).error("从云文档获取关注用户列表异常:{}", e) # 获取个人主页视频 @classmethod def get_videoList(cls, log_type, crawler, strategy, p_mid, oss_endpoint, env, machine): try: while True: url = "https://api.xiaoniangao.cn/profile/list_album" headers = { # "X-Mid": str(cls.follow_x_mid), "X-Mid": '1fb47aa7a860d9', # "X-Token-Id": str(cls.follow_x_token_id), "X-Token-Id": '9f2cb91f9952c107ecb73642083e1dec-1145266232', "content-type": "application/json", # "uuid": str(cls.follow_uid), "uuid": 'f40c2e7c-3cfb-4804-b513-608c0280268c', "Accept-Encoding": "gzip,compress,br,deflate", "User-Agent": "Mozilla/5.0 (iPhone; CPU iPhone OS 14_7_1 like Mac OS X)" " AppleWebKit/605.1.15 (KHTML, like Gecko) Mobile/15E148 " "MicroMessenger/8.0.20(0x18001435) NetType/WIFI Language/zh_CN", # "Referer": str(cls.follow_referer) "Referer": 'https://servicewechat.com/wxd7911e4c177690e4/654/page-frame.html' } json_text = { "visited_mid": p_mid, "start_t": cls.next_t, "qs": "imageMogr2/gravity/center/rotate/$/thumbnail/!690x385r/crop/690x385/interlace/1/format/jpg", "h_qs": "imageMogr2/gravity/center/rotate/$/thumbnail/!120x120r/crop/120x120/interlace/1/format/jpg", "limit": 20, # "token": str(cls.follow_token), "token": '54e4c603f7bf3dc009c86b49ed91be36', # "uid": str(cls.follow_uid), "uid": 'f40c2e7c-3cfb-4804-b513-608c0280268c', "proj": "ma", "wx_ver": "8.0.23", "code_ver": "3.68.0", "log_common_params": { "e": [{ "data": { "page": "profilePage", "topic": "public" } }], "ext": { "brand": "iPhone", "device": "iPhone 11", "os": "iOS 14.7.1", "weixinver": "8.0.23", "srcver": "2.24.7", "net": "wifi", "scene": "1089" }, "pj": "1", "pf": "2", "session_id": "7468cf52-00ea-432e-8505-6ea3ad7ec164" } } urllib3.disable_warnings() r = requests.post(url=url, headers=headers, json=json_text, proxies=proxies, verify=False) if 'data' not in r.text or r.status_code != 200: Common.logger(log_type, crawler).info(f"get_videoList:{r.text}\n") cls.next_t = None return elif 'list' not in r.json()['data']: Common.logger(log_type, crawler).info(f"get_videoList:{r.json()}\n") cls.next_t = None return elif len(r.json()['data']['list']) == 0: Common.logger(log_type, crawler).info(f"没有更多数据啦~\n") cls.next_t = None return else: cls.next_t = r.json()["data"]["next_t"] feeds = r.json()["data"]["list"] for i in range(len(feeds)): # 标题,表情随机加在片头、片尾,或替代句子中间的标点符号 char_sheet = Feishu.get_values_batch("hour", "xiaoniangao", "BhlbST") expression_list = [] char_list = [] for q in range(len(char_sheet)): if char_sheet[q][0] is not None: expression_list.append(char_sheet[q][0]) if char_sheet[q][1] is not None: char_list.append(char_sheet[q][1]) befor_video_title = feeds[i]["title"].strip().replace("\n", "") \ .replace("/", "").replace("\r", "").replace("#", "") \ .replace(".", "。").replace("\\", "").replace("&NBSP", "") \ .replace(":", "").replace("*", "").replace("?", "") \ .replace("?", "").replace('"', "").replace("<", "") \ .replace(">", "").replace("|", "").replace(" ", "") expression = random.choice(expression_list) expression_title_list = [expression + befor_video_title, befor_video_title + expression] # 标题,表情随机加在片头 title_list1 = random.choice(expression_title_list) # 标题,表情随机加在片尾 title_list2 = befor_video_title + random.choice(char_list) # # 替代句子中间的标点符号 # title_list3 = befor_video_title.replace( # ",", random.choice(expression_list)).replace(",", random.choice(expression_list)) title_list4 = [title_list1, title_list2] video_title = random.choice(title_list4) # 用户名 user_name = feeds[i]["album_user"]["nick"].strip().replace("\n", "") \ .replace("/", "").replace("快手", "").replace(" ", "") \ .replace(" ", "").replace("&NBSP", "").replace("\r", "") # 视频 ID if "vid" in feeds[i]: video_id = feeds[i]["vid"] else: video_id = 0 # 播放量 if "play_pv" in feeds[i]: video_play_cnt = feeds[i]["play_pv"] else: video_play_cnt = 0 # 点赞 if "total" in feeds[i]["favor"]: video_like_cnt = feeds[i]["favor"]["total"] else: video_like_cnt = 0 # 评论数 if "comment_count" in feeds[i]: video_comment_cnt = feeds[i]["comment_count"] else: video_comment_cnt = 0 # 分享 if "share" in feeds[i]: video_share_cnt = feeds[i]["share"] else: video_share_cnt = 0 # 时长 if "du" in feeds[i]: video_duration = int(feeds[i]["du"] / 1000) else: video_duration = 0 # 发布时间 if "t" in feeds[i]: publish_time_stamp = int(feeds[i]["t"] / 1000) else: publish_time_stamp = 0 publish_time_str = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(publish_time_stamp)) # 宽和高 if "w" in feeds[i] or "h" in feeds[i]: video_width = feeds[i]["w"] video_height = feeds[i]["h"] else: video_width = 0 video_height = 0 # 头像 if "hurl" in feeds[i]["album_user"]: head_url = feeds[i]["album_user"]["hurl"] else: head_url = 0 # 用户 ID if "id" in feeds[i]: profile_id = feeds[i]["id"] else: profile_id = 0 # 用户 mid if "mid" in feeds[i]: profile_mid = feeds[i]["mid"] else: profile_mid = 0 # 封面 if "url" in feeds[i]: cover_url = feeds[i]["url"] else: cover_url = 0 # 视频播放地址 if "v_url" in feeds[i]: video_url = feeds[i]["v_url"] else: video_url = 0 # 过滤无效视频 if video_id == 0 \ or video_title == 0 \ or publish_time_stamp == 0 \ or video_duration == 0 \ or video_url == 0: Common.logger(log_type, crawler).info("无效视频\n") elif int(time.time()) - publish_time_stamp > 3600*24*3: Common.logger(log_type, crawler).info(f"发布时间超过3天:{publish_time_str}\n") cls.next_t = None return else: video_dict = { "video_id": video_id, "video_title": video_title, "duration": video_duration, "play_cnt": video_play_cnt, "like_cnt": video_like_cnt, "comment_cnt": video_comment_cnt, "share_cnt": video_share_cnt, "user_name": user_name, "publish_time_stamp": publish_time_stamp, "publish_time_str": publish_time_str, "video_width": video_width, "video_height": video_height, "avatar_url": head_url, "profile_id": profile_id, "profile_mid": profile_mid, "cover_url": cover_url, "video_url": video_url, "session": f"xiaoniangao-follow-{int(time.time())}" } for k, v in video_dict.items(): Common.logger(log_type, crawler).info(f"{k}:{v}") cls.download_publish(log_type=log_type, crawler=crawler, strategy=strategy, video_dict=video_dict, oss_endpoint=oss_endpoint, env=env, machine=machine) except Exception as error: Common.logger(log_type, crawler).error(f"获取个人主页视频异常:{error}\n") @classmethod def repeat_video(cls, log_type, crawler, video_id, env, machine): sql = f""" select * from crawler_video where platform="小年糕" and out_video_id="{video_id}"; """ repeat_video = MysqlHelper.get_values(log_type, crawler, sql, env, machine) return len(repeat_video) # 下载/上传 @classmethod def download_publish(cls, log_type, crawler, strategy, video_dict, oss_endpoint, env, machine): try: if cls.download_rule(video_dict) is False: Common.logger(log_type, crawler).info("不满足基础门槛\n") elif cls.repeat_video(log_type, crawler, video_dict['video_id'], env, machine) != 0: Common.logger(log_type, crawler).info('视频已下载\n') elif any(str(word) if str(word) in video_dict['video_title'] else False for word in cls.filter_words(log_type)) is True: Common.logger(log_type, crawler).info("视频已中过滤词\n") else: # 下载封面 Common.download_method(log_type=log_type, crawler=crawler, text="cover", title=video_dict["video_title"], url=video_dict["cover_url"]) # 下载视频 Common.download_method(log_type=log_type, crawler=crawler, text="video", title=video_dict["video_title"], url=video_dict["video_url"]) # 保存视频信息至 "./videos/{download_video_title}/info.txt" Common.save_video_info(log_type=log_type, crawler=crawler, video_dict=video_dict) # 上传视频 Common.logger(log_type, crawler).info("开始上传视频...") our_video_id = Publish.upload_and_publish(log_type=log_type, crawler=crawler, strategy=strategy, our_uid="follow", env=env, oss_endpoint=oss_endpoint) if env == "dev": our_video_link = f"https://testadmin.piaoquantv.com/cms/post-detail/{our_video_id}/info" else: our_video_link = f"https://admin.piaoquantv.com/cms/post-detail/{our_video_id}/info" Common.logger(log_type, crawler).info("视频上传完成") if our_video_id is None: # 删除视频文件夹 shutil.rmtree(f"./{crawler}/videos/{video_dict['video_title']}") return # 视频信息保存数据库 rule_dict = { "duration": {"min": 40, "max": 100000000}, "play_cnt": {"min": 500} } insert_sql = f""" insert into crawler_video(video_id, out_user_id, platform, strategy, out_video_id, video_title, cover_url, video_url, duration, publish_time, play_cnt, crawler_rule, width, height) values({our_video_id}, "{video_dict['profile_id']}", "{cls.platform}", "定向爬虫策略", "{video_dict['video_id']}", "{video_dict['video_title']}", "{video_dict['cover_url']}", "{video_dict['video_url']}", {int(video_dict['duration'])}, "{video_dict['publish_time_str']}", {int(video_dict['play_cnt'])}, '{json.dumps(rule_dict)}', {int(video_dict['video_width'])}, {int(video_dict['video_height'])}) """ Common.logger(log_type, crawler).info(f"insert_sql:{insert_sql}") MysqlHelper.update_values(log_type, crawler, insert_sql, env, machine) Common.logger(log_type, crawler).info('视频信息插入数据库成功!') # 视频写入飞书 Feishu.insert_columns(log_type, crawler, "Wu0CeL", "ROWS", 1, 2) # 视频ID工作表,首行写入数据 upload_time = int(time.time()) values = [[time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(upload_time)), "用户主页", str(video_dict['video_id']), str(video_dict['video_title']), our_video_link, video_dict['play_cnt'], video_dict['comment_cnt'], video_dict['like_cnt'], video_dict['share_cnt'], video_dict['duration'], f"{video_dict['video_width']}*{video_dict['video_height']}", str(video_dict['publish_time_str']), str(video_dict['user_name']), str(video_dict['profile_id']), str(video_dict['profile_mid']), str(video_dict['avatar_url']), str(video_dict['cover_url']), str(video_dict['video_url'])]] time.sleep(1) Feishu.update_values(log_type, crawler, "Wu0CeL", "F2:Z2", values) Common.logger(log_type, crawler).info('视频信息写入飞书成功\n') except Exception as e: Common.logger(log_type, crawler).error("下载/上传异常:{}", e) # 获取所有关注列表的用户视频 @classmethod def get_follow_videos(cls, log_type, crawler, strategy, oss_endpoint, env, machine): try: # 已关注的用户列表 mids user_list = cls.get_users(log_type, crawler) for user in user_list: user_name = user['user_name'] profile_mid = user['profile_mid'] Common.logger(log_type, crawler).info(f"获取 {user_name} 主页视频") cls.get_videoList(log_type=log_type, crawler=crawler, strategy=strategy, p_mid=profile_mid, oss_endpoint=oss_endpoint, env=env, machine=machine) cls.next_t = None time.sleep(1) except Exception as e: Common.logger(log_type, crawler).error(f"get_follow_videos:{e}\n") if __name__ == "__main__": # print(XiaoniangaoFollow.repeat_video("follow", "xiaoniangao", "4919087666", "prod", "aliyun")) print(XiaoniangaoFollow.repeat_video("follow", "xiaoniangao", "4919087666", "dev", "local")) pass