""" @author: luojunhui 西瓜视频搜索爬虫 """ import re import json import base64 import requests import urllib.parse from lxml import etree from Crypto.Cipher import AES from Crypto.Util.Padding import unpad from fake_useragent import FakeUserAgent from applications.functions.common import sensitive_flag class XiGuaFunctions(object): """ XiGuaSearch Class """ @classmethod def tunnel_proxies(cls): """ 快代理方法 :return: """ tunnel = "q796.kdltps.com:15818" username = "t17772369458618" password = "5zqcjkmy" proxies = { "http": "http://%(user)s:%(pwd)s@%(proxy)s/" % {"user": username, "pwd": password, "proxy": tunnel}, "https": "http://%(user)s:%(pwd)s@%(proxy)s/" % {"user": username, "pwd": password, "proxy": tunnel} } return proxies @classmethod def byte_dance_cookie(cls, item_id): """ 获取西瓜视频的 cookie :param item_id: """ sess = requests.Session() sess.headers.update({ 'user-agent': FakeUserAgent().chrome, 'referer': 'https://www.ixigua.com/home/{}/'.format(item_id), }) # 获取 cookies sess.get('https://i.snssdk.com/slardar/sdk.js?bid=xigua_video_web_pc') data = '{"region":"cn","aid":1768,"needFid":false,"service":"www.ixigua.com","migrate_info":{"ticket":"","source":"node"},"cbUrlProtocol":"https","union":true}' r = sess.post('https://ttwid.bytedance.com/ttwid/union/register/', data=data) if r.json()['redirect_url']: requests.get( url=r.json()['redirect_url'] ) return r.cookies.values()[0] @classmethod def aes_decrypt(cls, data, key): """ XiGua AES decrypt :param data: :param key: :return: """ password = key.encode() iv = password[:16] try: ct = base64.b64decode(data.encode()) cipher = AES.new(password, AES.MODE_CBC, iv) pt = unpad(cipher.decrypt(ct), AES.block_size) return base64.b64decode(pt).decode() except Exception as e: print("Incorrect decryption {}".format(e)) return None @classmethod def extract_video_url(cls, text): """ 获取视频 video_url :param text: :return: """ HTML = etree.HTML(text) str_2 = HTML.xpath('//script[@id="SSR_HYDRATED_DATA"]/text()')[0] json_2 = str_2[str_2.find('{'):str_2.rfind('}') + 1] Irregulars = ['null', 'undefined', '=false', '=true', 'false', 'true'] # python中不规则的定义 for I in Irregulars: if I in ['=false', '=true']: json_2 = json_2.replace(I, '=' + I[1:].capitalize()) else: json_2 = json_2.replace(I, '12') dict_2 = json.loads(json_2)["anyVideo"]["gidInformation"]["packerData"]["video"] duration = dict_2["video_duration"] play_cnt = dict_2['video_watch_count'] publish_time = int(dict_2['video_publish_time']) like_cnt = dict_2['video_like_count'] video_title = dict_2['title'] video_id = dict_2['vid'] video_res = dict_2['videoResource'] cover_url = dict_2['poster_url'].replace("\\u002F", "/") if video_res['dash'] == 12: obj = video_res['normal'] ptk = obj['ptk'] video_list = obj['video_list'] keys = list(video_list.keys()) main_url = video_list[keys[-1]]['main_url'] real_video_url = cls.aes_decrypt(data=main_url, key=ptk) else: obj = video_res['dash'] ptk = obj["ptk"] video_url = obj['dynamic_video']['main_url'] real_video_url = cls.aes_decrypt(data=video_url, key=ptk) return { "video_url": real_video_url, "cover_url": cover_url, "video_id": video_id, "video_title": video_title, "like_cnt": like_cnt, "play_cnt": play_cnt, "publish_time": publish_time, "duration": duration } @classmethod def extract_info_by_re(cls, text): """ 通过正则表达式获取文本中的信息 :param text: :return: """ result = cls.extract_video_url(text) # 标题 title_match = re.search(r']*>(.*?)', text) if title_match: title_content = title_match.group(1) title_content = title_content.split(" - ")[0] try: title_content = bytes(title_content, "latin1").decode() except: title_content = title_content else: title_content = "" result['video_title'] = title_content return result @classmethod def get_video_info(cls, item_id): """ 获取视频信息 """ url = "https://www.ixigua.com/{}".format(item_id) headers = { "accept-encoding": "gzip, deflate", "accept-language": "zh-CN,zh-Hans;q=0.9", "cookie": "ttwid={}".format(cls.byte_dance_cookie(item_id)), "user-agent": FakeUserAgent().random, "referer": "https://www.ixigua.com/{}/".format(item_id), } response = requests.get( url=url, headers=headers ) video_info = cls.extract_info_by_re(response.text) return video_info def xigua_search(keyword, sensitive_words): """ 搜索 """ keyword = urllib.parse.quote(keyword) base_url = "https://www.ixigua.com/search/{}/ab_name=search&fss=input".format( keyword ) headers = { "authority": "www.ixigua.com", "accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7", "accept-language": "zh,en;q=0.9,zh-CN;q=0.8", "cache-control": "max-age=0", "cookie": "ixigua-a-s=1; support_webp=true; support_avif=true; csrf_session_id=a5355d954d3c63ed1ba35faada452b4d; tt_scid=Ur23fgYD2pMJOvi1BpILyfaobg8wA7IhGwmQx260ULRa8Dvjaxc5ZA63BUIP-6Vi473f; ttwid=1%7CNtTtSp4Iej-v0nWtepdZH3d3Ts6uGNMFzTN20ps1cdo%7C1708236945%7Cc1f301c64aa3bf69cdaa41f28856e2bb7b7eed16583f8c92d50cffa2d9944fc6; msToken=rr418opQf04vm8n9s8FAGdr1AoCUsvAOGKSDPbBEfwVS1sznxxZCvcZTI93qVz5uAXlX9yRwcKlNQZ4wMro2DmlHw5yWHAVeKr_SzgO1KtVVnjUMTUNEux_cq1-EIkI=", "upgrade-insecure-requests": "1", "user-agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/121.0.0.0 Safari/537.36", } basic_response = requests.get(url=base_url, headers=headers) html = etree.HTML(basic_response.text) result_list = html.xpath( '//div[@class="HorizontalFeedCard searchPageV2__card"]/div[1]/a' ) if result_list: for item in result_list: try: url = item.xpath("@href")[0] duration_str = str(item.xpath("./span/text()")[0]) duration_obj = duration_str.split(":") if len(duration_obj) == 3: duration = 100000 elif len(duration_obj) == 2: duration = int(duration_str.split(":")[0]) * 60 + int(duration_str.split(":")[1]) else: duration = 10000 title = item.xpath("@title")[0] real_title = bytes(str(title), "latin1").decode() if sensitive_flag(sensitive_words, real_title) and duration <= 300: try: res = XiGuaFunctions().get_video_info(url[1:]) if res: return [res] else: continue except Exception as e: print(e) except Exception as e: print(e) return [] else: return []