123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222 |
- """
- @author: luojunhui
- 西瓜视频搜索爬虫
- """
- import re
- import json
- import base64
- import requests
- import urllib.parse
- from lxml import etree
- from Crypto.Cipher import AES
- from Crypto.Util.Padding import unpad
- from fake_useragent import FakeUserAgent
- from applications.functions.common import sensitive_flag
- class XiGuaFunctions(object):
- """
- XiGuaSearch Class
- """
- @classmethod
- def tunnel_proxies(cls):
- """
- 快代理方法
- :return:
- """
- tunnel = "q796.kdltps.com:15818"
- username = "t17772369458618"
- password = "5zqcjkmy"
- proxies = {
- "http": "http://%(user)s:%(pwd)s@%(proxy)s/" % {"user": username, "pwd": password, "proxy": tunnel},
- "https": "http://%(user)s:%(pwd)s@%(proxy)s/" % {"user": username, "pwd": password, "proxy": tunnel}
- }
- return proxies
- @classmethod
- def byte_dance_cookie(cls, item_id):
- """
- 获取西瓜视频的 cookie
- :param item_id:
- """
- sess = requests.Session()
- sess.headers.update({
- 'user-agent': FakeUserAgent().chrome,
- 'referer': 'https://www.ixigua.com/home/{}/'.format(item_id),
- })
- # 获取 cookies
- sess.get('https://i.snssdk.com/slardar/sdk.js?bid=xigua_video_web_pc')
- data = '{"region":"cn","aid":1768,"needFid":false,"service":"www.ixigua.com","migrate_info":{"ticket":"","source":"node"},"cbUrlProtocol":"https","union":true}'
- r = sess.post('https://ttwid.bytedance.com/ttwid/union/register/', data=data)
- if r.json()['redirect_url']:
- requests.get(
- url=r.json()['redirect_url']
- )
- return r.cookies.values()[0]
- @classmethod
- def aes_decrypt(cls, data, key):
- """
- XiGua AES decrypt
- :param data:
- :param key:
- :return:
- """
- password = key.encode()
- iv = password[:16]
- try:
- ct = base64.b64decode(data.encode())
- cipher = AES.new(password, AES.MODE_CBC, iv)
- pt = unpad(cipher.decrypt(ct), AES.block_size)
- return base64.b64decode(pt).decode()
- except Exception as e:
- print("Incorrect decryption {}".format(e))
- return None
- @classmethod
- def extract_video_url(cls, text):
- """
- 获取视频 video_url
- :param text:
- :return:
- """
- HTML = etree.HTML(text)
- str_2 = HTML.xpath('//script[@id="SSR_HYDRATED_DATA"]/text()')[0]
- json_2 = str_2[str_2.find('{'):str_2.rfind('}') + 1]
- Irregulars = ['null', 'undefined', '=false', '=true', 'false', 'true']
- # python中不规则的定义
- for I in Irregulars:
- if I in ['=false', '=true']:
- json_2 = json_2.replace(I, '=' + I[1:].capitalize())
- else:
- json_2 = json_2.replace(I, '12')
- dict_2 = json.loads(json_2)["anyVideo"]["gidInformation"]["packerData"]["video"]
- duration = dict_2["video_duration"]
- play_cnt = dict_2['video_watch_count']
- publish_time = int(dict_2['video_publish_time'])
- like_cnt = dict_2['video_like_count']
- video_title = dict_2['title']
- video_id = dict_2['vid']
- video_res = dict_2['videoResource']
- cover_url = dict_2['poster_url'].replace("\\u002F", "/")
- if video_res['dash'] == 12:
- obj = video_res['normal']
- ptk = obj['ptk']
- video_list = obj['video_list']
- keys = list(video_list.keys())
- main_url = video_list[keys[-1]]['main_url']
- real_video_url = cls.aes_decrypt(data=main_url, key=ptk)
- else:
- obj = video_res['dash']
- ptk = obj["ptk"]
- video_url = obj['dynamic_video']['main_url']
- real_video_url = cls.aes_decrypt(data=video_url, key=ptk)
- return {
- "video_url": real_video_url,
- "cover_url": cover_url,
- "video_id": video_id,
- "video_title": video_title,
- "like_cnt": like_cnt,
- "play_cnt": play_cnt,
- "publish_time": publish_time,
- "duration": duration
- }
- @classmethod
- def extract_info_by_re(cls, text):
- """
- 通过正则表达式获取文本中的信息
- :param text:
- :return:
- """
- result = cls.extract_video_url(text)
- # 标题
- title_match = re.search(r'<title[^>]*>(.*?)</title>', text)
- if title_match:
- title_content = title_match.group(1)
- title_content = title_content.split(" - ")[0]
- try:
- title_content = bytes(title_content, "latin1").decode()
- except:
- title_content = title_content
- else:
- title_content = ""
- result['video_title'] = title_content
- return result
- @classmethod
- def get_video_info(cls, item_id):
- """
- 获取视频信息
- """
- url = "https://www.ixigua.com/{}".format(item_id)
- headers = {
- "accept-encoding": "gzip, deflate",
- "accept-language": "zh-CN,zh-Hans;q=0.9",
- "cookie": "ttwid={}".format(cls.byte_dance_cookie(item_id)),
- "user-agent": FakeUserAgent().random,
- "referer": "https://www.ixigua.com/{}/".format(item_id),
- }
- response = requests.get(
- url=url,
- headers=headers
- )
- video_info = cls.extract_info_by_re(response.text)
- return video_info
- def xigua_search(keyword, sensitive_words):
- """
- 搜索
- """
- keyword = urllib.parse.quote(keyword)
- base_url = "https://www.ixigua.com/search/{}/ab_name=search&fss=input".format(
- keyword
- )
- headers = {
- "authority": "www.ixigua.com",
- "accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7",
- "accept-language": "zh,en;q=0.9,zh-CN;q=0.8",
- "cache-control": "max-age=0",
- "cookie": "ixigua-a-s=1; support_webp=true; support_avif=true; csrf_session_id=a5355d954d3c63ed1ba35faada452b4d; tt_scid=Ur23fgYD2pMJOvi1BpILyfaobg8wA7IhGwmQx260ULRa8Dvjaxc5ZA63BUIP-6Vi473f; ttwid=1%7CNtTtSp4Iej-v0nWtepdZH3d3Ts6uGNMFzTN20ps1cdo%7C1708236945%7Cc1f301c64aa3bf69cdaa41f28856e2bb7b7eed16583f8c92d50cffa2d9944fc6; msToken=rr418opQf04vm8n9s8FAGdr1AoCUsvAOGKSDPbBEfwVS1sznxxZCvcZTI93qVz5uAXlX9yRwcKlNQZ4wMro2DmlHw5yWHAVeKr_SzgO1KtVVnjUMTUNEux_cq1-EIkI=",
- "upgrade-insecure-requests": "1",
- "user-agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/121.0.0.0 Safari/537.36",
- }
- basic_response = requests.get(url=base_url, headers=headers)
- html = etree.HTML(basic_response.text)
- result_list = html.xpath(
- '//div[@class="HorizontalFeedCard searchPageV2__card"]/div[1]/a'
- )
- if result_list:
- for item in result_list:
- try:
- url = item.xpath("@href")[0]
- duration_str = str(item.xpath("./span/text()")[0])
- duration_obj = duration_str.split(":")
- if len(duration_obj) == 3:
- duration = 100000
- elif len(duration_obj) == 2:
- duration = int(duration_str.split(":")[0]) * 60 + int(duration_str.split(":")[1])
- else:
- duration = 10000
- title = item.xpath("@title")[0]
- real_title = bytes(str(title), "latin1").decode()
- if sensitive_flag(sensitive_words, real_title) and duration <= 300:
- try:
- res = XiGuaFunctions().get_video_info(url[1:])
- if res:
- return [res]
- else:
- continue
- except Exception as e:
- print(e)
- except Exception as e:
- print(e)
- return []
- else:
- return []
|