import json import os import random import sys import time import uuid import requests from common.mq import MQ sys.path.append(os.getcwd()) from common.redis_db import get_data, store_data from common.common import Common from common import AliyunLogger, PiaoQuanPipeline from common.public import get_config_from_mysql, clean_title from common.limit import AuthorLimit def tunnel_proxies(): # 隧道域名:端口号 tunnel = "q796.kdltps.com:15818" # 用户名密码方式 username = "t17772369458618" password = "5zqcjkmy" tunnel_proxies = { "http": "http://%(user)s:%(pwd)s@%(proxy)s/" % {"user": username, "pwd": password, "proxy": tunnel}, "https": "http://%(user)s:%(pwd)s@%(proxy)s/" % {"user": username, "pwd": password, "proxy": tunnel}, } return tunnel_proxies class XiaoNianGaoAuthor: def __init__(self, platform, mode, rule_dict, env, user_list): self.platform = platform self.mode = mode self.rule_dict = rule_dict self.env = env self.user_list = user_list self.mq = MQ(topic_name="topic_crawler_etl_" + self.env) self.download_count = 0 self.test_account = [58528285, 58527674, 58528085, 58527582, 58527601, 58527612, 58528281, 58528095, 58527323, 58528071, 58527278] self.limiter = AuthorLimit(platform=self.platform, mode=self.mode) def get_author_list(self): # 每轮只抓取定量的数据,到达数量后自己退出 max_count = int(self.rule_dict.get("videos_cnt", {}).get("min", 140)) for user_dict in self.user_list: if self.download_count <= max_count: self.get_video_list(user_dict) # time.sleep(random.randint(1, 10)) time.sleep(1) else: AliyunLogger.logging( code="2000", platform=self.platform, mode=self.mode, env=self.env, message="本轮已经抓取足够数量的视频,已经自动退出", ) Common.logging( log_type=self.mode, crawler=self.platform, env=self.env, message="本轮已经抓取足够数量的视频,已经自动退出", ) return def get_video_list(self, user_dict): next_t = -1 # 只抓取更新的视频,如果刷到已经更新的立即退出 url = "https://kapi-xng-app.xiaoniangao.cn/v1/album/user_public" headers = { 'Host': 'kapi-xng-app.xiaoniangao.cn', 'content-type': 'application/json; charset=utf-8', 'accept': '*/*', 'authorization': 'hSNQ2s9pvPxvFn4LaQJxKQ6/7Is=', 'verb': 'POST', 'content-md5': 'c7b7f8663984e8800e3bcd9b44465083', 'x-b3-traceid': '2f9da41f960ae077', 'accept-language': 'zh-cn', 'date': 'Mon, 19 Jun 2023 06:41:17 GMT', 'x-token-id': '', 'x-signaturemethod': 'hmac-sha1', 'user-agent': 'xngapp/157 CFNetwork/1335.0.3.1 Darwin/21.6.0' } while True: payload = { "token": "", "limit": 20, "start_t": next_t, "visited_mid": int(user_dict["link"]), "share_width": 300, "share_height": 240, } response = requests.request( "POST", url, headers=headers, data=json.dumps(payload), # proxies=tunnel_proxies(), ) if "data" not in response.text or response.status_code != 200: Common.logger(self.mode, self.platform).info( f"get_videoList:{response.text}\n" ) Common.logging( log_type=self.mode, crawler=self.platform, env=self.env, message=f"get_videoList:{response.text}\n" ) AliyunLogger.logging( code="2000", platform=self.platform, mode=self.mode, env=self.env, message=f"get_videoList:{response.text}\n", ) return elif "list" not in response.json()["data"]: Common.logger(self.mode, self.platform).info( f"get_videoList:{response.json()}\n" ) Common.logging( log_type=self.mode, crawler=self.platform, env=self.env, message=f"get_videoList:{response.json()}\n" ) AliyunLogger.logging( code="2000", platform=self.platform, mode=self.mode, env=self.env, message=f"get_videoList:{response.text}\n", ) return elif len(response.json()["data"]["list"]) == 0: Common.logger(self.mode, self.platform).info(f"没有更多数据啦~\n") Common.logging( log_type=self.mode, crawler=self.platform, env=self.env, message=f"没有更多数据啦~\n" ) AliyunLogger.logging( code="2000", platform=self.platform, mode=self.mode, env=self.env, message=f"没有更多数据啦~\n", ) return else: next_t = response.json()["data"]["next_t"] feeds = response.json()["data"]["list"] for video_obj in feeds: try: AliyunLogger.logging( code="1001", platform=self.platform, mode=self.mode, env=self.env, message="扫描到一条视频", ) Common.logging( log_type=self.mode, crawler=self.platform, env=self.env, message=f"扫描到一条视频" ) date_flag = self.process_video_obj(video_obj, user_dict) if not date_flag: return except Exception as e: AliyunLogger.logging( code="3000", platform=self.platform, mode=self.mode, env=self.env, data=video_obj, message="抓取单条视频异常, 报错原因是: {}".format(e), ) Common.logging( log_type=self.mode, crawler=self.platform, env=self.env, message="抓取单条视频异常, 报错原因是: {}".format(e), ) def process_video_obj(self, video_obj, user_dict): trace_id = self.platform + str(uuid.uuid1()) # 标题,表情随机加在片头、片尾,或替代句子中间的标点符号 xiaoniangao_title = clean_title(video_obj.get("title", "")) # 随机取一个表情/符号 emoji = random.choice( get_config_from_mysql(self.mode, self.platform, self.env, "emoji") ) # 生成最终标题,标题list[表情+title, title+表情]随机取一个 video_title = random.choice( [f"{emoji}{xiaoniangao_title}", f"{xiaoniangao_title}{emoji}"] ) # 发布时间 publish_time_stamp = int(int(video_obj.get("t", 0)) / 1000) publish_time_str = time.strftime( "%Y-%m-%d %H:%M:%S", time.localtime(publish_time_stamp) ) # 用户名 / 头像 user_name = ( video_obj.get("user", {}) .get("nick", "") .strip() .replace("\n", "") .replace("/", "") .replace(" ", "") .replace(" ", "") .replace("&NBSP", "") .replace("\r", "") ) video_dict = { "video_title": video_title, "video_id": video_obj.get("vid", ""), "duration": int(video_obj.get("du", 0) / 1000), "play_cnt": video_obj.get("play_pv", 0), "like_cnt": video_obj.get("favor", {}).get("total", 0), "comment_cnt": video_obj.get("comment_count", 0), "share_cnt": video_obj.get("share", 0), "user_name": user_name, "publish_time_stamp": publish_time_stamp, "publish_time_str": publish_time_str, "update_time_stamp": int(time.time()), "video_width": int(video_obj.get("w", 0)), "video_height": int(video_obj.get("h", 0)), "avatar_url": video_obj.get("user", {}).get("hurl", ""), "profile_id": video_obj["id"], "profile_mid": video_obj.get("user", {}).get("mid", ""), "cover_url": video_obj.get("url", ""), "video_url": video_obj.get("v_url", ""), "session": f"xiaoniangao-author-{int(time.time())}", "out_user_id": video_obj["id"], "platform": self.platform, "strategy": self.mode, "out_video_id": video_obj.get("vid", ""), } value = get_data(self.platform, video_obj.get("vid", "")) if int(value) == 1: AliyunLogger.logging( code="2004", trace_id=trace_id, platform=self.platform, mode=self.mode, env=self.env, data=video_dict, message="redis重复视频") pipeline = PiaoQuanPipeline( platform=self.platform, mode=self.mode, rule_dict=self.rule_dict, env=self.env, item=video_dict, trace_id=trace_id, ) # account_level = user_dict['account_level'] if user_dict['link'] in self.test_account: if ( int(time.time()) - publish_time_stamp > 3600 * 24 ): AliyunLogger.logging( code="2004", trace_id=trace_id, platform=self.platform, mode=self.mode, env=self.env, data=video_dict, message="发布时间超过1天" ) return False flag = pipeline.repeat_video() else: if ( int(time.time()) - publish_time_stamp > 3600 * 24 * int(self.rule_dict.get("period", {}).get("max", 1000)) ): AliyunLogger.logging( code="2004", trace_id=trace_id, platform=self.platform, mode=self.mode, env=self.env, data=video_dict, message="发布时间超过{}天".format( int(self.rule_dict.get("period", {}).get("max", 1000)) ), ) return False flag = pipeline.process_item() if flag: video_dict["width"] = video_dict["video_width"] video_dict["height"] = video_dict["video_height"] video_dict["crawler_rule"] = json.dumps(self.rule_dict) video_dict["user_id"] = user_dict["uid"] video_dict["publish_time"] = video_dict["publish_time_str"] # print(video_dict) limit_flag = self.limiter.author_limitation(user_id=video_dict['user_id']) if limit_flag: self.mq.send_msg(video_dict) store_data(self.platform, video_obj.get("vid", "")) Common.logging( log_type=self.mode, crawler=self.platform, env=self.env, message="写入 redis 成功", ) self.download_count += 1 AliyunLogger.logging( code="1002", platform=self.platform, mode=self.mode, env=self.env, data=video_dict, trace_id=trace_id, message="成功发送 MQ 至 ETL", ) Common.logging( log_type=self.mode, crawler=self.platform, env=self.env, message="成功发送 MQ 至 ETL", ) else: AliyunLogger.logging( code="8808", platform=self.platform, mode=self.mode, env=self.env, trace_id=trace_id, account=video_dict['user_id'], message="监测到个人账号数量超过 300,停止抓取该账号" ) return True