# -*- coding: utf-8 -*- # @Time: 2023/11/07 import datetime import os import random import sys import time from datetime import date, timedelta import requests import json import urllib3 from common.feishu import Feishu sys.path.append(os.getcwd()) from common.common import Common from common import AliyunLogger from common.mq import MQ from requests.adapters import HTTPAdapter from common.scheduling_db import MysqlHelper from common.public import random_title, get_config_from_mysql, download_rule from common.limit import AuthorLimit class KuaishouauthorScheduling: platform = "快手" download_cnt = 0 limiter = AuthorLimit(platform="kuaishou", mode="author") @classmethod def videos_cnt(cls, rule_dict): videos_cnt = rule_dict.get("videos_cnt", {}).get("min", 0) if videos_cnt == 0: videos_cnt = 1000 return videos_cnt @classmethod def video_title(cls, log_type, crawler, env, title): title_split1 = title.split(" #") if title_split1[0] != "": title1 = title_split1[0] else: title1 = title_split1[-1] title_split2 = title1.split(" #") if title_split2[0] != "": title2 = title_split2[0] else: title2 = title_split2[-1] title_split3 = title2.split("@") if title_split3[0] != "": title3 = title_split3[0] else: title3 = title_split3[-1] video_title = title3.strip().replace("\n", "") \ .replace("/", "").replace("快手", "").replace(" ", "") \ .replace(" ", "").replace("&NBSP", "").replace("\r", "") \ .replace("#", "").replace(".", "。").replace("\\", "") \ .replace(":", "").replace("*", "").replace("?", "") \ .replace("?", "").replace('"', "").replace("<", "") \ .replace(">", "").replace("|", "").replace("@", "").replace('"', '').replace("'", '')[:40] if video_title.replace(" ", "") == "" or video_title == "。。。" or video_title == "...": return random_title(log_type, crawler, env, text='title') else: return video_title @classmethod def get_cookie(cls, log_type, crawler, env): select_sql = f""" select * from crawler_config where source="{crawler}" """ configs = MysqlHelper.get_values(log_type, crawler, select_sql, env, action="") for config in configs: if "cookie" in config["config"]: cookie_dict = { "cookie_id": config["id"], "title": config["title"].strip(), "cookie": dict(eval(config["config"]))["cookie"].strip(), "update_time": time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(int(config["update_time"] / 1000))), "operator": config["operator"].strip() } return cookie_dict @classmethod def get_videoList(cls, log_type, crawler, user_dict, rule_dict, env): pcursor = "" mq = MQ(topic_name="topic_crawler_etl_" + env) count = 0 special = 0 for i in range(3): # while True: if count > 5: return time.sleep(random.randint(10, 50)) url = "https://www.kuaishou.com/graphql" if user_dict['link'][0] == "V": link = user_dict["link"][3:] flag = user_dict["link"].split("_")[0] if flag == "V1": rule_dict = { 'period': {"min": 15, "max": 0}, } special = 0.01 elif flag == "V2": rule_dict = { 'period': {"min": 15, "max": 0}, } special = 0.01 elif flag == "V3": rule_dict = { 'period': {"min": 15, "max": 0}, } special = 0.01 else: link = user_dict["link"] payload = json.dumps({ "operationName": "visionProfilePhotoList", "variables": { "userId": str(link.replace("https://www.kuaishou.com/profile/", "")), "pcursor": pcursor, "page": "profile" }, "query": "fragment photoContent on PhotoEntity {\n id\n duration\n caption\n originCaption\n likeCount\n viewCount\n commentCount\n realLikeCount\n coverUrl\n photoUrl\n photoH265Url\n manifest\n manifestH265\n videoResource\n coverUrls {\n url\n __typename\n }\n timestamp\n expTag\n animatedCoverUrl\n distance\n videoRatio\n liked\n stereoType\n profileUserTopPhoto\n musicBlocked\n __typename\n}\n\nfragment feedContent on Feed {\n type\n author {\n id\n name\n headerUrl\n following\n headerUrls {\n url\n __typename\n }\n __typename\n }\n photo {\n ...photoContent\n __typename\n }\n canAddComment\n llsid\n status\n currentPcursor\n tags {\n type\n name\n __typename\n }\n __typename\n}\n\nquery visionProfilePhotoList($pcursor: String, $userId: String, $page: String, $webPageArea: String) {\n visionProfilePhotoList(pcursor: $pcursor, userId: $userId, page: $page, webPageArea: $webPageArea) {\n result\n llsid\n webPageArea\n feeds {\n ...feedContent\n __typename\n }\n hostName\n pcursor\n __typename\n }\n}\n" }) cookie_list = cls.get_cookie(log_type, crawler, env)["cookie"] if ',' in cookie_list: cookies = cookie_list.split(',') else: cookies = [cookie_list] cookie = random.choice(cookies) headers = { 'Accept': '*/*', 'Content-Type': 'application/json', 'Origin': 'https://www.kuaishou.com', 'Cookie': cookie, # 'Cookie': "kpf=PC_WEB; clientid=3; did=web_b7830efe78a1e48daacb126b6f52ad05; userId=1299331643; didv=1711610963000; kuaishou.server.web_st=ChZrdWFpc2hvdS5zZXJ2ZXIud2ViLnN0EqABsqe9eHH9ZkvQs7beNfwe3IobrRFI_3HvvYRx7BGcVwVImonsmZgPXWpHF89uY-Fomry6l9EtimGUwt4EEF0xMeyMauj7ZvMndNTvjG8qro9yB-xtc6_iN0a6-peDQz6zxeUs6gQLkm58NFtCTDGsqWqMDs1ruDWlJiy-1GVqT59GI_AoJgukwXtTEaYFKzDSmUWv7qsJ0Ya1gDlsyPFO0hoSoJCKbxHIWXjzVWap_gGna5KjIiCdD9i3Uy2z2XGYS--wpIFP_h8k_AasD37nnju7rv_ocSgFMAE; kuaishou.server.web_ph=76097deb523bd30b37fa3d9f67200e5e4e72; kpn=KUAISHOU_VISION", 'Content-Length': '1260', 'Accept-Language': 'zh-CN,zh-Hans;q=0.9', 'Host': 'www.kuaishou.com', 'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/15.6.1 Safari/605.1.15', 'Referer': f'https://www.kuaishou.com/profile/{link.replace("https://www.kuaishou.com/profile/", "")}', 'Accept-Encoding': 'gzip, deflate, br', 'Connection': 'keep-alive' } urllib3.disable_warnings() s = requests.session() # max_retries=3 重试3次 s.mount('http://', HTTPAdapter(max_retries=3)) s.mount('https://', HTTPAdapter(max_retries=3)) response = s.post(url=url, headers=headers, data=payload, verify=False, timeout=10) response.close() # Common.logger(log_type, crawler).info(f"response:{response.text}\n") if response.status_code != 200: Common.logger(log_type, crawler).warning(f"response:{response.text}\n") AliyunLogger.logging( code="2000", platform=crawler, mode=log_type, env=env, message=f"response:{response.json()}\n" ) return elif "data" not in response.json(): Common.logger(log_type, crawler).warning(f"response:{response.json()}\n") AliyunLogger.logging( code="2000", platform=crawler, mode=log_type, env=env, message=f"response:{response.json()}\n" ) Feishu.bot(log_type, 'kuaishou', f'快手cookie失效,请及时更换~') return elif "visionProfilePhotoList" not in response.json()["data"]: Common.logger(log_type, crawler).warning(f"response:{response.json()}\n") AliyunLogger.logging( code="2000", platform=crawler, mode=log_type, env=env, message=f"response:{response.json()}\n" ) Feishu.bot(log_type, 'kuaishou', f'快手cookie失效,请及时更换~') return elif "feeds" not in response.json()["data"]["visionProfilePhotoList"]: Common.logger(log_type, crawler).warning(f"response:{response.json()}\n") AliyunLogger.logging( code="2000", platform=crawler, mode=log_type, env=env, message=f"response:{response.json()}\n" ) return elif len(response.json()["data"]["visionProfilePhotoList"]["feeds"]) == 0: Common.logger(log_type, crawler).warning(f"没有更多视频啦 ~\n") AliyunLogger.logging( code="2001", platform=crawler, mode=log_type, env=env, message= f"没有更多视频啦 ~\n" ) return pcursor = response.json()['data']['visionProfilePhotoList']['pcursor'] feeds = response.json()['data']['visionProfilePhotoList']['feeds'] for i in range(len(feeds)): try: Common.logger(log_type, crawler).info('扫描到一条视频\n') AliyunLogger.logging( code="1001", platform=crawler, mode=log_type, env=env, message='扫描到一条视频\n' ) if cls.download_cnt >= cls.videos_cnt(rule_dict): Common.logger(log_type, crawler).info(f"已下载视频数:{cls.download_cnt}\n") AliyunLogger.logging( code="2002", platform=crawler, mode=log_type, env=env, message=f"已下载视频数:{cls.download_cnt}\n" ) return user_name = feeds[i].get("author").get("name") video_title = feeds[i].get("photo", {}).get("caption", random_title(log_type, crawler, env, text='title')) video_title = cls.video_title(log_type, crawler, env, video_title) try: video_id = feeds[i].get("photo", {}).get("videoResource").get("h264", {}).get("videoId", "") video_width = feeds[i].get("photo", {}).get("videoResource").get("h264", {}).get("adaptationSet", {})[0].get("representation", {})[0].get("width", 0) video_height = feeds[i].get("photo", {}).get("videoResource").get("h264", {}).get("adaptationSet", {})[0].get("representation", {})[0].get("height", 0) except KeyError: video_id = feeds[i].get("photo", {}).get("videoResource").get("hevc", {}).get("videoId", "") video_width = feeds[i].get("photo", {}).get("videoResource").get("hevc", {}).get("adaptationSet", {})[0].get("representation", {})[0].get("width", 0) video_height = feeds[i].get("photo", {}).get("videoResource").get("hevc", {}).get("adaptationSet", {})[0].get("representation", {})[0].get("height", 0) publish_time_stamp = int(int(feeds[i].get('photo', {}).get('timestamp', 0)) / 1000) publish_time_str = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(publish_time_stamp)) profile_userTop_photo = feeds[i].get('photo', {}).get('profileUserTopPhoto', '') viewCount = int(feeds[i].get('photo', {}).get('viewCount', 0)) realLikeCount = int(feeds[i].get('photo', {}).get('realLikeCount', 0)) # video_percent = '%.2f' % (realLikeCount / viewCount) # if special != 0: # special = float(special) # if float(video_percent) < special: # Common.logger(log_type, crawler).info(f"不符合条件:点赞/播放-{video_percent}\n") # AliyunLogger.logging( # code="2004", # platform=crawler, # mode=log_type, # env=env, # message=f"点赞量:{realLikeCount}\n" # ) # continue video_dict = {'video_title': video_title, 'video_id': video_id, 'play_cnt': int(feeds[i].get('photo', {}).get('viewCount', 0)), 'like_cnt': int(feeds[i].get('photo', {}).get('realLikeCount', 0)), 'comment_cnt': 0, 'share_cnt': 0, 'video_width': video_width, 'video_height': video_height, 'duration': int(int(feeds[i].get('photo', {}).get('duration', 0)) / 1000), 'publish_time_stamp': publish_time_stamp, 'publish_time_str': publish_time_str, 'user_name': feeds[i].get('author', {}).get('name', ""), 'user_id': feeds[i].get('author', {}).get('id', ""), 'avatar_url': feeds[i].get('author', {}).get('headerUrl', ""), 'cover_url': feeds[i].get('photo', {}).get('coverUrl', ""), 'video_url': feeds[i].get('photo', {}).get('photoUrl', ""), 'session': f"kuaishou-{int(time.time())}"} for k, v in video_dict.items(): Common.logger(log_type, crawler).info(f"{k}:{v}") AliyunLogger.logging( code="1000", platform=crawler, mode=log_type, env=env, message=f"{video_dict}\n" ) if profile_userTop_photo != True: if int((int(time.time()) - int(publish_time_stamp)) / (3600*24)) > int(rule_dict.get("period", {}).get("max", 1000)): Common.logger(log_type, crawler).info(f'发布时间超过{int(rule_dict.get("period", {}).get("max", 1000))}天\n') AliyunLogger.logging( code="2004", platform=crawler, mode=log_type, env=env, message=f'发布时间超过{int(rule_dict.get("period", {}).get("max", 1000))}天\n' ) return if video_dict["video_id"] == '' or video_dict["cover_url"] == '' or video_dict["video_url"] == '': Common.logger(log_type, crawler).info('无效视频\n') AliyunLogger.logging( code="2004", platform=crawler, mode=log_type, env=env, message='无效视频\n' ) elif download_rule(log_type=log_type, crawler=crawler, video_dict=video_dict, rule_dict=rule_dict) is False: Common.logger(log_type, crawler).info("不满足抓取规则\n") AliyunLogger.logging( code="2004", platform=crawler, mode=log_type, env=env, message='不满足抓取规则\n' ) elif any(str(word) if str(word) in video_dict["video_title"] else False for word in get_config_from_mysql(log_type=log_type, source=crawler, env=env, text="filter", action="")) is True: Common.logger(log_type, crawler).info('已中过滤词\n') AliyunLogger.logging( code="2004", platform=crawler, mode=log_type, env=env, message='已中过滤词\n' ) elif cls.repeat_video(log_type, crawler, video_dict["video_id"], env) != 0: count += 1 Common.logger(log_type, crawler).info('视频已下载\n') AliyunLogger.logging( code="2002", platform=crawler, mode=log_type, env=env, message='视频已下载\n' ) else: video_dict["out_user_id"] = video_dict["user_id"] video_dict["platform"] = crawler video_dict["strategy"] = log_type video_dict["out_video_id"] = video_dict["video_id"] video_dict["width"] = video_dict["video_width"] video_dict["height"] = video_dict["video_height"] video_dict["crawler_rule"] = json.dumps(rule_dict) video_dict["user_id"] = user_dict["uid"] video_dict["publish_time"] = video_dict["publish_time_str"] video_dict["strategy_type"] = log_type mq.send_msg(video_dict) current_time = datetime.datetime.now() timestamp = current_time.strftime("%Y-%m-%d %H:%M:%S") values = [[ user_name, video_id, video_title, publish_time_str, timestamp, viewCount, realLikeCount, feeds[i].get('photo', {}).get('coverUrl', ""), feeds[i].get('photo', {}).get('photoUrl', "") ]] Feishu.insert_columns('kuaishou', 'kuaishou', "ue0rAF", "ROWS", 1, 2) time.sleep(0.5) Feishu.update_values('kuaishou', 'kuaishou', "ue0rAF", "A2:Z2", values) cls.download_cnt += 1 AliyunLogger.logging(code="1002", message="成功发送至 ETL", data=video_dict) except Exception as e: Common.logger(log_type, crawler).warning(f"抓取单条视频异常:{e}\n") AliyunLogger.logging( code="3000", platform=crawler, mode=log_type, env=env, message=f"抓取单条视频异常:{e}\n" ) @classmethod def repeat_video(cls, log_type, crawler, video_id, env): sql = f""" select * from crawler_video where platform in ("{crawler}","{cls.platform}") and out_video_id="{video_id}"; """ repeat_video = MysqlHelper.get_values(log_type, crawler, sql, env) return len(repeat_video) @classmethod def get_author_videos(cls, log_type, crawler, user_list, rule_dict, env): for user_dict in user_list: try: Common.logger(log_type, crawler).info(f"开始抓取 {user_dict['nick_name']} 主页视频") AliyunLogger.logging( code="2000", platform=crawler, mode=log_type, env=env, message=f"开始抓取 {user_dict['nick_name']} 主页视频" ) cls.download_cnt = 0 cls.get_videoList(log_type=log_type, crawler=crawler, user_dict=user_dict, rule_dict=rule_dict, env=env) except Exception as e: Common.logger(log_type, crawler).warning(f"抓取用户{user_dict['nick_name']}主页视频时异常:{e}\n") AliyunLogger.logging( code="3000", platform=crawler, mode=log_type, env=env, message=f"抓取用户{user_dict['nick_name']}主页视频时异常:{e}\n" ) if __name__ == "__main__": print(KuaishouauthorScheduling.get_cookie("author", "kuaishou", "prod")["cookie"]) pass