import json import os import random import sys import time import uuid import requests from common.mq import MQ sys.path.append(os.getcwd()) from common.pipeline import PiaoQuanPipelineTest from common.public import get_config_from_mysql, clean_title from common.scheduling_db import MysqlHelper def tunnel_proxies(): # 隧道域名:端口号 tunnel = "q796.kdltps.com:15818" # 用户名密码方式 username = "t17772369458618" password = "5zqcjkmy" tunnel_proxies = { "http": "http://%(user)s:%(pwd)s@%(proxy)s/" % {"user": username, "pwd": password, "proxy": tunnel}, "https": "http://%(user)s:%(pwd)s@%(proxy)s/" % {"user": username, "pwd": password, "proxy": tunnel}, } return tunnel_proxies class XiaoNianGaoAuthor: def __init__(self, platform, mode, rule_dict, env, user_list): self.platform = platform self.mode = mode self.rule_dict = rule_dict self.env = env self.user_list = user_list self.mq = MQ(topic_name="topic_crawler_etl_" + self.env) self.download_count = 0 def get_author_list(self): # 每轮只抓取定量的数据,到达数量后自己退出 max_count = int(self.rule_dict.get("videos_cnt", {}).get("min", 300)) for user_dict in self.user_list: print(user_dict) account_level = user_dict['account_level'] if account_level and account_level != "P3": if self.download_count <= max_count: self.get_video_list(user_dict) time.sleep(random.randint(1, 15)) else: message = "本轮已经抓取足够数量的视频,已经自动退出" print(message) return def get_video_list(self, user_dict): next_t = -1 # 只抓取更新的视频,如果刷到已经更新的立即退出 url = "https://kapi-xng-app.xiaoniangao.cn/v1/album/user_public" headers = { "Host": "kapi-xng-app.xiaoniangao.cn", "content-type": "application/json; charset=utf-8", "accept": "*/*", "verb": "POST", "accept-language": "zh-cn", "date": "Wed, 01 Nov 2023 11:53:22 GMT", "x-token-id": "", "x-signaturemethod": "hmac-sha1", } while True: payload = { "token": "", "limit": 20, "start_t": next_t, "visited_mid": int(user_dict["link"]), "share_width": 300, "share_height": 240, } response = requests.request( "POST", url, headers=headers, data=json.dumps(payload), proxies=tunnel_proxies(), ) if "data" not in response.text or response.status_code != 200: message = f"get_videoList:{response.text}" print(message) return elif "list" not in response.json()["data"]: message = f"get_videoList:{response.json()}" print(message) return elif len(response.json()["data"]["list"]) == 0: message = f"没有更多数据啦~" print(message) return else: next_t = response.json()["data"]["next_t"] feeds = response.json()["data"]["list"] for video_obj in feeds: try: message = f"扫描到一条视频" print(message) flag = self.process_video_obj(video_obj, user_dict) if not flag: return except Exception as e: message = "抓取单条视频异常, 报错原因是: {}".format(e) print(message) def process_video_obj(self, video_obj, user_dict): trace_id = self.platform + str(uuid.uuid1()) # 标题,表情随机加在片头、片尾,或替代句子中间的标点符号 xiaoniangao_title = clean_title(video_obj.get("title", "")) # 随机取一个表情/符号 emoji = random.choice( get_config_from_mysql(self.mode, self.platform, self.env, "emoji") ) # 生成最终标题,标题list[表情+title, title+表情]随机取一个 video_title = random.choice( [f"{emoji}{xiaoniangao_title}", f"{xiaoniangao_title}{emoji}"] ) # 发布时间 publish_time_stamp = int(int(video_obj.get("t", 0)) / 1000) publish_time_str = time.strftime( "%Y-%m-%d %H:%M:%S", time.localtime(publish_time_stamp) ) # 用户名 / 头像 user_name = ( video_obj.get("user", {}) .get("nick", "") .strip() .replace("\n", "") .replace("/", "") .replace(" ", "") .replace(" ", "") .replace("&NBSP", "") .replace("\r", "") ) video_dict = { "video_title": video_title, "video_id": video_obj.get("vid", ""), "duration": int(video_obj.get("du", 0) / 1000), "play_cnt": video_obj.get("play_pv", 0), "like_cnt": video_obj.get("favor", {}).get("total", 0), "comment_cnt": video_obj.get("comment_count", 0), "share_cnt": video_obj.get("share", 0), "user_name": user_name, "publish_time_stamp": publish_time_stamp, "publish_time_str": publish_time_str, "update_time_stamp": int(time.time()), "video_width": int(video_obj.get("w", 0)), "video_height": int(video_obj.get("h", 0)), "avatar_url": video_obj.get("user", {}).get("hurl", ""), "profile_id": video_obj["id"], "profile_mid": video_obj.get("user", {}).get("mid", ""), "cover_url": video_obj.get("url", ""), "video_url": video_obj.get("v_url", ""), "session": f"xiaoniangao-author-{int(time.time())}", "out_user_id": video_obj["id"], "platform": self.platform, "strategy": self.mode, "out_video_id": video_obj.get("vid", ""), } if ( int(time.time()) - publish_time_stamp > 3600 * 24 * int(self.rule_dict.get("period", {}).get("max", 1000)) ): message = "发布时间超过{}天".format( int(self.rule_dict.get("period", {}).get("max", 1000)) ) print(message) return False pipeline = PiaoQuanPipelineTest( platform=self.platform, mode=self.mode, rule_dict=self.rule_dict, env=self.env, item=video_dict, trace_id=trace_id, ) account_level = user_dict['account_level'] if account_level == "P0" or account_level == "P1": flag = True else: flag = pipeline.process_item() # flag = pipeline.process_item() if flag: video_dict["width"] = video_dict["video_width"] video_dict["height"] = video_dict["video_height"] video_dict["crawler_rule"] = json.dumps(self.rule_dict) video_dict["user_id"] = user_dict["uid"] video_dict["publish_time"] = video_dict["publish_time_str"] print(video_dict) # self.mq.send_msg(video_dict) self.download_count += 1 message = "成功发送 MQ 至 ETL" print(message) return True if __name__ == "__main__": select_user_sql = ( f"""select * from crawler_user_v3 where task_id=21""" ) user_list = MysqlHelper.get_values( "author", "xiaoniangao", select_user_sql, "prod", "" ) XNGA = XiaoNianGaoAuthor( platform="xiaoniangao", mode="author", rule_dict={}, env="prod", user_list=user_list ) XNGA.get_author_list()