|
@@ -4,183 +4,188 @@
|
|
"""
|
|
"""
|
|
import re
|
|
import re
|
|
import json
|
|
import json
|
|
-import time
|
|
|
|
-import random
|
|
|
|
import base64
|
|
import base64
|
|
|
|
+import requests
|
|
import urllib.parse
|
|
import urllib.parse
|
|
|
|
|
|
-import requests
|
|
|
|
from lxml import etree
|
|
from lxml import etree
|
|
from Crypto.Cipher import AES
|
|
from Crypto.Cipher import AES
|
|
from Crypto.Util.Padding import unpad
|
|
from Crypto.Util.Padding import unpad
|
|
from fake_useragent import FakeUserAgent
|
|
from fake_useragent import FakeUserAgent
|
|
|
|
|
|
-
|
|
|
|
-def byte_dance_cookie(item_id):
|
|
|
|
- """
|
|
|
|
- 获取西瓜视频的 cookie
|
|
|
|
- :param item_id:
|
|
|
|
- """
|
|
|
|
- sess = requests.Session()
|
|
|
|
- sess.headers.update({
|
|
|
|
- 'user-agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 11_1_0) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/87.0.4280.88 Safari/537.36',
|
|
|
|
- 'referer': 'https://www.ixigua.com/home/{}/'.format(item_id),
|
|
|
|
- })
|
|
|
|
-
|
|
|
|
- # 获取 cookies
|
|
|
|
- sess.get('https://i.snssdk.com/slardar/sdk.js?bid=xigua_video_web_pc')
|
|
|
|
- data = '{"region":"cn","aid":1768,"needFid":false,"service":"www.ixigua.com","migrate_info":{"ticket":"","source":"node"},"cbUrlProtocol":"https","union":true}'
|
|
|
|
- r = sess.post('https://ttwid.bytedance.com/ttwid/union/register/', data=data)
|
|
|
|
- # print(r.text)
|
|
|
|
- return r.cookies.values()[0]
|
|
|
|
|
|
+from applications.functions.common import MySQLServer
|
|
|
|
|
|
|
|
|
|
-def aes_decrypt(data: str, key: str) -> str:
|
|
|
|
|
|
+class XiGuaFunctions(object):
|
|
"""
|
|
"""
|
|
- XiGua AES decrypt
|
|
|
|
- :param data:
|
|
|
|
- :param key:
|
|
|
|
- :return:
|
|
|
|
|
|
+ XiGuaSearch Class
|
|
"""
|
|
"""
|
|
- password = key.encode()
|
|
|
|
- iv = password[:16]
|
|
|
|
- try:
|
|
|
|
- ct = base64.b64decode(data.encode())
|
|
|
|
- cipher = AES.new(password, AES.MODE_CBC, iv)
|
|
|
|
- pt = unpad(cipher.decrypt(ct), AES.block_size)
|
|
|
|
- return base64.b64decode(pt).decode()
|
|
|
|
- except Exception as e:
|
|
|
|
- print("Incorrect decryption {}".format(e))
|
|
|
|
- return None
|
|
|
|
-
|
|
|
|
|
|
|
|
-def extract_video_url(text):
|
|
|
|
- """
|
|
|
|
- 获取视频 video_url
|
|
|
|
- :param text:
|
|
|
|
- :return:
|
|
|
|
- """
|
|
|
|
- HTML = etree.HTML(text)
|
|
|
|
- str_2 = HTML.xpath('//script[@id="SSR_HYDRATED_DATA"]/text()')[0]
|
|
|
|
- json_2 = str_2[str_2.find('{'):str_2.rfind('}') + 1]
|
|
|
|
- Irregulars = ['null', 'undefined', '=false', '=true', 'false', 'true']
|
|
|
|
- # python中不规则的定义
|
|
|
|
- for I in Irregulars:
|
|
|
|
- if I in ['=false', '=true']:
|
|
|
|
- json_2 = json_2.replace(I, '=' + I[1:].capitalize())
|
|
|
|
|
|
+ @classmethod
|
|
|
|
+ def tunnel_proxies(cls):
|
|
|
|
+ """
|
|
|
|
+ 快代理方法
|
|
|
|
+ :return:
|
|
|
|
+ """
|
|
|
|
+ tunnel = "q796.kdltps.com:15818"
|
|
|
|
+ username = "t17772369458618"
|
|
|
|
+ password = "5zqcjkmy"
|
|
|
|
+ proxies = {
|
|
|
|
+ "http": "http://%(user)s:%(pwd)s@%(proxy)s/" % {"user": username, "pwd": password, "proxy": tunnel},
|
|
|
|
+ "https": "http://%(user)s:%(pwd)s@%(proxy)s/" % {"user": username, "pwd": password, "proxy": tunnel}
|
|
|
|
+ }
|
|
|
|
+ return proxies
|
|
|
|
+
|
|
|
|
+ @classmethod
|
|
|
|
+ def byte_dance_cookie(cls, item_id):
|
|
|
|
+ """
|
|
|
|
+ 获取西瓜视频的 cookie
|
|
|
|
+ :param item_id:
|
|
|
|
+ """
|
|
|
|
+ sess = requests.Session()
|
|
|
|
+ sess.headers.update({
|
|
|
|
+ 'user-agent': FakeUserAgent().chrome,
|
|
|
|
+ 'referer': 'https://www.ixigua.com/home/{}/'.format(item_id),
|
|
|
|
+ })
|
|
|
|
+
|
|
|
|
+ # 获取 cookies
|
|
|
|
+ sess.get('https://i.snssdk.com/slardar/sdk.js?bid=xigua_video_web_pc')
|
|
|
|
+ data = '{"region":"cn","aid":1768,"needFid":false,"service":"www.ixigua.com","migrate_info":{"ticket":"","source":"node"},"cbUrlProtocol":"https","union":true}'
|
|
|
|
+ r = sess.post('https://ttwid.bytedance.com/ttwid/union/register/', data=data)
|
|
|
|
+ if r.json()['redirect_url']:
|
|
|
|
+ requests.get(
|
|
|
|
+ url=r.json()['redirect_url']
|
|
|
|
+ )
|
|
|
|
+ return r.cookies.values()[0]
|
|
|
|
+
|
|
|
|
+ @classmethod
|
|
|
|
+ def aes_decrypt(cls, data, key):
|
|
|
|
+ """
|
|
|
|
+ XiGua AES decrypt
|
|
|
|
+ :param data:
|
|
|
|
+ :param key:
|
|
|
|
+ :return:
|
|
|
|
+ """
|
|
|
|
+ password = key.encode()
|
|
|
|
+ iv = password[:16]
|
|
|
|
+ try:
|
|
|
|
+ ct = base64.b64decode(data.encode())
|
|
|
|
+ cipher = AES.new(password, AES.MODE_CBC, iv)
|
|
|
|
+ pt = unpad(cipher.decrypt(ct), AES.block_size)
|
|
|
|
+ return base64.b64decode(pt).decode()
|
|
|
|
+ except Exception as e:
|
|
|
|
+ print("Incorrect decryption {}".format(e))
|
|
|
|
+ return None
|
|
|
|
+
|
|
|
|
+ @classmethod
|
|
|
|
+ def extract_video_url(cls, text):
|
|
|
|
+ """
|
|
|
|
+ 获取视频 video_url
|
|
|
|
+ :param text:
|
|
|
|
+ :return:
|
|
|
|
+ """
|
|
|
|
+ HTML = etree.HTML(text)
|
|
|
|
+ str_2 = HTML.xpath('//script[@id="SSR_HYDRATED_DATA"]/text()')[0]
|
|
|
|
+ json_2 = str_2[str_2.find('{'):str_2.rfind('}') + 1]
|
|
|
|
+ Irregulars = ['null', 'undefined', '=false', '=true', 'false', 'true']
|
|
|
|
+ # python中不规则的定义
|
|
|
|
+ for I in Irregulars:
|
|
|
|
+ if I in ['=false', '=true']:
|
|
|
|
+ json_2 = json_2.replace(I, '=' + I[1:].capitalize())
|
|
|
|
+ else:
|
|
|
|
+ json_2 = json_2.replace(I, '12')
|
|
|
|
+ dict_2 = json.loads(json_2)["anyVideo"]["gidInformation"]["packerData"]["video"]
|
|
|
|
+ duration = dict_2["video_duration"]
|
|
|
|
+ play_cnt = dict_2['video_watch_count']
|
|
|
|
+ publish_time = int(dict_2['video_publish_time'])
|
|
|
|
+ like_cnt = dict_2['video_like_count']
|
|
|
|
+ video_title = dict_2['title']
|
|
|
|
+ video_id = dict_2['vid']
|
|
|
|
+ video_res = dict_2['videoResource']
|
|
|
|
+ cover_url = dict_2['poster_url'].replace("\\u002F", "/")
|
|
|
|
+ if video_res['dash'] == 12:
|
|
|
|
+ obj = video_res['normal']
|
|
|
|
+ ptk = obj['ptk']
|
|
|
|
+ video_list = obj['video_list']
|
|
|
|
+ keys = list(video_list.keys())
|
|
|
|
+ main_url = video_list[keys[-1]]['main_url']
|
|
|
|
+ real_video_url = cls.aes_decrypt(data=main_url, key=ptk)
|
|
else:
|
|
else:
|
|
- json_2 = json_2.replace(I, '12')
|
|
|
|
- dict_2 = json.loads(json_2)["anyVideo"]["gidInformation"]["packerData"]["video"]["videoResource"]
|
|
|
|
- if dict_2['dash'] == 12:
|
|
|
|
- obj = dict_2['normal']
|
|
|
|
- ptk = obj['ptk']
|
|
|
|
- main_url = obj['video_list']['video_3']['main_url']
|
|
|
|
- real_video_url = aes_decrypt(data=main_url, key=ptk)
|
|
|
|
- else:
|
|
|
|
- obj = dict_2['dash']
|
|
|
|
- ptk = obj["ptk"]
|
|
|
|
- video_url = obj['dynamic_video']['main_url']
|
|
|
|
- real_video_url = aes_decrypt(data=video_url, key=ptk)
|
|
|
|
- return real_video_url
|
|
|
|
-
|
|
|
|
-
|
|
|
|
-def extract_info_by_re(text):
|
|
|
|
- """
|
|
|
|
- 通过正则表达式获取文本中的信息
|
|
|
|
- :param text:
|
|
|
|
- :return:
|
|
|
|
- """
|
|
|
|
- # 标题
|
|
|
|
- title_match = re.search(r'<title[^>]*>(.*?)</title>', text)
|
|
|
|
- if title_match:
|
|
|
|
- title_content = title_match.group(1)
|
|
|
|
- title_content = title_content.split(" - ")[0]
|
|
|
|
- title_content = bytes(title_content, "latin1").decode()
|
|
|
|
- else:
|
|
|
|
- title_content = ""
|
|
|
|
-
|
|
|
|
- # video_id
|
|
|
|
- video_id = re.search(r'"vid":"(.*?)"', text).group(1)
|
|
|
|
-
|
|
|
|
- # like_count
|
|
|
|
- like_count = re.search(r'"video_like_count":(.*?),', text).group(1)
|
|
|
|
-
|
|
|
|
- # cover_url
|
|
|
|
- cover_url = re.search(r'"avatar_url":"(.*?)"', text).group(1)
|
|
|
|
-
|
|
|
|
- # video_play
|
|
|
|
- video_watch_count = re.search(r'"video_watch_count":(.*?),', text).group(1)
|
|
|
|
-
|
|
|
|
- # "video_publish_time"
|
|
|
|
- publish_time = re.search(r'"video_publish_time":"(.*?)"', text).group(1)
|
|
|
|
-
|
|
|
|
- # video_duration
|
|
|
|
- duration = re.search(r'("video_duration":)(.*?)"', text).group(2).replace(",", "")
|
|
|
|
-
|
|
|
|
- return {
|
|
|
|
- "title": title_content,
|
|
|
|
- "url": extract_video_url(text),
|
|
|
|
- "video_id": video_id,
|
|
|
|
- "like_count": like_count,
|
|
|
|
- "cover_url": cover_url,
|
|
|
|
- "play_count": video_watch_count,
|
|
|
|
- "publish_time": publish_time,
|
|
|
|
- "duration": duration
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
-
|
|
|
|
-def get_video_info(item_id):
|
|
|
|
- """
|
|
|
|
- 获取视频信息
|
|
|
|
- """
|
|
|
|
- url = "https://www.ixigua.com/{}".format(item_id)
|
|
|
|
- headers = {
|
|
|
|
- "accept-encoding": "gzip, deflate",
|
|
|
|
- "accept-language": "zh-CN,zh-Hans;q=0.9",
|
|
|
|
- "cookie": "ttwid={}".format(byte_dance_cookie(item_id)),
|
|
|
|
- "user-agent": FakeUserAgent().random,
|
|
|
|
- "referer": "https://www.ixigua.com/{}/".format(item_id),
|
|
|
|
- }
|
|
|
|
- response = requests.get(
|
|
|
|
- url=url,
|
|
|
|
- headers=headers,
|
|
|
|
- # proxies=tunnel_proxies(),
|
|
|
|
- timeout=5,
|
|
|
|
- )
|
|
|
|
- time.sleep(random.randint(1, 5))
|
|
|
|
- video_info = extract_info_by_re(response.text)
|
|
|
|
-
|
|
|
|
- video_dict = {
|
|
|
|
- "video_title": video_info.get("title", ""),
|
|
|
|
- "video_id": video_info.get("video_id"),
|
|
|
|
- "gid": str(item_id),
|
|
|
|
- "play_cnt": int(video_info.get("play_count", 0)),
|
|
|
|
- "like_cnt": int(video_info.get("like_count", 0)),
|
|
|
|
- "comment_cnt": 0,
|
|
|
|
- "share_cnt": 0,
|
|
|
|
- "favorite_cnt": 0,
|
|
|
|
- "duration": int(video_info.get("duration", 0)),
|
|
|
|
- "video_width": 0,
|
|
|
|
- "video_height": 0,
|
|
|
|
- "publish_time_stamp": int(video_info.get("publish_time", 0)),
|
|
|
|
- "publish_time_str": time.strftime(
|
|
|
|
- "%Y-%m-%d %H:%M:%S",
|
|
|
|
- time.localtime(int(video_info.get("publish_time", 0))),
|
|
|
|
- ),
|
|
|
|
- "avatar_url": str(
|
|
|
|
- video_info.get("user_info", {}).get("avatar_url", "")
|
|
|
|
- ),
|
|
|
|
- "cover_url": video_info.get("cover_url", "").replace("\\u002F", "/"),
|
|
|
|
- "video_url": video_info.get("url"),
|
|
|
|
- "session": f"xigua-author-{int(time.time())}",
|
|
|
|
- }
|
|
|
|
- return video_dict
|
|
|
|
|
|
+ obj = video_res['dash']
|
|
|
|
+ ptk = obj["ptk"]
|
|
|
|
+ video_url = obj['dynamic_video']['main_url']
|
|
|
|
+ real_video_url = cls.aes_decrypt(data=video_url, key=ptk)
|
|
|
|
+ return {
|
|
|
|
+ "video_url": real_video_url,
|
|
|
|
+ "cover_url": cover_url,
|
|
|
|
+ "video_id": video_id,
|
|
|
|
+ "video_title": video_title,
|
|
|
|
+ "like_cnt": like_cnt,
|
|
|
|
+ "play_cnt": play_cnt,
|
|
|
|
+ "publish_time": publish_time,
|
|
|
|
+ "duration": duration
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ @classmethod
|
|
|
|
+ def extract_info_by_re(cls, text):
|
|
|
|
+ """
|
|
|
|
+ 通过正则表达式获取文本中的信息
|
|
|
|
+ :param text:
|
|
|
|
+ :return:
|
|
|
|
+ """
|
|
|
|
+ result = cls.extract_video_url(text)
|
|
|
|
+ # 标题
|
|
|
|
+ title_match = re.search(r'<title[^>]*>(.*?)</title>', text)
|
|
|
|
+ if title_match:
|
|
|
|
+ title_content = title_match.group(1)
|
|
|
|
+ title_content = title_content.split(" - ")[0]
|
|
|
|
+ try:
|
|
|
|
+ title_content = bytes(title_content, "latin1").decode()
|
|
|
|
+ except:
|
|
|
|
+ title_content = title_content
|
|
|
|
+ else:
|
|
|
|
+ title_content = ""
|
|
|
|
+ result['video_title'] = title_content
|
|
|
|
+ return result
|
|
|
|
+
|
|
|
|
+ @classmethod
|
|
|
|
+ def get_video_info(cls, item_id):
|
|
|
|
+ """
|
|
|
|
+ 获取视频信息
|
|
|
|
+ """
|
|
|
|
+ url = "https://www.ixigua.com/{}".format(item_id)
|
|
|
|
+ headers = {
|
|
|
|
+ "accept-encoding": "gzip, deflate",
|
|
|
|
+ "accept-language": "zh-CN,zh-Hans;q=0.9",
|
|
|
|
+ "cookie": "ttwid={}".format(cls.byte_dance_cookie(item_id)),
|
|
|
|
+ "user-agent": FakeUserAgent().random,
|
|
|
|
+ "referer": "https://www.ixigua.com/{}/".format(item_id),
|
|
|
|
+ }
|
|
|
|
+ response = requests.get(
|
|
|
|
+ url=url,
|
|
|
|
+ headers=headers
|
|
|
|
+ )
|
|
|
|
+ video_info = cls.extract_info_by_re(response.text)
|
|
|
|
+ return video_info
|
|
|
|
|
|
|
|
|
|
def xigua_search(keyword):
|
|
def xigua_search(keyword):
|
|
"""
|
|
"""
|
|
搜索
|
|
搜索
|
|
"""
|
|
"""
|
|
|
|
+ sensitive_words = MySQLServer().select_sensitive_words()
|
|
|
|
+
|
|
|
|
+ def sensitive_flag(s_words, ori_title):
|
|
|
|
+ """
|
|
|
|
+ :param ori_title:
|
|
|
|
+ :param s_words:
|
|
|
|
+ :return:
|
|
|
|
+ """
|
|
|
|
+ for word in s_words:
|
|
|
|
+ if word in ori_title:
|
|
|
|
+ return False
|
|
|
|
+ return True
|
|
|
|
+
|
|
keyword = urllib.parse.quote(keyword)
|
|
keyword = urllib.parse.quote(keyword)
|
|
base_url = "https://www.ixigua.com/search/{}/ab_name=search&fss=input".format(
|
|
base_url = "https://www.ixigua.com/search/{}/ab_name=search&fss=input".format(
|
|
keyword
|
|
keyword
|
|
@@ -191,13 +196,6 @@ def xigua_search(keyword):
|
|
"accept-language": "zh,en;q=0.9,zh-CN;q=0.8",
|
|
"accept-language": "zh,en;q=0.9,zh-CN;q=0.8",
|
|
"cache-control": "max-age=0",
|
|
"cache-control": "max-age=0",
|
|
"cookie": "ixigua-a-s=1; support_webp=true; support_avif=true; csrf_session_id=a5355d954d3c63ed1ba35faada452b4d; tt_scid=Ur23fgYD2pMJOvi1BpILyfaobg8wA7IhGwmQx260ULRa8Dvjaxc5ZA63BUIP-6Vi473f; ttwid=1%7CNtTtSp4Iej-v0nWtepdZH3d3Ts6uGNMFzTN20ps1cdo%7C1708236945%7Cc1f301c64aa3bf69cdaa41f28856e2bb7b7eed16583f8c92d50cffa2d9944fc6; msToken=rr418opQf04vm8n9s8FAGdr1AoCUsvAOGKSDPbBEfwVS1sznxxZCvcZTI93qVz5uAXlX9yRwcKlNQZ4wMro2DmlHw5yWHAVeKr_SzgO1KtVVnjUMTUNEux_cq1-EIkI=",
|
|
"cookie": "ixigua-a-s=1; support_webp=true; support_avif=true; csrf_session_id=a5355d954d3c63ed1ba35faada452b4d; tt_scid=Ur23fgYD2pMJOvi1BpILyfaobg8wA7IhGwmQx260ULRa8Dvjaxc5ZA63BUIP-6Vi473f; ttwid=1%7CNtTtSp4Iej-v0nWtepdZH3d3Ts6uGNMFzTN20ps1cdo%7C1708236945%7Cc1f301c64aa3bf69cdaa41f28856e2bb7b7eed16583f8c92d50cffa2d9944fc6; msToken=rr418opQf04vm8n9s8FAGdr1AoCUsvAOGKSDPbBEfwVS1sznxxZCvcZTI93qVz5uAXlX9yRwcKlNQZ4wMro2DmlHw5yWHAVeKr_SzgO1KtVVnjUMTUNEux_cq1-EIkI=",
|
|
- "sec-ch-ua": '"Not A(Brand";v="99", "Google Chrome";v="121", "Chromium";v="121"',
|
|
|
|
- "sec-ch-ua-mobile": "?0",
|
|
|
|
- "sec-ch-ua-platform": '"macOS"',
|
|
|
|
- "sec-fetch-dest": "document",
|
|
|
|
- "sec-fetch-mode": "navigate",
|
|
|
|
- "sec-fetch-site": "none",
|
|
|
|
- "sec-fetch-user": "?1",
|
|
|
|
"upgrade-insecure-requests": "1",
|
|
"upgrade-insecure-requests": "1",
|
|
"user-agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/121.0.0.0 Safari/537.36",
|
|
"user-agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/121.0.0.0 Safari/537.36",
|
|
}
|
|
}
|
|
@@ -206,14 +204,20 @@ def xigua_search(keyword):
|
|
result = html.xpath(
|
|
result = html.xpath(
|
|
'//a[@class="HorizontalFeedCard__coverWrapper disableZoomAnimation"]/@href'
|
|
'//a[@class="HorizontalFeedCard__coverWrapper disableZoomAnimation"]/@href'
|
|
)
|
|
)
|
|
- res_list = []
|
|
|
|
- for page_id in result[:5]:
|
|
|
|
- doc_id = page_id[1:].split("?")[0]
|
|
|
|
- try:
|
|
|
|
- res = get_video_info(doc_id)
|
|
|
|
- temp = ["xigua", res['video_title'], res['video_url'], "https://www.ixigua.com/{}".format(doc_id)]
|
|
|
|
- res_list.append(temp)
|
|
|
|
- except:
|
|
|
|
- pass
|
|
|
|
- return res_list
|
|
|
|
-
|
|
|
|
|
|
+ if result:
|
|
|
|
+ L = []
|
|
|
|
+ doc_id_list = [page_id[1:] for page_id in result]
|
|
|
|
+ for doc_id in doc_id_list:
|
|
|
|
+ try:
|
|
|
|
+ video_d = XiGuaFunctions().get_video_info(doc_id)
|
|
|
|
+ video_title = video_d['video_title']
|
|
|
|
+ if sensitive_flag(sensitive_words, video_title) and int(video_d['duration']) <= 300:
|
|
|
|
+ L.append(video_d)
|
|
|
|
+ else:
|
|
|
|
+ continue
|
|
|
|
+ except Exception as e:
|
|
|
|
+ print(e)
|
|
|
|
+ continue
|
|
|
|
+ return L
|
|
|
|
+ else:
|
|
|
|
+ return []
|