# -*- coding: utf-8 -*- # @Author: wangkun # @Time: 2023/2/24 import os import random import shutil import sys import time from hashlib import md5 import requests import json import urllib3 from requests.adapters import HTTPAdapter sys.path.append(os.getcwd()) from common.common import Common from common.feishu import Feishu from common.getuser import getUser from common.db import MysqlHelper from common.publish import Publish from common.public import get_user_from_mysql from common.userAgent import get_random_user_agent class KuaiShouFollow: platform = "快手" tag = "快手爬虫,定向爬虫策略" @classmethod def get_rule(cls, log_type, crawler, index): try: while True: rule_sheet = Feishu.get_values_batch(log_type, crawler, "3iqG4z") if rule_sheet is None: Common.logger(log_type, crawler).warning("rule_sheet is None! 10秒后重新获取") time.sleep(10) continue if index == 1: rule_dict = { "play_cnt": f"{rule_sheet[1][1]}{rule_sheet[1][2]}", "video_width": f"{rule_sheet[2][1]}{rule_sheet[2][2]}", "video_height": f"{rule_sheet[3][1]}{rule_sheet[3][2]}", "like_cnt": f"{rule_sheet[4][1]}{rule_sheet[4][2]}", "duration": f"{rule_sheet[5][1]}{rule_sheet[5][2]}", "download_cnt": f"{rule_sheet[6][1]}{rule_sheet[6][2]}", "publish_time": f"{rule_sheet[7][1]}{rule_sheet[7][2]}", } # for k, v in rule_dict.items(): # Common.logger(log_type, crawler).info(f"{k}:{v}") return rule_dict elif index == 2: rule_dict = { "play_cnt": f"{rule_sheet[9][1]}{rule_sheet[9][2]}", "video_width": f"{rule_sheet[10][1]}{rule_sheet[10][2]}", "video_height": f"{rule_sheet[11][1]}{rule_sheet[11][2]}", "like_cnt": f"{rule_sheet[12][1]}{rule_sheet[12][2]}", "duration": f"{rule_sheet[13][1]}{rule_sheet[13][2]}", "download_cnt": f"{rule_sheet[14][1]}{rule_sheet[14][2]}", "publish_time": f"{rule_sheet[15][1]}{rule_sheet[15][2]}", } # for k, v in rule_dict.items(): # Common.logger(log_type, crawler).info(f"{k}:{v}") return rule_dict except Exception as e: Common.logger(log_type, crawler).error(f"get_rule:{e}\n") @classmethod def download_rule(cls, video_dict, rule_dict): if eval(f"{video_dict['play_cnt']}{rule_dict['play_cnt']}") is True \ and eval(f"{video_dict['video_width']}{rule_dict['video_width']}") is True \ and eval(f"{video_dict['video_height']}{rule_dict['video_height']}") is True \ and eval(f"{video_dict['like_cnt']}{rule_dict['like_cnt']}") is True \ and eval(f"{video_dict['duration']}{rule_dict['duration']}") is True \ and eval(f"{video_dict['publish_time']}{rule_dict['publish_time']}") is True: return True else: return False # 过滤词库 @classmethod def filter_words(cls, log_type, crawler): try: while True: filter_words_sheet = Feishu.get_values_batch(log_type, crawler, 'HIKVvs') if filter_words_sheet is None: Common.logger(log_type, crawler).warning(f"filter_words_sheet:{filter_words_sheet} 10秒钟后重试") continue filter_words_list = [] for x in filter_words_sheet: for y in x: if y is None: pass else: filter_words_list.append(y) return filter_words_list except Exception as e: Common.logger(log_type, crawler).error(f'filter_words异常:{e}\n') # 万能标题 @classmethod def random_title(cls, log_type, crawler): try: while True: random_title_sheet = Feishu.get_values_batch(log_type, crawler, '0DiyXe') if random_title_sheet is None: Common.logger(log_type, crawler).warning(f"filter_words_sheet:{random_title_sheet} 10秒钟后重试") continue random_title_list = [] for x in random_title_sheet: for y in x: if y is None: pass else: random_title_list.append(y) return random.choice(random_title_list) except Exception as e: Common.logger(log_type, crawler).error(f'random_title:{e}\n') # 获取站外用户信息 @classmethod def get_out_user_info(cls, log_type, crawler, out_uid): try: url = "https://www.kuaishou.com/graphql" payload = json.dumps({ "operationName": "visionProfile", "variables": { "userId": out_uid }, "query": "query visionProfile($userId: String) {\n visionProfile(userId: $userId) {\n result\n hostName\n userProfile {\n ownerCount {\n fan\n photo\n follow\n photo_public\n __typename\n }\n profile {\n gender\n user_name\n user_id\n headurl\n user_text\n user_profile_bg_url\n __typename\n }\n isFollowing\n __typename\n }\n __typename\n }\n}\n" }) headers = { 'Accept': '*/*', 'Content-Type': 'application/json', 'Origin': 'https://www.kuaishou.com', 'Cookie': 'did=web_c11041a45efb379fa3e11198d58d1dd1;; clientid=3; kpf=PC_WEB; kpn=KUAISHOU_VISION', 'Content-Length': '552', 'Accept-Language': 'zh-CN,zh-Hans;q=0.9', 'Host': 'www.kuaishou.com', 'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/15.6.1 Safari/605.1.15', 'Referer': 'https://www.kuaishou.com/profile/{}'.format(out_uid), 'Accept-Encoding': 'gzip, deflate, br', 'Connection': 'keep-alive' } urllib3.disable_warnings() s = requests.session() # max_retries=3 重试3次 s.mount('http://', HTTPAdapter(max_retries=3)) s.mount('https://', HTTPAdapter(max_retries=3)) response = s.post(url=url, headers=headers, data=payload, proxies=Common.tunnel_proxies(), verify=False, timeout=5) response.close() # Common.logger(log_type, crawler).info(f"get_out_user_info_response:{response.text}") if response.status_code != 200: Common.logger(log_type, crawler).warning(f"get_out_user_info_response:{response.text}\n") return elif 'data' not in response.json(): Common.logger(log_type, crawler).warning(f"get_out_user_info_response:{response.json()}\n") return elif 'visionProfile' not in response.json()['data']: Common.logger(log_type, crawler).warning(f"get_out_user_info_response:{response.json()['data']}\n") return elif 'userProfile' not in response.json()['data']['visionProfile']: Common.logger(log_type, crawler).warning( f"get_out_user_info_response:{response.json()['data']['visionProfile']['userProfile']}\n") return else: userProfile = response.json()['data']['visionProfile']['userProfile'] # Common.logger(log_type, crawler).info(f"userProfile:{userProfile}") try: out_fans_str = str(userProfile['ownerCount']['fan']) except Exception: out_fans_str = "0" try: out_follow_str = str(userProfile['ownerCount']['follow']) except Exception: out_follow_str = "0" try: out_avatar_url = userProfile['profile']['headurl'] except Exception: out_avatar_url = "" Common.logger(log_type, crawler).info(f"out_fans_str:{out_fans_str}") Common.logger(log_type, crawler).info(f"out_follow_str:{out_follow_str}") Common.logger(log_type, crawler).info(f"out_avatar_url:{out_avatar_url}") if "万" in out_fans_str: out_fans = int(float(out_fans_str.split("万")[0]) * 10000) else: out_fans = int(out_fans_str.replace(",", "")) if "万" in out_follow_str: out_follow = int(float(out_follow_str.split("万")[0]) * 10000) else: out_follow = int(out_follow_str.replace(",", "")) out_user_dict = { "out_fans": out_fans, "out_follow": out_follow, "out_avatar_url": out_avatar_url } Common.logger(log_type, crawler).info(f"out_user_dict:{out_user_dict}") return out_user_dict except Exception as e: Common.logger(log_type, crawler).error(f"get_out_user_info:{e}\n") # 获取用户信息列表 @classmethod def get_user_list(cls, log_type, crawler, sheetid, env, machine): try: while True: user_sheet = Feishu.get_values_batch(log_type, crawler, sheetid) if user_sheet is None: Common.logger(log_type, crawler).warning(f"user_sheet:{user_sheet} 10秒钟后重试") continue our_user_list = [] for i in range(1, len(user_sheet)): # for i in range(1, 2): out_uid = user_sheet[i][2] user_name = user_sheet[i][3] our_uid = user_sheet[i][6] our_user_link = user_sheet[i][7] if out_uid is None or user_name is None: Common.logger(log_type, crawler).info("空行\n") else: Common.logger(log_type, crawler).info(f"正在更新 {user_name} 用户信息\n") if our_uid is None: out_user_info = cls.get_out_user_info(log_type, crawler, out_uid) out_user_dict = { "out_uid": out_uid, "user_name": user_name, "out_avatar_url": out_user_info["out_avatar_url"], "out_create_time": '', "out_tag": '', "out_play_cnt": 0, "out_fans": out_user_info["out_fans"], "out_follow": out_user_info["out_follow"], "out_friend": 0, "out_like": 0, "platform": cls.platform, "tag": cls.tag, } our_user_dict = getUser.create_user(log_type=log_type, crawler=crawler, out_user_dict=out_user_dict, env=env, machine=machine) our_uid = our_user_dict['our_uid'] our_user_link = our_user_dict['our_user_link'] Feishu.update_values(log_type, crawler, sheetid, f'G{i + 1}:H{i + 1}', [[our_uid, our_user_link]]) Common.logger(log_type, crawler).info(f'站内用户信息写入飞书成功!\n') our_user_list.append(our_user_dict) else: our_user_dict = { 'out_uid': out_uid, 'user_name': user_name, 'our_uid': our_uid, 'our_user_link': our_user_link, } our_user_list.append(our_user_dict) return our_user_list except Exception as e: Common.logger(log_type, crawler).error(f'get_user_list:{e}\n') # 处理视频标题 @classmethod def video_title(cls, log_type, crawler, title): title_split1 = title.split(" #") if title_split1[0] != "": title1 = title_split1[0] else: title1 = title_split1[-1] title_split2 = title1.split(" #") if title_split2[0] != "": title2 = title_split2[0] else: title2 = title_split2[-1] title_split3 = title2.split("@") if title_split3[0] != "": title3 = title_split3[0] else: title3 = title_split3[-1] video_title = title3.strip().replace("\n", "") \ .replace("/", "").replace("快手", "").replace(" ", "") \ .replace(" ", "").replace("&NBSP", "").replace("\r", "") \ .replace("#", "").replace(".", "。").replace("\\", "") \ .replace(":", "").replace("*", "").replace("?", "") \ .replace("?", "").replace('"', "").replace("<", "") \ .replace(">", "").replace("|", "").replace("@", "")[:40] if video_title.replace(" ", "") == "" or video_title == "。。。" or video_title == "...": return cls.random_title(log_type, crawler) else: return video_title # @classmethod # def get_cookie(cls, log_type, crawler, out_uid, machine): # try: # # 打印请求配置 # ca = DesiredCapabilities.CHROME # ca["goog:loggingPrefs"] = {"performance": "ALL"} # # # 不打开浏览器运行 # chrome_options = webdriver.ChromeOptions() # chrome_options.add_argument("headless") # chrome_options.add_argument( # f'user-agent=Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/79.0.3945.79 Safari/537.36') # chrome_options.add_argument("--no-sandbox") # # # driver初始化 # if machine == "aliyun": # driver = webdriver.Chrome(desired_capabilities=ca, options=chrome_options) # elif machine == "macpro": # driver = webdriver.Chrome(desired_capabilities=ca, options=chrome_options, # service=Service('/Users/lieyunye/Downloads/chromedriver_v107/chromedriver')) # elif machine == "macair": # driver = webdriver.Chrome(desired_capabilities=ca, options=chrome_options, # service=Service('/Users/piaoquan/Downloads/chromedriver_v108/chromedriver')) # else: # driver = webdriver.Chrome(desired_capabilities=ca, options=chrome_options, service=Service( # '/Users/wangkun/Downloads/chromedriver/chromedriver_v109/chromedriver')) # # driver.implicitly_wait(10) # # print('打开个人主页') # driver.get(f'https://www.kuaishou.com/profile/{out_uid}') # time.sleep(1) # # # print('解析cookies') # logs = driver.get_log("performance") # # Common.logger(log_type, crawler).info('已获取logs:{}\n', logs) # # print('退出浏览器') # driver.quit() # for line in logs: # msg = json.loads(line['message']) # # Common.logger(log_type, crawler).info(f"{msg}\n\n") # if 'message' not in msg: # pass # elif 'params' not in msg['message']: # pass # elif 'headers' not in msg['message']['params']: # pass # elif 'Cookie' not in msg['message']['params']['headers']: # pass # elif msg['message']['params']['headers']['Host'] != 'www.kuaishou.com': # pass # else: # cookie = msg['message']['params']['headers']['Cookie'] # # Common.logger(log_type, crawler).info(f"{cookie}") # return cookie # except Exception as e: # Common.logger(log_type, crawler).error(f"get_cookie:{e}\n") @classmethod def get_videoList(cls, log_type, crawler, strategy, our_uid, out_uid, oss_endpoint, env, machine, pcursor=""): download_cnt_1, download_cnt_2 = 0, 0 rule_dict_1 = cls.get_rule(log_type, crawler, 1) rule_dict_2 = cls.get_rule(log_type, crawler, 2) if rule_dict_1 is None or rule_dict_2 is None: Common.logger(log_type, crawler).warning(f"rule_dict is None, 10秒后重试") return try: if download_cnt_1 >= int( rule_dict_1['download_cnt'].replace("=", "")[-1].replace("<", "")[-1].replace(">", "")[ -1]) and download_cnt_2 >= int( rule_dict_2['download_cnt'].replace("=", "")[-1].replace("<", "")[-1].replace(">", "")[-1]): Common.logger(log_type, crawler).info( f"规则1已下载{download_cnt_1}条视频,规则2已下载{download_cnt_2}条视频\n") return url = "https://www.kuaishou.com/graphql" payload = json.dumps({ "operationName": "visionProfilePhotoList", "variables": { "userId": out_uid, "pcursor": pcursor, "page": "profile" }, "query": "fragment photoContent on PhotoEntity {\n id\n duration\n caption\n originCaption\n likeCount\n viewCount\n realLikeCount\n coverUrl\n photoUrl\n photoH265Url\n manifest\n manifestH265\n videoResource\n coverUrls {\n url\n __typename\n }\n timestamp\n expTag\n animatedCoverUrl\n distance\n videoRatio\n liked\n stereoType\n profileUserTopPhoto\n musicBlocked\n __typename\n}\n\nfragment feedContent on Feed {\n type\n author {\n id\n name\n headerUrl\n following\n headerUrls {\n url\n __typename\n }\n __typename\n }\n photo {\n ...photoContent\n __typename\n }\n canAddComment\n llsid\n status\n currentPcursor\n tags {\n type\n name\n __typename\n }\n __typename\n}\n\nquery visionProfilePhotoList($pcursor: String, $userId: String, $page: String, $webPageArea: String) {\n visionProfilePhotoList(pcursor: $pcursor, userId: $userId, page: $page, webPageArea: $webPageArea) {\n result\n llsid\n webPageArea\n feeds {\n ...feedContent\n __typename\n }\n hostName\n pcursor\n __typename\n }\n}\n" }) headers = { 'Accept': '*/*', 'Content-Type': 'application/json', 'Origin': 'https://www.kuaishou.com', 'Cookie': 'kpf=PC_WEB; clientid=3; did=web_3f264336f6a6c191cd36fb15e87ab708; kpn=KUAISHOU_VISION', 'Content-Length': '1244', 'Accept-Language': 'zh-CN,zh-Hans;q=0.9', 'Host': 'www.kuaishou.com', 'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/111.0.0.0 Safari/537.36', # get_random_user_agent('pc'), 'Referer': 'https://www.kuaishou.com/profile/{}'.format(out_uid), 'Accept-Encoding': 'gzip, deflate, br', 'Connection': 'keep-alive' } urllib3.disable_warnings() s = requests.session() # max_retries=3 重试3次 s.mount('http://', HTTPAdapter(max_retries=3)) s.mount('https://', HTTPAdapter(max_retries=3)) response = s.post(url=url, headers=headers, data=payload, proxies=Common.tunnel_proxies(), verify=False, timeout=10) response.close() # Common.logger(log_type, crawler).info(f"get_videoList:{response.text}\n") if response.status_code != 200: Common.logger(log_type, crawler).warning(f"get_videoList_response:{response.text}\n") return elif 'data' not in response.json(): Common.logger(log_type, crawler).warning(f"get_videoList_response:{response.json()}\n") return elif 'visionProfilePhotoList' not in response.json()['data']: Common.logger(log_type, crawler).warning(f"get_videoList_response:{response.json()['data']}\n") return elif 'feeds' not in response.json()['data']['visionProfilePhotoList']: Common.logger(log_type, crawler).warning( f"get_videoList_response:{response.json()['data']['visionProfilePhotoList']}\n") return elif len(response.json()['data']['visionProfilePhotoList']['feeds']) == 0: Common.logger(log_type, crawler).info("没有更多视频啦 ~\n") return else: feeds = response.json()['data']['visionProfilePhotoList']['feeds'] pcursor = response.json()['data']['visionProfilePhotoList']['pcursor'] # Common.logger(log_type, crawler).info(f"feeds0: {feeds}\n") for i in range(len(feeds)): if 'photo' not in feeds[i]: Common.logger(log_type, crawler).warning(f"get_videoList:{feeds[i]}\n") break # video_title if 'caption' not in feeds[i]['photo']: video_title = cls.random_title(log_type, crawler) elif feeds[i]['photo']['caption'].strip() == "": video_title = cls.random_title(log_type, crawler) else: video_title = cls.video_title(log_type, crawler, feeds[i]['photo']['caption']) if 'videoResource' not in feeds[i]['photo'] \ and 'manifest' not in feeds[i]['photo'] \ and 'manifestH265' not in feeds[i]['photo']: Common.logger(log_type, crawler).warning(f"get_videoList:{feeds[i]['photo']}\n") break videoResource = feeds[i]['photo']['videoResource'] if 'h264' not in videoResource and 'hevc' not in videoResource: Common.logger(log_type, crawler).warning(f"get_videoList:{videoResource}\n") break # video_id if 'h264' in videoResource and 'videoId' in videoResource['h264']: video_id = videoResource['h264']['videoId'] elif 'hevc' in videoResource and 'videoId' in videoResource['hevc']: video_id = videoResource['hevc']['videoId'] else: video_id = "" # play_cnt if 'viewCount' not in feeds[i]['photo']: play_cnt = 0 else: play_cnt = int(feeds[i]['photo']['viewCount']) # like_cnt if 'realLikeCount' not in feeds[i]['photo']: like_cnt = 0 else: like_cnt = feeds[i]['photo']['realLikeCount'] # publish_time if 'timestamp' not in feeds[i]['photo']: publish_time_stamp = 0 publish_time_str = '' publish_time = 0 else: publish_time_stamp = int(int(feeds[i]['photo']['timestamp']) / 1000) publish_time_str = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(publish_time_stamp)) publish_time = int((int(time.time()) - publish_time_stamp) / (3600 * 24)) # duration if 'duration' not in feeds[i]['photo']: duration = 0 else: duration = int(int(feeds[i]['photo']['duration']) / 1000) # video_width / video_height / video_url mapping = {} for item in ['width', 'height']: try: val = str(videoResource['h264']['adaptationSet'][0]['representation'][0][item]) except Exception: val = str(videoResource['hevc']['adaptationSet'][0]['representation'][0][item]) except: val = '' mapping[item] = val video_width = int(mapping['width']) if mapping['width'] != '' else 0 video_height = int(mapping['height']) if mapping['height'] != '' else 0 # cover_url if 'coverUrl' not in feeds[i]['photo']: cover_url = "" else: cover_url = feeds[i]['photo']['coverUrl'] # user_name / avatar_url try: user_name = feeds[i]['author']['name'] avatar_url = feeds[i]['author']['headerUrl'] except Exception: user_name = '' avatar_url = '' video_url = feeds[i]['photo']['photoUrl'] video_dict = {'video_title': video_title, 'video_id': video_id, 'play_cnt': play_cnt, 'comment_cnt': 0, 'like_cnt': like_cnt, 'share_cnt': 0, 'video_width': video_width, 'video_height': video_height, 'duration': duration, 'publish_time': publish_time, 'publish_time_stamp': publish_time_stamp, 'publish_time_str': publish_time_str, 'user_name': user_name, 'user_id': out_uid, 'avatar_url': avatar_url, 'cover_url': cover_url, 'video_url': video_url, 'session': f"kuaishou{int(time.time())}"} rule_1 = cls.download_rule(video_dict, rule_dict_1) Common.logger(log_type, crawler).info(f"video_title:{video_title}") Common.logger(log_type, crawler).info(f"video_id:{video_id}\n") Common.logger(log_type, crawler).info( f"play_cnt:{video_dict['play_cnt']}{rule_dict_1['play_cnt']}, {eval(str(video_dict['play_cnt']) + str(rule_dict_1['play_cnt']))}") Common.logger(log_type, crawler).info( f"like_cnt:{video_dict['like_cnt']}{rule_dict_1['like_cnt']}, {eval(str(video_dict['like_cnt']) + str(rule_dict_1['like_cnt']))}") Common.logger(log_type, crawler).info( f"video_width:{video_dict['video_width']}{rule_dict_1['video_width']}, {eval(str(video_dict['video_width']) + str(rule_dict_1['video_width']))}") Common.logger(log_type, crawler).info( f"video_height:{video_dict['video_height']}{rule_dict_1['video_height']}, {eval(str(video_dict['video_height']) + str(rule_dict_1['video_height']))}") Common.logger(log_type, crawler).info( f"duration:{video_dict['duration']}{rule_dict_1['duration']}, {eval(str(video_dict['duration']) + str(rule_dict_1['duration']))}") Common.logger(log_type, crawler).info( f"publish_time:{video_dict['publish_time']}{rule_dict_1['publish_time']}, {eval(str(video_dict['publish_time']) + str(rule_dict_1['publish_time']))}") Common.logger(log_type, crawler).info(f"rule_1:{rule_1}\n") rule_2 = cls.download_rule(video_dict, rule_dict_2) Common.logger(log_type, crawler).info( f"play_cnt:{video_dict['play_cnt']}{rule_dict_2['play_cnt']}, {eval(str(video_dict['play_cnt']) + str(rule_dict_2['play_cnt']))}") Common.logger(log_type, crawler).info( f"like_cnt:{video_dict['like_cnt']}{rule_dict_2['like_cnt']}, {eval(str(video_dict['like_cnt']) + str(rule_dict_2['like_cnt']))}") Common.logger(log_type, crawler).info( f"video_width:{video_dict['video_width']}{rule_dict_2['video_width']}, {eval(str(video_dict['video_width']) + str(rule_dict_2['video_width']))}") Common.logger(log_type, crawler).info( f"video_height:{video_dict['video_height']}{rule_dict_2['video_height']}, {eval(str(video_dict['video_height']) + str(rule_dict_2['video_height']))}") Common.logger(log_type, crawler).info( f"duration:{video_dict['duration']}{rule_dict_2['duration']}, {eval(str(video_dict['duration']) + str(rule_dict_2['duration']))}") Common.logger(log_type, crawler).info( f"publish_time:{video_dict['publish_time']}{rule_dict_2['publish_time']}, {eval(str(video_dict['publish_time']) + str(rule_dict_2['publish_time']))}") Common.logger(log_type, crawler).info(f"rule_2:{rule_2}\n") if video_title == "" or video_url == "": Common.logger(log_type, crawler).info("无效视频\n") continue elif rule_1 is True: if download_cnt_1 < int( rule_dict_1['download_cnt'].replace("=", "")[-1].replace("<", "")[-1].replace(">", "")[ -1]): download_finished = cls.download_publish(log_type=log_type, crawler=crawler, strategy=strategy, video_dict=video_dict, rule_dict=rule_dict_1, our_uid=our_uid, oss_endpoint=oss_endpoint, env=env, machine=machine) if download_finished is True: download_cnt_1 += 1 elif rule_2 is True: if download_cnt_2 < int( rule_dict_2['download_cnt'].replace("=", "")[-1].replace("<", "")[-1].replace(">", "")[ -1]): download_finished = cls.download_publish(log_type=log_type, crawler=crawler, strategy=strategy, video_dict=video_dict, rule_dict=rule_dict_2, our_uid=our_uid, oss_endpoint=oss_endpoint, env=env, machine=machine) if download_finished is True: download_cnt_2 += 1 else: Common.logger(log_type, crawler).info("不满足下载规则\n") # Common.logger(log_type, crawler).info(f"feeds: {feeds}\n") if pcursor == "no_more": Common.logger(log_type, crawler).info(f"作者,{out_uid},已经到底了,没有更多内容了\n") return cls.get_videoList(log_type, crawler, strategy, our_uid, out_uid, oss_endpoint, env, machine, pcursor=pcursor) except Exception as e: Common.logger(log_type, crawler).error(f"get_videoList:{e}\n") @classmethod def repeat_video(cls, log_type, crawler, video_id, video_title, publish_time, env, machine): sql = f""" select * from crawler_video where platform="{cls.platform}" and out_video_id="{video_id}" or (platform="{cls.platform}" and video_title="{video_title}" and publish_time="{publish_time}") """ repeat_video = MysqlHelper.get_values(log_type, crawler, sql, env, machine) return len(repeat_video) @classmethod def download_publish(cls, log_type, crawler, strategy, video_dict, rule_dict, our_uid, oss_endpoint, env, machine): try: download_finished = False if cls.repeat_video(log_type, crawler, video_dict['video_id'], video_dict['video_title'], video_dict['publish_time_str'], env, machine) != 0: Common.logger(log_type, crawler).info('视频已下载\n') # elif video_dict['video_id'] in [x for y in Feishu.get_values_batch(log_type, crawler, "3cd128") for x in y]: # Common.logger(log_type, crawler).info('视频已下载\n') elif any(word if word in video_dict['video_title'] else False for word in cls.filter_words(log_type, crawler)) is True: Common.logger(log_type, crawler).info('标题已中过滤词\n') else: # 下载视频 Common.download_method(log_type=log_type, crawler=crawler, text='video', title=video_dict['video_title'], url=video_dict['video_url']) md_title = md5(video_dict['video_title'].encode('utf8')).hexdigest() if os.path.getsize(f"./{crawler}/videos/{md_title}/video.mp4") == 0: # 删除视频文件夹 shutil.rmtree(f"./{crawler}/videos/{md_title}") Common.logger(log_type, crawler).info("视频size=0,删除成功\n") return # ffmpeg_dict = Common.ffmpeg(log_type, crawler, # f"./{crawler}/videos/{video_dict['video_title']}/video.mp4") # if ffmpeg_dict is None or ffmpeg_dict['size'] == 0: # Common.logger(log_type, crawler).warning(f"下载的视频无效,已删除\n") # # 删除视频文件夹 # shutil.rmtree(f"./{crawler}/videos/{video_dict['video_title']}") # return download_finished # 下载封面 Common.download_method(log_type=log_type, crawler=crawler, text='cover', title=video_dict['video_title'], url=video_dict['cover_url']) # 保存视频信息至txt Common.save_video_info(log_type=log_type, crawler=crawler, video_dict=video_dict) # 上传视频 Common.logger(log_type, crawler).info("开始上传视频...") our_video_id = Publish.upload_and_publish(log_type=log_type, crawler=crawler, strategy=strategy, our_uid=our_uid, env=env, oss_endpoint=oss_endpoint) if env == 'dev': our_video_link = f"https://testadmin.piaoquantv.com/cms/post-detail/{our_video_id}/info" else: our_video_link = f"https://admin.piaoquantv.com/cms/post-detail/{our_video_id}/info" Common.logger(log_type, crawler).info("视频上传完成") if our_video_id is None: Common.logger(log_type, crawler).warning(f"our_video_id:{our_video_id} 删除该视频文件夹") # 删除视频文件夹 shutil.rmtree(f"./{crawler}/videos/{video_dict['video_title']}") return download_finished # 视频信息保存数据库 insert_sql = f""" insert into crawler_video(video_id, user_id, out_user_id, platform, strategy, out_video_id, video_title, cover_url, video_url, duration, publish_time, play_cnt, crawler_rule, width, height) values({our_video_id}, {our_uid}, "{video_dict['user_id']}", "{cls.platform}", "定向爬虫策略", "{video_dict['video_id']}", "{video_dict['video_title']}", "{video_dict['cover_url']}", "{video_dict['video_url']}", {int(video_dict['duration'])}, "{video_dict['publish_time_str']}", {int(video_dict['play_cnt'])}, '{json.dumps(rule_dict)}', {int(video_dict['video_width'])}, {int(video_dict['video_height'])}) """ Common.logger(log_type, crawler).info(f"insert_sql:{insert_sql}") MysqlHelper.update_values(log_type, crawler, insert_sql, env, machine) Common.logger(log_type, crawler).info('视频信息插入数据库成功!\n') # 视频写入飞书 Feishu.insert_columns(log_type, 'kuaishou', "fYdA8F", "ROWS", 1, 2) upload_time = int(time.time()) values = [[our_video_id, time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(upload_time)), "定向榜", str(video_dict['video_id']), video_dict['video_title'], our_video_link, video_dict['play_cnt'], video_dict['comment_cnt'], video_dict['like_cnt'], video_dict['share_cnt'], video_dict['duration'], f"{video_dict['video_width']}*{video_dict['video_height']}", video_dict['publish_time_str'], video_dict['user_name'], video_dict['user_id'], video_dict['avatar_url'], video_dict['cover_url'], video_dict['video_url']]] time.sleep(1) Feishu.update_values(log_type, 'kuaishou', "fYdA8F", "E2:Z2", values) Common.logger(log_type, crawler).info(f"视频已保存至云文档\n") download_finished = True return download_finished except Exception as e: Common.logger(log_type, crawler).error(f"download_publish:{e}\n") @classmethod def get_follow_videos(cls, log_type, crawler, strategy, oss_endpoint, env, machine): # user_list = cls.get_user_list(log_type=log_type, crawler=crawler, sheetid="bTSzxW", env=env, machine=machine) user_list = get_user_from_mysql(log_type, crawler, crawler, env) for user in user_list: spider_link = user["spider_link"] out_uid = spider_link.split('/')[-1] user_name = user["nick_name"] our_uid = user["media_id"] Common.logger(log_type, crawler).info(f"开始抓取 {user_name} 用户主页视频\n") try: cls.get_videoList(log_type=log_type, crawler=crawler, strategy=strategy, our_uid=our_uid, out_uid=out_uid, oss_endpoint=oss_endpoint, env=env, machine=machine) except Exception as e: Common.logger(log_type, crawler).info(f"用户:{user_name}, 抓取异常:{e}\n") continue # sleep_time = 120 # Common.logger(log_type, crawler).info(f"休眠{sleep_time}秒\n") # time.sleep(sleep_time) if __name__ == "__main__": KuaiShouFollow.get_videoList(log_type="follow", crawler="kuaishou", strategy="定向爬虫策略", our_uid="54719554", out_uid="3xnk3wbm3vfiha6", oss_endpoint="out", env="dev", machine="local") # print(KuaiShouFollow.get_out_user_info("follow", "kuaishou", "3xnk3wbm3vfiha6")) # print(Follow.get_out_user_info("follow", "kuaishou", "3x5wgjhfc7tx8ue"))