import json import os import random import sys import time import uuid import requests from common.mq import MQ sys.path.append(os.getcwd()) from common.common import Common from common import AliyunLogger, PiaoQuanPipeline from common.public import get_config_from_mysql, clean_title from common.limit import AuthorLimit def tunnel_proxies(): # 隧道域名:端口号 tunnel = "q796.kdltps.com:15818" # 用户名密码方式 username = "t17772369458618" password = "5zqcjkmy" tunnel_proxies = { "http": "http://%(user)s:%(pwd)s@%(proxy)s/" % {"user": username, "pwd": password, "proxy": tunnel}, "https": "http://%(user)s:%(pwd)s@%(proxy)s/" % {"user": username, "pwd": password, "proxy": tunnel}, } return tunnel_proxies class XiaoNianGaoAuthor: def __init__(self, platform, mode, rule_dict, env, user_list): self.platform = platform self.mode = mode self.rule_dict = rule_dict self.env = env self.user_list = user_list self.mq = MQ(topic_name="topic_crawler_etl_" + self.env) self.download_count = 0 self.test_account = [58528285, 58527674, 58528085, 58527582, 58527601, 58527612, 58528281, 58528095, 58527323, 58528071, 58527278] self.limiter = AuthorLimit(platform=self.platform, mode=self.mode) def get_author_list(self): # 每轮只抓取定量的数据,到达数量后自己退出 max_count = int(self.rule_dict.get("videos_cnt", {}).get("min", 140)) for user_dict in self.user_list: if self.download_count <= max_count: self.get_video_list(user_dict) # time.sleep(random.randint(1, 10)) time.sleep(1) else: AliyunLogger.logging( code="2000", platform=self.platform, mode=self.mode, env=self.env, message="本轮已经抓取足够数量的视频,已经自动退出", ) Common.logging( log_type=self.mode, crawler=self.platform, env=self.env, message="本轮已经抓取足够数量的视频,已经自动退出", ) return def get_video_list(self, user_dict): next_t = -1 # 只抓取更新的视频,如果刷到已经更新的立即退出 url = "https://kapi-xng-app.xiaoniangao.cn/v1/album/user_public" headers = { 'Host': 'kapi-xng-app.xiaoniangao.cn', 'content-type': 'application/json; charset=utf-8', 'accept': '*/*', 'authorization': 'hSNQ2s9pvPxvFn4LaQJxKQ6/7Is=', 'verb': 'POST', 'content-md5': 'c7b7f8663984e8800e3bcd9b44465083', 'x-b3-traceid': '2f9da41f960ae077', 'accept-language': 'zh-cn', 'date': 'Mon, 19 Jun 2023 06:41:17 GMT', 'x-token-id': '', 'x-signaturemethod': 'hmac-sha1', 'user-agent': 'xngapp/157 CFNetwork/1335.0.3.1 Darwin/21.6.0' } while True: payload = { "token": "", "limit": 20, "start_t": next_t, "visited_mid": int(user_dict["link"]), "share_width": 300, "share_height": 240, } response = requests.request( "POST", url, headers=headers, data=json.dumps(payload), # proxies=tunnel_proxies(), ) if "data" not in response.text or response.status_code != 200: Common.logger(self.mode, self.platform).info( f"get_videoList:{response.text}\n" ) Common.logging( log_type=self.mode, crawler=self.platform, env=self.env, message=f"get_videoList:{response.text}\n" ) AliyunLogger.logging( code="2000", platform=self.platform, mode=self.mode, env=self.env, message=f"get_videoList:{response.text}\n", ) return elif "list" not in response.json()["data"]: Common.logger(self.mode, self.platform).info( f"get_videoList:{response.json()}\n" ) Common.logging( log_type=self.mode, crawler=self.platform, env=self.env, message=f"get_videoList:{response.json()}\n" ) AliyunLogger.logging( code="2000", platform=self.platform, mode=self.mode, env=self.env, message=f"get_videoList:{response.text}\n", ) return elif len(response.json()["data"]["list"]) == 0: Common.logger(self.mode, self.platform).info(f"没有更多数据啦~\n") Common.logging( log_type=self.mode, crawler=self.platform, env=self.env, message=f"没有更多数据啦~\n" ) AliyunLogger.logging( code="2000", platform=self.platform, mode=self.mode, env=self.env, message=f"没有更多数据啦~\n", ) return else: next_t = response.json()["data"]["next_t"] feeds = response.json()["data"]["list"] for video_obj in feeds: try: AliyunLogger.logging( code="1001", platform=self.platform, mode=self.mode, env=self.env, message="扫描到一条视频", ) Common.logging( log_type=self.mode, crawler=self.platform, env=self.env, message=f"扫描到一条视频" ) date_flag = self.process_video_obj(video_obj, user_dict) if not date_flag: return except Exception as e: AliyunLogger.logging( code="3000", platform=self.platform, mode=self.mode, env=self.env, data=video_obj, message="抓取单条视频异常, 报错原因是: {}".format(e), ) Common.logging( log_type=self.mode, crawler=self.platform, env=self.env, message="抓取单条视频异常, 报错原因是: {}".format(e), ) def process_video_obj(self, video_obj, user_dict): trace_id = self.platform + str(uuid.uuid1()) # 标题,表情随机加在片头、片尾,或替代句子中间的标点符号 xiaoniangao_title = clean_title(video_obj.get("title", "")) # 随机取一个表情/符号 emoji = random.choice( get_config_from_mysql(self.mode, self.platform, self.env, "emoji") ) # 生成最终标题,标题list[表情+title, title+表情]随机取一个 video_title = random.choice( [f"{emoji}{xiaoniangao_title}", f"{xiaoniangao_title}{emoji}"] ) # 发布时间 publish_time_stamp = int(int(video_obj.get("t", 0)) / 1000) publish_time_str = time.strftime( "%Y-%m-%d %H:%M:%S", time.localtime(publish_time_stamp) ) # 用户名 / 头像 user_name = ( video_obj.get("user", {}) .get("nick", "") .strip() .replace("\n", "") .replace("/", "") .replace(" ", "") .replace(" ", "") .replace("&NBSP", "") .replace("\r", "") ) video_dict = { "video_title": video_title, "video_id": video_obj.get("vid", ""), "duration": int(video_obj.get("du", 0) / 1000), "play_cnt": video_obj.get("play_pv", 0), "like_cnt": video_obj.get("favor", {}).get("total", 0), "comment_cnt": video_obj.get("comment_count", 0), "share_cnt": video_obj.get("share", 0), "user_name": user_name, "publish_time_stamp": publish_time_stamp, "publish_time_str": publish_time_str, "update_time_stamp": int(time.time()), "video_width": int(video_obj.get("w", 0)), "video_height": int(video_obj.get("h", 0)), "avatar_url": video_obj.get("user", {}).get("hurl", ""), "profile_id": video_obj["id"], "profile_mid": video_obj.get("user", {}).get("mid", ""), "cover_url": video_obj.get("url", ""), "video_url": video_obj.get("v_url", ""), "session": f"xiaoniangao-author-{int(time.time())}", "out_user_id": video_obj["id"], "platform": self.platform, "strategy": self.mode, "out_video_id": video_obj.get("vid", ""), } pipeline = PiaoQuanPipeline( platform=self.platform, mode=self.mode, rule_dict=self.rule_dict, env=self.env, item=video_dict, trace_id=trace_id, ) # account_level = user_dict['account_level'] if user_dict['link'] in self.test_account: if ( int(time.time()) - publish_time_stamp > 3600 * 24 ): AliyunLogger.logging( code="2004", trace_id=trace_id, platform=self.platform, mode=self.mode, env=self.env, data=video_dict, message="发布时间超过1天" ) return False flag = pipeline.repeat_video() else: if ( int(time.time()) - publish_time_stamp > 3600 * 24 * int(self.rule_dict.get("period", {}).get("max", 1000)) ): AliyunLogger.logging( code="2004", trace_id=trace_id, platform=self.platform, mode=self.mode, env=self.env, data=video_dict, message="发布时间超过{}天".format( int(self.rule_dict.get("period", {}).get("max", 1000)) ), ) return False flag = pipeline.process_item() if flag: video_dict["width"] = video_dict["video_width"] video_dict["height"] = video_dict["video_height"] video_dict["crawler_rule"] = json.dumps(self.rule_dict) video_dict["user_id"] = user_dict["uid"] video_dict["publish_time"] = video_dict["publish_time_str"] # print(video_dict) limit_flag = self.limiter.author_limitation(user_id=video_dict['user_id']) if limit_flag: self.mq.send_msg(video_dict) self.download_count += 1 AliyunLogger.logging( code="1002", platform=self.platform, mode=self.mode, env=self.env, data=video_dict, trace_id=trace_id, message="成功发送 MQ 至 ETL", ) Common.logging( log_type=self.mode, crawler=self.platform, env=self.env, message="成功发送 MQ 至 ETL", ) else: AliyunLogger.logging( code="8808", platform=self.platform, mode=self.mode, env=self.env, trace_id=trace_id, account=video_dict['user_id'], message="监测到个人账号数量超过 300,停止抓取该账号" ) return True