# -*- coding: utf-8 -*- # @Author: wangkun # @Time: 2023/4/7 import base64 import json import os import random import string import sys import time import requests import urllib3 from requests.adapters import HTTPAdapter from selenium import webdriver from selenium.webdriver import DesiredCapabilities from selenium.webdriver.chrome.service import Service sys.path.append(os.getcwd()) from common.common import Common from common.feishu import Feishu class XiguaRecommend: @classmethod def random_signature(cls): src_digits = string.digits # string_数字 src_uppercase = string.ascii_uppercase # string_大写字母 src_lowercase = string.ascii_lowercase # string_小写字母 digits_num = random.randint(1, 6) uppercase_num = random.randint(1, 26 - digits_num - 1) lowercase_num = 26 - (digits_num + uppercase_num) password = random.sample(src_digits, digits_num) + random.sample(src_uppercase, uppercase_num) + random.sample( src_lowercase, lowercase_num) random.shuffle(password) new_password = 'AAAAAAAAAA' + ''.join(password)[10:-4] + 'AAAB' new_password_start = new_password[0:18] new_password_end = new_password[-7:] if new_password[18] == '8': new_password = new_password_start + 'w' + new_password_end elif new_password[18] == '9': new_password = new_password_start + 'x' + new_password_end elif new_password[18] == '-': new_password = new_password_start + 'y' + new_password_end elif new_password[18] == '.': new_password = new_password_start + 'z' + new_password_end else: new_password = new_password_start + 'y' + new_password_end return new_password @classmethod def get_signature(cls, env): # try: # time1 = time.time() # print(f"time1:{time1}") # 打印请求配置 ca = DesiredCapabilities.CHROME ca["goog:loggingPrefs"] = {"performance": "ALL"} # 不打开浏览器运行 chrome_options = webdriver.ChromeOptions() chrome_options.add_argument("headless") chrome_options.add_argument( f'user-agent=Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/79.0.3945.79 Safari/537.36') chrome_options.add_argument("--no-sandbox") # driver初始化 if env == "dev": driver = webdriver.Chrome(desired_capabilities=ca, options=chrome_options, service=Service('/Users/wangkun/Downloads/chromedriver/chromedriver_v111/chromedriver')) else: driver = webdriver.Chrome(desired_capabilities=ca, options=chrome_options) driver.implicitly_wait(10) driver.get('https://www.ixigua.com/') time.sleep(1) # 向上滑动 1000 个像素 driver.execute_script('window.scrollBy(0, 2000)') # Common.logger(log_type, crawler).info('刷新页面') driver.refresh() logs = driver.get_log("performance") # Common.logger(log_type, crawler).info('已获取logs:{}\n', logs) driver.quit() for line in logs: msg = json.loads(line['message']) if 'params' not in msg['message']: pass elif 'documentURL' not in msg['message']['params']: pass elif 'www.ixigua.com' not in msg['message']['params']['documentURL']: pass elif 'url' not in msg['message']['params']['request']: pass elif '_signature' not in msg['message']['params']['request']['url']: pass else: url = msg['message']['params']['request']['url'] signature = url.split('_signature=')[-1].split('&')[0] # print(f"url:{url}") # print(f"signature:{signature}") time2 = time.time() # print(f"time2:{time2}") # print(f"duration:{time2-time1}") return signature # except Exception as e: # Common.logger(log_type, crawler).error(f'get_signature异常:{e}\n') # 获取视频详情 @classmethod def get_video_url(cls, log_type, crawler, gid): try: url = 'https://www.ixigua.com/api/mixVideo/information?' headers = { "accept-encoding": "gzip, deflate", "accept-language": "zh-CN,zh-Hans;q=0.9", "user-agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) " "AppleWebKit/605.1.15 (KHTML, like Gecko) Version/15.5 Safari/605.1.15", "referer": "https://www.ixigua.com/7102614741050196520?logTag=0531c88ac04f38ab2c62", } params = { 'mixId': gid, 'msToken': 'IlG0wd0Pylyw9ghcYiB2YseUmTwrsrqqhXrbIcsSaTcLTJyVlbYJzk20zw3UO-CfrfC' 'NVVIOBNjIl7vfBoxnVUwO9ZyzAI3umSKsT5-pef_RRfQCJwmA', 'X-Bogus': 'DFSzswVupYTANCJOSBk0P53WxM-r', '_signature': '_02B4Z6wo0000119LvEwAAIDCuktNZ0y5wkdfS7jAALThuOR8D9yWNZ.EmWHKV0WSn6Px' 'fPsH9-BldyxVje0f49ryXgmn7Tzk-swEHNb15TiGqa6YF.cX0jW8Eds1TtJOIZyfc9s5emH7gdWN94', } cookies = { 'ixigua-a-s': '1', 'msToken': 'IlG0wd0Pylyw9ghcYiB2YseUmTwrsrqqhXrbIcsSaTcLTJyVlbYJzk20zw3UO-CfrfCNVVIOB' 'NjIl7vfBoxnVUwO9ZyzAI3umSKsT5-pef_RRfQCJwmA', 'ttwid': '1%7C_yXQeHWwLZgCsgHClOwTCdYSOt_MjdOkgnPIkpi-Sr8%7C1661241238%7Cf57d0c5ef3f1d7' '6e049fccdca1ac54887c34d1f8731c8e51a49780ff0ceab9f8', 'tt_scid': 'QZ4l8KXDG0YAEaMCSbADdcybdKbUfG4BC6S4OBv9lpRS5VyqYLX2bIR8CTeZeGHR9ee3', 'MONITOR_WEB_ID': '0a49204a-7af5-4e96-95f0-f4bafb7450ad', '__ac_nonce': '06304878000964fdad287', '__ac_signature': '_02B4Z6wo00f017Rcr3AAAIDCUVxeW1tOKEu0fKvAAI4cvoYzV-wBhq7B6D8k0no7lb' 'FlvYoinmtK6UXjRIYPXnahUlFTvmWVtb77jsMkKAXzAEsLE56m36RlvL7ky.M3Xn52r9t1IEb7IR3ke8', 'ttcid': 'e56fabf6e85d4adf9e4d91902496a0e882', '_tea_utm_cache_1300': 'undefined', 'support_avif': 'false', 'support_webp': 'false', 'xiguavideopcwebid': '7134967546256016900', 'xiguavideopcwebid.sig': 'xxRww5R1VEMJN_dQepHorEu_eAc', } urllib3.disable_warnings() s = requests.session() # max_retries=3 重试3次 s.mount('http://', HTTPAdapter(max_retries=3)) s.mount('https://', HTTPAdapter(max_retries=3)) response = s.get(url=url, headers=headers, params=params, cookies=cookies, verify=False, proxies=Common.tunnel_proxies(), timeout=5) response.close() if 'data' not in response.json() or response.json()['data'] == '': Common.logger(log_type, crawler).warning('get_video_info: response: {}', response) else: video_info = response.json()['data']['gidInformation']['packerData']['video'] video_url_dict = {} # video_url if 'videoResource' not in video_info: video_url_dict["video_url"] = '' video_url_dict["audio_url"] = '' video_url_dict["video_width"] = 0 video_url_dict["video_height"] = 0 elif 'dash_120fps' in video_info['videoResource']: if "video_list" in video_info['videoResource']['dash_120fps'] and 'video_4' in \ video_info['videoResource']['dash_120fps']['video_list']: video_url = video_info['videoResource']['dash_120fps']['video_list']['video_4'][ 'backup_url_1'] audio_url = video_info['videoResource']['dash_120fps']['video_list']['video_4'][ 'backup_url_1'] if len(video_url) % 3 == 1: video_url += '==' elif len(video_url) % 3 == 2: video_url += '=' elif len(audio_url) % 3 == 1: audio_url += '==' elif len(audio_url) % 3 == 2: audio_url += '=' video_url = base64.b64decode(video_url).decode('utf8') audio_url = base64.b64decode(audio_url).decode('utf8') video_width = video_info['videoResource']['dash_120fps']['video_list']['video_4']['vwidth'] video_height = video_info['videoResource']['dash_120fps']['video_list']['video_4'][ 'vheight'] video_url_dict["video_url"] = video_url video_url_dict["audio_url"] = audio_url video_url_dict["video_width"] = video_width video_url_dict["video_height"] = video_height elif "video_list" in video_info['videoResource']['dash_120fps'] and 'video_3' in \ video_info['videoResource']['dash_120fps']['video_list']: video_url = video_info['videoResource']['dash_120fps']['video_list']['video_3'][ 'backup_url_1'] audio_url = video_info['videoResource']['dash_120fps']['video_list']['video_3'][ 'backup_url_1'] if len(video_url) % 3 == 1: video_url += '==' elif len(video_url) % 3 == 2: video_url += '=' elif len(audio_url) % 3 == 1: audio_url += '==' elif len(audio_url) % 3 == 2: audio_url += '=' video_url = base64.b64decode(video_url).decode('utf8') audio_url = base64.b64decode(audio_url).decode('utf8') video_width = video_info['videoResource']['dash_120fps']['video_list']['video_3']['vwidth'] video_height = video_info['videoResource']['dash_120fps']['video_list']['video_3'][ 'vheight'] video_url_dict["video_url"] = video_url video_url_dict["audio_url"] = audio_url video_url_dict["video_width"] = video_width video_url_dict["video_height"] = video_height elif "video_list" in video_info['videoResource']['dash_120fps'] and 'video_2' in \ video_info['videoResource']['dash_120fps']['video_list']: video_url = video_info['videoResource']['dash_120fps']['video_list']['video_2'][ 'backup_url_1'] audio_url = video_info['videoResource']['dash_120fps']['video_list']['video_2'][ 'backup_url_1'] if len(video_url) % 3 == 1: video_url += '==' elif len(video_url) % 3 == 2: video_url += '=' elif len(audio_url) % 3 == 1: audio_url += '==' elif len(audio_url) % 3 == 2: audio_url += '=' video_url = base64.b64decode(video_url).decode('utf8') audio_url = base64.b64decode(audio_url).decode('utf8') video_width = video_info['videoResource']['dash_120fps']['video_list']['video_2']['vwidth'] video_height = video_info['videoResource']['dash_120fps']['video_list']['video_2'][ 'vheight'] video_url_dict["video_url"] = video_url video_url_dict["audio_url"] = audio_url video_url_dict["video_width"] = video_width video_url_dict["video_height"] = video_height elif "video_list" in video_info['videoResource']['dash_120fps'] and 'video_1' in \ video_info['videoResource']['dash_120fps']['video_list']: video_url = video_info['videoResource']['dash_120fps']['video_list']['video_1'][ 'backup_url_1'] audio_url = video_info['videoResource']['dash_120fps']['video_list']['video_1'][ 'backup_url_1'] if len(video_url) % 3 == 1: video_url += '==' elif len(video_url) % 3 == 2: video_url += '=' elif len(audio_url) % 3 == 1: audio_url += '==' elif len(audio_url) % 3 == 2: audio_url += '=' video_url = base64.b64decode(video_url).decode('utf8') audio_url = base64.b64decode(audio_url).decode('utf8') video_width = video_info['videoResource']['dash_120fps']['video_list']['video_1']['vwidth'] video_height = video_info['videoResource']['dash_120fps']['video_list']['video_1'][ 'vheight'] video_url_dict["video_url"] = video_url video_url_dict["audio_url"] = audio_url video_url_dict["video_width"] = video_width video_url_dict["video_height"] = video_height elif 'dynamic_video' in video_info['videoResource']['dash_120fps'] \ and 'dynamic_video_list' in video_info['videoResource']['dash_120fps']['dynamic_video'] \ and 'dynamic_audio_list' in video_info['videoResource']['dash_120fps']['dynamic_video'] \ and len( video_info['videoResource']['dash_120fps']['dynamic_video']['dynamic_video_list']) != 0 \ and len( video_info['videoResource']['dash_120fps']['dynamic_video']['dynamic_audio_list']) != 0: video_url = \ video_info['videoResource']['dash_120fps']['dynamic_video']['dynamic_video_list'][-1][ 'backup_url_1'] audio_url = \ video_info['videoResource']['dash_120fps']['dynamic_video']['dynamic_audio_list'][-1][ 'backup_url_1'] if len(video_url) % 3 == 1: video_url += '==' elif len(video_url) % 3 == 2: video_url += '=' elif len(audio_url) % 3 == 1: audio_url += '==' elif len(audio_url) % 3 == 2: audio_url += '=' video_url = base64.b64decode(video_url).decode('utf8') audio_url = base64.b64decode(audio_url).decode('utf8') video_width = \ video_info['videoResource']['dash_120fps']['dynamic_video']['dynamic_video_list'][-1][ 'vwidth'] video_height = \ video_info['videoResource']['dash_120fps']['dynamic_video']['dynamic_video_list'][-1][ 'vheight'] video_url_dict["video_url"] = video_url video_url_dict["audio_url"] = audio_url video_url_dict["video_width"] = video_width video_url_dict["video_height"] = video_height else: video_url_dict["video_url"] = '' video_url_dict["audio_url"] = '' video_url_dict["video_width"] = 0 video_url_dict["video_height"] = 0 elif 'dash' in video_info['videoResource']: if "video_list" in video_info['videoResource']['dash'] and 'video_4' in \ video_info['videoResource']['dash']['video_list']: video_url = video_info['videoResource']['dash']['video_list']['video_4']['backup_url_1'] audio_url = video_info['videoResource']['dash']['video_list']['video_4']['backup_url_1'] if len(video_url) % 3 == 1: video_url += '==' elif len(video_url) % 3 == 2: video_url += '=' elif len(audio_url) % 3 == 1: audio_url += '==' elif len(audio_url) % 3 == 2: audio_url += '=' video_url = base64.b64decode(video_url).decode('utf8') audio_url = base64.b64decode(audio_url).decode('utf8') video_width = video_info['videoResource']['dash']['video_list']['video_4']['vwidth'] video_height = video_info['videoResource']['dash']['video_list']['video_4']['vheight'] video_url_dict["video_url"] = video_url video_url_dict["audio_url"] = audio_url video_url_dict["video_width"] = video_width video_url_dict["video_height"] = video_height elif "video_list" in video_info['videoResource']['dash'] and 'video_3' in \ video_info['videoResource']['dash']['video_list']: video_url = video_info['videoResource']['dash']['video_list']['video_3']['backup_url_1'] audio_url = video_info['videoResource']['dash']['video_list']['video_3']['backup_url_1'] if len(video_url) % 3 == 1: video_url += '==' elif len(video_url) % 3 == 2: video_url += '=' elif len(audio_url) % 3 == 1: audio_url += '==' elif len(audio_url) % 3 == 2: audio_url += '=' video_url = base64.b64decode(video_url).decode('utf8') audio_url = base64.b64decode(audio_url).decode('utf8') video_width = video_info['videoResource']['dash']['video_list']['video_3']['vwidth'] video_height = video_info['videoResource']['dash']['video_list']['video_3']['vheight'] video_url_dict["video_url"] = video_url video_url_dict["audio_url"] = audio_url video_url_dict["video_width"] = video_width video_url_dict["video_height"] = video_height elif "video_list" in video_info['videoResource']['dash'] and 'video_2' in \ video_info['videoResource']['dash']['video_list']: video_url = video_info['videoResource']['dash']['video_list']['video_2']['backup_url_1'] audio_url = video_info['videoResource']['dash']['video_list']['video_2']['backup_url_1'] if len(video_url) % 3 == 1: video_url += '==' elif len(video_url) % 3 == 2: video_url += '=' elif len(audio_url) % 3 == 1: audio_url += '==' elif len(audio_url) % 3 == 2: audio_url += '=' video_url = base64.b64decode(video_url).decode('utf8') audio_url = base64.b64decode(audio_url).decode('utf8') video_width = video_info['videoResource']['dash']['video_list']['video_2']['vwidth'] video_height = video_info['videoResource']['dash']['video_list']['video_2']['vheight'] video_url_dict["video_url"] = video_url video_url_dict["audio_url"] = audio_url video_url_dict["video_width"] = video_width video_url_dict["video_height"] = video_height elif "video_list" in video_info['videoResource']['dash'] and 'video_1' in \ video_info['videoResource']['dash']['video_list']: video_url = video_info['videoResource']['dash']['video_list']['video_1']['backup_url_1'] audio_url = video_info['videoResource']['dash']['video_list']['video_1']['backup_url_1'] if len(video_url) % 3 == 1: video_url += '==' elif len(video_url) % 3 == 2: video_url += '=' elif len(audio_url) % 3 == 1: audio_url += '==' elif len(audio_url) % 3 == 2: audio_url += '=' video_url = base64.b64decode(video_url).decode('utf8') audio_url = base64.b64decode(audio_url).decode('utf8') video_width = video_info['videoResource']['dash']['video_list']['video_1']['vwidth'] video_height = video_info['videoResource']['dash']['video_list']['video_1']['vheight'] video_url_dict["video_url"] = video_url video_url_dict["audio_url"] = audio_url video_url_dict["video_width"] = video_width video_url_dict["video_height"] = video_height elif 'dynamic_video' in video_info['videoResource']['dash'] \ and 'dynamic_video_list' in video_info['videoResource']['dash']['dynamic_video'] \ and 'dynamic_audio_list' in video_info['videoResource']['dash']['dynamic_video'] \ and len(video_info['videoResource']['dash']['dynamic_video']['dynamic_video_list']) != 0 \ and len( video_info['videoResource']['dash']['dynamic_video']['dynamic_audio_list']) != 0: video_url = video_info['videoResource']['dash']['dynamic_video']['dynamic_video_list'][-1][ 'backup_url_1'] audio_url = video_info['videoResource']['dash']['dynamic_video']['dynamic_audio_list'][-1][ 'backup_url_1'] if len(video_url) % 3 == 1: video_url += '==' elif len(video_url) % 3 == 2: video_url += '=' elif len(audio_url) % 3 == 1: audio_url += '==' elif len(audio_url) % 3 == 2: audio_url += '=' video_url = base64.b64decode(video_url).decode('utf8') audio_url = base64.b64decode(audio_url).decode('utf8') video_width = \ video_info['videoResource']['dash']['dynamic_video']['dynamic_video_list'][-1]['vwidth'] video_height = \ video_info['videoResource']['dash']['dynamic_video']['dynamic_video_list'][-1]['vheight'] video_url_dict["video_url"] = video_url video_url_dict["audio_url"] = audio_url video_url_dict["video_width"] = video_width video_url_dict["video_height"] = video_height else: video_url_dict["video_url"] = '' video_url_dict["audio_url"] = '' video_url_dict["video_width"] = 0 video_url_dict["video_height"] = 0 elif 'normal' in video_info['videoResource']: if "video_list" in video_info['videoResource']['normal'] and 'video_4' in \ video_info['videoResource']['normal']['video_list']: video_url = video_info['videoResource']['normal']['video_list']['video_4']['backup_url_1'] audio_url = video_info['videoResource']['normal']['video_list']['video_4']['backup_url_1'] if len(video_url) % 3 == 1: video_url += '==' elif len(video_url) % 3 == 2: video_url += '=' elif len(audio_url) % 3 == 1: audio_url += '==' elif len(audio_url) % 3 == 2: audio_url += '=' video_url = base64.b64decode(video_url).decode('utf8') audio_url = base64.b64decode(audio_url).decode('utf8') video_width = video_info['videoResource']['normal']['video_list']['video_4']['vwidth'] video_height = video_info['videoResource']['normal']['video_list']['video_4']['vheight'] video_url_dict["video_url"] = video_url video_url_dict["audio_url"] = audio_url video_url_dict["video_width"] = video_width video_url_dict["video_height"] = video_height elif "video_list" in video_info['videoResource']['normal'] and 'video_3' in \ video_info['videoResource']['normal']['video_list']: video_url = video_info['videoResource']['normal']['video_list']['video_3']['backup_url_1'] audio_url = video_info['videoResource']['normal']['video_list']['video_3']['backup_url_1'] if len(video_url) % 3 == 1: video_url += '==' elif len(video_url) % 3 == 2: video_url += '=' elif len(audio_url) % 3 == 1: audio_url += '==' elif len(audio_url) % 3 == 2: audio_url += '=' video_url = base64.b64decode(video_url).decode('utf8') audio_url = base64.b64decode(audio_url).decode('utf8') video_width = video_info['videoResource']['normal']['video_list']['video_3']['vwidth'] video_height = video_info['videoResource']['normal']['video_list']['video_3']['vheight'] video_url_dict["video_url"] = video_url video_url_dict["audio_url"] = audio_url video_url_dict["video_width"] = video_width video_url_dict["video_height"] = video_height elif "video_list" in video_info['videoResource']['normal'] and 'video_2' in \ video_info['videoResource']['normal']['video_list']: video_url = video_info['videoResource']['normal']['video_list']['video_2']['backup_url_1'] audio_url = video_info['videoResource']['normal']['video_list']['video_2']['backup_url_1'] if len(video_url) % 3 == 1: video_url += '==' elif len(video_url) % 3 == 2: video_url += '=' elif len(audio_url) % 3 == 1: audio_url += '==' elif len(audio_url) % 3 == 2: audio_url += '=' video_url = base64.b64decode(video_url).decode('utf8') audio_url = base64.b64decode(audio_url).decode('utf8') video_width = video_info['videoResource']['normal']['video_list']['video_2']['vwidth'] video_height = video_info['videoResource']['normal']['video_list']['video_2']['vheight'] video_url_dict["video_url"] = video_url video_url_dict["audio_url"] = audio_url video_url_dict["video_width"] = video_width video_url_dict["video_height"] = video_height elif "video_list" in video_info['videoResource']['normal'] and 'video_1' in \ video_info['videoResource']['normal']['video_list']: video_url = video_info['videoResource']['normal']['video_list']['video_1']['backup_url_1'] audio_url = video_info['videoResource']['normal']['video_list']['video_1']['backup_url_1'] if len(video_url) % 3 == 1: video_url += '==' elif len(video_url) % 3 == 2: video_url += '=' elif len(audio_url) % 3 == 1: audio_url += '==' elif len(audio_url) % 3 == 2: audio_url += '=' video_url = base64.b64decode(video_url).decode('utf8') audio_url = base64.b64decode(audio_url).decode('utf8') video_width = video_info['videoResource']['normal']['video_list']['video_1']['vwidth'] video_height = video_info['videoResource']['normal']['video_list']['video_1']['vheight'] video_url_dict["video_url"] = video_url video_url_dict["audio_url"] = audio_url video_url_dict["video_width"] = video_width video_url_dict["video_height"] = video_height elif 'dynamic_video' in video_info['videoResource']['normal'] \ and 'dynamic_video_list' in video_info['videoResource']['normal']['dynamic_video'] \ and 'dynamic_audio_list' in video_info['videoResource']['normal']['dynamic_video'] \ and len( video_info['videoResource']['normal']['dynamic_video']['dynamic_video_list']) != 0 \ and len( video_info['videoResource']['normal']['dynamic_video']['dynamic_audio_list']) != 0: video_url = \ video_info['videoResource']['normal']['dynamic_video']['dynamic_video_list'][-1][ 'backup_url_1'] audio_url = \ video_info['videoResource']['normal']['dynamic_video']['dynamic_audio_list'][-1][ 'backup_url_1'] if len(video_url) % 3 == 1: video_url += '==' elif len(video_url) % 3 == 2: video_url += '=' elif len(audio_url) % 3 == 1: audio_url += '==' elif len(audio_url) % 3 == 2: audio_url += '=' video_url = base64.b64decode(video_url).decode('utf8') audio_url = base64.b64decode(audio_url).decode('utf8') video_width = \ video_info['videoResource']['normal']['dynamic_video']['dynamic_video_list'][-1][ 'vwidth'] video_height = \ video_info['videoResource']['normal']['dynamic_video']['dynamic_video_list'][-1][ 'vheight'] video_url_dict["video_url"] = video_url video_url_dict["audio_url"] = audio_url video_url_dict["video_width"] = video_width video_url_dict["video_height"] = video_height else: video_url_dict["video_url"] = '' video_url_dict["audio_url"] = '' video_url_dict["video_width"] = 0 video_url_dict["video_height"] = 0 else: video_url_dict["video_url"] = '' video_url_dict["audio_url"] = '' video_url_dict["video_width"] = 0 video_url_dict["video_height"] = 0 return video_url_dict except Exception as e: Common.logger(log_type, crawler).error(f'get_video_url:{e}\n') @classmethod def get_videolist(cls, log_type, crawler, env): while True: try: # signature = f"_{cls.random_signature()}" signature = cls.get_signature(env) if signature is None: Common.logger(log_type, crawler).warning(f"signature:{signature}") continue url = "https://www.ixigua.com/api/feedv2/feedById?" params = { "channelId": "94349543909", "count": "9", "maxTime": str(int(time.time())), "queryCount": "1", "_signature": signature, # "_signature": '_02B4Z6wo00001O38UmAAAIDBlTK5ZUm9hMDt7HbAAF9Se5', # "_signature": '_02B4Z6wo0000158YzJQAAIDC59YnkMoXHRufGMgAAIP97SpOQxVfKP5yN1rB9OQ2Be5sOOQWgCiFaeOyxlnCG4RZUX7NfDmED3tHWe2-vSJ-icJj7GZCBorr2AT2MY.Tm6TzjyGTXhKwp98X5f' # "maxTime": "1680867875", # "request_from": "701", # "offset": "0", # "referrer:": "https://open.weixin.qq.com/", # "aid": "1768", # "msToken": "Tqe-W_gibxblmWtCV1PoAUBjAb9W9lPoz8iX8OK9MS1XfRogNdVXeoxc69AKWSEObCuHssPmeRuJe1IH_G3nmTxrJc4XJMEs5iQ2ea36jFmKCTVkJ-9p-M7gcdQz3fw=", # "X-Bogus": "DFSzswVuZ6UAN9WvtV34uY/F6qyN", } headers = { 'referer': 'https://www.ixigua.com/?is_new_connect=0&is_new_user=0', 'user-agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/111.0.0.0 Safari/537.36 Edg/111.0.1661.54', # 'authority': 'www.ixigua.com', # 'accept': 'application/json, text/plain, */*', # 'accept-language': 'zh-CN,zh;q=0.9,en;q=0.8,en-GB;q=0.7,en-US;q=0.6', # 'cache-control': 'no-cache', # 'cookie': 'MONITOR_WEB_ID=67cb5099-a022-4ec3-bb8e-c4de6ba51dd0; s_v_web_id=verify_lef4i99x_32SosrdH_Qrtk_4LJn_8S7q_fhu16xe3s8ZV; support_webp=true; support_avif=false; csrf_session_id=a5355d954d3c63ed1ba35faada452b4d; passport_csrf_token=72b2574f3c99f8ba670e42df430218fd; passport_csrf_token_default=72b2574f3c99f8ba670e42df430218fd; sid_guard=c7472b508ea631823ba765a60cf8757f%7C1680867422%7C3024002%7CFri%2C+12-May-2023+11%3A37%3A04+GMT; uid_tt=c13f47d51767f616befe32fb3e9f485a; uid_tt_ss=c13f47d51767f616befe32fb3e9f485a; sid_tt=c7472b508ea631823ba765a60cf8757f; sessionid=c7472b508ea631823ba765a60cf8757f; sessionid_ss=c7472b508ea631823ba765a60cf8757f; sid_ucp_v1=1.0.0-KGUzNWYxNmRkZGJiZjgxY2MzZWNkMTEzMTkwYjY1Yjg5OTY5NzVlNmMKFQiu3d-eqQIQ3oDAoQYYGCAMOAhACxoCaGwiIGM3NDcyYjUwOGVhNjMxODIzYmE3NjVhNjBjZjg3NTdm; ssid_ucp_v1=1.0.0-KGUzNWYxNmRkZGJiZjgxY2MzZWNkMTEzMTkwYjY1Yjg5OTY5NzVlNmMKFQiu3d-eqQIQ3oDAoQYYGCAMOAhACxoCaGwiIGM3NDcyYjUwOGVhNjMxODIzYmE3NjVhNjBjZjg3NTdm; __ac_nonce=064300065001db7f6a17b; __ac_signature=_02B4Z6wo00f01818fmAAAIDCtbKVZ8QwbVPNXHrAAJd4Fp5IJBrYy-5AgEoa72Xn.rSoHeAReu30RHJAVrhA5vJusD5C-.mKhoov6Xgsg-ppp08LmOqE770Q-TRNhVGRJBKwb1ueF3QyPH2Jca; odin_tt=b893608d4dde2e1e8df8cd5d97a0e2fbeafc4ca762ac72ebef6e6c97e2ed19859bb01d46b4190ddd6dd17d7f9678e1de; msToken=Tqe-W_gibxblmWtCV1PoAUBjAb9W9lPoz8iX8OK9MS1XfRogNdVXeoxc69AKWSEObCuHssPmeRuJe1IH_G3nmTxrJc4XJMEs5iQ2ea36jFmKCTVkJ-9p-M7gcdQz3fw=; tt_scid=7SO17t4-YtgZpkEX-9CRvB9s98xYEiDf-C10y9i1SxUCRIQFbRgr8N8Hkb5JXjjZ83e7; ttwid=1%7CHHtv2QqpSGuSu8r-zXF1QoWsvjmNi1SJrqOrZzg-UCY%7C1680867977%7C9027097968bd917c32a425e8d5661663df403e6a57a38dff12d4725a783f247c; ixigua-a-s=1; ixigua-a-s=3', # 'pragma': 'no-cache', # 'sec-ch-ua': '"Microsoft Edge";v="111", "Not(A:Brand";v="8", "Chromium";v="111"', # 'sec-ch-ua-mobile': '?0', # 'sec-ch-ua-platform': '"macOS"', # 'sec-fetch-dest': 'empty', # 'sec-fetch-mode': 'cors', # 'sec-fetch-site': 'same-origin', # 'tt-anti-token': 'r8MhLGUgtoX-95d1758d7d3522be689af62ddc195c1ed6adb1249ca9cb84b39168213da98c63', # 'x-secsdk-csrf-token': '00010000000182d3d5c3e286e4c4538dd74a7ae03396eabdcc95b454f49a1e6029b52f9046fb1753a48082f54679' } urllib3.disable_warnings() s = requests.session() # max_retries=3 重试3次 s.mount('http://', HTTPAdapter(max_retries=3)) s.mount('https://', HTTPAdapter(max_retries=3)) response = requests.get(url=url, headers=headers, params=params, proxies=Common.tunnel_proxies(), verify=False, timeout=5) response.close() if response.status_code != 200: Common.logger(log_type, crawler).warning(f"get_videolist_response:{response.text}\n") return elif 'data' not in response.text: Common.logger(log_type, crawler).warning(f"get_videolist_response:{response.text}\n") return elif 'channelFeed' not in response.json()['data']: Common.logger(log_type, crawler).warning(f"get_videolist_response:{response.json()}\n") return elif 'Data' not in response.json()['data']['channelFeed']: Common.logger(log_type, crawler).warning(f"get_videolist_response:{response.json()}\n") return elif len(response.json()['data']['channelFeed']['Data']) == 0: Common.logger(log_type, crawler).warning(f"get_videolist_response:{response.json()}\n") return else: videoList = response.json()['data']['channelFeed']['Data'] for i in range(len(videoList)): if 'data' not in videoList[i]: continue # video_title video_title = videoList[i]['data'].get('title', '') # video_id video_id = videoList[i]['data'].get('vid', '') # play_cnt play_cnt = int(videoList[i]['data'].get('playNum', 0)) # comment_cnt comment_cnt = int(videoList[i]['data'].get('commentNum', 0)) # gid gid = videoList[i]['data'].get('item_id', 0) # share_cnt / like_cnt share_cnt = 0 like_cnt = 0 # duration duration = int(videoList[i]['data'].get('duration', 0)) # publish_time_stamp publish_time_stamp = int(videoList[i]['data'].get('publish_time', 0)) # publish_time_str publish_time_str = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(publish_time_stamp)) # cover_url cover_url = videoList[i]['data'].get('image_url', '') # user_name user_name = videoList[i]['data']['user_info'].get('name', '') # user_id user_id = videoList[i]['data']['user_info'].get('user_id', '') # avatar_url avatar_url = videoList[i]['data']['user_info'].get('avatar_url', '') if gid == 0 or video_id == '' or cover_url == '': Common.logger(log_type, crawler).info(f'{video_title}:无效视频\n') else: video_url_dict = cls.get_video_url(log_type, crawler, gid) video_url = video_url_dict["video_url"] audio_url = video_url_dict["audio_url"] video_width = video_url_dict["video_width"] video_height = video_url_dict["video_height"] video_dict = { 'video_title': video_title, 'video_id': video_id, 'gid': gid, 'play_cnt': play_cnt, 'comment_cnt': comment_cnt, 'like_cnt': like_cnt, 'share_cnt': share_cnt, 'video_width': video_width, 'video_height': video_height, 'duration': duration, 'publish_time_stamp': publish_time_stamp, 'publish_time_str': publish_time_str, 'user_name': user_name, 'user_id': user_id, 'avatar_url': avatar_url, 'cover_url': cover_url, 'audio_url': audio_url, 'video_url': video_url, 'session': signature } for k, v in video_dict.items(): Common.logger(log_type, crawler).info(f"{k}:{v}") cls.download_publish(log_type, crawler, video_dict) except Exception as e: Common.logger(log_type, crawler).error(f"get_videolist:{e}\n") @classmethod def download_publish(cls, log_type, crawler, video_dict): if video_dict['video_id'] in [y for x in Feishu.get_values_batch(log_type, crawler, "1iKGF1") for y in x]: Common.logger(log_type, crawler).info("视频已存在\n") else: Feishu.insert_columns(log_type, crawler, "1iKGF1", "ROWS", 1, 2) values = [[time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(int(time.time()))), "西瓜推荐榜", video_dict['video_title'], video_dict['video_id'], "", video_dict['gid'], video_dict['play_cnt'], video_dict['comment_cnt'], video_dict['like_cnt'], video_dict['share_cnt'], video_dict['duration'], f"{video_dict['video_width']}*{video_dict['video_height']}", video_dict['publish_time_str'], video_dict['user_name'], video_dict['user_id'], video_dict['avatar_url'], video_dict['cover_url'], video_dict['audio_url'], video_dict['video_url']]] time.sleep(0.5) Feishu.update_values(log_type, crawler, "1iKGF1", "F2:Z2", values) Common.logger(log_type, crawler).info("写入飞书成功\n") if __name__ == "__main__": # XiguaRecommend.get_signature("recommend", "xigua", "dev") XiguaRecommend.get_videolist("recommend", "xigua", "dev") # print(XiguaRecommend.get_video_url("recommend", "xigua", "7218171653242094139")) pass