|
@@ -8,6 +8,9 @@ import time
|
|
|
import uuid
|
|
|
import base64
|
|
|
import requests
|
|
|
+from lxml import etree
|
|
|
+from Crypto.Cipher import AES
|
|
|
+from Crypto.Util.Padding import unpad
|
|
|
from fake_useragent import FakeUserAgent
|
|
|
|
|
|
from common.mq import MQ
|
|
@@ -18,6 +21,48 @@ from common import AliyunLogger, PiaoQuanPipeline, tunnel_proxies
|
|
|
from common.limit import AuthorLimit
|
|
|
|
|
|
|
|
|
+def aes_decrypt(data: str, key: str) -> str:
|
|
|
+ """
|
|
|
+ XiGua AES decrypt
|
|
|
+ :param data:
|
|
|
+ :param key:
|
|
|
+ :return:
|
|
|
+ """
|
|
|
+ password = key.encode()
|
|
|
+ iv = password[:16]
|
|
|
+ try:
|
|
|
+ ct = base64.b64decode(data.encode())
|
|
|
+ cipher = AES.new(password, AES.MODE_CBC, iv)
|
|
|
+ pt = unpad(cipher.decrypt(ct), AES.block_size)
|
|
|
+ return base64.b64decode(pt).decode()
|
|
|
+ except Exception as e:
|
|
|
+ print("Incorrect decryption {}".format(e))
|
|
|
+ return None
|
|
|
+
|
|
|
+
|
|
|
+def extract_video_url(text):
|
|
|
+ """
|
|
|
+ 获取视频 video_url
|
|
|
+ :param text:
|
|
|
+ :return:
|
|
|
+ """
|
|
|
+ HTML = etree.HTML(text)
|
|
|
+ str_2 = HTML.xpath('//script[@id="SSR_HYDRATED_DATA"]/text()')[0]
|
|
|
+ json_2 = str_2[str_2.find('{'):str_2.rfind('}') + 1]
|
|
|
+ Irregulars = ['null', 'undefined', '=false', '=true', 'false', 'true']
|
|
|
+ # python中不规则的定义
|
|
|
+ for I in Irregulars:
|
|
|
+ if I in ['=false', '=true']:
|
|
|
+ json_2 = json_2.replace(I, '=' + I[1:].capitalize())
|
|
|
+ else:
|
|
|
+ json_2 = json_2.replace(I, '12')
|
|
|
+ dict_2 = json.loads(json_2)["anyVideo"]["gidInformation"]["packerData"]["video"]["videoResource"]["dash"]
|
|
|
+ ptk = dict_2["ptk"]
|
|
|
+ video_url = dict_2['dynamic_video']['main_url']
|
|
|
+ real_video_url = aes_decrypt(data=video_url, key=ptk)
|
|
|
+ return real_video_url
|
|
|
+
|
|
|
+
|
|
|
def extract_info_by_re(text):
|
|
|
"""
|
|
|
通过正则表达式获取文本中的信息
|
|
@@ -32,16 +77,6 @@ def extract_info_by_re(text):
|
|
|
title_content = bytes(title_content, "latin1").decode()
|
|
|
else:
|
|
|
title_content = ""
|
|
|
- # video_url
|
|
|
- main_url = re.search(r'("main_url":")(.*?)"', text)[0]
|
|
|
- main_url = main_url.split(":")[1]
|
|
|
- decoded_data = base64.b64decode(main_url)
|
|
|
- try:
|
|
|
- # 尝试使用utf-8解码
|
|
|
- video_url = decoded_data.decode()
|
|
|
- except UnicodeDecodeError:
|
|
|
- # 如果utf-8解码失败,尝试使用其他编码方式
|
|
|
- video_url = decoded_data.decode('latin-1')
|
|
|
|
|
|
# video_id
|
|
|
video_id = re.search(r'"vid":"(.*?)"', text).group(1)
|
|
@@ -60,9 +95,10 @@ def extract_info_by_re(text):
|
|
|
|
|
|
# video_duration
|
|
|
duration = re.search(r'("video_duration":)(.*?)"', text).group(2).replace(",", "")
|
|
|
+
|
|
|
return {
|
|
|
"title": title_content,
|
|
|
- "url": video_url,
|
|
|
+ "url": extract_video_url(text),
|
|
|
"video_id": video_id,
|
|
|
"like_count": like_count,
|
|
|
"cover_url": cover_url,
|
|
@@ -71,6 +107,7 @@ def extract_info_by_re(text):
|
|
|
"duration": duration
|
|
|
}
|
|
|
|
|
|
+
|
|
|
def random_signature():
|
|
|
"""
|
|
|
随机生成签名
|
|
@@ -102,6 +139,7 @@ def random_signature():
|
|
|
new_password = new_password_start + "y" + new_password_end
|
|
|
return new_password
|
|
|
|
|
|
+
|
|
|
def byte_dance_cookie(item_id):
|
|
|
"""
|
|
|
获取西瓜视频的 cookie
|
|
@@ -120,6 +158,7 @@ def byte_dance_cookie(item_id):
|
|
|
# print(r.text)
|
|
|
return r.cookies.values()[0]
|
|
|
|
|
|
+
|
|
|
def get_video_url(video_info):
|
|
|
"""
|
|
|
获取视频的链接
|
|
@@ -1087,29 +1126,29 @@ class XiGuaAuthor:
|
|
|
|
|
|
video_info = extract_info_by_re(response.text)
|
|
|
video_dict = {
|
|
|
- "video_title": video_info.get("title", ""),
|
|
|
- "video_id": video_info.get("video_id"),
|
|
|
- "gid": str(item_id),
|
|
|
- "play_cnt": int(video_info.get("play_count", 0)),
|
|
|
- "like_cnt": int(video_info.get("like_count", 0)),
|
|
|
- "comment_cnt": 0,
|
|
|
- "share_cnt": 0,
|
|
|
- "favorite_cnt": 0,
|
|
|
- "duration": int(video_info.get("duration", 0)),
|
|
|
- "video_width": 0,
|
|
|
- "video_height": 0,
|
|
|
- "publish_time_stamp": int(video_info.get("publish_time", 0)),
|
|
|
- "publish_time_str": time.strftime(
|
|
|
- "%Y-%m-%d %H:%M:%S",
|
|
|
- time.localtime(int(video_info.get("publish_time", 0))),
|
|
|
- ),
|
|
|
- "avatar_url": str(
|
|
|
- video_info.get("user_info", {}).get("avatar_url", "")
|
|
|
- ),
|
|
|
- "cover_url": video_info.get("cover_url", "").replace("\\u002F", "/"),
|
|
|
- "video_url": video_info.get("url"),
|
|
|
- "session": f"xigua-author-{int(time.time())}",
|
|
|
- }
|
|
|
+ "video_title": video_info.get("title", ""),
|
|
|
+ "video_id": video_info.get("video_id"),
|
|
|
+ "gid": str(item_id),
|
|
|
+ "play_cnt": int(video_info.get("play_count", 0)),
|
|
|
+ "like_cnt": int(video_info.get("like_count", 0)),
|
|
|
+ "comment_cnt": 0,
|
|
|
+ "share_cnt": 0,
|
|
|
+ "favorite_cnt": 0,
|
|
|
+ "duration": int(video_info.get("duration", 0)),
|
|
|
+ "video_width": 0,
|
|
|
+ "video_height": 0,
|
|
|
+ "publish_time_stamp": int(video_info.get("publish_time", 0)),
|
|
|
+ "publish_time_str": time.strftime(
|
|
|
+ "%Y-%m-%d %H:%M:%S",
|
|
|
+ time.localtime(int(video_info.get("publish_time", 0))),
|
|
|
+ ),
|
|
|
+ "avatar_url": str(
|
|
|
+ video_info.get("user_info", {}).get("avatar_url", "")
|
|
|
+ ),
|
|
|
+ "cover_url": video_info.get("cover_url", "").replace("\\u002F", "/"),
|
|
|
+ "video_url": video_info.get("url"),
|
|
|
+ "session": f"xigua-author-{int(time.time())}",
|
|
|
+ }
|
|
|
return video_dict
|
|
|
|
|
|
|