# -*- coding: utf-8 -*- # @Author: wangkun # @Time: 2023/3/16 import json import os import random import shutil import sys import time import requests import urllib3 sys.path.append(os.getcwd()) from common.common import Common from common.feishu import Feishu from common.publish import Publish from common.scheduling_db import MysqlHelper from common.public import get_config_from_mysql proxies = {"http": None, "https": None} class XiaoniangaoPlayScheduling: platform = "小年糕" words = "abcdefghijklmnopqrstuvwxyz0123456789" uid = f"""{"".join(random.sample(words, 8))}-{"".join(random.sample(words, 4))}-{"".join(random.sample(words, 4))}-{"".join(random.sample(words, 4))}-{"".join(random.sample(words, 12))}""" token = "".join(random.sample(words, 32)) uid_token_dict = { "uid": uid, "token": token } # 生成 uid、token @classmethod def get_uid_token(cls): words = "abcdefghijklmnopqrstuvwxyz0123456789" uid = f"""{"".join(random.sample(words, 8))}-{"".join(random.sample(words, 4))}-{"".join(random.sample(words, 4))}-{"".join(random.sample(words, 4))}-{"".join(random.sample(words, 12))}""" token = "".join(random.sample(words, 32)) uid_token_dict = { "uid": uid, "token": token } return uid_token_dict # 基础门槛规则 @staticmethod def download_rule(log_type, crawler, video_dict, rule_dict): """ 下载视频的基本规则 :param log_type: 日志 :param crawler: 哪款爬虫 :param video_dict: 视频信息,字典格式 :param rule_dict: 规则信息,字典格式 :return: 满足规则,返回 True;反之,返回 False """ rule_play_cnt_min = rule_dict.get('play_cnt', {}).get('min', 0) rule_play_cnt_max = rule_dict.get('play_cnt', {}).get('max', 100000000) if rule_play_cnt_max == 0: rule_play_cnt_max = 100000000 rule_duration_min = rule_dict.get('duration', {}).get('min', 0) rule_duration_max = rule_dict.get('duration', {}).get('max', 100000000) if rule_duration_max == 0: rule_duration_max = 100000000 rule_period_min = rule_dict.get('period', {}).get('min', 0) # rule_period_max = rule_dict.get('period', {}).get('max', 100000000) # if rule_period_max == 0: # rule_period_max = 100000000 rule_fans_cnt_min = rule_dict.get('fans_cnt', {}).get('min', 0) rule_fans_cnt_max = rule_dict.get('fans_cnt', {}).get('max', 100000000) if rule_fans_cnt_max == 0: rule_fans_cnt_max = 100000000 rule_videos_cnt_min = rule_dict.get('videos_cnt', {}).get('min', 0) rule_videos_cnt_max = rule_dict.get('videos_cnt', {}).get('max', 100000000) if rule_videos_cnt_max == 0: rule_videos_cnt_max = 100000000 rule_like_cnt_min = rule_dict.get('like_cnt', {}).get('min', 0) rule_like_cnt_max = rule_dict.get('like_cnt', {}).get('max', 100000000) if rule_like_cnt_max == 0: rule_like_cnt_max = 100000000 rule_width_min = rule_dict.get('width', {}).get('min', 0) rule_width_max = rule_dict.get('width', {}).get('max', 100000000) if rule_width_max == 0: rule_width_max = 100000000 rule_height_min = rule_dict.get('height', {}).get('min', 0) rule_height_max = rule_dict.get('height', {}).get('max', 100000000) if rule_height_max == 0: rule_height_max = 100000000 rule_share_cnt_min = rule_dict.get('share_cnt', {}).get('min', 0) rule_share_cnt_max = rule_dict.get('share_cnt', {}).get('max', 100000000) if rule_share_cnt_max == 0: rule_share_cnt_max = 100000000 rule_comment_cnt_min = rule_dict.get('comment_cnt', {}).get('min', 0) rule_comment_cnt_max = rule_dict.get('comment_cnt', {}).get('max', 100000000) if rule_comment_cnt_max == 0: rule_comment_cnt_max = 100000000 rule_publish_time_min = rule_dict.get('publish_time', {}).get('min', 0) rule_publish_time_max = rule_dict.get('publish_time', {}).get('max', 0) if rule_publish_time_max == 0: rule_publish_time_max = 4102415999000 # 2099-12-31 23:59:59 Common.logger(log_type, crawler).info( f'rule_duration_max:{rule_duration_max} >= duration:{int(float(video_dict["duration"]))} >= rule_duration_min:{int(rule_duration_min)}') Common.logger(log_type, crawler).info( f'rule_play_cnt_max:{int(rule_play_cnt_max)} >= play_cnt:{int(video_dict["play_cnt"])} >= rule_play_cnt_min:{int(rule_play_cnt_min)}') Common.logger(log_type, crawler).info( f'now:{int(time.time())} - publish_time_stamp:{int(video_dict["publish_time_stamp"])} <= {3600 * 24 * int(rule_period_min)}') Common.logger(log_type, crawler).info( f'rule_like_cnt_max:{int(rule_like_cnt_max)} >= like_cnt:{int(video_dict["like_cnt"])} >= rule_like_cnt_min:{int(rule_like_cnt_min)}') Common.logger(log_type, crawler).info( f'rule_comment_cnt_max:{int(rule_comment_cnt_max)} >= comment_cnt:{int(video_dict["comment_cnt"])} >= rule_comment_cnt_min:{int(rule_comment_cnt_min)}') Common.logger(log_type, crawler).info( f'rule_share_cnt_max:{int(rule_share_cnt_max)} >= share_cnt:{int(video_dict["share_cnt"])} >= rule_share_cnt_min:{int(rule_share_cnt_min)}') Common.logger(log_type, crawler).info( f'rule_width_max:{int(rule_width_max)} >= video_width:{int(video_dict["video_width"])} >= rule_width_min:{int(rule_width_min)}') Common.logger(log_type, crawler).info( f'rule_height_max:{int(rule_height_max)} >= video_height:{int(video_dict["video_height"])} >= rule_height_min:{int(rule_height_min)}') Common.logger(log_type, crawler).info( f'rule_publish_time_max:{int(rule_publish_time_max)} >= publish_time_stamp:{int(video_dict["publish_time_stamp"])} >= rule_publish_time_min:{int(rule_publish_time_min)}') if int(rule_duration_max) >= int(float(video_dict["duration"])) >= int(rule_duration_min) \ and int(rule_play_cnt_max) >= int(video_dict['play_cnt']) >= int(rule_play_cnt_min) \ and int(time.time()) - int(video_dict["publish_time_stamp"]) <= 3600 * 24 * int(rule_period_min) \ and int(rule_like_cnt_max) >= int(video_dict['like_cnt']) >= int(rule_like_cnt_min) \ and int(rule_comment_cnt_max) >= int(video_dict['comment_cnt']) >= int(rule_comment_cnt_min) \ and int(rule_share_cnt_max) >= int(video_dict['share_cnt']) >= int(rule_share_cnt_min) \ and int(rule_width_max) >= int(video_dict['video_width']) >= int(rule_width_min) \ and int(rule_height_max) >= int(video_dict['video_height']) >= int(rule_height_min)\ and int(rule_publish_time_max) >= int(video_dict['publish_time_stamp']) >= int(rule_publish_time_min): return True else: return False # 获取列表 @classmethod def get_videoList(cls, log_type, crawler, rule_dict, strategy, oss_endpoint, env): uid_token_dict = cls.uid_token_dict url = "https://kapi.xiaoniangao.cn/trends/get_recommend_trends" headers = { "x-b3-traceid": '1dc0a6d0929a2b', "X-Token-Id": 'ae99a4953804085ebb0ae36fa138031d-1146052582', "uid": uid_token_dict['uid'], "content-type": "application/json", "Accept-Encoding": "gzip,compress,br,deflate", "User-Agent": 'Mozilla/5.0 (iPhone; CPU iPhone OS 14_7_1 like Mac OS X)' ' AppleWebKit/605.1.15 (KHTML, like Gecko) Mobile/15E148 ' 'MicroMessenger/8.0.20(0x18001432) NetType/WIFI Language/zh_CN', "Referer": 'https://servicewechat.com/wxd7911e4c177690e4/620/page-frame.html' } data = { "log_params": { "page": "discover_rec", "common": { "brand": "iPhone", "device": "iPhone 11", "os": "iOS 14.7.1", "weixinver": "8.0.20", "srcver": "2.24.2", "net": "wifi", "scene": 1089 } }, "qs": "imageMogr2/gravity/center/rotate/$/thumbnail/!750x500r/crop/750x500/interlace/1/format/jpg", "h_qs": "imageMogr2/gravity/center/rotate/$/thumbnail/!80x80r/crop/80x80/interlace/1/format/jpg", "share_width": 625, "share_height": 500, "ext": { "fmid": 0, "items": {} }, "app": "xng", "rec_scene": "discover_rec", "log_common_params": { "e": [{ "data": { "page": "discoverIndexPage", "topic": "recommend" }, "ab": {} }], "ext": { "brand": "iPhone", "device": "iPhone 11", "os": "iOS 14.7.1", "weixinver": "8.0.20", "srcver": "2.24.3", "net": "wifi", "scene": "1089" }, "pj": "1", "pf": "2", "session_id": "7bcce313-b57d-4305-8d14-6ebd9a1bad29" }, "refresh": False, # "token": cls.play_token, "token": uid_token_dict['token'], # "uid": cls.play_uid, "uid": uid_token_dict['uid'], "proj": "ma", "wx_ver": "8.0.20", "code_ver": "3.62.0" } urllib3.disable_warnings() r = requests.post(url=url, headers=headers, json=data, proxies=proxies, verify=False) if "data" not in r.text or r.status_code != 200: Common.logger(log_type, crawler).warning(f"get_videoList:{r.text}") return elif "data" not in r.json(): Common.logger(log_type, crawler).info(f"get_videoList:{r.json()}") return elif "list" not in r.json()["data"]: Common.logger(log_type, crawler).warning(f"get_videoList:{r.json()['data']}") return elif len(r.json()["data"]["list"]) == 0: Common.logger(log_type, crawler).warning(f"get_videoList:{r.json()['data']['list']}") return else: # 视频列表数据 feeds = r.json()["data"]["list"] for i in range(len(feeds)): # 标题,表情随机加在片头、片尾,或替代句子中间的标点符号 xiaoniangao_title = feeds[i].get("title", "").strip().replace("\n", "") \ .replace("/", "").replace("\r", "").replace("#", "") \ .replace(".", "。").replace("\\", "").replace("&NBSP", "") \ .replace(":", "").replace("*", "").replace("?", "") \ .replace("?", "").replace('"', "").replace("<", "") \ .replace(">", "").replace("|", "").replace(" ", "") \ .replace('"', '').replace("'", '') # 随机取一个表情/符号 emoji = random.choice(get_config_from_mysql(log_type, crawler, env, "emoji")) # 生成最终标题,标题list[表情+title, title+表情]随机取一个 video_title = random.choice([f"{emoji}{xiaoniangao_title}", f"{xiaoniangao_title}{emoji}"]) # 视频 ID video_id = feeds[i].get("vid", "") # 播放量 play_cnt = feeds[i].get("play_pv", 0) # 点赞量 like_cnt = feeds[i].get("favor", {}).get("total", 0) # 评论数 comment_cnt = feeds[i].get("comment_count", 0) # 分享量 share_cnt = feeds[i].get("share", 0) # 时长 duration = int(feeds[i].get("du", 0) / 1000) # 宽和高 video_width = int(feeds[i].get("w", 0)) video_height = int(feeds[i].get("h", 0)) # 发布时间 publish_time_stamp = int(int(feeds[i].get("t", 0)) / 1000) publish_time_str = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(publish_time_stamp)) # 用户名 / 头像 user_name = feeds[i].get("user", {}).get("nick", "").strip().replace("\n", "") \ .replace("/", "").replace("快手", "").replace(" ", "") \ .replace(" ", "").replace("&NBSP", "").replace("\r", "") avatar_url = feeds[i].get("user", {}).get("hurl", "") # 用户 ID profile_id = feeds[i]["id"] # 用户 mid profile_mid = feeds[i]["user"]["mid"] # 视频封面 cover_url = feeds[i].get("url", "") # 视频播放地址 video_url = feeds[i].get("v_url", "") video_dict = { "video_title": video_title, "video_id": video_id, "duration": duration, "play_cnt": play_cnt, "like_cnt": like_cnt, "comment_cnt": comment_cnt, "share_cnt": share_cnt, "user_name": user_name, "publish_time_stamp": publish_time_stamp, "publish_time_str": publish_time_str, "video_width": video_width, "video_height": video_height, "avatar_url": avatar_url, "profile_id": profile_id, "profile_mid": profile_mid, "cover_url": cover_url, "video_url": video_url, "session": f"xiaoniangao-play-{int(time.time())}" } for k, v in video_dict.items(): Common.logger(log_type, crawler).info(f"{k}:{v}") # 过滤无效视频 if video_title == "" or video_id == "" or video_url == "": Common.logger(log_type, crawler).warning("无效视频\n") # 抓取基础规则过滤 elif cls.download_rule(log_type, crawler, video_dict, rule_dict) is False: Common.logger(log_type, crawler).info("不满足抓取规则\n") elif cls.repeat_video(log_type, crawler, video_dict['video_id'], env) != 0: Common.logger(log_type, crawler).info('视频已下载\n') # 过滤敏感词 elif any(str(word) if str(word) in video_title else False for word in get_config_from_mysql(log_type, crawler, env, "filter", action="")) is True: Common.logger(log_type, crawler).info("视频已中过滤词\n") else: cls.download_publish(log_type=log_type, crawler=crawler, video_dict=video_dict, rule_dict=rule_dict, strategy=strategy, oss_endpoint=oss_endpoint, env=env) @classmethod def repeat_video(cls, log_type, crawler, video_id, env): sql = f""" select * from crawler_video where platform="小年糕" and out_video_id="{video_id}"; """ repeat_video = MysqlHelper.get_values(log_type, crawler, sql, env) return len(repeat_video) @classmethod def download_publish(cls, log_type, crawler, video_dict, rule_dict, strategy, oss_endpoint, env): # 下载封面 Common.download_method(log_type=log_type, crawler=crawler, text="cover", title=video_dict["video_title"], url=video_dict["cover_url"]) # 下载视频 Common.download_method(log_type=log_type, crawler=crawler, text="video", title=video_dict["video_title"], url=video_dict["video_url"]) # 保存视频信息至 "./videos/{download_video_title}/info.txt" Common.save_video_info(log_type=log_type, crawler=crawler, video_dict=video_dict) # 上传视频 Common.logger(log_type, crawler).info("开始上传视频...") our_video_id = Publish.upload_and_publish(log_type=log_type, crawler=crawler, strategy=strategy, our_uid="play", env=env, oss_endpoint=oss_endpoint) if env == "dev": our_video_link = f"https://testadmin.piaoquantv.com/cms/post-detail/{our_video_id}/info" else: our_video_link = f"https://admin.piaoquantv.com/cms/post-detail/{our_video_id}/info" Common.logger(log_type, crawler).info("视频上传完成") if our_video_id is None: # 删除视频文件夹 shutil.rmtree(f"./{crawler}/videos/{video_dict['video_title']}") return insert_sql = f""" insert into crawler_video(video_id, out_user_id, platform, strategy, out_video_id, video_title, cover_url, video_url, duration, publish_time, play_cnt, crawler_rule, width, height) values({our_video_id}, "{video_dict['profile_id']}", "{cls.platform}", "播放量榜爬虫策略", "{video_dict['video_id']}", "{video_dict['video_title']}", "{video_dict['cover_url']}", "{video_dict['video_url']}", {int(video_dict['duration'])}, "{video_dict['publish_time_str']}", {int(video_dict['play_cnt'])}, '{json.dumps(rule_dict)}', {int(video_dict['video_width'])}, {int(video_dict['video_height'])}) """ Common.logger(log_type, crawler).info(f"insert_sql:{insert_sql}") MysqlHelper.update_values(log_type, crawler, insert_sql, env) Common.logger(log_type, crawler).info('视频信息插入数据库成功!') # 视频写入飞书 Feishu.insert_columns(log_type, crawler, "c85k1C", "ROWS", 1, 2) # 视频ID工作表,首行写入数据 upload_time = int(time.time()) values = [[time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(upload_time)), "播放量榜爬虫策略", str(video_dict['video_id']), str(video_dict['video_title']), our_video_link, video_dict['play_cnt'], video_dict['comment_cnt'], video_dict['like_cnt'], video_dict['share_cnt'], video_dict['duration'], f"{video_dict['video_width']}*{video_dict['video_height']}", str(video_dict['publish_time_str']), str(video_dict['user_name']), str(video_dict['profile_id']), str(video_dict['profile_mid']), str(video_dict['avatar_url']), str(video_dict['cover_url']), str(video_dict['video_url'])]] time.sleep(1) Feishu.update_values(log_type, crawler, "c85k1C", "F2:Z2", values) Common.logger(log_type, crawler).info('视频信息写入飞书成功\n') if __name__ == '__main__': pass