# -*- coding: utf-8 -*- # @Author: wangkun # @Time: 2023/2/3 """ YouTube 定向榜 1. 发布时间<=1个月 2. 10分钟>=时长>=1分钟 """ import os import re import shutil import sys import time import json import requests sys.path.append(os.getcwd()) from common.common import Common # from common.db import MysqlHelper from common.scheduling_db import MysqlHelper from common.feishu import Feishu from common.getuser import getUser from common.publish import Publish from common.translate import Translate from common.public import get_user_from_mysql, get_config_from_mysql headers = { 'user-agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/110.0.0.0 Safari/537.36', } def format_nums(data): data_dict = [{'亿': 100000000}, {'百万': 1000000}, {'万': 10000}, {'k': 1000}, {'w': 10000}, {'m': 1000000}, {'千': 1000}, {'M': 1000000}, {'K': 1000}, {'W': 10000}] data = str(data) for i in data_dict: index = data.find(list(i.keys())[0]) if index > 0: count = int(float(data[:index]) * list(i.values())[0]) return count elif index < 0: continue count = int(float(re.findall(r'\d+', data)[0])) return count class YoutubeAuthorScheduling: # 翻页参数 continuation = '' # 抓取平台 platform = 'youtube' headers = { 'user-agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/110.0.0.0 Safari/537.36', } @classmethod def get_out_user_info(cls, log_type, crawler, browse_id, out_user_id): """ 获取站外用户信息 :param log_type: 日志 :param crawler: 哪款爬虫 :param browse_id: browse_id :param out_user_id: 站外用户 UID :return: out_user_dict = {'out_user_name': 站外用户昵称, 'out_avatar_url': 站外用户头像, 'out_fans': 站外用户粉丝量, 'out_play_cnt': 站外用户总播放量, 'out_create_time': 站外用户创建时间} """ try: url = f'https://www.youtube.com/{out_user_id}/about' res = requests.get(url=url, headers=headers) info = re.findall(r'var ytInitialData = (.*?);', res.text, re.S)[0] data = json.loads(info) header = data['header']['c4TabbedHeaderRenderer'] tabs = data['contents']['twoColumnBrowseResultsRenderer']['tabs'] try: subsimpleText = header['subscriberCountText']['simpleText'].replace('位订阅者', '') out_fans = format_nums(subsimpleText) except Exception as e: out_fans = 0 for tab in tabs: if 'tabRenderer' not in tab or 'content' not in tab['tabRenderer']: continue viewCountText = \ tab['tabRenderer']['content']['sectionListRenderer']['contents'][0]['itemSectionRenderer'][ 'contents'][0]['channelAboutFullMetadataRenderer']['viewCountText']['simpleText'] out_create_time = \ tab['tabRenderer']['content']['sectionListRenderer']['contents'][0]['itemSectionRenderer'][ 'contents'][0]['channelAboutFullMetadataRenderer']['joinedDateText']['runs'][1]['text'] break out_user_dict = { 'out_user_name': header['title'], 'out_avatar_url': header['avatar']['thumbnails'][-1]['url'], 'out_fans': out_fans, 'out_play_cnt': int( viewCountText.replace('收看次數:', '').replace('次', '').replace(',', '')) if viewCountText else 0, 'out_create_time': out_create_time.replace('年', '-').replace('月', '-').replace('日', ''), } # print(out_user_dict) return out_user_dict except Exception as e: Common.logger(log_type, crawler).error(f'get_out_user_info异常:{e}\n') @classmethod def get_user_from_feishu(cls, log_type, crawler, sheetid, env): """ 补全飞书用户表信息,并返回 :param log_type: 日志 :param crawler: 哪款爬虫 :param sheetid: 飞书表 :param env: 正式环境:prod,测试环境:dev :param machine: 部署机器,阿里云填写 aliyun,aliyun_hk ,线下分别填写 macpro,macair,local :return: user_list """ try: user_sheet = Feishu.get_values_batch(log_type, crawler, sheetid) user_list = [] for i in range(1, len(user_sheet)): out_uid = user_sheet[i][2] user_name = user_sheet[i][3] browse_id = user_sheet[i][5] our_uid = user_sheet[i][6] uer_url = user_sheet[i][4] if out_uid is not None and user_name is not None: Common.logger(log_type, crawler).info(f"正在更新 {user_name} 用户信息\n") if our_uid is None: sql = f""" select * from crawler_user where platform="{cls.platform}" and out_user_id="{out_uid}" """ our_user_info = MysqlHelper.get_values(log_type, crawler, sql, env) # 数据库中(youtube + out_user_id)返回数量 == 0,则创建站内账号UID,并写入定向账号飞书表。并结合站外用户信息,一并写入爬虫账号数据库 if not our_user_info: # 获取站外账号信息,写入数据库 try: out_user_dict = cls.get_out_user_info(log_type, crawler, browse_id, out_uid) except Exception as e: continue out_avatar_url = out_user_dict['out_avatar_url'] out_create_time = out_user_dict['out_create_time'] out_play_cnt = out_user_dict['out_play_cnt'] out_fans = out_user_dict['out_fans'] tag = 'youtube爬虫,定向爬虫策略' # 创建站内账号 create_user_dict = { 'nickName': user_name, 'avatarUrl': out_avatar_url, 'tagName': tag, } our_uid = getUser.create_uid(log_type, crawler, create_user_dict, env) Common.logger(log_type, crawler).info(f'新创建的站内UID:{our_uid}') if env == 'prod': our_user_link = f'https://admin.piaoquantv.com/ums/user/{our_uid}/post' else: our_user_link = f'https://testadmin.piaoquantv.com/ums/user/{our_uid}/post' Common.logger(log_type, crawler).info(f'站内用户主页链接:{our_user_link}') Feishu.update_values(log_type, crawler, sheetid, f'G{i + 1}:H{i + 1}', [[our_uid, our_user_link]]) Common.logger(log_type, crawler).info(f'站内用户信息写入飞书成功!') sql = f""" insert into crawler_user(user_id, out_user_id, out_user_name, out_avatar_url, out_create_time, out_play_cnt, out_fans, platform, tag) values({our_uid}, "{out_uid}", "{user_name}", "{out_avatar_url}", "{out_create_time}", {out_play_cnt}, {out_fans}, "{cls.platform}", "{tag}") """ Common.logger(log_type, crawler).info(f'sql:{sql}') MysqlHelper.update_values(log_type, crawler, sql, env) Common.logger(log_type, crawler).info('用户信息插入数据库成功!\n') # 数据库中(youtube + out_user_id)返回数量 != 0,则直接把数据库中的站内 UID 写入飞书 else: our_uid = our_user_info[0][1] if 'env' == 'prod': our_user_link = f'https://admin.piaoquantv.com/ums/user/{our_uid}/post' else: our_user_link = f'https://testadmin.piaoquantv.com/ums/user/{our_uid}/post' Common.logger(log_type, crawler).info(f'站内用户主页链接:{our_user_link}') Feishu.update_values(log_type, crawler, sheetid, f'G{i + 1}:H{i + 1}', [[our_uid, our_user_link]]) Common.logger(log_type, crawler).info(f'站内用户信息写入飞书成功!\n') user_dict = { 'out_user_id': out_uid, 'out_user_name': user_name, 'out_browse_id': browse_id, 'our_user_id': our_uid, 'out_user_url': uer_url } user_list.append(user_dict) else: pass return user_list except Exception as e: Common.logger(log_type, crawler).error(f"get_user_from_feishu异常:{e}\n") @classmethod def get_continuation(cls, data): continuation = data['continuationItemRenderer']['continuationEndpoint']['continuationCommand']['token'] return continuation @classmethod def get_feeds(cls, log_type, crawler, browse_id, out_uid): """ 获取个人主页视频列表 :param log_type: 日志 :param crawler: 哪款爬虫 :param browse_id: 每个用户主页的请求参数中唯一值 :param out_uid: 站外用户UID :return: video_list """ url = "https://www.youtube.com/youtubei/v1/browse?key=AIzaSyAO_FJ2SlqU8Q4STEHLGCilw_Y9_11qcW8&prettyPrint=false" payload = json.dumps({ "context": { "client": { "hl": "zh-CN", "gl": "US", "remoteHost": "38.93.247.21", "deviceMake": "Apple", "deviceModel": "", "visitorData": "CgtraDZfVnB4NXdIWSi6mIOfBg%3D%3D", "userAgent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/109.0.0.0 Safari/537.36,gzip(gfe)", "clientName": "WEB", "clientVersion": "2.20230201.01.00", "osName": "Macintosh", "osVersion": "10_15_7", "originalUrl": f"https://www.youtube.com/{out_uid}/videos", "platform": "DESKTOP", "clientFormFactor": "UNKNOWN_FORM_FACTOR", "configInfo": { "appInstallData": "CLqYg58GEInorgUQuIuuBRCU-K4FENfkrgUQuNSuBRC2nP4SEPuj_hIQ5_euBRCy9a4FEKLsrgUQt-CuBRDi1K4FEILdrgUQh92uBRDM364FEP7urgUQzPWuBRDZ6a4FEOSg_hIQo_muBRDvo_4SEMnJrgUQlqf-EhCR-PwS" }, "timeZone": "Asia/Shanghai", "browserName": "Chrome", "browserVersion": "109.0.0.0", "acceptHeader": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.9", "deviceExperimentId": "ChxOekU1TlRReU5qWTBOVFExTVRRNU5qRTBOdz09ELqYg58GGOmU7Z4G", "screenWidthPoints": 944, "screenHeightPoints": 969, "screenPixelDensity": 1, "screenDensityFloat": 1, "utcOffsetMinutes": 480, "userInterfaceTheme": "USER_INTERFACE_THEME_LIGHT", "memoryTotalKbytes": "8000000", "mainAppWebInfo": { "graftUrl": f"/{out_uid}/videos", "pwaInstallabilityStatus": "PWA_INSTALLABILITY_STATUS_CAN_BE_INSTALLED", "webDisplayMode": "WEB_DISPLAY_MODE_FULLSCREEN", "isWebNativeShareAvailable": True } }, "user": { "lockedSafetyMode": False }, "request": { "useSsl": True, "internalExperimentFlags": [], "consistencyTokenJars": [] }, "clickTracking": { "clickTrackingParams": "CBcQ8JMBGAYiEwiNhIXX9IL9AhUFSUwIHWnnDks=" }, "adSignalsInfo": { "params": [ { "key": "dt", "value": "1675676731048" }, { "key": "flash", "value": "0" }, { "key": "frm", "value": "0" }, { "key": "u_tz", "value": "480" }, { "key": "u_his", "value": "4" }, { "key": "u_h", "value": "1080" }, { "key": "u_w", "value": "1920" }, { "key": "u_ah", "value": "1080" }, { "key": "u_aw", "value": "1920" }, { "key": "u_cd", "value": "24" }, { "key": "bc", "value": "31" }, { "key": "bih", "value": "969" }, { "key": "biw", "value": "944" }, { "key": "brdim", "value": "-269,-1080,-269,-1080,1920,-1080,1920,1080,944,969" }, { "key": "vis", "value": "1" }, { "key": "wgl", "value": "true" }, { "key": "ca_type", "value": "image" } ], "bid": "ANyPxKpfiaAf-DBzNeKLgkceMEA9UIeCWFRTRm4AQMCuejhI3PGwDB1jizQIX60YcEYtt_CX7tZWAbYerQ-rWLvV7y_KCLkBww" } }, # "browseId": browse_id, "params": "EgZ2aWRlb3PyBgQKAjoA", "continuation": cls.continuation }) headers = { 'authority': 'www.youtube.com', 'accept': '*/*', 'accept-language': 'zh-CN,zh;q=0.9,en;q=0.8,en-GB;q=0.7,en-US;q=0.6', 'cache-control': 'no-cache', 'content-type': 'application/json', 'cookie': 'VISITOR_INFO1_LIVE=kh6_Vpx5wHY; YSC=UupqFrWvAR0; DEVICE_INFO=ChxOekU1TlRReU5qWTBOVFExTVRRNU5qRTBOdz09EOmU7Z4GGOmU7Z4G; PREF=tz=Asia.Shanghai; ST-1kg1gfd=itct=CBcQ8JMBGAYiEwiNhIXX9IL9AhUFSUwIHWnnDks%3D&csn=MC4zNzI3MDcwMDA1Mjg4NzE5Ng..&endpoint=%7B%22clickTrackingParams%22%3A%22CBcQ8JMBGAYiEwiNhIXX9IL9AhUFSUwIHWnnDks%3D%22%2C%22commandMetadata%22%3A%7B%22webCommandMetadata%22%3A%7B%22url%22%3A%22%2F%40chinatravel5971%2Fvideos%22%2C%22webPageType%22%3A%22WEB_PAGE_TYPE_CHANNEL%22%2C%22rootVe%22%3A3611%2C%22apiUrl%22%3A%22%2Fyoutubei%2Fv1%2Fbrowse%22%7D%7D%2C%22browseEndpoint%22%3A%7B%22browseId%22%3A%22UCpLXnfBCNhj8KLnt54RQMKA%22%2C%22params%22%3A%22EgZ2aWRlb3PyBgQKAjoA%22%2C%22canonicalBaseUrl%22%3A%22%2F%40chinatravel5971%22%7D%7D', 'origin': 'https://www.youtube.com', 'pragma': 'no-cache', 'referer': f'https://www.youtube.com/{out_uid}/featured', 'sec-ch-ua': '"Not_A Brand";v="99", "Chromium";v="109", "Google Chrome";v="109.0.5414.87"', 'sec-ch-ua-arch': '"arm"', 'sec-ch-ua-bitness': '"64"', 'sec-ch-ua-full-version': '"109.0.1518.52"', 'sec-ch-ua-full-version-list': '"Not_A Brand";v="99.0.0.0", "Microsoft Edge";v="109.0.1518.52", "Chromium";v="109.0.5414.87"', 'sec-ch-ua-mobile': '?0', 'sec-ch-ua-model': '', 'sec-ch-ua-platform': '"macOS"', 'sec-ch-ua-platform-version': '"12.4.0"', 'sec-ch-ua-wow64': '?0', 'sec-fetch-dest': 'empty', 'sec-fetch-mode': 'same-origin', 'sec-fetch-site': 'same-origin', 'user-agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/109.0.0.0 Safari/537.36', 'x-goog-visitor-id': 'CgtraDZfVnB4NXdIWSi6mIOfBg%3D%3D', 'x-youtube-bootstrap-logged-in': 'false', 'x-youtube-client-name': '1', 'x-youtube-client-version': '2.20230201.01.00' } try: response = requests.post(url=url, headers=headers, data=payload) # Common.logger(log_type, crawler).info(f"get_feeds_response:{response.json()}\n") cls.continuation = response.json()['trackingParams'] if response.status_code != 200: Common.logger(log_type, crawler).warning(f'get_feeds_response:{response.text}\n') elif 'continuationContents' not in response.text and 'onResponseReceivedActions' not in response.text: Common.logger(log_type, crawler).warning(f'get_feeds_response:{response.text}\n') elif 'continuationContents' in response.json(): # Common.logger(log_type, crawler).info("'continuationContents' in response.json()\n") if 'richGridContinuation' not in response.json()['continuationContents']: Common.logger(log_type, crawler).warning( f'get_feeds_response:{response.json()["continuationContents"]}\n') elif 'contents' not in response.json()['continuationContents']['richGridContinuation']: Common.logger(log_type, crawler).warning( f'get_feeds_response:{response.json()["continuationContents"]["richGridContinuation"]}\n') elif 'contents' in response.json()["continuationContents"]["richGridContinuation"]: feeds = response.json()["continuationContents"]["richGridContinuation"]['contents'] return feeds elif 'onResponseReceivedActions' in response.json(): Common.logger(log_type, crawler).info("'onResponseReceivedActions' in response.json()\n") if len(response.json()['onResponseReceivedActions']) == 0: Common.logger(log_type, crawler).warning( f'get_feeds_response:{response.json()["onResponseReceivedActions"]}\n') elif 'appendContinuationItemsAction' not in response.json()['onResponseReceivedActions'][0]: Common.logger(log_type, crawler).warning( f'get_feeds_response:{response.json()["onResponseReceivedActions"][0]}\n') elif 'continuationItems' not in response.json()['onResponseReceivedActions'][0][ 'appendContinuationItemsAction']: Common.logger(log_type, crawler).warning( f'get_feeds_response:{response.json()["onResponseReceivedActions"][0]["appendContinuationItemsAction"]}\n') elif len(response.json()['onResponseReceivedActions'][0]['appendContinuationItemsAction'][ 'continuationItems']) == 0: Common.logger(log_type, crawler).warning( f'get_feeds_response:{response.json()["onResponseReceivedActions"][0]["appendContinuationItemsAction"]["continuationItems"]}\n') else: feeds = response.json()["onResponseReceivedActions"][0]["appendContinuationItemsAction"][ "continuationItems"] return feeds else: Common.logger(log_type, crawler).info('feeds is None\n') except Exception as e: Common.logger(log_type, crawler).error(f'get_feeds异常:{e}\n') @classmethod def get_first_page(cls, user_url): try: res = requests.get(url=user_url, headers=cls.headers) info = re.findall(r'var ytInitialData = (.*?);', res.text, re.S)[0] ytInitialData = json.loads(info) video_list = \ ytInitialData['contents']['twoColumnBrowseResultsRenderer']['tabs'][1]['tabRenderer']['content'][ 'richGridRenderer']['contents'] except Exception as e: video_list = [] return video_list @classmethod def get_next_page(cls, log_type, crawler, strategy, oss_endpoint, env, out_uid, our_uid, out_user_url, continuation): post_url = "https://www.youtube.com/youtubei/v1/browse?key=AIzaSyAO_FJ2SlqU8Q4STEHLGCilw_Y9_11qcW8&prettyPrint=false" payload = json.dumps({ "context": { "client": { "userAgent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/110.0.0.0 Safari/537.36,gzip(gfe)", "clientName": "WEB", "clientVersion": "2.20230221.06.00", "osName": "Macintosh", "osVersion": "10_15_7", "originalUrl": "https://www.youtube.com/@wongkim728/videos", "screenPixelDensity": 2, "platform": "DESKTOP", "clientFormFactor": "UNKNOWN_FORM_FACTOR", "configInfo": { "appInstallData": "CKWy258GEOWg_hIQzN-uBRC4rP4SEOf3rgUQzPWuBRCi7K4FEMiJrwUQieiuBRDshq8FENrprgUQ4tSuBRD-7q4FEKOArwUQgt2uBRC2nP4SEJT4rgUQuIuuBRCH3a4FELjUrgUQjqj-EhCR-PwS" }, "screenDensityFloat": 2, "timeZone": "Asia/Shanghai", "browserName": "Chrome", "browserVersion": "110.0.0.0", "acceptHeader": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7", "deviceExperimentId": "ChxOekl3TWpVek9UQXpPVE13TnpJd056a3pNZz09EKWy258GGJie0p8G", "screenWidthPoints": 576, "screenHeightPoints": 764, "utcOffsetMinutes": 480, "userInterfaceTheme": "USER_INTERFACE_THEME_LIGHT", "connectionType": "CONN_CELLULAR_4G", "memoryTotalKbytes": "8000000", "mainAppWebInfo": { "graftUrl": out_user_url, "pwaInstallabilityStatus": "PWA_INSTALLABILITY_STATUS_CAN_BE_INSTALLED", "webDisplayMode": "WEB_DISPLAY_MODE_FULLSCREEN", "isWebNativeShareAvailable": False } }, "user": { "lockedSafetyMode": False }, "request": { "useSsl": True, "internalExperimentFlags": [], "consistencyTokenJars": [] }, "clickTracking": { "clickTrackingParams": "" }, "adSignalsInfo": { "params": [], "bid": "ANyPxKo8EXfKNGm3gYLAqhR5HA90FSKMvQf43tk3KV_XUWB5xi_0OxAo2TJTfoVx_516NRxz0qwRg-1x2kD-IVt7LPKrRHkJBA" } }, "continuation": continuation }) headers = { # 'authorization': 'SAPISIDHASH 1677121838_f5055bd4b4c242d18af423b37ac0f556bf1dfc30', 'content-type': 'application/json', 'cookie': 'VISITOR_INFO1_LIVE=HABZsLFdU40; DEVICE_INFO=ChxOekl3TWpVek9UQXpPVE13TnpJd056a3pNZz09EJie0p8GGJie0p8G; PREF=f4=4000000&tz=Asia.Shanghai; HSID=AxFp7ylWWebUZYqrl; SSID=ANHuSQMqvVcV0vVNn; APISID=AkwZgjPvFZ6LZCrE/Aiv0K-2rEUzY1bH1u; SAPISID=8yRrBMHYXAhqkybH/AEFGJvzZ3tPalnTy0; __Secure-1PAPISID=8yRrBMHYXAhqkybH/AEFGJvzZ3tPalnTy0; __Secure-3PAPISID=8yRrBMHYXAhqkybH/AEFGJvzZ3tPalnTy0; SID=TwjWkM4mrKb4o8pRKbyQVqELjNU43ZL0bF8QB2hdTI9z05T4Koo9aQoNQfX1AiGFWeD7WA.; __Secure-1PSID=TwjWkM4mrKb4o8pRKbyQVqELjNU43ZL0bF8QB2hdTI9z05T4bs4qvvXffLLTXq_VYw0XLw.; __Secure-3PSID=TwjWkM4mrKb4o8pRKbyQVqELjNU43ZL0bF8QB2hdTI9z05T4cNwzpudzvCglfQ5A1FJnog.; LOGIN_INFO=AFmmF2swRAIgO4TvR9xxWoHPgrGoGAEVo-P8Slqem__vIdF_oajjRiECIFiq4YtbL_IQGCbkjrHsWkWH6OpzKd8RlgdS6qNurR0Q:QUQ3MjNmejV5WkRVUmZXVlFjbjY0dW1aVGpoZkZQdmxYamIzV01zc0lmT3JiQl9ldVYwc0t4dlNkbWpoVEdJMHVaWjZXVEt3ZERQeUppU3AyNmR6ckFucWltZU5LNmZjQ3lHUEtKTDBzSlo5WXpJQzF3UlNCVlp2Q1ZKVmxtRk05OHRuWFFiWGphcFpPblFOUURWTlVxVGtBazVjcmVtS2pR; YSC=CtX0f3NennA; SIDCC=AFvIBn9aXC4vNCbg5jPzjbC8LMYCBVx_dy8uJO20b-768rmRfP9f5BqQ_xXspPemecVq29qZ7A; __Secure-1PSIDCC=AFvIBn-4TD_lPaKgbmYAGO6hZluLgSgbWgb7XAcaeNG6982LIIpS_Gb9vkqHTBMyCGvb4x7m6jk; __Secure-3PSIDCC=AFvIBn9ypvGX15qq4CsnsuhWTaXa9yMTxWMWbIDXtr6L3XZD81XBUQ0IMUv9ZKh9mf8NEbSvOy0; SIDCC=AFvIBn_DwLbohF2llhq4EQjFDFA3n9-_AK_7ITJsTZtCeYwy43J8KCYUPfY7ghqX9s-Qq5dOIQ; __Secure-1PSIDCC=AFvIBn-7x_HhxbmDkOzXew-sXAEWVuUGpglr8rypU623IyO8Y9OungcqMkuxBZQ2vr6G7x9UcxM; __Secure-3PSIDCC=AFvIBn-7aSYRxZkCKZp7-Mdn9PwbW4CUtXD0ok0nCvPIZXfkFrN9VqN1BHkI1fUaoIo_8YCjwRs', 'origin': 'https://www.youtube.com', 'referer': out_user_url, 'user-agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/110.0.0.0 Safari/537.36', } try: res = requests.request("POST", post_url, headers=headers, data=payload).json() video_infos = res['onResponseReceivedActions'][0]['appendContinuationItemsAction']['continuationItems'] for data in video_infos: if 'richItemRenderer' in data: video_id = data["richItemRenderer"]["content"]['videoRenderer']['videoId'] video_dict = cls.get_video_info(log_type, crawler, out_uid, video_id) # 发布时间<=7天 publish_time = int(time.mktime(time.strptime(video_dict['publish_time'], "%Y-%m-%d"))) if int(time.time()) - publish_time <= 3600 * 24 * 7: cls.download_publish(log_type, crawler, video_dict, strategy, our_uid, env, oss_endpoint) else: Common.logger(log_type, crawler).info('发布时间超过7天\n') return else: continuation = cls.get_continuation(data) cls.get_next_page(log_type, crawler, strategy, oss_endpoint, env, out_uid, our_uid,out_user_url, continuation) except: return @classmethod def get_videos(cls, log_type, crawler, strategy, task, oss_endpoint, env, out_uid, our_uid, out_user_url): try: feeds = cls.get_first_page(out_user_url) for data in feeds: if 'richItemRenderer' in data: video_id = data["richItemRenderer"]["content"]['videoRenderer']['videoId'] video_dict = cls.get_video_info(log_type, crawler, out_uid, video_id) # 发布时间<=7天 publish_time = int(time.mktime(time.strptime(video_dict['publish_time'], "%Y-%m-%d"))) if int(time.time()) - publish_time <= 3600 * 24 * 7: cls.download_publish(log_type, crawler, video_dict, strategy, our_uid, env, oss_endpoint) else: Common.logger(log_type, crawler).info('发布时间超过7天\n') return else: continuation = cls.get_continuation(data) cls.get_next_page(log_type, crawler, strategy, oss_endpoint, env, out_uid, our_uid, out_user_url, continuation=continuation) except Exception as e: Common.logger(log_type, crawler).error(f"get_videos异常:{e}\n") @classmethod def filter_emoji(cls, title): # 过滤表情 try: co = re.compile(u'[\U00010000-\U0010ffff]') except re.error: co = re.compile(u'[\uD800-\uDBFF][\uDC00-\uDFFF]') return co.sub("", title) @classmethod def is_contain_chinese(cls, strword): for ch in strword: if u'\u4e00' <= ch <= u'\u9fff': return True return False @classmethod def parse_video(cls, video_dict, log_type, crawler, out_uid, video_id): try: if 'streamingData' not in video_dict: Common.logger(log_type, crawler).warning(f"get_video_info_response:{video_dict}\n") elif 'videoDetails' not in video_dict: Common.logger(log_type, crawler).warning(f"get_video_info_response:{video_dict}\n") elif 'microformat' not in video_dict: Common.logger(log_type, crawler).warning(f"get_video_info_response:{video_dict}\n") else: playerMicroformatRenderer = video_dict['microformat']['playerMicroformatRenderer'] videoDetails = video_dict['videoDetails'] # streamingData = response.json()['streamingData'] # video_title if 'title' not in videoDetails: video_title = '' else: video_title = videoDetails['title'] video_title = cls.filter_emoji(video_title) # if Translate.is_contains_chinese(video_title) is False: if not cls.is_contain_chinese(video_title): video_title = Translate.google_translate(video_title) \ .strip().replace("\\", "").replace(" ", "").replace("\n", "") \ .replace("/", "").replace("\r", "").replace("&NBSP", "").replace("&", "") \ .replace(";", "").replace("amp;", "") # 自动翻译标题为中文 if 'lengthSeconds' not in videoDetails: duration = 0 else: duration = int(videoDetails['lengthSeconds']) # play_cnt if 'viewCount' not in videoDetails: play_cnt = 0 else: play_cnt = int(videoDetails['viewCount']) # publish_time if 'publishDate' not in playerMicroformatRenderer: publish_time = '' else: publish_time = playerMicroformatRenderer['publishDate'] if publish_time == '': publish_time_stamp = 0 elif ':' in publish_time: publish_time_stamp = int(time.mktime(time.strptime(publish_time, "%Y-%m-%d %H:%M:%S"))) else: publish_time_stamp = int(time.mktime(time.strptime(publish_time, "%Y-%m-%d"))) # user_name if 'author' not in videoDetails: user_name = '' else: user_name = videoDetails['author'] # cover_url if 'thumbnail' not in videoDetails: cover_url = '' elif 'thumbnails' not in videoDetails['thumbnail']: cover_url = '' elif len(videoDetails['thumbnail']['thumbnails']) == 0: cover_url = '' elif 'url' not in videoDetails['thumbnail']['thumbnails'][-1]: cover_url = '' else: cover_url = videoDetails['thumbnail']['thumbnails'][-1]['url'] # video_url # if 'formats' not in streamingData: # video_url = '' # elif len(streamingData['formats']) == 0: # video_url = '' # elif 'url' not in streamingData['formats'][-1]: # video_url = '' # else: # video_url = streamingData['formats'][-1]['url'] video_url = f"https://www.youtube.com/watch?v={video_id}" Common.logger(log_type, crawler).info(f'video_title:{video_title}') Common.logger(log_type, crawler).info(f'video_id:{video_id}') Common.logger(log_type, crawler).info(f'play_cnt:{play_cnt}') Common.logger(log_type, crawler).info(f'publish_time:{publish_time}') Common.logger(log_type, crawler).info(f'user_name:{user_name}') Common.logger(log_type, crawler).info(f'cover_url:{cover_url}') Common.logger(log_type, crawler).info(f'video_url:{video_url}') video_dict = { 'video_title': video_title, 'video_id': video_id, 'duration': duration, 'play_cnt': play_cnt, 'publish_time': publish_time, 'publish_time_stamp': publish_time_stamp, 'user_name': user_name, 'out_uid': out_uid, 'cover_url': cover_url, 'video_url': video_url, } return video_dict except Exception as e: Common.logger(log_type, crawler).error(f"get_video_info异常:{e}\n") @classmethod def get_video_info(cls, log_type, crawler, out_uid, video_id): try: url = "https://www.youtube.com/youtubei/v1/player?key=AIzaSyAO_FJ2SlqU8Q4STEHLGCilw_Y9_11qcW8&prettyPrint=false" payload = json.dumps({ "context": { "client": { "hl": "zh-CN", "gl": "US", "remoteHost": "38.93.247.21", "deviceMake": "Apple", "deviceModel": "", "visitorData": "CgtraDZfVnB4NXdIWSjkzoefBg%3D%3D", "userAgent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/109.0.0.0 Safari/537.36,gzip(gfe)", "clientName": "WEB", "clientVersion": "2.20230201.01.00", "osName": "Macintosh", "osVersion": "10_15_7", "originalUrl": f"https://www.youtube.com/watch?v={video_id}", "platform": "DESKTOP", "clientFormFactor": "UNKNOWN_FORM_FACTOR", "configInfo": { "appInstallData": "COTOh58GEPuj_hIQ1-SuBRC4i64FEMzfrgUQgt2uBRCi7K4FEOLUrgUQzPWuBRCKgK8FEOSg_hIQtpz-EhDa6a4FEP7urgUQieiuBRDn964FELjUrgUQlPiuBRCH3a4FELfgrgUQ76P-EhDJya4FEJan_hIQkfj8Eg%3D%3D" }, "timeZone": "Asia/Shanghai", "browserName": "Chrome", "browserVersion": "109.0.0.0", "acceptHeader": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.9", "deviceExperimentId": "ChxOekU1TlRReU5qWTBOVFExTVRRNU5qRTBOdz09EOTOh58GGOmU7Z4G", "screenWidthPoints": 1037, "screenHeightPoints": 969, "screenPixelDensity": 1, "screenDensityFloat": 1, "utcOffsetMinutes": 480, "userInterfaceTheme": "USER_INTERFACE_THEME_LIGHT", "memoryTotalKbytes": "8000000", "clientScreen": "WATCH", "mainAppWebInfo": { "graftUrl": f"/watch?v={video_id}", "pwaInstallabilityStatus": "PWA_INSTALLABILITY_STATUS_CAN_BE_INSTALLED", "webDisplayMode": "WEB_DISPLAY_MODE_FULLSCREEN", "isWebNativeShareAvailable": True } }, "user": { "lockedSafetyMode": False }, "request": { "useSsl": True, "internalExperimentFlags": [], "consistencyTokenJars": [] }, "clickTracking": { "clickTrackingParams": "CIwBEKQwGAYiEwipncqx3IL9AhXs4cQKHbKZDO4yB3JlbGF0ZWRInsS1qbGFtIlUmgEFCAEQ-B0=" }, "adSignalsInfo": { "params": [ { "key": "dt", "value": "1675749222611" }, { "key": "flash", "value": "0" }, { "key": "frm", "value": "0" }, { "key": "u_tz", "value": "480" }, { "key": "u_his", "value": "3" }, { "key": "u_h", "value": "1080" }, { "key": "u_w", "value": "1920" }, { "key": "u_ah", "value": "1080" }, { "key": "u_aw", "value": "1920" }, { "key": "u_cd", "value": "24" }, { "key": "bc", "value": "31" }, { "key": "bih", "value": "969" }, { "key": "biw", "value": "1037" }, { "key": "brdim", "value": "-269,-1080,-269,-1080,1920,-1080,1920,1080,1037,969" }, { "key": "vis", "value": "1" }, { "key": "wgl", "value": "true" }, { "key": "ca_type", "value": "image" } ], "bid": "ANyPxKop8SijebwUCq4ZfKbJwlSjVQa_RTdS6c6a6WPYpCKnxpWCJ33B1SzRuSXjSfH9O2MhURebAs0CngRg6B4nOjBpeJDKgA" } }, "videoId": str(video_id), "playbackContext": { "contentPlaybackContext": { "currentUrl": f"/watch?v={video_id}", "vis": 0, "splay": False, "autoCaptionsDefaultOn": False, "autonavState": "STATE_NONE", "html5Preference": "HTML5_PREF_WANTS", "signatureTimestamp": 19394, "referer": f"https://www.youtube.com/watch?v={video_id}", "lactMilliseconds": "-1", "watchAmbientModeContext": { "watchAmbientModeEnabled": True } } }, "racyCheckOk": False, "contentCheckOk": False }) headers = { 'authority': 'www.youtube.com', 'accept': '*/*', 'accept-language': 'zh-CN,zh;q=0.9,en;q=0.8,en-GB;q=0.7,en-US;q=0.6', 'cache-control': 'no-cache', 'content-type': 'application/json', 'cookie': f'VISITOR_INFO1_LIVE=kh6_Vpx5wHY; YSC=UupqFrWvAR0; DEVICE_INFO=ChxOekU1TlRReU5qWTBOVFExTVRRNU5qRTBOdz09EOmU7Z4GGOmU7Z4G; PREF=tz=Asia.Shanghai; ST-180dxzo=itct=CIwBEKQwGAYiEwipncqx3IL9AhXs4cQKHbKZDO4yB3JlbGF0ZWRInsS1qbGFtIlUmgEFCAEQ-B0%3D&csn=MC41MTQ1NTQzMTE3NTA4MjY0&endpoint=%7B%22clickTrackingParams%22%3A%22CIwBEKQwGAYiEwipncqx3IL9AhXs4cQKHbKZDO4yB3JlbGF0ZWRInsS1qbGFtIlUmgEFCAEQ-B0%3D%22%2C%22commandMetadata%22%3A%7B%22webCommandMetadata%22%3A%7B%22url%22%3A%22%2Fwatch%3Fv%3D{video_id}%22%2C%22webPageType%22%3A%22WEB_PAGE_TYPE_WATCH%22%2C%22rootVe%22%3A3832%7D%7D%2C%22watchEndpoint%22%3A%7B%22videoId%22%3A%22{video_id}%22%2C%22nofollow%22%3Atrue%2C%22watchEndpointSupportedOnesieConfig%22%3A%7B%22html5PlaybackOnesieConfig%22%3A%7B%22commonConfig%22%3A%7B%22url%22%3A%22https%3A%2F%2Frr5---sn-nx5s7n76.googlevideo.com%2Finitplayback%3Fsource%3Dyoutube%26oeis%3D1%26c%3DWEB%26oad%3D3200%26ovd%3D3200%26oaad%3D11000%26oavd%3D11000%26ocs%3D700%26oewis%3D1%26oputc%3D1%26ofpcc%3D1%26msp%3D1%26odepv%3D1%26id%3D38654ad085c12212%26ip%3D38.93.247.21%26initcwndbps%3D11346250%26mt%3D1675748964%26oweuc%3D%26pxtags%3DCg4KAnR4EggyNDQ1MTI4OA%26rxtags%3DCg4KAnR4EggyNDQ1MTI4Ng%252CCg4KAnR4EggyNDQ1MTI4Nw%252CCg4KAnR4EggyNDQ1MTI4OA%252CCg4KAnR4EggyNDQ1MTI4OQ%22%7D%7D%7D%7D%7D', 'origin': 'https://www.youtube.com', 'pragma': 'no-cache', 'referer': f'https://www.youtube.com/watch?v={video_id}', 'sec-ch-ua': '"Not_A Brand";v="99", "Chromium";v="109", "Google Chrome";v="109.0.5414.87"', 'sec-ch-ua-arch': '"arm"', 'sec-ch-ua-bitness': '"64"', 'sec-ch-ua-full-version': '"109.0.1518.52"', 'sec-ch-ua-full-version-list': '"Not_A Brand";v="99.0.0.0", "Microsoft Edge";v="109.0.1518.52", "Chromium";v="109.0.5414.87"', 'sec-ch-ua-mobile': '?0', 'sec-ch-ua-model': '', 'sec-ch-ua-platform': '"macOS"', 'sec-ch-ua-platform-version': '"12.4.0"', 'sec-ch-ua-wow64': '?0', 'sec-fetch-dest': 'empty', 'sec-fetch-mode': 'same-origin', 'sec-fetch-site': 'same-origin', 'user-agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/109.0.0.0 Safari/537.36', 'x-goog-visitor-id': 'CgtraDZfVnB4NXdIWSjkzoefBg%3D%3D', 'x-youtube-bootstrap-logged-in': 'false', 'x-youtube-client-name': '1', 'x-youtube-client-version': '2.20230201.01.00' } response = requests.post(url=url, headers=headers, data=payload) if response.status_code != 200: Common.logger(log_type, crawler).warning(f"get_video_info_response:{response.text}\n") elif 'streamingData' not in response.json(): Common.logger(log_type, crawler).warning(f"get_video_info_response:{response.json()}\n") elif 'videoDetails' not in response.json(): Common.logger(log_type, crawler).warning(f"get_video_info_response:{response.json()}\n") elif 'microformat' not in response.json(): Common.logger(log_type, crawler).warning(f"get_video_info_response:{response.json()}\n") else: playerMicroformatRenderer = response.json()['microformat']['playerMicroformatRenderer'] videoDetails = response.json()['videoDetails'] # streamingData = response.json()['streamingData'] # video_title if 'title' not in videoDetails: video_title = '' else: video_title = videoDetails['title'].replace('"', '').replace("'", '') video_title = cls.filter_emoji(video_title) if not cls.is_contain_chinese(video_title): video_title = Translate.google_translate(video_title) \ .strip().replace("\\", "").replace(" ", "").replace("\n", "") \ .replace("/", "").replace("\r", "").replace("&NBSP", "").replace("&", "") \ .replace(";", "").replace("amp;", "") # 自动翻译标题为中文 if 'lengthSeconds' not in videoDetails: duration = 0 else: duration = int(videoDetails['lengthSeconds']) # play_cnt if 'viewCount' not in videoDetails: play_cnt = 0 else: play_cnt = int(videoDetails['viewCount']) # publish_time if 'publishDate' not in playerMicroformatRenderer: publish_time = '' else: publish_time = playerMicroformatRenderer['publishDate'] if publish_time == '': publish_time_stamp = 0 elif ':' in publish_time: publish_time_stamp = int(time.mktime(time.strptime(publish_time, "%Y-%m-%d %H:%M:%S"))) else: publish_time_stamp = int(time.mktime(time.strptime(publish_time, "%Y-%m-%d"))) # user_name if 'author' not in videoDetails: user_name = '' else: user_name = videoDetails['author'] # cover_url if 'thumbnail' not in videoDetails: cover_url = '' elif 'thumbnails' not in videoDetails['thumbnail']: cover_url = '' elif len(videoDetails['thumbnail']['thumbnails']) == 0: cover_url = '' elif 'url' not in videoDetails['thumbnail']['thumbnails'][-1]: cover_url = '' else: cover_url = videoDetails['thumbnail']['thumbnails'][-1]['url'] # video_url # if 'formats' not in streamingData: # video_url = '' # elif len(streamingData['formats']) == 0: # video_url = '' # elif 'url' not in streamingData['formats'][-1]: # video_url = '' # else: # video_url = streamingData['formats'][-1]['url'] video_url = f"https://www.youtube.com/watch?v={video_id}" Common.logger(log_type, crawler).info(f'video_title:{video_title}') Common.logger(log_type, crawler).info(f'video_id:{video_id}') Common.logger(log_type, crawler).info(f'play_cnt:{play_cnt}') Common.logger(log_type, crawler).info(f'publish_time:{publish_time}') Common.logger(log_type, crawler).info(f'user_name:{user_name}') Common.logger(log_type, crawler).info(f'cover_url:{cover_url}') Common.logger(log_type, crawler).info(f'video_url:{video_url}') video_dict = { 'video_title': video_title, 'video_id': video_id, 'duration': duration, 'play_cnt': play_cnt, 'publish_time': publish_time, 'publish_time_stamp': publish_time_stamp, 'user_name': user_name, 'out_uid': out_uid, 'cover_url': cover_url, 'video_url': video_url, } return video_dict except Exception as e: Common.logger(log_type, crawler).error(f"get_video_info异常:{e}\n") @classmethod def repeat_video(cls, log_type, crawler, video_id, env): sql = f""" select * from crawler_video where platform="{cls.platform}" and out_video_id="{video_id}"; """ repeat_video = MysqlHelper.get_values(log_type, crawler, sql, env) return len(repeat_video) @classmethod def download_publish(cls, log_type, crawler, video_dict, strategy, our_uid, env, oss_endpoint): try: filter_words = get_config_from_mysql(log_type, crawler, env, text='filter', action='get_author_map') for filter_word in filter_words: if filter_word in video_dict['video_title']: Common.logger(log_type, crawler).info('标题已中过滤词:{}\n', video_dict['video_title']) return if video_dict['video_title'] == '' or video_dict['video_url'] == '': Common.logger(log_type, crawler).info('无效视频\n') elif video_dict['duration'] > 1200 or video_dict['duration'] < 60: Common.logger(log_type, crawler).info(f"时长:{video_dict['duration']}不满足规则\n") # elif repeat_video is not None and len(repeat_video) != 0: elif cls.repeat_video(log_type, crawler, video_dict['video_id'], env) != 0: Common.logger(log_type, crawler).info('视频已下载\n') else: # 下载视频 Common.logger(log_type, crawler).info('开始下载视频...') # Common.download_method(log_type, crawler, 'video', video_dict['video_title'], video_dict['video_url']) Common.download_method(log_type, crawler, 'youtube_video', video_dict['video_title'], video_dict['video_url']) # ffmpeg_dict = Common.ffmpeg(log_type, crawler, f"./{crawler}/videos/{video_dict['video_title']}/video.mp4") # video_width = int(ffmpeg_dict['width']) # video_height = int(ffmpeg_dict['height']) # video_size = int(ffmpeg_dict['size']) video_width = 1280 video_height = 720 duration = int(video_dict['duration']) Common.logger(log_type, crawler).info(f'video_width:{video_width}') Common.logger(log_type, crawler).info(f'video_height:{video_height}') Common.logger(log_type, crawler).info(f'duration:{duration}') # Common.logger(log_type, crawler).info(f'video_size:{video_size}\n') video_dict['video_width'] = video_width video_dict['video_height'] = video_height video_dict['duration'] = duration video_dict['comment_cnt'] = 0 video_dict['like_cnt'] = 0 video_dict['share_cnt'] = 0 video_dict['avatar_url'] = video_dict['cover_url'] video_dict['session'] = f'youtube{int(time.time())}' rule = '1,2' # if duration < 60 or duration > 600: # # 删除视频文件夹 # shutil.rmtree(f"./{crawler}/videos/{video_dict['video_title']}/") # Common.logger(log_type, crawler).info(f"时长:{video_dict['duration']}不满足抓取规则,删除成功\n") # return # if duration == 0 or duration is None: # # 删除视频文件夹 # shutil.rmtree(f"./{crawler}/videos/{video_dict['video_title']}/") # Common.logger(log_type, crawler).info(f"视频下载出错,删除成功\n") # return # else: # 下载封面 Common.download_method(log_type, crawler, 'cover', video_dict['video_title'], video_dict['cover_url']) # 保存视频文本信息 Common.save_video_info(log_type, crawler, video_dict) # 上传视频 Common.logger(log_type, crawler).info(f"开始上传视频") if env == 'dev': our_video_id = Publish.upload_and_publish(log_type, crawler, strategy, our_uid, env, oss_endpoint) our_video_link = f"https://testadmin.piaoquantv.com/cms/post-detail/{our_video_id}/info" else: our_video_id = Publish.upload_and_publish(log_type, crawler, strategy, our_uid, env, oss_endpoint) our_video_link = f"https://admin.piaoquantv.com/cms/post-detail/{our_video_id}/info" Common.logger(log_type, crawler).info("视频上传完成") if our_video_id is None: # 删除视频文件夹 shutil.rmtree(f"./{crawler}/videos/{video_dict['video_title']}/") return # 视频信息保存至飞书 Feishu.insert_columns(log_type, crawler, "GVxlYk", "ROWS", 1, 2) # 视频ID工作表,首行写入数据 upload_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(int(time.time()))) values = [[upload_time, "定向榜", video_dict['video_id'], video_dict['video_title'], our_video_link, video_dict['play_cnt'], video_dict['duration'], f'{video_width}*{video_height}', video_dict['publish_time'], video_dict['user_name'], video_dict['cover_url'], video_dict['video_url'] ]] # time.sleep(1) Feishu.update_values(log_type, crawler, "GVxlYk", "F2:Z2", values) Common.logger(log_type, crawler).info('视频信息写入定向_已下载表成功\n') # 视频信息保存数据库 sql = f""" insert into crawler_video(video_id, user_id, out_user_id, platform, strategy, out_video_id, video_title, cover_url, video_url, duration, publish_time, play_cnt, crawler_rule, width, height) values({our_video_id}, "{our_uid}", "{video_dict['out_uid']}", "{cls.platform}", "定向爬虫策略", "{video_dict['video_id']}", "{video_dict['video_title']}", "{video_dict['cover_url']}", "{video_dict['video_url']}", {int(duration)}, "{video_dict['publish_time']}", {int(video_dict['play_cnt'])}, "{rule}", {int(video_width)}, {int(video_height)}) """ MysqlHelper.update_values(log_type, crawler, sql, env) Common.logger(log_type, crawler).info('视频信息插入数据库成功!\n') except Exception as e: Common.logger(log_type, crawler).info(f"download_publish异常:{e}\n") @classmethod def get_follow_videos(cls, log_type, crawler, task, oss_endpoint, env): try: user_list = get_user_from_mysql(log_type, crawler, crawler, env, action='get_author_map') strategy = '定向抓取策略' for user_dict in user_list: out_user_url = user_dict['link'] out_uid = out_user_url.split('/')[3] user_name = user_dict['nick_name'] our_uid = user_dict['uid'] Common.logger(log_type, crawler).info(f'获取 {user_name} 主页视频\n') cls.get_videos( log_type=log_type, crawler=crawler, strategy=strategy, task=task, oss_endpoint=oss_endpoint, env=env, our_uid=our_uid, out_uid=out_uid, out_user_url=out_user_url ) cls.continuation = '' except Exception as e: Common.logger(log_type, crawler).error(f"get_follow_videos异常:{e}\n") if __name__ == "__main__": YoutubeAuthorScheduling.get_follow_videos('author', 'youtube', '', 'outer', 'dev')