# -*- coding: utf-8 -*- # @Author: wangkun # @Time: 2023/3/13 import json import os import random import shutil import sys import time from hashlib import md5 import requests import urllib3 from common.mq import MQ sys.path.append(os.getcwd()) from common.common import Common from common.scheduling_db import MysqlHelper from common.publish import Publish from common.feishu import Feishu from common.public import get_config_from_mysql, download_rule proxies = {"http": None, "https": None} class XiaoniangaoAuthorScheduling: platform = "小年糕" @classmethod def repeat_video(cls, log_type, crawler, video_id, env): # sql = f""" select * from crawler_video where platform="{cls.platform}" and out_video_id="{video_id}"; """ sql = f""" select * from crawler_video where platform in ("{crawler}","{cls.platform}") and out_video_id="{video_id}"; """ repeat_video = MysqlHelper.get_values(log_type, crawler, sql, env) return len(repeat_video) # 获取个人主页视频 @classmethod def get_videoList(cls, log_type, crawler, rule_dict, user_dict, env): mq = MQ(topic_name="topic_crawler_etl_" + env) next_t = -1 while True: url = "https://kapi-xng-app.xiaoniangao.cn/v1/album/user_public" payload = "{\"share_width\":300,\"log\":{\"brand\":\"iPad\",\"net\":\"wifi\",\"resolution\":\"750*1334\",\"uid\":\"2F310D09-5E32-5985-8644-3BCB6920E76F\",\"app_version\":\"1.22.5\",\"channel\":\"ios_app_store\",\"page\":\"\",\"product\":\"xng\",\"os_version\":\"15.7\",\"pf\":\"4\",\"session_id\":\"47D7817B-AAB1-4E70-BA7F-B868FC9AA21F\",\"idfa\":\"\",\"device\":\"iPad Pro (12.9-inch) (3rd generation)\"},\"qs\":\"imageMogr2\\/gravity\\/center\\/rotate\\/$\\/thumbnail\\/!690x385r\\/interlace\\/1\\/format\\/jpg\",\"share_height\":240,\"start_t\":-1,\"token\":\"\",\"visited_mid\":211201301,\"limit\":20}" payload_dic = json.loads(payload) payload_dic['start_t'] = next_t payload_dic['visited_mid'] = int(user_dict['link']) payload_new = json.dumps(payload_dic) headers = { 'Host': 'kapi-xng-app.xiaoniangao.cn', 'content-type': 'application/json; charset=utf-8', 'accept': '*/*', 'authorization': 'hSNQ2s9pvPxvFn4LaQJxKQ6/7Is=', 'verb': 'POST', 'content-md5': 'c7b7f8663984e8800e3bcd9b44465083', 'x-b3-traceid': '2f9da41f960ae077', 'accept-language': 'zh-cn', 'date': 'Mon, 19 Jun 2023 06:41:17 GMT', 'x-token-id': '', 'x-signaturemethod': 'hmac-sha1', 'user-agent': 'xngapp/157 CFNetwork/1335.0.3.1 Darwin/21.6.0' } urllib3.disable_warnings() r = requests.post(url=url, headers=headers, data=payload_new, proxies=proxies, verify=False) if 'data' not in r.text or r.status_code != 200: Common.logger(log_type, crawler).info(f"get_videoList:{r.text}\n") Common.logging(log_type, crawler, env, f"get_videoList:{r.text}\n") return elif 'list' not in r.json()['data']: Common.logger(log_type, crawler).info(f"get_videoList:{r.json()}\n") Common.logging(log_type, crawler, env, f"get_videoList:{r.json()}\n") return elif len(r.json()['data']['list']) == 0: Common.logger(log_type, crawler).info(f"没有更多数据啦~\n") Common.logging(log_type, crawler, env, f"没有更多数据啦~\n") return else: next_t = r.json()["data"]["next_t"] feeds = r.json()["data"]["list"] for i in range(len(feeds)): try: # 标题,表情随机加在片头、片尾,或替代句子中间的标点符号 xiaoniangao_title = feeds[i].get("title", "").strip().replace("\n", "") \ .replace("/", "").replace("\r", "").replace("#", "") \ .replace(".", "。").replace("\\", "").replace("&NBSP", "") \ .replace(":", "").replace("*", "").replace("?", "") \ .replace("?", "").replace('"', "").replace("<", "") \ .replace(">", "").replace("|", "").replace(" ", "") \ .replace('"', '').replace("'", '') # 随机取一个表情/符号 emoji = random.choice(get_config_from_mysql(log_type, crawler, env, "emoji")) # 生成最终标题,标题list[表情+title, title+表情]随机取一个 video_title = random.choice([f"{emoji}{xiaoniangao_title}", f"{xiaoniangao_title}{emoji}"]) # 发布时间 publish_time_stamp = int(int(feeds[i].get("t", 0)) / 1000) publish_time_str = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(publish_time_stamp)) # 用户名 / 头像 user_name = feeds[i].get("user", {}).get("nick", "").strip().replace("\n", "") \ .replace("/", "").replace(" ", "") \ .replace(" ", "").replace("&NBSP", "").replace("\r", "") video_dict = { "video_title": video_title, "video_id": feeds[i].get("vid", ""), "duration": int(feeds[i].get("du", 0) / 1000), "play_cnt": feeds[i].get("play_pv", 0), "like_cnt": feeds[i].get("favor", {}).get("total", 0), "comment_cnt": feeds[i].get("comment_count", 0), "share_cnt": feeds[i].get("share", 0), "user_name": user_name, "publish_time_stamp": publish_time_stamp, "publish_time_str": publish_time_str, "video_width": int(feeds[i].get("w", 0)), "video_height": int(feeds[i].get("h", 0)), "avatar_url": feeds[i].get("user", {}).get("hurl", ""), "profile_id": feeds[i]["id"], "profile_mid": feeds[i].get("user", {}).get("mid", ""), "cover_url": feeds[i].get("url", ""), "video_url": feeds[i].get("v_url", ""), "session": f"xiaoniangao-author-{int(time.time())}" } for k, v in video_dict.items(): Common.logger(log_type, crawler).info(f"{k}:{v}") Common.logging(log_type, crawler, env, f"{video_dict}") if int(time.time()) - publish_time_stamp > 3600 * 24 * int(rule_dict.get('period', {}).get('max', 1000)): Common.logger(log_type, crawler).info(f"发布时间超过{int(rule_dict.get('period', {}).get('max', 1000))}天\n") Common.logging(log_type, crawler, env, f"发布时间超过{int(rule_dict.get('period', {}).get('max', 1000))}天\n") return # 过滤无效视频 if video_title == "" or video_dict["video_id"] == "" or video_dict["video_url"] == "": Common.logger(log_type, crawler).info("无效视频\n") Common.logging(log_type, crawler, env, "无效视频\n") # 抓取基础规则过滤 elif download_rule(log_type=log_type, crawler=crawler, video_dict=video_dict, rule_dict=rule_dict) is False: Common.logger(log_type, crawler).info("不满足抓取规则\n") Common.logging(log_type, crawler, env, "不满足抓取规则\n") elif any(str(word) if str(word) in video_dict["video_title"] else False for word in get_config_from_mysql(log_type=log_type, source=crawler, env=env, text="filter", action="")) is True: Common.logger(log_type, crawler).info('已中过滤词\n') Common.logging(log_type, crawler, env, '已中过滤词\n') elif cls.repeat_video(log_type, crawler, video_dict['video_id'], env) != 0: Common.logger(log_type, crawler).info('视频已下载\n') Common.logging(log_type, crawler, env, '视频已下载\n') else: # cls.download_publish(log_type=log_type, # crawler=crawler, # video_dict=video_dict, # rule_dict=rule_dict, # user_dict=user_dict, # env=env) video_dict["out_user_id"] = video_dict["profile_id"] video_dict["platform"] = crawler video_dict["strategy"] = log_type video_dict["out_video_id"] = video_dict["video_id"] video_dict["width"] = video_dict["video_width"] video_dict["height"] = video_dict["video_height"] video_dict["crawler_rule"] = json.dumps(rule_dict) video_dict["user_id"] = user_dict["uid"] video_dict["publish_time"] = video_dict["publish_time_str"] mq.send_msg(video_dict) except Exception as e: Common.logger(log_type, crawler).error(f"抓取单条视频异常:{e}\n") Common.logging(log_type, crawler, env, f"抓取单条视频异常:{e}\n") # 下载/上传 @classmethod def download_publish(cls, log_type, crawler, video_dict, rule_dict, user_dict, env): # 下载视频 Common.download_method(log_type=log_type, crawler=crawler, text="video", title=video_dict["video_title"], url=video_dict["video_url"]) md_title = md5(video_dict['video_title'].encode('utf8')).hexdigest() try: if os.path.getsize(f"./{crawler}/videos/{md_title}/video.mp4") == 0: # 删除视频文件夹 shutil.rmtree(f"./{crawler}/videos/{md_title}") Common.logger(log_type, crawler).info("视频size=0,删除成功\n") Common.logging(log_type, crawler, env, "视频size=0,删除成功\n") return except FileNotFoundError: # 删除视频文件夹 shutil.rmtree(f"./{crawler}/videos/{md_title}") Common.logger(log_type, crawler).info("视频文件不存在,删除文件夹成功\n") Common.logging(log_type, crawler, env, "视频文件不存在,删除文件夹成功\n") return # 下载封面 Common.download_method(log_type=log_type, crawler=crawler, text="cover", title=video_dict["video_title"], url=video_dict["cover_url"]) # 保存视频信息至 "./videos/{download_video_title}/info.txt" Common.save_video_info(log_type=log_type, crawler=crawler, video_dict=video_dict) # 上传视频 Common.logger(log_type, crawler).info("开始上传视频...") Common.logging(log_type, crawler, env, "开始上传视频...") if env == "dev": oss_endpoint = "out" our_video_id = Publish.upload_and_publish(log_type=log_type, crawler=crawler, strategy="定向抓取策略", our_uid=user_dict["uid"], env=env, oss_endpoint=oss_endpoint) our_video_link = f"https://testadmin.piaoquantv.com/cms/post-detail/{our_video_id}/info" else: oss_endpoint = "inner" our_video_id = Publish.upload_and_publish(log_type=log_type, crawler=crawler, strategy="定向抓取策略", our_uid=user_dict["uid"], env=env, oss_endpoint=oss_endpoint) our_video_link = f"https://admin.piaoquantv.com/cms/post-detail/{our_video_id}/info" if our_video_id is None: try: # 删除视频文件夹 shutil.rmtree(f"./{crawler}/videos/{md_title}") return except FileNotFoundError: return insert_sql = f""" insert into crawler_video(video_id, out_user_id, platform, strategy, out_video_id, video_title, cover_url, video_url, duration, publish_time, play_cnt, crawler_rule, width, height) values({our_video_id}, "{video_dict['profile_id']}", "{cls.platform}", "定向抓取策略", "{video_dict['video_id']}", "{video_dict['video_title']}", "{video_dict['cover_url']}", "{video_dict['video_url']}", {int(video_dict['duration'])}, "{video_dict['publish_time_str']}", {int(video_dict['play_cnt'])}, '{json.dumps(rule_dict)}', {int(video_dict['video_width'])}, {int(video_dict['video_height'])}) """ Common.logger(log_type, crawler).info(f"insert_sql:{insert_sql}") Common.logging(log_type, crawler, env, f"insert_sql:{insert_sql}") MysqlHelper.update_values(log_type, crawler, insert_sql, env) Common.logger(log_type, crawler).info('视频信息些入数据库成功') Common.logging(log_type, crawler, env, '视频信息些入数据库成功') # 视频写入飞书 Feishu.insert_columns(log_type, crawler, "Wu0CeL", "ROWS", 1, 2) # 视频ID工作表,首行写入数据 upload_time = int(time.time()) values = [[time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(upload_time)), "用户主页", str(video_dict['video_id']), str(video_dict['video_title']), our_video_link, video_dict['play_cnt'], video_dict['comment_cnt'], video_dict['like_cnt'], video_dict['share_cnt'], video_dict['duration'], f"{video_dict['video_width']}*{video_dict['video_height']}", str(video_dict['publish_time_str']), str(video_dict['user_name']), str(video_dict['profile_id']), str(video_dict['profile_mid']), str(video_dict['avatar_url']), str(video_dict['cover_url']), str(video_dict['video_url'])]] time.sleep(1) Feishu.update_values(log_type, crawler, "Wu0CeL", "F2:Z2", values) Common.logger(log_type, crawler).info('视频信息写入飞书成功\n') Common.logging(log_type, crawler, env, '视频信息写入飞书成功\n') # 获取所有关注列表的用户视频 @classmethod def get_author_videos(cls, log_type, crawler, user_list, rule_dict, env): for user_dict in user_list: try: Common.logger(log_type, crawler).info(f"获取 {user_dict['nick_name']} 主页视频") Common.logging(log_type, crawler, env, f"获取 {user_dict['nick_name']} 主页视频") cls.get_videoList(log_type=log_type, crawler=crawler, rule_dict=rule_dict, user_dict=user_dict, env=env) except Exception as e: Common.logger(log_type, crawler).error(f"抓取{user_dict['nick_name']}主页时异常:{e}\n") Common.logging(log_type, crawler, env, f"抓取{user_dict['nick_name']}主页时异常:{e}\n") if __name__ == "__main__": pass