"""
@author: luojunhui
西瓜视频搜索爬虫
"""
import re
import json
import base64
import requests
import urllib.parse
from lxml import etree
from Crypto.Cipher import AES
from Crypto.Util.Padding import unpad
from fake_useragent import FakeUserAgent
from applications.functions.common import sensitive_flag
class XiGuaFunctions(object):
"""
XiGuaSearch Class
"""
@classmethod
def tunnel_proxies(cls):
"""
快代理方法
:return:
"""
tunnel = "q796.kdltps.com:15818"
username = "t17772369458618"
password = "5zqcjkmy"
proxies = {
"http": "http://%(user)s:%(pwd)s@%(proxy)s/" % {"user": username, "pwd": password, "proxy": tunnel},
"https": "http://%(user)s:%(pwd)s@%(proxy)s/" % {"user": username, "pwd": password, "proxy": tunnel}
}
return proxies
@classmethod
def byte_dance_cookie(cls, item_id):
"""
获取西瓜视频的 cookie
:param item_id:
"""
sess = requests.Session()
sess.headers.update({
'user-agent': FakeUserAgent().chrome,
'referer': 'https://www.ixigua.com/home/{}/'.format(item_id),
})
# 获取 cookies
sess.get('https://i.snssdk.com/slardar/sdk.js?bid=xigua_video_web_pc')
data = '{"region":"cn","aid":1768,"needFid":false,"service":"www.ixigua.com","migrate_info":{"ticket":"","source":"node"},"cbUrlProtocol":"https","union":true}'
r = sess.post('https://ttwid.bytedance.com/ttwid/union/register/', data=data)
if r.json()['redirect_url']:
requests.get(
url=r.json()['redirect_url']
)
return r.cookies.values()[0]
@classmethod
def aes_decrypt(cls, data, key):
"""
XiGua AES decrypt
:param data:
:param key:
:return:
"""
password = key.encode()
iv = password[:16]
try:
ct = base64.b64decode(data.encode())
cipher = AES.new(password, AES.MODE_CBC, iv)
pt = unpad(cipher.decrypt(ct), AES.block_size)
return base64.b64decode(pt).decode()
except Exception as e:
print("Incorrect decryption {}".format(e))
return None
@classmethod
def extract_video_url(cls, text):
"""
获取视频 video_url
:param text:
:return:
"""
HTML = etree.HTML(text)
str_2 = HTML.xpath('//script[@id="SSR_HYDRATED_DATA"]/text()')[0]
json_2 = str_2[str_2.find('{'):str_2.rfind('}') + 1]
Irregulars = ['null', 'undefined', '=false', '=true', 'false', 'true']
# python中不规则的定义
for I in Irregulars:
if I in ['=false', '=true']:
json_2 = json_2.replace(I, '=' + I[1:].capitalize())
else:
json_2 = json_2.replace(I, '12')
dict_2 = json.loads(json_2)["anyVideo"]["gidInformation"]["packerData"]["video"]
duration = dict_2["video_duration"]
play_cnt = dict_2['video_watch_count']
publish_time = int(dict_2['video_publish_time'])
like_cnt = dict_2['video_like_count']
video_title = dict_2['title']
video_id = dict_2['vid']
video_res = dict_2['videoResource']
cover_url = dict_2['poster_url'].replace("\\u002F", "/")
if video_res['dash'] == 12:
obj = video_res['normal']
ptk = obj['ptk']
video_list = obj['video_list']
keys = list(video_list.keys())
main_url = video_list[keys[-1]]['main_url']
real_video_url = cls.aes_decrypt(data=main_url, key=ptk)
else:
obj = video_res['dash']
ptk = obj["ptk"]
video_url = obj['dynamic_video']['main_url']
real_video_url = cls.aes_decrypt(data=video_url, key=ptk)
return {
"video_url": real_video_url,
"cover_url": cover_url,
"video_id": video_id,
"video_title": video_title,
"like_cnt": like_cnt,
"play_cnt": play_cnt,
"publish_time": publish_time,
"duration": duration
}
@classmethod
def extract_info_by_re(cls, text):
"""
通过正则表达式获取文本中的信息
:param text:
:return:
"""
result = cls.extract_video_url(text)
# 标题
title_match = re.search(r'
]*>(.*?)', text)
if title_match:
title_content = title_match.group(1)
title_content = title_content.split(" - ")[0]
try:
title_content = bytes(title_content, "latin1").decode()
except:
title_content = title_content
else:
title_content = ""
result['video_title'] = title_content
return result
@classmethod
def get_video_info(cls, item_id):
"""
获取视频信息
"""
url = "https://www.ixigua.com/{}".format(item_id)
headers = {
"accept-encoding": "gzip, deflate",
"accept-language": "zh-CN,zh-Hans;q=0.9",
"cookie": "ttwid={}".format(cls.byte_dance_cookie(item_id)),
"user-agent": FakeUserAgent().random,
"referer": "https://www.ixigua.com/{}/".format(item_id),
}
response = requests.get(
url=url,
headers=headers
)
video_info = cls.extract_info_by_re(response.text)
return video_info
def xigua_search(keyword, sensitive_words):
"""
搜索
"""
keyword = urllib.parse.quote(keyword)
base_url = "https://www.ixigua.com/search/{}/ab_name=search&fss=input".format(
keyword
)
headers = {
"authority": "www.ixigua.com",
"accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7",
"accept-language": "zh,en;q=0.9,zh-CN;q=0.8",
"cache-control": "max-age=0",
"cookie": "ixigua-a-s=1; support_webp=true; support_avif=true; csrf_session_id=a5355d954d3c63ed1ba35faada452b4d; tt_scid=Ur23fgYD2pMJOvi1BpILyfaobg8wA7IhGwmQx260ULRa8Dvjaxc5ZA63BUIP-6Vi473f; ttwid=1%7CNtTtSp4Iej-v0nWtepdZH3d3Ts6uGNMFzTN20ps1cdo%7C1708236945%7Cc1f301c64aa3bf69cdaa41f28856e2bb7b7eed16583f8c92d50cffa2d9944fc6; msToken=rr418opQf04vm8n9s8FAGdr1AoCUsvAOGKSDPbBEfwVS1sznxxZCvcZTI93qVz5uAXlX9yRwcKlNQZ4wMro2DmlHw5yWHAVeKr_SzgO1KtVVnjUMTUNEux_cq1-EIkI=",
"upgrade-insecure-requests": "1",
"user-agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/121.0.0.0 Safari/537.36",
}
basic_response = requests.get(url=base_url, headers=headers)
html = etree.HTML(basic_response.text)
result_list = html.xpath(
'//div[@class="HorizontalFeedCard searchPageV2__card"]/div[1]/a'
)
if result_list:
for item in result_list:
try:
url = item.xpath("@href")[0]
duration_str = str(item.xpath("./span/text()")[0])
duration_obj = duration_str.split(":")
if len(duration_obj) == 3:
duration = 100000
elif len(duration_obj) == 2:
duration = int(duration_str.split(":")[0]) * 60 + int(duration_str.split(":")[1])
else:
duration = 10000
title = item.xpath("@title")[0]
real_title = bytes(str(title), "latin1").decode()
if sensitive_flag(sensitive_words, real_title) and duration <= 300:
try:
res = XiGuaFunctions().get_video_info(url[1:])
if res:
return [res]
else:
continue
except Exception as e:
print(e)
except Exception as e:
print(e)
return []
else:
return []
def xigua_search_v2(keyword, sensitive_words):
"""
Search By KeyWord
:param sensitive_words:
:param keyword:
:return:
"""
url = "https://www.ixigua.com/api/searchv2/complex/{}/10".format(keyword)
params = {}
headers = {
'accept': 'application/json, text/plain, */*',
'accept-language': 'en,zh;q=0.9,zh-CN;q=0.8',
'cookie': '_tea_utm_cache_2285=undefined;',
'priority': 'u=1, i',
'referer': 'https://www.ixigua.com/search/{}'.format(urllib.parse.quote(keyword)),
'user-agent': FakeUserAgent().chrome
}
response = requests.request("GET", url, headers=headers, params=params)
try:
recall_list = response.json()['data']['data']
if recall_list:
for obj in recall_list:
if obj['type'] == "video":
title = obj['data']['title']
url = obj['data']['group_id']
duration = obj['data']['video_time']
watch_count = obj['data']['video_watch_count']
if sensitive_flag(sensitive_words, title) and duration <= 300:
try:
res = XiGuaFunctions().get_video_info(url)
if res:
return [res]
else:
continue
except Exception as e:
print(e)
return []
else:
return []
except Exception as e:
return []