# -*- coding: utf-8 -*- # @Author: wangkun # @Time: 2023/3/13 import json import os import random import shutil import sys import time from hashlib import md5 import requests import urllib3 sys.path.append(os.getcwd()) from common.common import Common from common.scheduling_db import MysqlHelper from common.publish import Publish from common.feishu import Feishu from common.public import get_config_from_mysql, download_rule proxies = {"http": None, "https": None} class XiaoniangaoAuthorScheduling: platform = "小年糕" @classmethod def repeat_video(cls, log_type, crawler, video_id, env): sql = f""" select * from crawler_video where platform="{cls.platform}" and out_video_id="{video_id}"; """ repeat_video = MysqlHelper.get_values(log_type, crawler, sql, env) return len(repeat_video) # 获取个人主页视频 @classmethod def get_videoList(cls, log_type, crawler, rule_dict, user_dict, env): next_t = -1 while True: url = "https://kapi-xng-app.xiaoniangao.cn/v1/album/user_public" payload = "{\"share_width\":300,\"log\":{\"brand\":\"iPad\",\"net\":\"wifi\",\"resolution\":\"750*1334\",\"uid\":\"2F310D09-5E32-5985-8644-3BCB6920E76F\",\"app_version\":\"1.22.5\",\"channel\":\"ios_app_store\",\"page\":\"\",\"product\":\"xng\",\"os_version\":\"15.7\",\"pf\":\"4\",\"session_id\":\"47D7817B-AAB1-4E70-BA7F-B868FC9AA21F\",\"idfa\":\"\",\"device\":\"iPad Pro (12.9-inch) (3rd generation)\"},\"qs\":\"imageMogr2\\/gravity\\/center\\/rotate\\/$\\/thumbnail\\/!690x385r\\/interlace\\/1\\/format\\/jpg\",\"share_height\":240,\"start_t\":-1,\"token\":\"\",\"visited_mid\":211201301,\"limit\":20}" payload_dic = json.loads(payload) payload_dic['start_t'] = next_t payload_dic['visited_mid'] = int(user_dict['link']) payload_new = json.dumps(payload_dic) headers = { 'Host': 'kapi-xng-app.xiaoniangao.cn', 'content-type': 'application/json; charset=utf-8', 'accept': '*/*', 'authorization': 'hSNQ2s9pvPxvFn4LaQJxKQ6/7Is=', 'verb': 'POST', 'content-md5': 'c7b7f8663984e8800e3bcd9b44465083', 'x-b3-traceid': '2f9da41f960ae077', 'accept-language': 'zh-cn', 'date': 'Mon, 19 Jun 2023 06:41:17 GMT', 'x-token-id': '', 'x-signaturemethod': 'hmac-sha1', 'user-agent': 'xngapp/157 CFNetwork/1335.0.3.1 Darwin/21.6.0' } urllib3.disable_warnings() r = requests.post(url=url, headers=headers, data=payload_new, proxies=proxies, verify=False) if 'data' not in r.text or r.status_code != 200: Common.logger(log_type, crawler).info(f"get_videoList:{r.text}\n") Common.logging(log_type, crawler, env, f"get_videoList:{r.text}\n") return elif 'list' not in r.json()['data']: Common.logger(log_type, crawler).info(f"get_videoList:{r.json()}\n") Common.logging(log_type, crawler, env, f"get_videoList:{r.json()}\n") return elif len(r.json()['data']['list']) == 0: Common.logger(log_type, crawler).info(f"没有更多数据啦~\n") Common.logging(log_type, crawler, env, f"没有更多数据啦~\n") return else: next_t = r.json()["data"]["next_t"] feeds = r.json()["data"]["list"] for i in range(len(feeds)): try: # 标题,表情随机加在片头、片尾,或替代句子中间的标点符号 xiaoniangao_title = feeds[i].get("title", "").strip().replace("\n", "") \ .replace("/", "").replace("\r", "").replace("#", "") \ .replace(".", "。").replace("\\", "").replace("&NBSP", "") \ .replace(":", "").replace("*", "").replace("?", "") \ .replace("?", "").replace('"', "").replace("<", "") \ .replace(">", "").replace("|", "").replace(" ", "") \ .replace('"', '').replace("'", '') # 随机取一个表情/符号 emoji = random.choice(get_config_from_mysql(log_type, crawler, env, "emoji")) # 生成最终标题,标题list[表情+title, title+表情]随机取一个 video_title = random.choice([f"{emoji}{xiaoniangao_title}", f"{xiaoniangao_title}{emoji}"]) # 发布时间 publish_time_stamp = int(int(feeds[i].get("t", 0)) / 1000) publish_time_str = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(publish_time_stamp)) # 用户名 / 头像 user_name = feeds[i].get("user", {}).get("nick", "").strip().replace("\n", "") \ .replace("/", "").replace(" ", "") \ .replace(" ", "").replace("&NBSP", "").replace("\r", "") video_dict = { "video_title": video_title, "video_id": feeds[i].get("vid", ""), "duration": int(feeds[i].get("du", 0) / 1000), "play_cnt": feeds[i].get("play_pv", 0), "like_cnt": feeds[i].get("favor", {}).get("total", 0), "comment_cnt": feeds[i].get("comment_count", 0), "share_cnt": feeds[i].get("share", 0), "user_name": user_name, "publish_time_stamp": publish_time_stamp, "publish_time_str": publish_time_str, "video_width": int(feeds[i].get("w", 0)), "video_height": int(feeds[i].get("h", 0)), "avatar_url": feeds[i].get("user", {}).get("hurl", ""), "profile_id": feeds[i]["id"], "profile_mid": feeds[i].get("user", {}).get("mid", ""), "cover_url": feeds[i].get("url", ""), "video_url": feeds[i].get("v_url", ""), "session": f"xiaoniangao-author-{int(time.time())}" } for k, v in video_dict.items(): Common.logger(log_type, crawler).info(f"{k}:{v}") Common.logging(log_type, crawler, env, f"{video_dict}") if int(time.time()) - publish_time_stamp > 3600 * 24 * int(rule_dict.get('period', {}).get('max', 1000)): Common.logger(log_type, crawler).info(f"发布时间超过{int(rule_dict.get('period', {}).get('max', 1000))}天\n") Common.logging(log_type, crawler, env, f"发布时间超过{int(rule_dict.get('period', {}).get('max', 1000))}天\n") return # 过滤无效视频 if video_title == "" or video_dict["video_id"] == "" or video_dict["video_url"] == "": Common.logger(log_type, crawler).info("无效视频\n") Common.logging(log_type, crawler, env, "无效视频\n") # 抓取基础规则过滤 elif download_rule(log_type=log_type, crawler=crawler, video_dict=video_dict, rule_dict=rule_dict) is False: Common.logger(log_type, crawler).info("不满足抓取规则\n") Common.logging(log_type, crawler, env, "不满足抓取规则\n") elif any(str(word) if str(word) in video_dict["video_title"] else False for word in get_config_from_mysql(log_type=log_type, source=crawler, env=env, text="filter", action="")) is True: Common.logger(log_type, crawler).info('已中过滤词\n') Common.logging(log_type, crawler, env, '已中过滤词\n') elif cls.repeat_video(log_type, crawler, video_dict['video_id'], env) != 0: Common.logger(log_type, crawler).info('视频已下载\n') Common.logging(log_type, crawler, env, '视频已下载\n') else: cls.download_publish(log_type=log_type, crawler=crawler, video_dict=video_dict, rule_dict=rule_dict, user_dict=user_dict, env=env) except Exception as e: Common.logger(log_type, crawler).error(f"抓取单条视频异常:{e}\n") Common.logging(log_type, crawler, env, f"抓取单条视频异常:{e}\n") # 下载/上传 @classmethod def download_publish(cls, log_type, crawler, video_dict, rule_dict, user_dict, env): # 下载视频 Common.download_method(log_type=log_type, crawler=crawler, text="video", title=video_dict["video_title"], url=video_dict["video_url"]) md_title = md5(video_dict['video_title'].encode('utf8')).hexdigest() try: if os.path.getsize(f"./{crawler}/videos/{md_title}/video.mp4") == 0: # 删除视频文件夹 shutil.rmtree(f"./{crawler}/videos/{md_title}") Common.logger(log_type, crawler).info("视频size=0,删除成功\n") Common.logging(log_type, crawler, env, "视频size=0,删除成功\n") return except FileNotFoundError: # 删除视频文件夹 shutil.rmtree(f"./{crawler}/videos/{md_title}") Common.logger(log_type, crawler).info("视频文件不存在,删除文件夹成功\n") Common.logging(log_type, crawler, env, "视频文件不存在,删除文件夹成功\n") return # 下载封面 Common.download_method(log_type=log_type, crawler=crawler, text="cover", title=video_dict["video_title"], url=video_dict["cover_url"]) # 保存视频信息至 "./videos/{download_video_title}/info.txt" Common.save_video_info(log_type=log_type, crawler=crawler, video_dict=video_dict) # 上传视频 Common.logger(log_type, crawler).info("开始上传视频...") Common.logging(log_type, crawler, env, "开始上传视频...") if env == "dev": oss_endpoint = "out" our_video_id = Publish.upload_and_publish(log_type=log_type, crawler=crawler, strategy="定向抓取策略", our_uid=user_dict["uid"], env=env, oss_endpoint=oss_endpoint) our_video_link = f"https://testadmin.piaoquantv.com/cms/post-detail/{our_video_id}/info" else: oss_endpoint = "inner" our_video_id = Publish.upload_and_publish(log_type=log_type, crawler=crawler, strategy="定向抓取策略", our_uid=user_dict["uid"], env=env, oss_endpoint=oss_endpoint) our_video_link = f"https://admin.piaoquantv.com/cms/post-detail/{our_video_id}/info" if our_video_id is None: try: # 删除视频文件夹 shutil.rmtree(f"./{crawler}/videos/{md_title}") return except FileNotFoundError: return insert_sql = f""" insert into crawler_video(video_id, out_user_id, platform, strategy, out_video_id, video_title, cover_url, video_url, duration, publish_time, play_cnt, crawler_rule, width, height) values({our_video_id}, "{video_dict['profile_id']}", "{cls.platform}", "定向抓取策略", "{video_dict['video_id']}", "{video_dict['video_title']}", "{video_dict['cover_url']}", "{video_dict['video_url']}", {int(video_dict['duration'])}, "{video_dict['publish_time_str']}", {int(video_dict['play_cnt'])}, '{json.dumps(rule_dict)}', {int(video_dict['video_width'])}, {int(video_dict['video_height'])}) """ Common.logger(log_type, crawler).info(f"insert_sql:{insert_sql}") Common.logging(log_type, crawler, env, f"insert_sql:{insert_sql}") MysqlHelper.update_values(log_type, crawler, insert_sql, env) Common.logger(log_type, crawler).info('视频信息些入数据库成功') Common.logging(log_type, crawler, env, '视频信息些入数据库成功') # 视频写入飞书 Feishu.insert_columns(log_type, crawler, "Wu0CeL", "ROWS", 1, 2) # 视频ID工作表,首行写入数据 upload_time = int(time.time()) values = [[time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(upload_time)), "用户主页", str(video_dict['video_id']), str(video_dict['video_title']), our_video_link, video_dict['play_cnt'], video_dict['comment_cnt'], video_dict['like_cnt'], video_dict['share_cnt'], video_dict['duration'], f"{video_dict['video_width']}*{video_dict['video_height']}", str(video_dict['publish_time_str']), str(video_dict['user_name']), str(video_dict['profile_id']), str(video_dict['profile_mid']), str(video_dict['avatar_url']), str(video_dict['cover_url']), str(video_dict['video_url'])]] time.sleep(1) Feishu.update_values(log_type, crawler, "Wu0CeL", "F2:Z2", values) Common.logger(log_type, crawler).info('视频信息写入飞书成功\n') Common.logging(log_type, crawler, env, '视频信息写入飞书成功\n') # 获取所有关注列表的用户视频 @classmethod def get_author_videos(cls, log_type, crawler, user_list, rule_dict, env): for user_dict in user_list: try: Common.logger(log_type, crawler).info(f"获取 {user_dict['nick_name']} 主页视频") Common.logging(log_type, crawler, env, f"获取 {user_dict['nick_name']} 主页视频") cls.get_videoList(log_type=log_type, crawler=crawler, rule_dict=rule_dict, user_dict=user_dict, env=env) except Exception as e: Common.logger(log_type, crawler).error(f"抓取{user_dict['nick_name']}主页时异常:{e}\n") Common.logging(log_type, crawler, env, f"抓取{user_dict['nick_name']}主页时异常:{e}\n") if __name__ == "__main__": pass