# -*- coding: utf-8 -*- # @Author: wangkun # @Time: 2023/3/13 import json import os import random import shutil import sys import time import requests import urllib3 sys.path.append(os.getcwd()) from common.common import Common from common.scheduling_db import MysqlHelper from common.publish import Publish from common.feishu import Feishu from common.public import get_config_from_mysql proxies = {"http": None, "https": None} class XiaoniangaoAuthorScheduling: platform = "小年糕" # 小程序个人主页视频列表翻页参数 next_t = None # 基础门槛规则 @staticmethod def download_rule(log_type, crawler, video_dict, rule_dict): """ 下载视频的基本规则 :param log_type: 日志 :param crawler: 哪款爬虫 :param video_dict: 视频信息,字典格式 :param rule_dict: 规则信息,字典格式 :return: 满足规则,返回 True;反之,返回 False """ rule_playCnt_min = rule_dict.get('playCnt', {}).get('min', 0) rule_playCnt_max = rule_dict.get('playCnt', {}).get('max', 100000000) if rule_playCnt_max == 0: rule_playCnt_max = 100000000 rule_duration_min = rule_dict.get('duration', {}).get('min', 0) rule_duration_max = rule_dict.get('duration', {}).get('max', 100000000) if rule_duration_max == 0: rule_duration_max = 100000000 rule_period_min = rule_dict.get('period', {}).get('min', 0) # rule_period_max = rule_dict.get('period', {}).get('max', 100000000) # if rule_period_max == 0: # rule_period_max = 100000000 # # rule_fans_min = rule_dict.get('fans', {}).get('min', 0) # rule_fans_max = rule_dict.get('fans', {}).get('max', 100000000) # if rule_fans_max == 0: # rule_fans_max = 100000000 # # rule_videos_min = rule_dict.get('videos', {}).get('min', 0) # rule_videos_max = rule_dict.get('videos', {}).get('max', 100000000) # if rule_videos_max == 0: # rule_videos_max = 100000000 rule_like_min = rule_dict.get('like', {}).get('min', 0) rule_like_max = rule_dict.get('like', {}).get('max', 100000000) if rule_like_max == 0: rule_like_max = 100000000 rule_videoWidth_min = rule_dict.get('videoWidth', {}).get('min', 0) rule_videoWidth_max = rule_dict.get('videoWidth', {}).get('max', 100000000) if rule_videoWidth_max == 0: rule_videoWidth_max = 100000000 rule_videoHeight_min = rule_dict.get('videoHeight', {}).get('min', 0) rule_videoHeight_max = rule_dict.get('videoHeight', {}).get('max', 100000000) if rule_videoHeight_max == 0: rule_videoHeight_max = 100000000 rule_shareCnt_min = rule_dict.get('shareCnt', {}).get('min', 0) rule_shareCnt_max = rule_dict.get('shareCnt', {}).get('max', 100000000) if rule_shareCnt_max == 0: rule_shareCnt_max = 100000000 rule_commentCnt_min = rule_dict.get('commentCnt', {}).get('min', 0) rule_commentCnt_max = rule_dict.get('commentCnt', {}).get('max', 100000000) if rule_commentCnt_max == 0: rule_commentCnt_max = 100000000 Common.logger(log_type, crawler).info(f'rule_duration_max:{rule_duration_max} >= duration:{int(float(video_dict["duration"]))} >= rule_duration_min:{int(rule_duration_min)}') Common.logger(log_type, crawler).info(f'rule_playCnt_max:{int(rule_playCnt_max)} >= play_cnt:{int(video_dict["play_cnt"])} >= rule_playCnt_min:{int(rule_playCnt_min)}') Common.logger(log_type, crawler).info(f'now:{int(time.time())} - publish_time_stamp:{int(video_dict["publish_time_stamp"])} <= {3600 * 24 * int(rule_period_min)}') Common.logger(log_type, crawler).info(f'rule_like_max:{int(rule_like_max)} >= like_cnt:{int(video_dict["like_cnt"])} >= rule_like_min:{int(rule_like_min)}') Common.logger(log_type, crawler).info(f'rule_commentCnt_max:{int(rule_commentCnt_max)} >= comment_cnt:{int(video_dict["comment_cnt"])} >= rule_commentCnt_min:{int(rule_commentCnt_min)}') Common.logger(log_type, crawler).info(f'rule_shareCnt_max:{int(rule_shareCnt_max)} >= share_cnt:{int(video_dict["share_cnt"])} >= rule_shareCnt_min:{int(rule_shareCnt_min)}') Common.logger(log_type, crawler).info(f'rule_videoWidth_max:{int(rule_videoWidth_max)} >= video_width:{int(video_dict["video_width"])} >= rule_videoWidth_min:{int(rule_videoWidth_min)}') Common.logger(log_type, crawler).info(f'rule_videoHeight_max:{int(rule_videoHeight_max)} >= video_height:{int(video_dict["video_height"])} >= rule_videoHeight_min:{int(rule_videoHeight_min)}') if int(rule_duration_max) >= int(float(video_dict["duration"])) >= int(rule_duration_min) \ and int(rule_playCnt_max) >= int(video_dict['play_cnt']) >= int(rule_playCnt_min) \ and int(time.time()) - int(video_dict["publish_time_stamp"]) <= 3600 * 24 * int(rule_period_min) \ and int(rule_like_max) >= int(video_dict['like_cnt']) >= int(rule_like_min) \ and int(rule_commentCnt_max) >= int(video_dict['comment_cnt']) >= int(rule_commentCnt_min) \ and int(rule_shareCnt_max) >= int(video_dict['share_cnt']) >= int(rule_shareCnt_min) \ and int(rule_videoWidth_max) >= int(video_dict['video_width']) >= int(rule_videoWidth_min) \ and int(rule_videoHeight_max) >= int(video_dict['video_height']) >= int(rule_videoHeight_min): return True else: return False @classmethod def repeat_video(cls, log_type, crawler, video_id, env): sql = f""" select * from crawler_video where platform="小年糕" and out_video_id="{video_id}"; """ repeat_video = MysqlHelper.get_values(log_type, crawler, sql, env) return len(repeat_video) # 获取个人主页视频 @classmethod def get_videoList(cls, log_type, crawler, strategy, p_mid, uid, rule_dict, oss_endpoint, env): while True: url = "https://api.xiaoniangao.cn/profile/list_album" headers = { "X-Mid": '1fb47aa7a860d9', "X-Token-Id": '9f2cb91f9952c107ecb73642083e1dec-1145266232', "content-type": "application/json", "uuid": 'f40c2e7c-3cfb-4804-b513-608c0280268c', "Accept-Encoding": "gzip,compress,br,deflate", "User-Agent": "Mozilla/5.0 (iPhone; CPU iPhone OS 14_7_1 like Mac OS X)" " AppleWebKit/605.1.15 (KHTML, like Gecko) Mobile/15E148 " "MicroMessenger/8.0.20(0x18001435) NetType/WIFI Language/zh_CN", "Referer": 'https://servicewechat.com/wxd7911e4c177690e4/654/page-frame.html' } json_text = { "visited_mid": str(p_mid), "start_t": cls.next_t, "qs": "imageMogr2/gravity/center/rotate/$/thumbnail/!690x385r/crop/690x385/interlace/1/format/jpg", "h_qs": "imageMogr2/gravity/center/rotate/$/thumbnail/!120x120r/crop/120x120/interlace/1/format/jpg", "limit": 20, "token": '54e4c603f7bf3dc009c86b49ed91be36', "uid": 'f40c2e7c-3cfb-4804-b513-608c0280268c', "proj": "ma", "wx_ver": "8.0.23", "code_ver": "3.68.0", "log_common_params": { "e": [{ "data": { "page": "profilePage", "topic": "public" } }], "ext": { "brand": "iPhone", "device": "iPhone 11", "os": "iOS 14.7.1", "weixinver": "8.0.23", "srcver": "2.24.7", "net": "wifi", "scene": "1089" }, "pj": "1", "pf": "2", "session_id": "7468cf52-00ea-432e-8505-6ea3ad7ec164" } } urllib3.disable_warnings() r = requests.post(url=url, headers=headers, json=json_text, proxies=proxies, verify=False) if 'data' not in r.text or r.status_code != 200: Common.logger(log_type, crawler).info(f"get_videoList:{r.text}\n") cls.next_t = None return elif 'list' not in r.json()['data']: Common.logger(log_type, crawler).info(f"get_videoList:{r.json()}\n") cls.next_t = None return elif len(r.json()['data']['list']) == 0: Common.logger(log_type, crawler).info(f"没有更多数据啦~\n") cls.next_t = None return else: cls.next_t = r.json()["data"]["next_t"] feeds = r.json()["data"]["list"] for i in range(len(feeds)): # 标题,表情随机加在片头、片尾,或替代句子中间的标点符号 xiaoniangao_title = feeds[i].get("title", "").strip().replace("\n", "") \ .replace("/", "").replace("\r", "").replace("#", "") \ .replace(".", "。").replace("\\", "").replace("&NBSP", "") \ .replace(":", "").replace("*", "").replace("?", "") \ .replace("?", "").replace('"', "").replace("<", "") \ .replace(">", "").replace("|", "").replace(" ", "") \ .replace('"', '').replace("'", '') # 随机取一个表情/符号 emoji = random.choice(get_config_from_mysql(log_type, crawler, env, "emoji")) # 生成最终标题,标题list[表情+title, title+表情]随机取一个 video_title = random.choice([f"{emoji}{xiaoniangao_title}", f"{xiaoniangao_title}{emoji}"]) # 视频 ID video_id = feeds[i].get("vid", "") # 播放量 play_cnt = feeds[i].get("play_pv", 0) # 点赞量 like_cnt = feeds[i].get("favor", {}).get("total", 0) # 评论数 comment_cnt = feeds[i].get("comment_count", 0) # 分享量 share_cnt = feeds[i].get("share", 0) # 时长 duration = int(feeds[i].get("du", 0) / 1000) # 宽和高 video_width = int(feeds[i].get("w", 0)) video_height = int(feeds[i].get("h", 0)) # 发布时间 publish_time_stamp = int(int(feeds[i].get("t", 0)) / 1000) publish_time_str = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(publish_time_stamp)) # 用户名 / 头像 user_name = feeds[i].get("album_user", {}).get("nick", "").strip().replace("\n", "") \ .replace("/", "").replace("快手", "").replace(" ", "") \ .replace(" ", "").replace("&NBSP", "").replace("\r", "") avatar_url = feeds[i].get("album_user", {}).get("hurl", "") # 用户 ID profile_id = feeds[i]["id"] # 用户 mid profile_mid = feeds[i]["mid"] # 视频封面 cover_url = feeds[i].get("url", "") # 视频播放地址 video_url = feeds[i].get("v_url", "") video_dict = { "video_id": video_id, "video_title": video_title, "duration": duration, "play_cnt": play_cnt, "like_cnt": like_cnt, "comment_cnt": comment_cnt, "share_cnt": share_cnt, "user_name": user_name, "publish_time_stamp": publish_time_stamp, "publish_time_str": publish_time_str, "video_width": video_width, "video_height": video_height, "avatar_url": avatar_url, "profile_id": profile_id, "profile_mid": profile_mid, "cover_url": cover_url, "video_url": video_url, "session": f"xiaoniangao-author-{int(time.time())}" } for k, v in video_dict.items(): Common.logger(log_type, crawler).info(f"{k}:{v}") if int(time.time()) - publish_time_stamp > 3600 * 24 * int(rule_dict.get('period', {}).get('min', 0)): Common.logger(log_type, crawler).info(f"发布时间超过3天:{publish_time_str}\n") cls.next_t = None return # 过滤无效视频 if video_title == "" or video_id == "" or video_url == "": Common.logger(log_type, crawler).info("无效视频\n") # 抓取基础规则过滤 elif cls.download_rule(log_type, crawler, video_dict, rule_dict) is False: Common.logger(log_type, crawler).info("不满足抓取规则\n") elif cls.repeat_video(log_type, crawler, video_dict['video_id'], env) != 0: Common.logger(log_type, crawler).info('视频已下载\n') # 过滤词 elif any(str(word) if str(word) in video_title else False for word in get_config_from_mysql(log_type, crawler, env, "filter", action="")) is True: Common.logger(log_type, crawler).info("视频已中过滤词\n") else: cls.download_publish(log_type=log_type, crawler=crawler, strategy=strategy, video_dict=video_dict, rule_dict=rule_dict, uid=uid, oss_endpoint=oss_endpoint, env=env) # 下载/上传 @classmethod def download_publish(cls, log_type, crawler, strategy, video_dict, rule_dict, uid, oss_endpoint, env): # 下载封面 Common.download_method(log_type=log_type, crawler=crawler, text="cover", title=video_dict["video_title"], url=video_dict["cover_url"]) # 下载视频 Common.download_method(log_type=log_type, crawler=crawler, text="video", title=video_dict["video_title"], url=video_dict["video_url"]) # 保存视频信息至 "./videos/{download_video_title}/info.txt" Common.save_video_info(log_type=log_type, crawler=crawler, video_dict=video_dict) # 上传视频 Common.logger(log_type, crawler).info("开始上传视频...") our_video_id = Publish.upload_and_publish(log_type=log_type, crawler=crawler, strategy=strategy, our_uid=uid, env=env, oss_endpoint=oss_endpoint) if env == "dev": our_video_link = f"https://testadmin.piaoquantv.com/cms/post-detail/{our_video_id}/info" else: our_video_link = f"https://admin.piaoquantv.com/cms/post-detail/{our_video_id}/info" Common.logger(log_type, crawler).info("视频上传完成") if our_video_id is None: # 删除视频文件夹 shutil.rmtree(f"./{crawler}/videos/{video_dict['video_title']}") return insert_sql = f""" insert into crawler_video(video_id, out_user_id, platform, strategy, out_video_id, video_title, cover_url, video_url, duration, publish_time, play_cnt, crawler_rule, width, height) values({our_video_id}, "{video_dict['profile_id']}", "{cls.platform}", "定向爬虫策略", "{video_dict['video_id']}", "{video_dict['video_title']}", "{video_dict['cover_url']}", "{video_dict['video_url']}", {int(video_dict['duration'])}, "{video_dict['publish_time_str']}", {int(video_dict['play_cnt'])}, '{json.dumps(rule_dict)}', {int(video_dict['video_width'])}, {int(video_dict['video_height'])}) """ Common.logger(log_type, crawler).info(f"insert_sql:{insert_sql}") MysqlHelper.update_values(log_type, crawler, insert_sql, env) Common.logger(log_type, crawler).info('视频信息插入数据库成功!') # 视频写入飞书 Feishu.insert_columns(log_type, crawler, "Wu0CeL", "ROWS", 1, 2) # 视频ID工作表,首行写入数据 upload_time = int(time.time()) values = [[time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(upload_time)), "用户主页", str(video_dict['video_id']), str(video_dict['video_title']), our_video_link, video_dict['play_cnt'], video_dict['comment_cnt'], video_dict['like_cnt'], video_dict['share_cnt'], video_dict['duration'], f"{video_dict['video_width']}*{video_dict['video_height']}", str(video_dict['publish_time_str']), str(video_dict['user_name']), str(video_dict['profile_id']), str(video_dict['profile_mid']), str(video_dict['avatar_url']), str(video_dict['cover_url']), str(video_dict['video_url'])]] time.sleep(1) Feishu.update_values(log_type, crawler, "Wu0CeL", "F2:Z2", values) Common.logger(log_type, crawler).info('视频信息写入飞书成功\n') # 获取所有关注列表的用户视频 @classmethod def get_follow_videos(cls, log_type, crawler, user_list, rule_dict, strategy, oss_endpoint, env): if len(user_list) == 0: Common.logger(log_type, crawler).warning(f"抓取用户列表为空\n") return for user in user_list: # Common.logger(log_type, crawler).info(f"user:{user}") try: user_name = user['nick_name'] profile_mid = user['link'] uid = user['uid'] Common.logger(log_type, crawler).info(f"获取 {user_name} 主页视频") cls.get_videoList(log_type=log_type, crawler=crawler, strategy=strategy, p_mid=profile_mid, rule_dict=rule_dict, uid=uid, oss_endpoint=oss_endpoint, env=env) cls.next_t = None time.sleep(1) except Exception as e: Common.logger(log_type, crawler).error(f"get_follow_videos:{e}\n") if __name__ == "__main__": # print(XiaoniangaoAuthorScheduling.repeat_video("follow", "xiaoniangao", "4919087666", "prod", "aliyun")) # print(XiaoniangaoAuthorScheduling.repeat_video("follow", "xiaoniangao", "4919087666", "dev")) # XiaoniangaoAuthorScheduling.get_users() pass