# -*- coding: utf-8 -*- # @Author: wangkun # @Time: 2023/2/17 import base64 import json import os import random import shutil import string import sys import time from hashlib import md5 import requests import urllib3 from urllib.parse import quote from requests.adapters import HTTPAdapter sys.path.append(os.getcwd()) from common.db import MysqlHelper from common.getuser import getUser from common.common import Common from common.feishu import Feishu from common.publish import Publish from common.public import get_config_from_mysql from common.userAgent import get_random_user_agent, get_random_header class XiguaSearch: platform = "西瓜视频" tag = "西瓜视频爬虫,搜索爬虫策略" @classmethod def get_rule(cls, log_type, crawler): try: while True: rule_sheet = Feishu.get_values_batch(log_type, crawler, "shxOl7") if rule_sheet is None: Common.logger(log_type, crawler).warning("rule_sheet is None! 10秒后重新获取") time.sleep(10) continue rule_dict = { "play_cnt": int(rule_sheet[1][2]), "min_duration": int(rule_sheet[2][2]), "max_duration": int(rule_sheet[3][2]), "publish_time": int(rule_sheet[4][2]), } return rule_dict except Exception as e: Common.logger(log_type, crawler).error(f"get_rule:{e}\n") # 下载规则 @classmethod def download_rule(cls, video_info_dict, rule_dict): if video_info_dict['play_cnt'] >= rule_dict['play_cnt']: if video_info_dict['comment_cnt'] >= rule_dict['comment_cnt']: if video_info_dict['like_cnt'] >= rule_dict['like_cnt']: if video_info_dict['duration'] >= rule_dict['duration']: if video_info_dict['video_width'] >= rule_dict['video_width'] \ or video_info_dict['video_height'] >= rule_dict['video_height']: return True else: return False else: return False else: return False else: return False else: return False # 过滤词库 @classmethod def filter_words(cls, log_type, crawler): try: while True: filter_words_sheet = Feishu.get_values_batch(log_type, crawler, 'KGB4Hc') if filter_words_sheet is None: Common.logger(log_type, crawler).warning(f"filter_words_sheet:{filter_words_sheet} 10秒钟后重试") continue filter_words_list = [] for x in filter_words_sheet: for y in x: if y is None: pass else: filter_words_list.append(y) return filter_words_list except Exception as e: Common.logger(log_type, crawler).error(f'filter_words异常:{e}\n') # 获取用户信息(字典格式). 注意:部分 user_id 字符类型是 int / str @classmethod def get_user_list(cls, log_type, crawler, sheetid, env, machine): try: while True: user_sheet = Feishu.get_values_batch(log_type, crawler, sheetid) if user_sheet is None: Common.logger(log_type, crawler).warning(f"user_sheet:{user_sheet} 10秒钟后重试") continue our_user_list = [] for i in range(1, len(user_sheet)): our_uid = user_sheet[i][6] search_word = user_sheet[i][4] tag1 = user_sheet[i][8] tag2 = user_sheet[i][9] tag3 = user_sheet[i][10] tag4 = user_sheet[i][11] tag5 = user_sheet[i][12] tag6 = user_sheet[i][13] tag7 = user_sheet[i][14] Common.logger(log_type, crawler).info(f"正在更新 {search_word} 关键词信息\n") if our_uid is None: default_user = getUser.get_default_user() # 用来创建our_id的信息 user_dict = { 'recommendStatus': -6, 'appRecommendStatus': -6, 'nickName': default_user['nickName'], 'avatarUrl': default_user['avatarUrl'], 'tagName': f'{tag1},{tag2},{tag3},{tag4},{tag5},{tag6},{tag7}', } Common.logger(log_type, crawler).info(f'新创建的站内UID:{our_uid}') our_uid = getUser.create_uid(log_type, crawler, user_dict, env) if env == 'prod': our_user_link = f'https://admin.piaoquantv.com/ums/user/{our_uid}/post' else: our_user_link = f'https://testadmin.piaoquantv.com/ums/user/{our_uid}/post' Feishu.update_values(log_type, crawler, sheetid, f'G{i + 1}:H{i + 1}', [[our_uid, our_user_link]]) Common.logger(log_type, crawler).info(f'站内用户信息写入飞书成功!\n') our_user_dict = { 'out_uid': '', 'search_word': search_word, 'our_uid': our_uid, 'our_user_link': f'https://admin.piaoquantv.com/ums/user/{our_uid}/post', } our_user_list.append(our_user_dict) return our_user_list except Exception as e: Common.logger(log_type, crawler).error(f'get_user_id_from_feishu异常:{e}\n') @classmethod def random_signature(cls): src_digits = string.digits # string_数字 src_uppercase = string.ascii_uppercase # string_大写字母 src_lowercase = string.ascii_lowercase # string_小写字母 digits_num = random.randint(1, 6) uppercase_num = random.randint(1, 26 - digits_num - 1) lowercase_num = 26 - (digits_num + uppercase_num) password = random.sample(src_digits, digits_num) + random.sample(src_uppercase, uppercase_num) + random.sample( src_lowercase, lowercase_num) random.shuffle(password) new_password = 'AAAAAAAAAA' + ''.join(password)[10:-4] + 'AAAB' new_password_start = new_password[0:18] new_password_end = new_password[-7:] if new_password[18] == '8': new_password = new_password_start + 'w' + new_password_end elif new_password[18] == '9': new_password = new_password_start + 'x' + new_password_end elif new_password[18] == '-': new_password = new_password_start + 'y' + new_password_end elif new_password[18] == '.': new_password = new_password_start + 'z' + new_password_end else: new_password = new_password_start + 'y' + new_password_end return new_password # 获取视频详情 @classmethod def get_video_url(cls, log_type, crawler, gid): try: url = 'https://www.ixigua.com/api/mixVideo/information?' headers = { "accept-encoding": "gzip, deflate", "accept-language": "zh-CN,zh-Hans;q=0.9", "user-agent": get_random_user_agent('pc'), "referer": "https://www.ixigua.com/7102614741050196520?logTag=0531c88ac04f38ab2c62", } params = { 'mixId': gid, 'msToken': 'IlG0wd0Pylyw9ghcYiB2YseUmTwrsrqqhXrbIcsSaTcLTJyVlbYJzk20zw3UO-CfrfC' 'NVVIOBNjIl7vfBoxnVUwO9ZyzAI3umSKsT5-pef_RRfQCJwmA', 'X-Bogus': 'DFSzswVupYTANCJOSBk0P53WxM-r', '_signature': '_02B4Z6wo0000119LvEwAAIDCuktNZ0y5wkdfS7jAALThuOR8D9yWNZ.EmWHKV0WSn6Px' 'fPsH9-BldyxVje0f49ryXgmn7Tzk-swEHNb15TiGqa6YF.cX0jW8Eds1TtJOIZyfc9s5emH7gdWN94', } cookies = { 'ixigua-a-s': '1', 'msToken': 'IlG0wd0Pylyw9ghcYiB2YseUmTwrsrqqhXrbIcsSaTcLTJyVlbYJzk20zw3UO-CfrfCNVVIOB' 'NjIl7vfBoxnVUwO9ZyzAI3umSKsT5-pef_RRfQCJwmA', 'ttwid': '1%7C_yXQeHWwLZgCsgHClOwTCdYSOt_MjdOkgnPIkpi-Sr8%7C1661241238%7Cf57d0c5ef3f1d7' '6e049fccdca1ac54887c34d1f8731c8e51a49780ff0ceab9f8', 'tt_scid': 'QZ4l8KXDG0YAEaMCSbADdcybdKbUfG4BC6S4OBv9lpRS5VyqYLX2bIR8CTeZeGHR9ee3', 'MONITOR_WEB_ID': '0a49204a-7af5-4e96-95f0-f4bafb7450ad', '__ac_nonce': '06304878000964fdad287', '__ac_signature': '_02B4Z6wo00f017Rcr3AAAIDCUVxeW1tOKEu0fKvAAI4cvoYzV-wBhq7B6D8k0no7lb' 'FlvYoinmtK6UXjRIYPXnahUlFTvmWVtb77jsMkKAXzAEsLE56m36RlvL7ky.M3Xn52r9t1IEb7IR3ke8', 'ttcid': 'e56fabf6e85d4adf9e4d91902496a0e882', '_tea_utm_cache_1300': 'undefined', 'support_avif': 'false', 'support_webp': 'false', 'xiguavideopcwebid': '7134967546256016900', 'xiguavideopcwebid.sig': 'xxRww5R1VEMJN_dQepHorEu_eAc', } urllib3.disable_warnings() s = requests.session() # max_retries=3 重试3次 s.mount('http://', HTTPAdapter(max_retries=3)) s.mount('https://', HTTPAdapter(max_retries=3)) response = s.get(url=url, headers=headers, params=params, cookies=cookies, verify=False, proxies=Common.tunnel_proxies(), timeout=5) # response = s.get(url=url, headers=headers, params=params, cookies=cookies, verify=False) response.close() if 'data' not in response.json() or response.json()['data'] == '': Common.logger(log_type, crawler).warning('get_video_info: response: {}', response) else: video_info = response.json()['data']['gidInformation']['packerData']['video'] video_url_dict = {} # video_url if 'videoResource' not in video_info: video_url_dict["video_url"] = '' video_url_dict["audio_url"] = '' video_url_dict["video_width"] = 0 video_url_dict["video_height"] = 0 elif 'dash_120fps' in video_info['videoResource']: if "video_list" in video_info['videoResource']['dash_120fps'] and 'video_4' in \ video_info['videoResource']['dash_120fps']['video_list']: video_url = video_info['videoResource']['dash_120fps']['video_list']['video_4']['backup_url_1'] audio_url = video_info['videoResource']['dash_120fps']['video_list']['video_4']['backup_url_1'] if len(video_url) % 3 == 1: video_url += '==' elif len(video_url) % 3 == 2: video_url += '=' elif len(audio_url) % 3 == 1: audio_url += '==' elif len(audio_url) % 3 == 2: audio_url += '=' video_url = base64.b64decode(video_url).decode('utf8') audio_url = base64.b64decode(audio_url).decode('utf8') video_width = video_info['videoResource']['dash_120fps']['video_list']['video_4']['vwidth'] video_height = video_info['videoResource']['dash_120fps']['video_list']['video_4']['vheight'] video_url_dict["video_url"] = video_url video_url_dict["audio_url"] = audio_url video_url_dict["video_width"] = video_width video_url_dict["video_height"] = video_height elif "video_list" in video_info['videoResource']['dash_120fps'] and 'video_3' in \ video_info['videoResource']['dash_120fps']['video_list']: video_url = video_info['videoResource']['dash_120fps']['video_list']['video_3']['backup_url_1'] audio_url = video_info['videoResource']['dash_120fps']['video_list']['video_3']['backup_url_1'] if len(video_url) % 3 == 1: video_url += '==' elif len(video_url) % 3 == 2: video_url += '=' elif len(audio_url) % 3 == 1: audio_url += '==' elif len(audio_url) % 3 == 2: audio_url += '=' video_url = base64.b64decode(video_url).decode('utf8') audio_url = base64.b64decode(audio_url).decode('utf8') video_width = video_info['videoResource']['dash_120fps']['video_list']['video_3']['vwidth'] video_height = video_info['videoResource']['dash_120fps']['video_list']['video_3']['vheight'] video_url_dict["video_url"] = video_url video_url_dict["audio_url"] = audio_url video_url_dict["video_width"] = video_width video_url_dict["video_height"] = video_height elif "video_list" in video_info['videoResource']['dash_120fps'] and 'video_2' in \ video_info['videoResource']['dash_120fps']['video_list']: video_url = video_info['videoResource']['dash_120fps']['video_list']['video_2']['backup_url_1'] audio_url = video_info['videoResource']['dash_120fps']['video_list']['video_2']['backup_url_1'] if len(video_url) % 3 == 1: video_url += '==' elif len(video_url) % 3 == 2: video_url += '=' elif len(audio_url) % 3 == 1: audio_url += '==' elif len(audio_url) % 3 == 2: audio_url += '=' video_url = base64.b64decode(video_url).decode('utf8') audio_url = base64.b64decode(audio_url).decode('utf8') video_width = video_info['videoResource']['dash_120fps']['video_list']['video_2']['vwidth'] video_height = video_info['videoResource']['dash_120fps']['video_list']['video_2']['vheight'] video_url_dict["video_url"] = video_url video_url_dict["audio_url"] = audio_url video_url_dict["video_width"] = video_width video_url_dict["video_height"] = video_height elif "video_list" in video_info['videoResource']['dash_120fps'] and 'video_1' in \ video_info['videoResource']['dash_120fps']['video_list']: video_url = video_info['videoResource']['dash_120fps']['video_list']['video_1']['backup_url_1'] audio_url = video_info['videoResource']['dash_120fps']['video_list']['video_1']['backup_url_1'] if len(video_url) % 3 == 1: video_url += '==' elif len(video_url) % 3 == 2: video_url += '=' elif len(audio_url) % 3 == 1: audio_url += '==' elif len(audio_url) % 3 == 2: audio_url += '=' video_url = base64.b64decode(video_url).decode('utf8') audio_url = base64.b64decode(audio_url).decode('utf8') video_width = video_info['videoResource']['dash_120fps']['video_list']['video_1']['vwidth'] video_height = video_info['videoResource']['dash_120fps']['video_list']['video_1']['vheight'] video_url_dict["video_url"] = video_url video_url_dict["audio_url"] = audio_url video_url_dict["video_width"] = video_width video_url_dict["video_height"] = video_height elif 'dynamic_video' in video_info['videoResource']['dash_120fps'] \ and 'dynamic_video_list' in video_info['videoResource']['dash_120fps']['dynamic_video'] \ and 'dynamic_audio_list' in video_info['videoResource']['dash_120fps']['dynamic_video'] \ and len( video_info['videoResource']['dash_120fps']['dynamic_video']['dynamic_video_list']) != 0 \ and len( video_info['videoResource']['dash_120fps']['dynamic_video']['dynamic_audio_list']) != 0: video_url = \ video_info['videoResource']['dash_120fps']['dynamic_video']['dynamic_video_list'][-1][ 'backup_url_1'] audio_url = \ video_info['videoResource']['dash_120fps']['dynamic_video']['dynamic_audio_list'][-1][ 'backup_url_1'] if len(video_url) % 3 == 1: video_url += '==' elif len(video_url) % 3 == 2: video_url += '=' elif len(audio_url) % 3 == 1: audio_url += '==' elif len(audio_url) % 3 == 2: audio_url += '=' video_url = base64.b64decode(video_url).decode('utf8') audio_url = base64.b64decode(audio_url).decode('utf8') video_width = \ video_info['videoResource']['dash_120fps']['dynamic_video']['dynamic_video_list'][-1][ 'vwidth'] video_height = \ video_info['videoResource']['dash_120fps']['dynamic_video']['dynamic_video_list'][-1][ 'vheight'] video_url_dict["video_url"] = video_url video_url_dict["audio_url"] = audio_url video_url_dict["video_width"] = video_width video_url_dict["video_height"] = video_height else: video_url_dict["video_url"] = '' video_url_dict["audio_url"] = '' video_url_dict["video_width"] = 0 video_url_dict["video_height"] = 0 elif 'dash' in video_info['videoResource']: if "video_list" in video_info['videoResource']['dash'] and 'video_4' in \ video_info['videoResource']['dash']['video_list']: video_url = video_info['videoResource']['dash']['video_list']['video_4']['backup_url_1'] audio_url = video_info['videoResource']['dash']['video_list']['video_4']['backup_url_1'] if len(video_url) % 3 == 1: video_url += '==' elif len(video_url) % 3 == 2: video_url += '=' elif len(audio_url) % 3 == 1: audio_url += '==' elif len(audio_url) % 3 == 2: audio_url += '=' video_url = base64.b64decode(video_url).decode('utf8') audio_url = base64.b64decode(audio_url).decode('utf8') video_width = video_info['videoResource']['dash']['video_list']['video_4']['vwidth'] video_height = video_info['videoResource']['dash']['video_list']['video_4']['vheight'] video_url_dict["video_url"] = video_url video_url_dict["audio_url"] = audio_url video_url_dict["video_width"] = video_width video_url_dict["video_height"] = video_height elif "video_list" in video_info['videoResource']['dash'] and 'video_3' in \ video_info['videoResource']['dash']['video_list']: video_url = video_info['videoResource']['dash']['video_list']['video_3']['backup_url_1'] audio_url = video_info['videoResource']['dash']['video_list']['video_3']['backup_url_1'] if len(video_url) % 3 == 1: video_url += '==' elif len(video_url) % 3 == 2: video_url += '=' elif len(audio_url) % 3 == 1: audio_url += '==' elif len(audio_url) % 3 == 2: audio_url += '=' video_url = base64.b64decode(video_url).decode('utf8') audio_url = base64.b64decode(audio_url).decode('utf8') video_width = video_info['videoResource']['dash']['video_list']['video_3']['vwidth'] video_height = video_info['videoResource']['dash']['video_list']['video_3']['vheight'] video_url_dict["video_url"] = video_url video_url_dict["audio_url"] = audio_url video_url_dict["video_width"] = video_width video_url_dict["video_height"] = video_height elif "video_list" in video_info['videoResource']['dash'] and 'video_2' in \ video_info['videoResource']['dash']['video_list']: video_url = video_info['videoResource']['dash']['video_list']['video_2']['backup_url_1'] audio_url = video_info['videoResource']['dash']['video_list']['video_2']['backup_url_1'] if len(video_url) % 3 == 1: video_url += '==' elif len(video_url) % 3 == 2: video_url += '=' elif len(audio_url) % 3 == 1: audio_url += '==' elif len(audio_url) % 3 == 2: audio_url += '=' video_url = base64.b64decode(video_url).decode('utf8') audio_url = base64.b64decode(audio_url).decode('utf8') video_width = video_info['videoResource']['dash']['video_list']['video_2']['vwidth'] video_height = video_info['videoResource']['dash']['video_list']['video_2']['vheight'] video_url_dict["video_url"] = video_url video_url_dict["audio_url"] = audio_url video_url_dict["video_width"] = video_width video_url_dict["video_height"] = video_height elif "video_list" in video_info['videoResource']['dash'] and 'video_1' in \ video_info['videoResource']['dash']['video_list']: video_url = video_info['videoResource']['dash']['video_list']['video_1']['backup_url_1'] audio_url = video_info['videoResource']['dash']['video_list']['video_1']['backup_url_1'] if len(video_url) % 3 == 1: video_url += '==' elif len(video_url) % 3 == 2: video_url += '=' elif len(audio_url) % 3 == 1: audio_url += '==' elif len(audio_url) % 3 == 2: audio_url += '=' video_url = base64.b64decode(video_url).decode('utf8') audio_url = base64.b64decode(audio_url).decode('utf8') video_width = video_info['videoResource']['dash']['video_list']['video_1']['vwidth'] video_height = video_info['videoResource']['dash']['video_list']['video_1']['vheight'] video_url_dict["video_url"] = video_url video_url_dict["audio_url"] = audio_url video_url_dict["video_width"] = video_width video_url_dict["video_height"] = video_height elif 'dynamic_video' in video_info['videoResource']['dash'] \ and 'dynamic_video_list' in video_info['videoResource']['dash']['dynamic_video'] \ and 'dynamic_audio_list' in video_info['videoResource']['dash']['dynamic_video'] \ and len(video_info['videoResource']['dash']['dynamic_video']['dynamic_video_list']) != 0 \ and len(video_info['videoResource']['dash']['dynamic_video']['dynamic_audio_list']) != 0: video_url = video_info['videoResource']['dash']['dynamic_video']['dynamic_video_list'][-1][ 'backup_url_1'] audio_url = video_info['videoResource']['dash']['dynamic_video']['dynamic_audio_list'][-1][ 'backup_url_1'] if len(video_url) % 3 == 1: video_url += '==' elif len(video_url) % 3 == 2: video_url += '=' elif len(audio_url) % 3 == 1: audio_url += '==' elif len(audio_url) % 3 == 2: audio_url += '=' video_url = base64.b64decode(video_url).decode('utf8') audio_url = base64.b64decode(audio_url).decode('utf8') video_width = video_info['videoResource']['dash']['dynamic_video']['dynamic_video_list'][-1][ 'vwidth'] video_height = video_info['videoResource']['dash']['dynamic_video']['dynamic_video_list'][-1][ 'vheight'] video_url_dict["video_url"] = video_url video_url_dict["audio_url"] = audio_url video_url_dict["video_width"] = video_width video_url_dict["video_height"] = video_height else: video_url_dict["video_url"] = '' video_url_dict["audio_url"] = '' video_url_dict["video_width"] = 0 video_url_dict["video_height"] = 0 elif 'normal' in video_info['videoResource']: if "video_list" in video_info['videoResource']['normal'] and 'video_4' in \ video_info['videoResource']['normal']['video_list']: video_url = video_info['videoResource']['normal']['video_list']['video_4']['backup_url_1'] audio_url = video_info['videoResource']['normal']['video_list']['video_4']['backup_url_1'] if len(video_url) % 3 == 1: video_url += '==' elif len(video_url) % 3 == 2: video_url += '=' elif len(audio_url) % 3 == 1: audio_url += '==' elif len(audio_url) % 3 == 2: audio_url += '=' video_url = base64.b64decode(video_url).decode('utf8') audio_url = base64.b64decode(audio_url).decode('utf8') video_width = video_info['videoResource']['normal']['video_list']['video_4']['vwidth'] video_height = video_info['videoResource']['normal']['video_list']['video_4']['vheight'] video_url_dict["video_url"] = video_url video_url_dict["audio_url"] = audio_url video_url_dict["video_width"] = video_width video_url_dict["video_height"] = video_height elif "video_list" in video_info['videoResource']['normal'] and 'video_3' in \ video_info['videoResource']['normal']['video_list']: video_url = video_info['videoResource']['normal']['video_list']['video_3']['backup_url_1'] audio_url = video_info['videoResource']['normal']['video_list']['video_3']['backup_url_1'] if len(video_url) % 3 == 1: video_url += '==' elif len(video_url) % 3 == 2: video_url += '=' elif len(audio_url) % 3 == 1: audio_url += '==' elif len(audio_url) % 3 == 2: audio_url += '=' video_url = base64.b64decode(video_url).decode('utf8') audio_url = base64.b64decode(audio_url).decode('utf8') video_width = video_info['videoResource']['normal']['video_list']['video_3']['vwidth'] video_height = video_info['videoResource']['normal']['video_list']['video_3']['vheight'] video_url_dict["video_url"] = video_url video_url_dict["audio_url"] = audio_url video_url_dict["video_width"] = video_width video_url_dict["video_height"] = video_height elif "video_list" in video_info['videoResource']['normal'] and 'video_2' in \ video_info['videoResource']['normal']['video_list']: video_url = video_info['videoResource']['normal']['video_list']['video_2']['backup_url_1'] audio_url = video_info['videoResource']['normal']['video_list']['video_2']['backup_url_1'] if len(video_url) % 3 == 1: video_url += '==' elif len(video_url) % 3 == 2: video_url += '=' elif len(audio_url) % 3 == 1: audio_url += '==' elif len(audio_url) % 3 == 2: audio_url += '=' video_url = base64.b64decode(video_url).decode('utf8') audio_url = base64.b64decode(audio_url).decode('utf8') video_width = video_info['videoResource']['normal']['video_list']['video_2']['vwidth'] video_height = video_info['videoResource']['normal']['video_list']['video_2']['vheight'] video_url_dict["video_url"] = video_url video_url_dict["audio_url"] = audio_url video_url_dict["video_width"] = video_width video_url_dict["video_height"] = video_height elif "video_list" in video_info['videoResource']['normal'] and 'video_1' in \ video_info['videoResource']['normal']['video_list']: video_url = video_info['videoResource']['normal']['video_list']['video_1']['backup_url_1'] audio_url = video_info['videoResource']['normal']['video_list']['video_1']['backup_url_1'] if len(video_url) % 3 == 1: video_url += '==' elif len(video_url) % 3 == 2: video_url += '=' elif len(audio_url) % 3 == 1: audio_url += '==' elif len(audio_url) % 3 == 2: audio_url += '=' video_url = base64.b64decode(video_url).decode('utf8') audio_url = base64.b64decode(audio_url).decode('utf8') video_width = video_info['videoResource']['normal']['video_list']['video_1']['vwidth'] video_height = video_info['videoResource']['normal']['video_list']['video_1']['vheight'] video_url_dict["video_url"] = video_url video_url_dict["audio_url"] = audio_url video_url_dict["video_width"] = video_width video_url_dict["video_height"] = video_height elif 'dynamic_video' in video_info['videoResource']['normal'] \ and 'dynamic_video_list' in video_info['videoResource']['normal']['dynamic_video'] \ and 'dynamic_audio_list' in video_info['videoResource']['normal']['dynamic_video'] \ and len(video_info['videoResource']['normal']['dynamic_video']['dynamic_video_list']) != 0 \ and len(video_info['videoResource']['normal']['dynamic_video']['dynamic_audio_list']) != 0: video_url = video_info['videoResource']['normal']['dynamic_video']['dynamic_video_list'][-1][ 'backup_url_1'] audio_url = video_info['videoResource']['normal']['dynamic_video']['dynamic_audio_list'][-1][ 'backup_url_1'] if len(video_url) % 3 == 1: video_url += '==' elif len(video_url) % 3 == 2: video_url += '=' elif len(audio_url) % 3 == 1: audio_url += '==' elif len(audio_url) % 3 == 2: audio_url += '=' video_url = base64.b64decode(video_url).decode('utf8') audio_url = base64.b64decode(audio_url).decode('utf8') video_width = video_info['videoResource']['normal']['dynamic_video']['dynamic_video_list'][-1][ 'vwidth'] video_height = video_info['videoResource']['normal']['dynamic_video']['dynamic_video_list'][-1][ 'vheight'] video_url_dict["video_url"] = video_url video_url_dict["audio_url"] = audio_url video_url_dict["video_width"] = video_width video_url_dict["video_height"] = video_height else: video_url_dict["video_url"] = '' video_url_dict["audio_url"] = '' video_url_dict["video_width"] = 0 video_url_dict["video_height"] = 0 else: video_url_dict["video_url"] = '' video_url_dict["audio_url"] = '' video_url_dict["video_width"] = 0 video_url_dict["video_height"] = 0 return video_url_dict except Exception as e: Common.logger(log_type, crawler).error(f'get_video_url:{e}\n') @classmethod def get_video_info(cls, log_type, crawler, item_id): d_url = "http://a6.pstatp.com/article/full/11/1/{video_id}/{video_id}/1/0/?iid=3636030325&device_id=5787057242" \ "&ac=wifi&channel=wandoujia&aid=13&app_name=news_article&version_code=532&version_name=5.3.2&device_platform" \ "=android&ab_client=a1%2Cc2%2Ce1%2Cf2%2Cg2%2Cb3%2Cf4&abflag=3&ssmix=a&device_type=SM705" \ "&device_brand=smartisan&os_api=19&os_version=4.4.2&uuid=864593021012562&openudid=e23a5ff037ef2d1a" \ "&manifest_version_code=532&resolution=1080*1920&dpi=480&update_version_code=5320".format( video_id=item_id) res = requests.get(url=d_url, headers=get_random_header('pc'), proxies=Common.tunnel_proxies()) data = json.loads(res.text)['data'] item_counter = data['h5_extra']['itemCell']['itemCounter'] user_info = data['user_info'] detail_info = data['video_detail_info'] video_dict = {'video_title': data['title'].replace('"', '').replace("'", ''), 'video_id': detail_info['video_id'], 'gid': data['group_id'], 'play_cnt': item_counter['videoWatchCount'], 'comment_cnt': item_counter['commentCount'], 'like_cnt': item_counter['diggCount'], 'share_cnt': item_counter['shareCount'], 'duration': data['video_duration'], 'publish_time_stamp': data['publish_time'], 'publish_time_str': time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(data['publish_time'])), 'user_name': user_info['name'], 'user_id': user_info['user_id'], 'avatar_url': user_info['avatar_url'], 'cover_url': data['large_image']['url'].replace('\u0026', '&'), } return video_dict @classmethod def is_ruled(cls, log_type, crawler, video_dict, rule_dict): old_time = int(time.time()) - (3600 * 24 * rule_dict['publish_time']) if video_dict['publish_time_stamp'] <= old_time: return False elif video_dict['play_cnt'] <= rule_dict['play_cnt']: return False elif video_dict['duration'] < rule_dict['min_duration'] or video_dict['duration'] > rule_dict['max_duration']: return False else: return True @classmethod def get_videolist(cls, log_type, crawler, strategy, our_uid, search_word, oss_endpoint, env, machine): total_count = 1 offset = 0 while True: signature = cls.random_signature() # url = "https://www.ixigua.com/api/searchv2/complex/{}/{}?order_type=publish_time&click_position=new".format( # quote(search_word), offset, signature) url = f'https://www.ixigua.com/api/searchv2/complex/{quote(search_word)}/{offset}?' \ f'search_id=202305111126371489381ECEC7FE277E3F&' \ f'aid=1768&' \ f'msToken=lPfIf3aps6EktQAeOl9yRgnL44MtMeGt2WnHjahIR0IysASB_zdhGiY0J9WWxNDpLd7aVdQx_36MpyPI5f2zRUHFYyNNsX5cl-or6GkiVuLLiRsU3ylxj9vt7Upubw==&' \ f'X-Bogus=DFSzswVY4h0ANGD7tC7G/Mm4pIkV&' \ f'_signature={signature}' headers = { 'referer': 'https://www.ixigua.com/search/{}/?logTag=594535e3690f17a88cdb&tab_name=search'.format( quote(search_word)), 'cookie': 'ttcid=5d8f917a525e46759dc886296bf1111b69; MONITOR_WEB_ID=ad1c8360-d4c9-4fa2-a801-d9fd68dfc1b2; s_v_web_id=verify_lh8vaa6v_VI4RQ0ET_nVbq_4PXw_8mfN_7Xp6wdLOZi08; passport_csrf_token=0e7c6992cb6170c9db034c3696191fff; passport_csrf_token_default=0e7c6992cb6170c9db034c3696191fff; support_webp=true; support_avif=true; csrf_session_id=a5355d954d3c63ed1ba35faada452b4d; odin_tt=3072e827705bd5aa707fb8d432524d7f8fad972b02b31a2d3458a3e5209d5492; sid_guard=46a52ce83dacb0b871dae675476a3e42%7C1683773717%7C21600%7CThu%2C+11-May-2023+08%3A55%3A17+GMT; uid_tt=4126f296856e6042f195253e9a01c4cb; uid_tt_ss=4126f296856e6042f195253e9a01c4cb; sid_tt=46a52ce83dacb0b871dae675476a3e42; sessionid=46a52ce83dacb0b871dae675476a3e42; sessionid_ss=46a52ce83dacb0b871dae675476a3e42; sid_ucp_v1=1.0.0-KDMyMzg5NWI3YzAxMGFkN2Y4MjZiMzE5Njc0MGFmMWQ5NGExY2MyYzgKCBCVsvGiBhgNGgJobCIgNDZhNTJjZTgzZGFjYjBiODcxZGFlNjc1NDc2YTNlNDI; ssid_ucp_v1=1.0.0-KDMyMzg5NWI3YzAxMGFkN2Y4MjZiMzE5Njc0MGFmMWQ5NGExY2MyYzgKCBCVsvGiBhgNGgJobCIgNDZhNTJjZTgzZGFjYjBiODcxZGFlNjc1NDc2YTNlNDI; ixigua-a-s=1; tt_scid=sblZQP6nSw2f6A.XS-yHFqB.R3o9UFsRTUCKAoWlHWzNrOf8R01qeIBbu6TDeXtMa3fb; ttwid=1%7C4zaTJmlaHpEa8rAB-KjREdxT3sNBUJWrAzRJnNvqExQ%7C1683775619%7Cf4fc6fa51baf2e302242da412ead6500c3d3f5bfb0be6253cbae00301d5773ae; msToken=lPfIf3aps6EktQAeOl9yRgnL44MtMeGt2WnHjahIR0IysASB_zdhGiY0J9WWxNDpLd7aVdQx_36MpyPI5f2zRUHFYyNNsX5cl-or6GkiVuLLiRsU3ylxj9vt7Upubw==', 'user-agent': get_random_user_agent('pc'), } try: proxies = Common.tunnel_proxies() s = requests.session() # max_retries=3 重试3次 s.mount('http://', HTTPAdapter(max_retries=3)) s.mount('https://', HTTPAdapter(max_retries=3)) res = s.request("GET", url, headers=headers, proxies=proxies, timeout=5) # Common.logger(log_type, crawler).info(f"proxies:{proxies}\n") Common.logger(log_type, crawler).info(f"get_videolist:{res.json()}\n") search_list = res.json()['data']['data'] except Exception as e: Common.logger(log_type, crawler).warning(f"get_videolist:{e}\n") continue if not search_list: Common.logger(log_type, crawler).error(f'关键词:{search_word},没有获取到视频列表:offset{offset}') return for video_info in search_list: v_type = video_info['type'] rule_dict = cls.get_rule(log_type, crawler) publish_time = video_info['data']['publish_time'] old_time = int(time.time()) - (3600 * 24 * rule_dict['publish_time']) if publish_time <= old_time: Common.logger(log_type, crawler).error(f'关键词:{search_word},抓取完毕,退出抓取\n') return if v_type == 'video': item_id = video_info['data']['group_id'] if video_info['data']['publish_time'] <= old_time: Common.logger(log_type, crawler).error(f'关键词:{search_word},视频:{item_id},不符合抓取规则\n') continue elif video_info['data']['video_watch_count'] <= rule_dict['play_cnt']: Common.logger(log_type, crawler).error(f'关键词:{search_word},视频:{item_id},不符合抓取规则\n') continue elif video_info['data']['video_time'] < rule_dict['min_duration'] or video_info['data'][ 'video_time'] > rule_dict['max_duration']: Common.logger(log_type, crawler).error(f'关键词:{search_word},视频:{item_id},不符合抓取规则\n') continue try: video_dict = cls.get_video_info(log_type, crawler, item_id) filter_words = get_config_from_mysql(log_type, crawler, env, text='filter') is_filter = False for filter_word in filter_words: if filter_word in video_dict['video_title']: is_filter = True break if is_filter: Common.logger(log_type, crawler).info('标题已中过滤词:{}\n', video_dict['video_title']) continue video_url_dict = cls.get_video_url(log_type, crawler, video_dict['gid']) video_dict['video_width'] = video_url_dict["video_width"] video_dict['video_height'] = video_url_dict["video_height"] video_dict['audio_url'] = video_url_dict["audio_url"] video_dict['video_url'] = video_url_dict["video_url"] video_dict['session'] = signature except Exception as e: Common.logger(log_type, crawler).error( f'关键词:{search_word},视频:{item_id},获取详情失败,原因:{e}') continue if cls.repeat_video(log_type, crawler, video_dict['video_id'], env, machine) != 0: Common.logger(log_type, crawler).info( f'关键词:{search_word},gid:{video_dict["gid"]},视频已下载,无需重复下载\n') continue for k, v in video_dict.items(): Common.logger(log_type, crawler).info(f"{k}:{v}") try: # print( # f'search_word:{search_word},title:{video_dict["video_title"]},gid:{video_dict["gid"]},offset:{offset}, total:{total_count}') cls.download_publish( search_word=search_word, log_type=log_type, crawler=crawler, video_dict=video_dict, rule_dict=rule_dict, strategy=strategy, our_uid=our_uid, oss_endpoint=oss_endpoint, env=env, machine=machine ) except Exception as e: Common.logger(log_type, crawler).error(f'关键词:{search_word},视频:{item_id},下载失败,原因:{e}') continue total_count += 1 Common.logger(log_type, crawler).info( f'search_word:{search_word},title:{video_dict["video_title"]},gid:{video_dict["gid"]},offset:{offset}, total:{total_count}') if total_count >= 30: return # elif v_type == 'pseries': # try: # item_id = video_info['data']['group_id'] # p_url = "https://www.ixigua.com/api/videov2/pseries_more_v2?pSeriesId={}&rank=0&tailCount=30&aid=1768&msToken=wHEafKFLx0k3hihOPbhXYNsfMBxWiq2AB0K5R-34kEFixyq3ATi_DuXbL4Q47J9C2uK2zgWItMa1g2yc4FyDxM4dMijmSdwF4c4T8sSmOkoOI0wGzeEcPw==&X-Bogus=DFSzswVOzdUANG3ItaVHYr7TlqCv&_signature=_02B4Z6wo00001vB6l3QAAIDBZKzMeTihTmbwepPAANgh1Ai3JgFFo4e6anoezmBEpHfEMEYlWISGhXI-QKfev4N-2bwgXsHOuNGLnOsGqMbANIjFPh7Yj6OakQWrkbACenlv0P-arswtB6Zn45".format( # item_id) # p_headers = { # 'referer': 'https://www.ixigua.com/{}?series_flow=1&logTag=cfec9d927da968feff89'.format( # item_id), # 'user-agent': get_random_user_agent('pc'), # } # p_res = requests.request("GET", p_url, headers=p_headers, # proxies=Common.tunnel_proxies()).json() # except Exception as e: # Common.logger(log_type, crawler).error(f'合集:{item_id},没有获取到合集详情,原因:{e}') # continue # for video in p_res['data']: # item_id = video['item_id'] # try: # video_dict = cls.get_video_info(log_type, crawler, item_id) # video_url_dict = cls.get_video_url(log_type, crawler, video_dict['gid']) # video_dict['video_width'] = video_url_dict["video_width"] # video_dict['video_height'] = video_url_dict["video_height"] # video_dict['audio_url'] = video_url_dict["audio_url"] # video_dict['video_url'] = video_url_dict["video_url"] # video_dict['session'] = signature # except Exception as e: # Common.logger(log_type, crawler).error(f'视频:{item_id},没有获取到视频详情,原因:{e}') # continue # if cls.repeat_video(log_type, crawler, video_dict['video_id'], env, machine) != 0: # Common.logger(log_type, crawler).info( # f'gid:{video_dict["gid"]},视频已下载,无需重复下载\n') # continue # if not cls.is_ruled(log_type, crawler, video_dict, rule_dict): # Common.logger(log_type, crawler).error(f'视频:{item_id},不符合抓取规则\n') # continue # for k, v in video_dict.items(): # Common.logger(log_type, crawler).info(f"{k}:{v}") # try: # # print( # # f'search_word:{search_word},title:{video_dict["video_title"]},gid:{video_dict["gid"]},offset:{offset}, total:{total_count}') # cls.download_publish( # search_word=search_word, # log_type=log_type, # crawler=crawler, # video_dict=video_dict, # rule_dict=rule_dict, # strategy=strategy, # our_uid=our_uid, # oss_endpoint=oss_endpoint, # env=env, # machine=machine # ) # total_count += 1 # if total_count >= 30: # return # else: # break # except Exception as e: # Common.logger(log_type, crawler).error(f'视频:{item_id},download_publish异常:{e}\n') offset += 10 @classmethod def repeat_video(cls, log_type, crawler, video_id, env, machine): sql = f""" select * from crawler_video where platform="{cls.platform}" and out_video_id="{video_id}"; """ repeat_video = MysqlHelper.get_values(log_type, crawler, sql, env, machine) return len(repeat_video) # 下载 / 上传 @classmethod def download_publish(cls, log_type, crawler, search_word, strategy, video_dict, rule_dict, our_uid, oss_endpoint, env, machine): Common.download_method(log_type=log_type, crawler=crawler, text='xigua_video', title=video_dict['video_title'], url=video_dict['video_url']) # 下载音频 Common.download_method(log_type=log_type, crawler=crawler, text='xigua_audio', title=video_dict['video_title'], url=video_dict['audio_url']) # 合成音视频 Common.video_compose(log_type=log_type, crawler=crawler, video_dir=f"./{crawler}/videos/{video_dict['video_title']}") md_title = md5(video_dict['video_title'].encode('utf8')).hexdigest() if os.path.getsize(f"./{crawler}/videos/{md_title}/video.mp4") == 0: # 删除视频文件夹 shutil.rmtree(f"./{crawler}/videos/{md_title}") Common.logger(log_type, crawler).info("视频size=0,删除成功\n") return # ffmpeg_dict = Common.ffmpeg(log_type, crawler, # f"./{crawler}/videos/{video_dict['video_title']}/video.mp4") # if ffmpeg_dict is None or ffmpeg_dict['size'] == 0: # Common.logger(log_type, crawler).warning(f"下载的视频无效,已删除\n") # # 删除视频文件夹 # shutil.rmtree(f"./{crawler}/videos/{video_dict['video_title']}") # return # 下载封面 Common.download_method(log_type=log_type, crawler=crawler, text='cover', title=video_dict['video_title'], url=video_dict['cover_url']) # 保存视频信息至txt Common.save_video_info(log_type=log_type, crawler=crawler, video_dict=video_dict) # 上传视频 Common.logger(log_type, crawler).info("开始上传视频...") our_video_id = Publish.upload_and_publish(log_type=log_type, crawler=crawler, strategy=strategy, our_uid=our_uid, env=env, oss_endpoint=oss_endpoint) if env == 'dev': our_video_link = f"https://testadmin.piaoquantv.com/cms/post-detail/{our_video_id}/info" else: our_video_link = f"https://admin.piaoquantv.com/cms/post-detail/{our_video_id}/info" Common.logger(log_type, crawler).info("视频上传完成") if our_video_id is None: # 删除视频文件夹 shutil.rmtree(f"./{crawler}/videos/{video_dict['video_title']}") return # 视频写入飞书 Feishu.insert_columns(log_type, 'xigua', "BUNvGC", "ROWS", 1, 2) upload_time = int(time.time()) values = [[ search_word, time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(upload_time)), "关键词搜索", video_dict['video_title'], str(video_dict['video_id']), our_video_link, video_dict['gid'], video_dict['play_cnt'], video_dict['comment_cnt'], video_dict['like_cnt'], video_dict['share_cnt'], video_dict['duration'], str(video_dict['video_width']) + '*' + str(video_dict['video_height']), video_dict['publish_time_str'], video_dict['user_name'], video_dict['user_id'], video_dict['avatar_url'], video_dict['cover_url'], video_dict['video_url'], video_dict['audio_url']]] time.sleep(1) Feishu.update_values(log_type, 'xigua', "BUNvGC", "E2:Z2", values) Common.logger(log_type, crawler).info(f"视频已保存至云文档\n") # 视频信息保存数据库 insert_sql = f""" insert into crawler_video(video_id, user_id, out_user_id, platform, strategy, out_video_id, video_title, cover_url, video_url, duration, publish_time, play_cnt, crawler_rule, width, height) values({our_video_id}, {our_uid}, "{video_dict['user_id']}", "{cls.platform}", "搜索爬虫策略", "{video_dict['video_id']}", "{video_dict['video_title']}", "{video_dict['cover_url']}", "{video_dict['video_url']}", {int(video_dict['duration'])}, "{video_dict['publish_time_str']}", {int(video_dict['play_cnt'])}, '{json.dumps(rule_dict)}', {int(video_dict['video_width'])}, {int(video_dict['video_height'])}) """ Common.logger(log_type, crawler).info(f"insert_sql:{insert_sql}") MysqlHelper.update_values(log_type, crawler, insert_sql, env, machine) Common.logger(log_type, crawler).info('视频信息插入数据库成功!\n') @classmethod def get_search_videos(cls, log_type, crawler, strategy, oss_endpoint, env, machine): user_list = cls.get_user_list(log_type=log_type, crawler=crawler, sheetid="SSPNPW", env=env, machine=machine) for user in user_list: try: search_word = user["search_word"] our_uid = user["our_uid"] Common.logger(log_type, crawler).info(f"开始抓取 {search_word} 用户主页视频\n") cls.get_videolist(log_type=log_type, crawler=crawler, strategy=strategy, our_uid=our_uid, search_word=search_word, oss_endpoint=oss_endpoint, env=env, machine=machine) except Exception as e: Common.logger(log_type, crawler).error(f"get_search_videos:{e}\n") if __name__ == '__main__': XiguaSearch.get_search_videos('search', 'xigua', 'xigua_search', 'out', 'dev', 'aliyun')