""" @author: luojunhui 西瓜视频搜索爬虫 """ import re import json import base64 import requests import urllib.parse from lxml import etree from Crypto.Cipher import AES from Crypto.Util.Padding import unpad from fake_useragent import FakeUserAgent from applications.functions.common import sensitive_flag class XiGuaFunctions(object): """ XiGuaSearch Class """ @classmethod def tunnel_proxies(cls): """ 快代理方法 :return: """ # 隧道域名:端口号 tunnel = "l901.kdltps.com:15818" # 用户名密码方式 username = "t11983523373311" password = "mtuhdr2z" proxies = { "http": "http://%(user)s:%(pwd)s@%(proxy)s/" % {"user": username, "pwd": password, "proxy": tunnel}, "https": "http://%(user)s:%(pwd)s@%(proxy)s/" % {"user": username, "pwd": password, "proxy": tunnel} } return proxies @classmethod def byte_dance_cookie(cls, item_id): """ 获取西瓜视频的 cookie :param item_id: """ sess = requests.Session() sess.headers.update({ 'user-agent': FakeUserAgent().chrome, 'referer': 'https://www.ixigua.com/home/{}/'.format(item_id), }) # 获取 cookies sess.get('https://i.snssdk.com/slardar/sdk.js?bid=xigua_video_web_pc') data = '{"region":"cn","aid":1768,"needFid":false,"service":"www.ixigua.com","migrate_info":{"ticket":"","source":"node"},"cbUrlProtocol":"https","union":true}' r = sess.post('https://ttwid.bytedance.com/ttwid/union/register/', data=data) if r.json()['redirect_url']: requests.get( url=r.json()['redirect_url'] ) return r.cookies.values()[0] @classmethod def aes_decrypt(cls, data, key): """ XiGua AES decrypt :param data: :param key: :return: """ password = key.encode() iv = password[:16] try: ct = base64.b64decode(data.encode()) cipher = AES.new(password, AES.MODE_CBC, iv) pt = unpad(cipher.decrypt(ct), AES.block_size) return base64.b64decode(pt).decode() except Exception as e: print("Incorrect decryption {}".format(e)) return None @classmethod def extract_video_url(cls, text): """ 获取视频 video_url :param text: :return: """ HTML = etree.HTML(text) str_2 = HTML.xpath('//script[@id="SSR_HYDRATED_DATA"]/text()')[0] json_2 = str_2[str_2.find('{'):str_2.rfind('}') + 1] Irregulars = ['null', 'undefined', '=false', '=true', 'false', 'true'] # python中不规则的定义 for I in Irregulars: if I in ['=false', '=true']: json_2 = json_2.replace(I, '=' + I[1:].capitalize()) else: json_2 = json_2.replace(I, '12') dict_2 = json.loads(json_2)["anyVideo"]["gidInformation"]["packerData"]["video"] duration = dict_2["video_duration"] play_cnt = dict_2['video_watch_count'] publish_time = int(dict_2['video_publish_time']) like_cnt = dict_2['video_like_count'] video_title = dict_2['title'] video_id = dict_2['vid'] video_res = dict_2['videoResource'] cover_url = dict_2['poster_url'].replace("\\u002F", "/") if video_res['dash'] == 12: obj = video_res['normal'] ptk = obj['ptk'] video_list = obj['video_list'] keys = list(video_list.keys()) main_url = video_list[keys[-1]]['main_url'] real_video_url = cls.aes_decrypt(data=main_url, key=ptk) else: obj = video_res['dash'] ptk = obj["ptk"] video_url = obj['dynamic_video']['main_url'] real_video_url = cls.aes_decrypt(data=video_url, key=ptk) return { "video_url": real_video_url, "cover_url": cover_url, "video_id": video_id, "video_title": video_title, "like_cnt": like_cnt, "play_cnt": play_cnt, "publish_time": publish_time, "duration": duration } @classmethod def extract_info_by_re(cls, text): """ 通过正则表达式获取文本中的信息 :param text: :return: """ result = cls.extract_video_url(text) # 标题 title_match = re.search(r']*>(.*?)', text) if title_match: title_content = title_match.group(1) title_content = title_content.split(" - ")[0] try: title_content = bytes(title_content, "latin1").decode() except: title_content = title_content else: title_content = "" result['video_title'] = title_content return result @classmethod def get_video_info(cls, item_id): """ 获取视频信息 """ url = "https://www.ixigua.com/{}".format(item_id) headers = { # "accept-encoding": "gzip, deflate", "accept-language": "zh-CN,zh-Hans;q=0.9", # "cookie": "ttwid={}".format(cls.byte_dance_cookie(item_id)), "cookie": "UIFID=73355a799e41c2edb6d004baa6cda0116425031dff9117e11075ec8bf266082874fe897f43e66be83a0501afe4a08cfc7e1066ab88423af122641493c7af9f0a745eb85c50fddb096de5cc77cd5ff05503312d84d36ab2681c6e6d930bbe68edaebf8fae03b04eb669359965e01c266b;" "__ac_nonce=0666fd1a00053bf535b9f;" "__ac_signature=_02B4Z6wo00f01u8PTiQAAIDBvfBuP-YjUQbvL0qAAN25bWfWXQrzRNCBKvFYKS5wAOYPXg5XV1Ck9JEroeWeWKijH2v3i4lxXM37JogiJJfEtYD.8sbXul2-4v.VRRta4xa07ignRnGj5Voh83;" # "msToken=Pc0sCOhbTxWnGbeqIHMcELMObmtTQGPwloqzOwtfsew-ao5WYnHuhKwE4TL_-88EGh64ec36ggsuqMuV-iBmcF1Gg92ZDGlD89lL6r0MMCg-8srTh1GfNgDnVfFq7g==; " # "tt_scid=wLMuzIiixDpWtXV38R283kz.YIi2x1BE31RggCRLCsFJu204SFWS8Py13xxEPpzZ3b8e;" "ttwid=1%7C9b5sTIuwZxZKt0wFsvE-2t5OoFxH_Q5VIpVNWEREbAo%7C1718605316%7C9dfc9322350e713e6109ed46a7047ed31c0ab5a724e84de0bb766c195043207c", "user-agent": FakeUserAgent().chrome, "referer": "https://www.ixigua.com/{}/".format(item_id), } response = requests.get( url=url, headers=headers ) # print(response.text) video_info = cls.extract_info_by_re(response.text) return video_info # class XiGuaVideoDeal(object): def xigua_search_v2(keyword, sensitive_words): """ Search By KeyWord :param sensitive_words: :param keyword: :return: """ url = "https://www.ixigua.com/api/searchv2/complex/{}/10".format(keyword) params = {} headers = { 'accept': 'application/json, text/plain, */*', 'accept-language': 'en,zh;q=0.9,zh-CN;q=0.8', 'cookie': '_tea_utm_cache_2285=undefined;', 'priority': 'u=1, i', 'referer': 'https://www.ixigua.com/search/{}'.format(urllib.parse.quote(keyword)), 'user-agent': FakeUserAgent().chrome } response = requests.request("GET", url, headers=headers, params=params) try: recall_list = response.json()['data']['data'] if recall_list: for obj in recall_list: if obj['type'] == "video": title = obj['data']['title'] url = obj['data']['group_id'] duration = obj['data']['video_time'] watch_count = obj['data']['video_watch_count'] if sensitive_flag(sensitive_words, title) and duration <= 300: # try: res = XiGuaFunctions().get_video_info(url) if res: return [res] else: continue # except Exception as e: # print(e) return [] else: return [] except Exception as e: return []