# -*- coding: utf-8 -*- # @Author: wangkun # @Time: 2023/3/13 import json import os import random import shutil import sys import time from hashlib import md5 import requests import urllib3 sys.path.append(os.getcwd()) from common.common import Common from common.scheduling_db import MysqlHelper from common.publish import Publish from common.feishu import Feishu from common.public import get_config_from_mysql, download_rule proxies = {"http": None, "https": None} class XiaoniangaoAuthorScheduling: platform = "小年糕" @classmethod def repeat_video(cls, log_type, crawler, video_id, env): sql = f""" select * from crawler_video where platform="{cls.platform}" and out_video_id="{video_id}"; """ repeat_video = MysqlHelper.get_values(log_type, crawler, sql, env) return len(repeat_video) # 获取个人主页视频 @classmethod def get_videoList(cls, log_type, crawler, rule_dict, user_dict, env): next_t = None while True: url = "https://api.xiaoniangao.cn/profile/list_album" headers = { "X-Mid": '1fb47aa7a860d9', "X-Token-Id": '9f2cb91f9952c107ecb73642083e1dec-1145266232', "content-type": "application/json", "uuid": 'f40c2e7c-3cfb-4804-b513-608c0280268c', "Accept-Encoding": "gzip,compress,br,deflate", "User-Agent": "Mozilla/5.0 (iPhone; CPU iPhone OS 14_7_1 like Mac OS X)" " AppleWebKit/605.1.15 (KHTML, like Gecko) Mobile/15E148 " "MicroMessenger/8.0.20(0x18001435) NetType/WIFI Language/zh_CN", "Referer": 'https://servicewechat.com/wxd7911e4c177690e4/654/page-frame.html' } json_text = { "visited_mid": str(user_dict['link']), "start_t": next_t, "qs": "imageMogr2/gravity/center/rotate/$/thumbnail/!690x385r/crop/690x385/interlace/1/format/jpg", "h_qs": "imageMogr2/gravity/center/rotate/$/thumbnail/!120x120r/crop/120x120/interlace/1/format/jpg", "limit": 20, "token": '54e4c603f7bf3dc009c86b49ed91be36', "uid": 'f40c2e7c-3cfb-4804-b513-608c0280268c', "proj": "ma", "wx_ver": "8.0.23", "code_ver": "3.68.0", "log_common_params": { "e": [{ "data": { "page": "profilePage", "topic": "public" } }], "ext": { "brand": "iPhone", "device": "iPhone 11", "os": "iOS 14.7.1", "weixinver": "8.0.23", "srcver": "2.24.7", "net": "wifi", "scene": "1089" }, "pj": "1", "pf": "2", "session_id": "7468cf52-00ea-432e-8505-6ea3ad7ec164" } } urllib3.disable_warnings() r = requests.post(url=url, headers=headers, json=json_text, proxies=proxies, verify=False) if 'data' not in r.text or r.status_code != 200: Common.logger(log_type, crawler).info(f"get_videoList:{r.text}\n") return elif 'list' not in r.json()['data']: Common.logger(log_type, crawler).info(f"get_videoList:{r.json()}\n") return elif len(r.json()['data']['list']) == 0: Common.logger(log_type, crawler).info(f"没有更多数据啦~\n") return else: next_t = r.json()["data"]["next_t"] feeds = r.json()["data"]["list"] for i in range(len(feeds)): try: # 标题,表情随机加在片头、片尾,或替代句子中间的标点符号 xiaoniangao_title = feeds[i].get("title", "").strip().replace("\n", "") \ .replace("/", "").replace("\r", "").replace("#", "") \ .replace(".", "。").replace("\\", "").replace("&NBSP", "") \ .replace(":", "").replace("*", "").replace("?", "") \ .replace("?", "").replace('"', "").replace("<", "") \ .replace(">", "").replace("|", "").replace(" ", "") \ .replace('"', '').replace("'", '') # 随机取一个表情/符号 emoji = random.choice(get_config_from_mysql(log_type, crawler, env, "emoji")) # 生成最终标题,标题list[表情+title, title+表情]随机取一个 video_title = random.choice([f"{emoji}{xiaoniangao_title}", f"{xiaoniangao_title}{emoji}"]) # 发布时间 publish_time_stamp = int(int(feeds[i].get("t", 0)) / 1000) publish_time_str = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(publish_time_stamp)) # 用户名 / 头像 user_name = feeds[i].get("album_user", {}).get("nick", "").strip().replace("\n", "") \ .replace("/", "").replace("快手", "").replace(" ", "") \ .replace(" ", "").replace("&NBSP", "").replace("\r", "") video_dict = { "video_title": video_title, "video_id": feeds[i].get("vid", ""), "duration": int(feeds[i].get("du", 0) / 1000), "play_cnt": feeds[i].get("play_pv", 0), "like_cnt": feeds[i].get("favor", {}).get("total", 0), "comment_cnt": feeds[i].get("comment_count", 0), "share_cnt": feeds[i].get("share", 0), "user_name": user_name, "publish_time_stamp": publish_time_stamp, "publish_time_str": publish_time_str, "video_width": int(feeds[i].get("w", 0)), "video_height": int(feeds[i].get("h", 0)), "avatar_url": feeds[i].get("album_user", {}).get("hurl", ""), "profile_id": feeds[i]["id"], "profile_mid": feeds[i]["mid"], "cover_url": feeds[i].get("url", ""), "video_url": feeds[i].get("v_url", ""), "session": f"xiaoniangao-author-{int(time.time())}" } for k, v in video_dict.items(): Common.logger(log_type, crawler).info(f"{k}:{v}") if int(time.time()) - publish_time_stamp > 3600 * 24 * int(rule_dict.get('period', {}).get('max', 1000)): Common.logger(log_type, crawler).info(f"发布时间超过{int(rule_dict.get('period', {}).get('max', 1000))}天\n") return # 过滤无效视频 if video_title == "" or video_dict["video_id"] == "" or video_dict["video_url"] == "": Common.logger(log_type, crawler).info("无效视频\n") # 抓取基础规则过滤 elif download_rule(log_type=log_type, crawler=crawler, video_dict=video_dict, rule_dict=rule_dict) is False: Common.logger(log_type, crawler).info("不满足抓取规则\n") elif any(str(word) if str(word) in video_dict["video_title"] else False for word in get_config_from_mysql(log_type=log_type, source=crawler, env=env, text="filter", action="")) is True: Common.logger(log_type, crawler).info('已中过滤词\n') elif cls.repeat_video(log_type, crawler, video_dict['video_id'], env) != 0: Common.logger(log_type, crawler).info('视频已下载\n') else: cls.download_publish(log_type=log_type, crawler=crawler, video_dict=video_dict, rule_dict=rule_dict, user_dict=user_dict, env=env) except Exception as e: Common.logger(log_type, crawler).error(f"抓取单条视频异常:{e}\n") # 下载/上传 @classmethod def download_publish(cls, log_type, crawler, video_dict, rule_dict, user_dict, env): # 下载视频 Common.download_method(log_type=log_type, crawler=crawler, text="video", title=video_dict["video_title"], url=video_dict["video_url"]) md_title = md5(video_dict['video_title'].encode('utf8')).hexdigest() try: if os.path.getsize(f"./{crawler}/videos/{md_title}/video.mp4") == 0: # 删除视频文件夹 shutil.rmtree(f"./{crawler}/videos/{md_title}") Common.logger(log_type, crawler).info("视频size=0,删除成功\n") return except FileNotFoundError: # 删除视频文件夹 shutil.rmtree(f"./{crawler}/videos/{md_title}") Common.logger(log_type, crawler).info("视频文件不存在,删除文件夹成功\n") return # 下载封面 Common.download_method(log_type=log_type, crawler=crawler, text="cover", title=video_dict["video_title"], url=video_dict["cover_url"]) # 保存视频信息至 "./videos/{download_video_title}/info.txt" Common.save_video_info(log_type=log_type, crawler=crawler, video_dict=video_dict) # 上传视频 Common.logger(log_type, crawler).info("开始上传视频...") if env == "dev": oss_endpoint = "out" our_video_id = Publish.upload_and_publish(log_type=log_type, crawler=crawler, strategy="定向抓取策略", our_uid=user_dict["uid"], env=env, oss_endpoint=oss_endpoint) our_video_link = f"https://testadmin.piaoquantv.com/cms/post-detail/{our_video_id}/info" else: oss_endpoint = "inner" our_video_id = Publish.upload_and_publish(log_type=log_type, crawler=crawler, strategy="定向抓取策略", our_uid=user_dict["uid"], env=env, oss_endpoint=oss_endpoint) our_video_link = f"https://admin.piaoquantv.com/cms/post-detail/{our_video_id}/info" if our_video_id is None: try: # 删除视频文件夹 shutil.rmtree(f"./{crawler}/videos/{md_title}") return except FileNotFoundError: return insert_sql = f""" insert into crawler_video(video_id, out_user_id, platform, strategy, out_video_id, video_title, cover_url, video_url, duration, publish_time, play_cnt, crawler_rule, width, height) values({our_video_id}, "{video_dict['profile_id']}", "{cls.platform}", "定向抓取策略", "{video_dict['video_id']}", "{video_dict['video_title']}", "{video_dict['cover_url']}", "{video_dict['video_url']}", {int(video_dict['duration'])}, "{video_dict['publish_time_str']}", {int(video_dict['play_cnt'])}, '{json.dumps(rule_dict)}', {int(video_dict['video_width'])}, {int(video_dict['video_height'])}) """ Common.logger(log_type, crawler).info(f"insert_sql:{insert_sql}") MysqlHelper.update_values(log_type, crawler, insert_sql, env) Common.logger(log_type, crawler).info('视频信息些入数据库成功') # 视频写入飞书 Feishu.insert_columns(log_type, crawler, "Wu0CeL", "ROWS", 1, 2) # 视频ID工作表,首行写入数据 upload_time = int(time.time()) values = [[time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(upload_time)), "用户主页", str(video_dict['video_id']), str(video_dict['video_title']), our_video_link, video_dict['play_cnt'], video_dict['comment_cnt'], video_dict['like_cnt'], video_dict['share_cnt'], video_dict['duration'], f"{video_dict['video_width']}*{video_dict['video_height']}", str(video_dict['publish_time_str']), str(video_dict['user_name']), str(video_dict['profile_id']), str(video_dict['profile_mid']), str(video_dict['avatar_url']), str(video_dict['cover_url']), str(video_dict['video_url'])]] time.sleep(1) Feishu.update_values(log_type, crawler, "Wu0CeL", "F2:Z2", values) Common.logger(log_type, crawler).info('视频信息写入飞书成功\n') # 获取所有关注列表的用户视频 @classmethod def get_author_videos(cls, log_type, crawler, user_list, rule_dict, env): for user_dict in user_list: # try: Common.logger(log_type, crawler).info(f"获取 {user_dict['nick_name']} 主页视频") cls.get_videoList(log_type=log_type, crawler=crawler, rule_dict=rule_dict, user_dict=user_dict, env=env) # except Exception as e: # Common.logger(log_type, crawler).error(f"抓取{user_dict['nick_name']}主页时异常:{e}\n") if __name__ == "__main__": # print(XiaoniangaoAuthorScheduling.repeat_video("follow", "xiaoniangao", "4919087666", "prod", "aliyun")) # print(XiaoniangaoAuthorScheduling.repeat_video("follow", "xiaoniangao", "4919087666", "dev")) # XiaoniangaoAuthorScheduling.get_users() pass