# -*- coding: utf-8 -*- # @Author: wangkun # @Time: 2023/4/7 import base64 import json import os import random import shutil import string import sys import time from datetime import date, timedelta from hashlib import md5 import requests import urllib3 from requests.adapters import HTTPAdapter from selenium import webdriver from selenium.webdriver import DesiredCapabilities from selenium.webdriver.chrome.service import Service sys.path.append(os.getcwd()) from common.publish import Publish from common.common import Common from common.feishu import Feishu from common.scheduling_db import MysqlHelper class XiguaRecommend: platform = "西瓜视频" @classmethod def xigua_config(cls, log_type, crawler, text, env): select_sql = f"""select * from crawler_config where source="xigua" """ contents = MysqlHelper.get_values(log_type, crawler, select_sql, env, action='') title_list = [] filter_list = [] for content in contents: config = content['config'] config_dict = eval(config) for k, v in config_dict.items(): if k == "title": title_list_config = v.split(",") for title in title_list_config: title_list.append(title) if k == "filter": filter_list_config = v.split(",") for filter_word in filter_list_config: filter_list.append(filter_word) if text == "title": return title_list elif text == "filter": return filter_list @classmethod def download_rule(cls, video_dict): publish_time_str_rule = (date.today() + timedelta(days=-30)).strftime("%Y-%m-%d %H:%M:%S") publish_time_stamp_rule = int(time.mktime(time.strptime(publish_time_str_rule, "%Y-%m-%d %H:%M:%S"))) if int(video_dict['play_cnt']) >= 10000: if 60*30 >= int(video_dict['duration']) >= 60: if int(video_dict['publish_time_stamp']) >= publish_time_stamp_rule: return True else: return False else: return False else: return False @classmethod def random_signature(cls): src_digits = string.digits # string_数字 src_uppercase = string.ascii_uppercase # string_大写字母 src_lowercase = string.ascii_lowercase # string_小写字母 digits_num = random.randint(1, 6) uppercase_num = random.randint(1, 26 - digits_num - 1) lowercase_num = 26 - (digits_num + uppercase_num) password = random.sample(src_digits, digits_num) + random.sample(src_uppercase, uppercase_num) + random.sample( src_lowercase, lowercase_num) random.shuffle(password) new_password = 'AAAAAAAAAA' + ''.join(password)[10:-4] + 'AAAB' new_password_start = new_password[0:18] new_password_end = new_password[-7:] if new_password[18] == '8': new_password = new_password_start + 'w' + new_password_end elif new_password[18] == '9': new_password = new_password_start + 'x' + new_password_end elif new_password[18] == '-': new_password = new_password_start + 'y' + new_password_end elif new_password[18] == '.': new_password = new_password_start + 'z' + new_password_end else: new_password = new_password_start + 'y' + new_password_end return new_password @classmethod def get_signature(cls, log_type, crawler, env): try: # 打印请求配置 ca = DesiredCapabilities.CHROME ca["goog:loggingPrefs"] = {"performance": "ALL"} # 不打开浏览器运行 chrome_options = webdriver.ChromeOptions() chrome_options.add_argument("headless") chrome_options.add_argument( f'user-agent=Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/79.0.3945.79 Safari/537.36') chrome_options.add_argument("--no-sandbox") # driver初始化 if env == "dev": driver = webdriver.Chrome(desired_capabilities=ca, options=chrome_options, service=Service('/Users/wangkun/Downloads/chromedriver/chromedriver_v111/chromedriver')) else: driver = webdriver.Chrome(desired_capabilities=ca, options=chrome_options) driver.implicitly_wait(10) driver.get('https://www.ixigua.com/') time.sleep(1) # 向上滑动 1000 个像素 driver.execute_script('window.scrollBy(0, 2000)') # Common.logger(log_type, crawler).info('刷新页面') driver.refresh() logs = driver.get_log("performance") # Common.logger(log_type, crawler).info('已获取logs:{}\n', logs) driver.quit() for line in logs: msg = json.loads(line['message']) if 'params' not in msg['message']: pass elif 'documentURL' not in msg['message']['params']: pass elif 'www.ixigua.com' not in msg['message']['params']['documentURL']: pass elif 'url' not in msg['message']['params']['request']: pass elif '_signature' not in msg['message']['params']['request']['url']: pass else: url = msg['message']['params']['request']['url'] signature = url.split('_signature=')[-1].split('&')[0] return signature except Exception as e: Common.logger(log_type, crawler).error(f'get_signature异常:{e}\n') # 获取视频详情 @classmethod def get_video_url(cls, log_type, crawler, gid): try: url = 'https://www.ixigua.com/api/mixVideo/information?' headers = { "accept-encoding": "gzip, deflate", "accept-language": "zh-CN,zh-Hans;q=0.9", "user-agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) " "AppleWebKit/605.1.15 (KHTML, like Gecko) Version/15.5 Safari/605.1.15", "referer": "https://www.ixigua.com/7102614741050196520?logTag=0531c88ac04f38ab2c62", } params = { 'mixId': gid, 'msToken': 'IlG0wd0Pylyw9ghcYiB2YseUmTwrsrqqhXrbIcsSaTcLTJyVlbYJzk20zw3UO-CfrfC' 'NVVIOBNjIl7vfBoxnVUwO9ZyzAI3umSKsT5-pef_RRfQCJwmA', 'X-Bogus': 'DFSzswVupYTANCJOSBk0P53WxM-r', '_signature': '_02B4Z6wo0000119LvEwAAIDCuktNZ0y5wkdfS7jAALThuOR8D9yWNZ.EmWHKV0WSn6Px' 'fPsH9-BldyxVje0f49ryXgmn7Tzk-swEHNb15TiGqa6YF.cX0jW8Eds1TtJOIZyfc9s5emH7gdWN94', } cookies = { 'ixigua-a-s': '1', 'msToken': 'IlG0wd0Pylyw9ghcYiB2YseUmTwrsrqqhXrbIcsSaTcLTJyVlbYJzk20zw3UO-CfrfCNVVIOB' 'NjIl7vfBoxnVUwO9ZyzAI3umSKsT5-pef_RRfQCJwmA', 'ttwid': '1%7C_yXQeHWwLZgCsgHClOwTCdYSOt_MjdOkgnPIkpi-Sr8%7C1661241238%7Cf57d0c5ef3f1d7' '6e049fccdca1ac54887c34d1f8731c8e51a49780ff0ceab9f8', 'tt_scid': 'QZ4l8KXDG0YAEaMCSbADdcybdKbUfG4BC6S4OBv9lpRS5VyqYLX2bIR8CTeZeGHR9ee3', 'MONITOR_WEB_ID': '0a49204a-7af5-4e96-95f0-f4bafb7450ad', '__ac_nonce': '06304878000964fdad287', '__ac_signature': '_02B4Z6wo00f017Rcr3AAAIDCUVxeW1tOKEu0fKvAAI4cvoYzV-wBhq7B6D8k0no7lb' 'FlvYoinmtK6UXjRIYPXnahUlFTvmWVtb77jsMkKAXzAEsLE56m36RlvL7ky.M3Xn52r9t1IEb7IR3ke8', 'ttcid': 'e56fabf6e85d4adf9e4d91902496a0e882', '_tea_utm_cache_1300': 'undefined', 'support_avif': 'false', 'support_webp': 'false', 'xiguavideopcwebid': '7134967546256016900', 'xiguavideopcwebid.sig': 'xxRww5R1VEMJN_dQepHorEu_eAc', } urllib3.disable_warnings() s = requests.session() # max_retries=3 重试3次 s.mount('http://', HTTPAdapter(max_retries=3)) s.mount('https://', HTTPAdapter(max_retries=3)) response = s.get(url=url, headers=headers, params=params, cookies=cookies, verify=False, proxies=Common.tunnel_proxies(), timeout=5) response.close() if 'data' not in response.json() or response.json()['data'] == '': Common.logger(log_type, crawler).warning('get_video_info: response: {}', response) else: video_info = response.json()['data']['gidInformation']['packerData']['video'] video_url_dict = {} # video_url if 'videoResource' not in video_info: video_url_dict["video_url"] = '' video_url_dict["audio_url"] = '' video_url_dict["video_width"] = 0 video_url_dict["video_height"] = 0 elif 'dash_120fps' in video_info['videoResource']: if "video_list" in video_info['videoResource']['dash_120fps'] and 'video_4' in \ video_info['videoResource']['dash_120fps']['video_list']: video_url = video_info['videoResource']['dash_120fps']['video_list']['video_4'][ 'backup_url_1'] audio_url = video_info['videoResource']['dash_120fps']['video_list']['video_4'][ 'backup_url_1'] if len(video_url) % 3 == 1: video_url += '==' elif len(video_url) % 3 == 2: video_url += '=' elif len(audio_url) % 3 == 1: audio_url += '==' elif len(audio_url) % 3 == 2: audio_url += '=' video_url = base64.b64decode(video_url).decode('utf8') audio_url = base64.b64decode(audio_url).decode('utf8') video_width = video_info['videoResource']['dash_120fps']['video_list']['video_4']['vwidth'] video_height = video_info['videoResource']['dash_120fps']['video_list']['video_4'][ 'vheight'] video_url_dict["video_url"] = video_url video_url_dict["audio_url"] = audio_url video_url_dict["video_width"] = video_width video_url_dict["video_height"] = video_height elif "video_list" in video_info['videoResource']['dash_120fps'] and 'video_3' in \ video_info['videoResource']['dash_120fps']['video_list']: video_url = video_info['videoResource']['dash_120fps']['video_list']['video_3'][ 'backup_url_1'] audio_url = video_info['videoResource']['dash_120fps']['video_list']['video_3'][ 'backup_url_1'] if len(video_url) % 3 == 1: video_url += '==' elif len(video_url) % 3 == 2: video_url += '=' elif len(audio_url) % 3 == 1: audio_url += '==' elif len(audio_url) % 3 == 2: audio_url += '=' video_url = base64.b64decode(video_url).decode('utf8') audio_url = base64.b64decode(audio_url).decode('utf8') video_width = video_info['videoResource']['dash_120fps']['video_list']['video_3']['vwidth'] video_height = video_info['videoResource']['dash_120fps']['video_list']['video_3'][ 'vheight'] video_url_dict["video_url"] = video_url video_url_dict["audio_url"] = audio_url video_url_dict["video_width"] = video_width video_url_dict["video_height"] = video_height elif "video_list" in video_info['videoResource']['dash_120fps'] and 'video_2' in \ video_info['videoResource']['dash_120fps']['video_list']: video_url = video_info['videoResource']['dash_120fps']['video_list']['video_2'][ 'backup_url_1'] audio_url = video_info['videoResource']['dash_120fps']['video_list']['video_2'][ 'backup_url_1'] if len(video_url) % 3 == 1: video_url += '==' elif len(video_url) % 3 == 2: video_url += '=' elif len(audio_url) % 3 == 1: audio_url += '==' elif len(audio_url) % 3 == 2: audio_url += '=' video_url = base64.b64decode(video_url).decode('utf8') audio_url = base64.b64decode(audio_url).decode('utf8') video_width = video_info['videoResource']['dash_120fps']['video_list']['video_2']['vwidth'] video_height = video_info['videoResource']['dash_120fps']['video_list']['video_2'][ 'vheight'] video_url_dict["video_url"] = video_url video_url_dict["audio_url"] = audio_url video_url_dict["video_width"] = video_width video_url_dict["video_height"] = video_height elif "video_list" in video_info['videoResource']['dash_120fps'] and 'video_1' in \ video_info['videoResource']['dash_120fps']['video_list']: video_url = video_info['videoResource']['dash_120fps']['video_list']['video_1'][ 'backup_url_1'] audio_url = video_info['videoResource']['dash_120fps']['video_list']['video_1'][ 'backup_url_1'] if len(video_url) % 3 == 1: video_url += '==' elif len(video_url) % 3 == 2: video_url += '=' elif len(audio_url) % 3 == 1: audio_url += '==' elif len(audio_url) % 3 == 2: audio_url += '=' video_url = base64.b64decode(video_url).decode('utf8') audio_url = base64.b64decode(audio_url).decode('utf8') video_width = video_info['videoResource']['dash_120fps']['video_list']['video_1']['vwidth'] video_height = video_info['videoResource']['dash_120fps']['video_list']['video_1'][ 'vheight'] video_url_dict["video_url"] = video_url video_url_dict["audio_url"] = audio_url video_url_dict["video_width"] = video_width video_url_dict["video_height"] = video_height elif 'dynamic_video' in video_info['videoResource']['dash_120fps'] \ and 'dynamic_video_list' in video_info['videoResource']['dash_120fps']['dynamic_video'] \ and 'dynamic_audio_list' in video_info['videoResource']['dash_120fps']['dynamic_video'] \ and len( video_info['videoResource']['dash_120fps']['dynamic_video']['dynamic_video_list']) != 0 \ and len( video_info['videoResource']['dash_120fps']['dynamic_video']['dynamic_audio_list']) != 0: video_url = \ video_info['videoResource']['dash_120fps']['dynamic_video']['dynamic_video_list'][-1][ 'backup_url_1'] audio_url = \ video_info['videoResource']['dash_120fps']['dynamic_video']['dynamic_audio_list'][-1][ 'backup_url_1'] if len(video_url) % 3 == 1: video_url += '==' elif len(video_url) % 3 == 2: video_url += '=' elif len(audio_url) % 3 == 1: audio_url += '==' elif len(audio_url) % 3 == 2: audio_url += '=' video_url = base64.b64decode(video_url).decode('utf8') audio_url = base64.b64decode(audio_url).decode('utf8') video_width = \ video_info['videoResource']['dash_120fps']['dynamic_video']['dynamic_video_list'][-1][ 'vwidth'] video_height = \ video_info['videoResource']['dash_120fps']['dynamic_video']['dynamic_video_list'][-1][ 'vheight'] video_url_dict["video_url"] = video_url video_url_dict["audio_url"] = audio_url video_url_dict["video_width"] = video_width video_url_dict["video_height"] = video_height else: video_url_dict["video_url"] = '' video_url_dict["audio_url"] = '' video_url_dict["video_width"] = 0 video_url_dict["video_height"] = 0 elif 'dash' in video_info['videoResource']: if "video_list" in video_info['videoResource']['dash'] and 'video_4' in \ video_info['videoResource']['dash']['video_list']: video_url = video_info['videoResource']['dash']['video_list']['video_4']['backup_url_1'] audio_url = video_info['videoResource']['dash']['video_list']['video_4']['backup_url_1'] if len(video_url) % 3 == 1: video_url += '==' elif len(video_url) % 3 == 2: video_url += '=' elif len(audio_url) % 3 == 1: audio_url += '==' elif len(audio_url) % 3 == 2: audio_url += '=' video_url = base64.b64decode(video_url).decode('utf8') audio_url = base64.b64decode(audio_url).decode('utf8') video_width = video_info['videoResource']['dash']['video_list']['video_4']['vwidth'] video_height = video_info['videoResource']['dash']['video_list']['video_4']['vheight'] video_url_dict["video_url"] = video_url video_url_dict["audio_url"] = audio_url video_url_dict["video_width"] = video_width video_url_dict["video_height"] = video_height elif "video_list" in video_info['videoResource']['dash'] and 'video_3' in \ video_info['videoResource']['dash']['video_list']: video_url = video_info['videoResource']['dash']['video_list']['video_3']['backup_url_1'] audio_url = video_info['videoResource']['dash']['video_list']['video_3']['backup_url_1'] if len(video_url) % 3 == 1: video_url += '==' elif len(video_url) % 3 == 2: video_url += '=' elif len(audio_url) % 3 == 1: audio_url += '==' elif len(audio_url) % 3 == 2: audio_url += '=' video_url = base64.b64decode(video_url).decode('utf8') audio_url = base64.b64decode(audio_url).decode('utf8') video_width = video_info['videoResource']['dash']['video_list']['video_3']['vwidth'] video_height = video_info['videoResource']['dash']['video_list']['video_3']['vheight'] video_url_dict["video_url"] = video_url video_url_dict["audio_url"] = audio_url video_url_dict["video_width"] = video_width video_url_dict["video_height"] = video_height elif "video_list" in video_info['videoResource']['dash'] and 'video_2' in \ video_info['videoResource']['dash']['video_list']: video_url = video_info['videoResource']['dash']['video_list']['video_2']['backup_url_1'] audio_url = video_info['videoResource']['dash']['video_list']['video_2']['backup_url_1'] if len(video_url) % 3 == 1: video_url += '==' elif len(video_url) % 3 == 2: video_url += '=' elif len(audio_url) % 3 == 1: audio_url += '==' elif len(audio_url) % 3 == 2: audio_url += '=' video_url = base64.b64decode(video_url).decode('utf8') audio_url = base64.b64decode(audio_url).decode('utf8') video_width = video_info['videoResource']['dash']['video_list']['video_2']['vwidth'] video_height = video_info['videoResource']['dash']['video_list']['video_2']['vheight'] video_url_dict["video_url"] = video_url video_url_dict["audio_url"] = audio_url video_url_dict["video_width"] = video_width video_url_dict["video_height"] = video_height elif "video_list" in video_info['videoResource']['dash'] and 'video_1' in \ video_info['videoResource']['dash']['video_list']: video_url = video_info['videoResource']['dash']['video_list']['video_1']['backup_url_1'] audio_url = video_info['videoResource']['dash']['video_list']['video_1']['backup_url_1'] if len(video_url) % 3 == 1: video_url += '==' elif len(video_url) % 3 == 2: video_url += '=' elif len(audio_url) % 3 == 1: audio_url += '==' elif len(audio_url) % 3 == 2: audio_url += '=' video_url = base64.b64decode(video_url).decode('utf8') audio_url = base64.b64decode(audio_url).decode('utf8') video_width = video_info['videoResource']['dash']['video_list']['video_1']['vwidth'] video_height = video_info['videoResource']['dash']['video_list']['video_1']['vheight'] video_url_dict["video_url"] = video_url video_url_dict["audio_url"] = audio_url video_url_dict["video_width"] = video_width video_url_dict["video_height"] = video_height elif 'dynamic_video' in video_info['videoResource']['dash'] \ and 'dynamic_video_list' in video_info['videoResource']['dash']['dynamic_video'] \ and 'dynamic_audio_list' in video_info['videoResource']['dash']['dynamic_video'] \ and len(video_info['videoResource']['dash']['dynamic_video']['dynamic_video_list']) != 0 \ and len( video_info['videoResource']['dash']['dynamic_video']['dynamic_audio_list']) != 0: video_url = video_info['videoResource']['dash']['dynamic_video']['dynamic_video_list'][-1][ 'backup_url_1'] audio_url = video_info['videoResource']['dash']['dynamic_video']['dynamic_audio_list'][-1][ 'backup_url_1'] if len(video_url) % 3 == 1: video_url += '==' elif len(video_url) % 3 == 2: video_url += '=' elif len(audio_url) % 3 == 1: audio_url += '==' elif len(audio_url) % 3 == 2: audio_url += '=' video_url = base64.b64decode(video_url).decode('utf8') audio_url = base64.b64decode(audio_url).decode('utf8') video_width = \ video_info['videoResource']['dash']['dynamic_video']['dynamic_video_list'][-1]['vwidth'] video_height = \ video_info['videoResource']['dash']['dynamic_video']['dynamic_video_list'][-1]['vheight'] video_url_dict["video_url"] = video_url video_url_dict["audio_url"] = audio_url video_url_dict["video_width"] = video_width video_url_dict["video_height"] = video_height else: video_url_dict["video_url"] = '' video_url_dict["audio_url"] = '' video_url_dict["video_width"] = 0 video_url_dict["video_height"] = 0 elif 'normal' in video_info['videoResource']: if "video_list" in video_info['videoResource']['normal'] and 'video_4' in \ video_info['videoResource']['normal']['video_list']: video_url = video_info['videoResource']['normal']['video_list']['video_4']['backup_url_1'] audio_url = video_info['videoResource']['normal']['video_list']['video_4']['backup_url_1'] if len(video_url) % 3 == 1: video_url += '==' elif len(video_url) % 3 == 2: video_url += '=' elif len(audio_url) % 3 == 1: audio_url += '==' elif len(audio_url) % 3 == 2: audio_url += '=' video_url = base64.b64decode(video_url).decode('utf8') audio_url = base64.b64decode(audio_url).decode('utf8') video_width = video_info['videoResource']['normal']['video_list']['video_4']['vwidth'] video_height = video_info['videoResource']['normal']['video_list']['video_4']['vheight'] video_url_dict["video_url"] = video_url video_url_dict["audio_url"] = audio_url video_url_dict["video_width"] = video_width video_url_dict["video_height"] = video_height elif "video_list" in video_info['videoResource']['normal'] and 'video_3' in \ video_info['videoResource']['normal']['video_list']: video_url = video_info['videoResource']['normal']['video_list']['video_3']['backup_url_1'] audio_url = video_info['videoResource']['normal']['video_list']['video_3']['backup_url_1'] if len(video_url) % 3 == 1: video_url += '==' elif len(video_url) % 3 == 2: video_url += '=' elif len(audio_url) % 3 == 1: audio_url += '==' elif len(audio_url) % 3 == 2: audio_url += '=' video_url = base64.b64decode(video_url).decode('utf8') audio_url = base64.b64decode(audio_url).decode('utf8') video_width = video_info['videoResource']['normal']['video_list']['video_3']['vwidth'] video_height = video_info['videoResource']['normal']['video_list']['video_3']['vheight'] video_url_dict["video_url"] = video_url video_url_dict["audio_url"] = audio_url video_url_dict["video_width"] = video_width video_url_dict["video_height"] = video_height elif "video_list" in video_info['videoResource']['normal'] and 'video_2' in \ video_info['videoResource']['normal']['video_list']: video_url = video_info['videoResource']['normal']['video_list']['video_2']['backup_url_1'] audio_url = video_info['videoResource']['normal']['video_list']['video_2']['backup_url_1'] if len(video_url) % 3 == 1: video_url += '==' elif len(video_url) % 3 == 2: video_url += '=' elif len(audio_url) % 3 == 1: audio_url += '==' elif len(audio_url) % 3 == 2: audio_url += '=' video_url = base64.b64decode(video_url).decode('utf8') audio_url = base64.b64decode(audio_url).decode('utf8') video_width = video_info['videoResource']['normal']['video_list']['video_2']['vwidth'] video_height = video_info['videoResource']['normal']['video_list']['video_2']['vheight'] video_url_dict["video_url"] = video_url video_url_dict["audio_url"] = audio_url video_url_dict["video_width"] = video_width video_url_dict["video_height"] = video_height elif "video_list" in video_info['videoResource']['normal'] and 'video_1' in \ video_info['videoResource']['normal']['video_list']: video_url = video_info['videoResource']['normal']['video_list']['video_1']['backup_url_1'] audio_url = video_info['videoResource']['normal']['video_list']['video_1']['backup_url_1'] if len(video_url) % 3 == 1: video_url += '==' elif len(video_url) % 3 == 2: video_url += '=' elif len(audio_url) % 3 == 1: audio_url += '==' elif len(audio_url) % 3 == 2: audio_url += '=' video_url = base64.b64decode(video_url).decode('utf8') audio_url = base64.b64decode(audio_url).decode('utf8') video_width = video_info['videoResource']['normal']['video_list']['video_1']['vwidth'] video_height = video_info['videoResource']['normal']['video_list']['video_1']['vheight'] video_url_dict["video_url"] = video_url video_url_dict["audio_url"] = audio_url video_url_dict["video_width"] = video_width video_url_dict["video_height"] = video_height elif 'dynamic_video' in video_info['videoResource']['normal'] \ and 'dynamic_video_list' in video_info['videoResource']['normal']['dynamic_video'] \ and 'dynamic_audio_list' in video_info['videoResource']['normal']['dynamic_video'] \ and len( video_info['videoResource']['normal']['dynamic_video']['dynamic_video_list']) != 0 \ and len( video_info['videoResource']['normal']['dynamic_video']['dynamic_audio_list']) != 0: video_url = \ video_info['videoResource']['normal']['dynamic_video']['dynamic_video_list'][-1][ 'backup_url_1'] audio_url = \ video_info['videoResource']['normal']['dynamic_video']['dynamic_audio_list'][-1][ 'backup_url_1'] if len(video_url) % 3 == 1: video_url += '==' elif len(video_url) % 3 == 2: video_url += '=' elif len(audio_url) % 3 == 1: audio_url += '==' elif len(audio_url) % 3 == 2: audio_url += '=' video_url = base64.b64decode(video_url).decode('utf8') audio_url = base64.b64decode(audio_url).decode('utf8') video_width = \ video_info['videoResource']['normal']['dynamic_video']['dynamic_video_list'][-1][ 'vwidth'] video_height = \ video_info['videoResource']['normal']['dynamic_video']['dynamic_video_list'][-1][ 'vheight'] video_url_dict["video_url"] = video_url video_url_dict["audio_url"] = audio_url video_url_dict["video_width"] = video_width video_url_dict["video_height"] = video_height else: video_url_dict["video_url"] = '' video_url_dict["audio_url"] = '' video_url_dict["video_width"] = 0 video_url_dict["video_height"] = 0 else: video_url_dict["video_url"] = '' video_url_dict["audio_url"] = '' video_url_dict["video_width"] = 0 video_url_dict["video_height"] = 0 return video_url_dict except Exception as e: Common.logger(log_type, crawler).error(f'get_video_url:{e}\n') @classmethod def repeat_video(cls, log_type, crawler, video_id, env): sql = f""" select * from crawler_video where platform="西瓜视频" and out_video_id="{video_id}"; """ repeat_video = MysqlHelper.get_values(log_type, crawler, sql, env) return len(repeat_video) @classmethod def get_videoList(cls, log_type, crawler, oss_endpoint, env): queryCount = 1 while True: signature = cls.get_signature(log_type, crawler, env) if signature is None: Common.logger(log_type, crawler).warning(f"signature:{signature}") continue url = "https://www.ixigua.com/api/feedv2/feedById?" params = { "channelId": "94349543909", "count": "9", "maxTime": str(int(time.time())), # "maxTime": "1683190690", "queryCount": str(queryCount), "_signature": signature, "request_from": "701", "offset": "0", "referrer:": "https://open.weixin.qq.com/", "aid": "1768", "msToken": "XDpSA6_ZPP-gAkkBV-_WRQvNpG20uUUGPwf3E-S-txhznjBcXNbK2sbOuSpF3U7Jki6R9HwLDPeW4Gj7n6PURPTKrKLEs8J-ieFrwXDvMp2DX94ZoMua", # "X-Bogus": "DFSzswVOx7bANt0TtCAcOFm4pIkR", } headers = { 'referer': 'https://www.ixigua.com/', 'user-agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/112.0.0.0 Safari/537.36', 'authority': 'www.ixigua.com', 'accept': 'application/json, text/plain, */*', 'accept-language': 'zh-CN,zh;q=0.9,en;q=0.8,en-GB;q=0.7,en-US;q=0.6', 'cache-control': 'no-cache', # 'cookie': 'ttcid=5d8f917a525e46759dc886296bf1111b69; MONITOR_WEB_ID=ad1c8360-d4c9-4fa2-a801-d9fd68dfc1b2; s_v_web_id=verify_lh8vaa6v_VI4RQ0ET_nVbq_4PXw_8mfN_7Xp6wdLOZi08; passport_csrf_token=0e7c6992cb6170c9db034c3696191fff; passport_csrf_token_default=0e7c6992cb6170c9db034c3696191fff; odin_tt=b102690fef38bf07c400e3c69cdc27627701802bdd816fa827e3721c33607c4d2c0cbef09fe99c7d370e4a9e9e11c263; sid_guard=8dec4ecbe52cbdcff99dafe622b586b4%7C1683189144%7C3024002%7CThu%2C+08-Jun-2023+08%3A32%3A26+GMT; uid_tt=1dccbeaf685e24afd018fec335f3151d; uid_tt_ss=1dccbeaf685e24afd018fec335f3151d; sid_tt=8dec4ecbe52cbdcff99dafe622b586b4; sessionid=8dec4ecbe52cbdcff99dafe622b586b4; sessionid_ss=8dec4ecbe52cbdcff99dafe622b586b4; sid_ucp_v1=1.0.0-KGVhZTIxYjFlNzRlZTNhZjk5MjNlNzk2NGRhOWJlYzZiNGI5NzBhMzYKFQiu3d-eqQIQmNvNogYYGCAMOAhACxoCaGwiIDhkZWM0ZWNiZTUyY2JkY2ZmOTlkYWZlNjIyYjU4NmI0; ssid_ucp_v1=1.0.0-KGVhZTIxYjFlNzRlZTNhZjk5MjNlNzk2NGRhOWJlYzZiNGI5NzBhMzYKFQiu3d-eqQIQmNvNogYYGCAMOAhACxoCaGwiIDhkZWM0ZWNiZTUyY2JkY2ZmOTlkYWZlNjIyYjU4NmI0; support_webp=true; support_avif=true; csrf_session_id=9dd5d8287d4f075ae24ff163cd22e51f; msToken=XDpSA6_ZPP-gAkkBV-_WRQvNpG20uUUGPwf3E-S-txhznjBcXNbK2sbOuSpF3U7Jki6R9HwLDPeW4Gj7n6PURPTKrKLEs8J-ieFrwXDvMp2DX94ZoMua; ixigua-a-s=1; tt_scid=UTduWO4ij7cX6YKx23sDuV4zjvFkGFtFk5ZBhEnd1lJ1EZBykStzU7tbWQOSzGdE0fc6; ttwid=1%7C4zaTJmlaHpEa8rAB-KjREdxT3sNBUJWrAzRJnNvqExQ%7C1683198318%7Cffc2eef612caab19a0db93b4cec27e21a6230f9b82ab4bf5b1c6193d082baab1', 'pragma': 'no-cache', 'sec-ch-ua': '"Chromium";v="112", "Google Chrome";v="112", "Not:A-Brand";v="99"', 'sec-ch-ua-mobile': '?0', 'sec-ch-ua-platform': '"macOS"', 'sec-fetch-dest': 'empty', 'sec-fetch-mode': 'cors', 'sec-fetch-site': 'same-origin', # 'tt-anti-token': '95Ny0vj4Q-90dd9b91193b34ce554cc2861439b9629d897723f4d33719b9747d7d18a2ff7c', # 'x-secsdk-csrf-token': '000100000001ecb8f07e247a89e289b3ab55f3c967a8e88f88aa0addb1ddca9d3e36f35d7999175be79b8699c881' } urllib3.disable_warnings() s = requests.session() # max_retries=3 重试3次 s.mount('http://', HTTPAdapter(max_retries=3)) s.mount('https://', HTTPAdapter(max_retries=3)) response = requests.get(url=url, headers=headers, params=params, proxies=Common.tunnel_proxies(), verify=False, timeout=5) response.close() queryCount += 1 Common.logger(log_type, crawler).info(f"queryCount:{queryCount}") if response.status_code != 200: Common.logger(log_type, crawler).warning(f"get_videolist_response:{response.text}\n") return elif 'data' not in response.text: Common.logger(log_type, crawler).warning(f"get_videolist_response:{response.text}\n") return elif 'channelFeed' not in response.json()['data']: Common.logger(log_type, crawler).warning(f"get_videolist_response:{response.json()}\n") return elif 'Data' not in response.json()['data']['channelFeed']: Common.logger(log_type, crawler).warning(f"get_videolist_response:{response.json()}\n") return elif len(response.json()['data']['channelFeed']['Data']) == 0: Common.logger(log_type, crawler).warning(f"get_videolist_response:{response.json()}\n") return else: videoList = response.json()['data']['channelFeed']['Data'] for i in range(len(videoList)): if 'data' not in videoList[i]: continue # video_title video_title = videoList[i]['data'].get('title', '').replace('"' ,'').replace("'", '') if video_title == '': video_title = random.choice(cls.xigua_config(log_type, crawler, "title", env)) # video_id video_id = videoList[i]['data'].get('vid', '') # play_cnt play_cnt = int(videoList[i]['data'].get('playNum', 0)) # comment_cnt comment_cnt = int(videoList[i]['data'].get('commentNum', 0)) # gid gid = videoList[i]['data'].get('item_id', 0) # share_cnt / like_cnt share_cnt = 0 like_cnt = 0 # duration duration = int(videoList[i]['data'].get('duration', 0)) # publish_time_stamp publish_time_stamp = int(videoList[i]['data'].get('publish_time', 0)) # publish_time_str publish_time_str = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(publish_time_stamp)) # cover_url cover_url = videoList[i]['data'].get('image_url', '') # user_name user_name = videoList[i]['data']['user_info'].get('name', '') # user_id user_id = videoList[i]['data']['user_info'].get('user_id', '') # avatar_url avatar_url = videoList[i]['data']['user_info'].get('avatar_url', '') video_dict = { 'video_title': video_title, 'video_id': video_id, 'gid': gid, 'play_cnt': play_cnt, 'comment_cnt': comment_cnt, 'like_cnt': like_cnt, 'share_cnt': share_cnt, 'duration': duration, 'publish_time_stamp': publish_time_stamp, 'publish_time_str': publish_time_str, 'user_name': user_name, 'user_id': user_id, 'avatar_url': avatar_url, 'cover_url': cover_url, 'session': signature } for k, v in video_dict.items(): Common.logger(log_type, crawler).info(f"{k}:{v}") if gid == 0 or video_id == '' or cover_url == '': Common.logger(log_type, crawler).info('无效视频\n') elif cls.download_rule(video_dict) is False: Common.logger(log_type, crawler).info('不满足抓取规则\n') elif any(str(word) if str(word) in video_title else False for word in cls.xigua_config(log_type, crawler, "filter", env)) is True: Common.logger(log_type, crawler).info('已中过滤词\n') elif cls.repeat_video(log_type, crawler, video_id, env) != 0: Common.logger(log_type, crawler).info('视频已下载\n') else: video_url_dict = cls.get_video_url(log_type, crawler, gid) video_dict['video_url'] = video_url_dict["video_url"] video_dict["audio_url"] = video_url_dict["audio_url"] video_dict["video_width"] = video_url_dict["video_width"] video_dict["video_height"] = video_url_dict["video_height"] cls.download_publish(log_type, crawler, video_dict, oss_endpoint, env) @classmethod def download_publish(cls, log_type, crawler, video_dict, oss_endpoint, env): # 下载视频 Common.download_method(log_type=log_type, crawler=crawler, text='xigua_video', title=video_dict['video_title'], url=video_dict['video_url']) # 下载音频 Common.download_method(log_type=log_type, crawler=crawler, text='xigua_audio', title=video_dict['video_title'], url=video_dict['audio_url']) # 合成音视频 Common.video_compose(log_type=log_type, crawler=crawler, video_dir=f"./{crawler}/videos/{video_dict['video_title']}") md_title = md5(video_dict['video_title'].encode('utf8')).hexdigest() if os.path.getsize(f"./{crawler}/videos/{md_title}/video.mp4") == 0: # 删除视频文件夹 shutil.rmtree(f"./{crawler}/videos/{md_title}") Common.logger(log_type, crawler).info("视频size=0,删除成功\n") return # 下载封面 Common.download_method(log_type=log_type, crawler=crawler, text='cover', title=video_dict['video_title'], url=video_dict['cover_url']) # 保存视频信息至txt Common.save_video_info(log_type=log_type, crawler=crawler, video_dict=video_dict) # 上传视频 Common.logger(log_type, crawler).info("开始上传视频...") our_video_id = Publish.upload_and_publish(log_type=log_type, crawler=crawler, strategy="推荐榜爬虫策略", our_uid="recommend", env=env, oss_endpoint=oss_endpoint) if env == 'dev': our_video_link = f"https://testadmin.piaoquantv.com/cms/post-detail/{our_video_id}/info" else: our_video_link = f"https://admin.piaoquantv.com/cms/post-detail/{our_video_id}/info" Common.logger(log_type, crawler).info("视频上传完成") if our_video_id is None: # 删除视频文件夹 shutil.rmtree(f"./{crawler}/videos/{video_dict['video_title']}") return # 视频写入飞书 Feishu.insert_columns(log_type, 'xigua', "1iKGF1", "ROWS", 1, 2) upload_time = int(time.time()) values = [[time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(upload_time)), "推荐榜爬虫策略", video_dict['video_title'], str(video_dict['video_id']), our_video_link, video_dict['gid'], video_dict['play_cnt'], video_dict['comment_cnt'], video_dict['like_cnt'], video_dict['share_cnt'], video_dict['duration'], str(video_dict['video_width']) + '*' + str(video_dict['video_height']), video_dict['publish_time_str'], video_dict['user_name'], video_dict['user_id'], video_dict['avatar_url'], video_dict['cover_url'], video_dict['audio_url'], video_dict['video_url']]] time.sleep(1) Feishu.update_values(log_type, 'xigua', "1iKGF1", "F2:Z2", values) Common.logger(log_type, crawler).info(f"视频已保存至云文档\n") rule_dict = { "play_cnt": {"min": 10000}, "duration": {"min": 60, "max": 60*30}, "publish_day": {"min": 30} } # 视频信息保存数据库 insert_sql = f""" insert into crawler_video(video_id, user_id, out_user_id, platform, strategy, out_video_id, video_title, cover_url, video_url, duration, publish_time, play_cnt, crawler_rule, width, height) values({our_video_id}, {int(50322238)}, "{video_dict['user_id']}", "{cls.platform}", "推荐榜爬虫策略", "{video_dict['video_id']}", "{video_dict['video_title']}", "{video_dict['cover_url']}", "{video_dict['video_url']}", {int(video_dict['duration'])}, "{video_dict['publish_time_str']}", {int(video_dict['play_cnt'])}, '{json.dumps(rule_dict)}', {int(video_dict['video_width'])}, {int(video_dict['video_height'])}) """ Common.logger(log_type, crawler).info(f"insert_sql:{insert_sql}") MysqlHelper.update_values(log_type, crawler, insert_sql, env, action='') Common.logger(log_type, crawler).info('视频信息插入数据库成功!\n') if __name__ == "__main__": # XiguaRecommend.get_signature("recommend", "xigua", "dev") # XiguaRecommend.get_videolist("recommend", "xigua", "dev") # print(XiguaRecommend.get_video_url("recommend", "xigua", "7218171653242094139")) # print(XiguaRecommend.filter_words("recommend", "xigua")) print(XiguaRecommend.xigua_config("recommend", "xigua", "title", "dev")) pass