import json
import os
import re
import random
import sys
import string
import time
import uuid
import base64
import requests
from lxml import etree
from Crypto.Cipher import AES
from Crypto.Util.Padding import unpad
from fake_useragent import FakeUserAgent
from common.mq import MQ
sys.path.append(os.getcwd())
from common import AliyunLogger, PiaoQuanPipeline, tunnel_proxies
from common.limit import AuthorLimit
def aes_decrypt(data: str, key: str) -> str:
"""
XiGua AES decrypt
:param data:
:param key:
:return:
"""
password = key.encode()
iv = password[:16]
try:
ct = base64.b64decode(data.encode())
cipher = AES.new(password, AES.MODE_CBC, iv)
pt = unpad(cipher.decrypt(ct), AES.block_size)
return base64.b64decode(pt).decode()
except Exception as e:
print("Incorrect decryption {}".format(e))
return None
def extract_video_url(text):
"""
获取视频 video_url
:param text:
:return:
"""
HTML = etree.HTML(text)
str_2 = HTML.xpath('//script[@id="SSR_HYDRATED_DATA"]/text()')[0]
json_2 = str_2[str_2.find('{'):str_2.rfind('}') + 1]
Irregulars = ['null', 'undefined', '=false', '=true', 'false', 'true']
# python中不规则的定义
for I in Irregulars:
if I in ['=false', '=true']:
json_2 = json_2.replace(I, '=' + I[1:].capitalize())
else:
json_2 = json_2.replace(I, '12')
dict_2 = json.loads(json_2)["anyVideo"]["gidInformation"]["packerData"]["video"]["videoResource"]["dash"]
ptk = dict_2["ptk"]
video_url = dict_2['dynamic_video']['main_url']
real_video_url = aes_decrypt(data=video_url, key=ptk)
return real_video_url
def extract_info_by_re(text):
"""
通过正则表达式获取文本中的信息
:param text:
:return:
"""
# 标题
title_match = re.search(r'
]*>(.*?)', text)
if title_match:
title_content = title_match.group(1)
title_content = title_content.split(" - ")[0]
title_content = bytes(title_content, "latin1").decode()
else:
title_content = ""
# video_id
video_id = re.search(r'"vid":"(.*?)"', text).group(1)
# like_count
like_count = re.search(r'"video_like_count":(.*?),', text).group(1)
# cover_url
cover_url = re.search(r'"avatar_url":"(.*?)"', text).group(1)
# video_play
video_watch_count = re.search(r'"video_watch_count":(.*?),', text).group(1)
# "video_publish_time"
publish_time = re.search(r'"video_publish_time":"(.*?)"', text).group(1)
# video_duration
duration = re.search(r'("video_duration":)(.*?)"', text).group(2).replace(",", "")
return {
"title": title_content,
"url": extract_video_url(text),
"video_id": video_id,
"like_count": like_count,
"cover_url": cover_url,
"play_count": video_watch_count,
"publish_time": publish_time,
"duration": duration
}
def random_signature():
"""
随机生成签名
"""
src_digits = string.digits # string_数字
src_uppercase = string.ascii_uppercase # string_大写字母
src_lowercase = string.ascii_lowercase # string_小写字母
digits_num = random.randint(1, 6)
uppercase_num = random.randint(1, 26 - digits_num - 1)
lowercase_num = 26 - (digits_num + uppercase_num)
password = (
random.sample(src_digits, digits_num)
+ random.sample(src_uppercase, uppercase_num)
+ random.sample(src_lowercase, lowercase_num)
)
random.shuffle(password)
new_password = "AAAAAAAAAA" + "".join(password)[10:-4] + "AAAB"
new_password_start = new_password[0:18]
new_password_end = new_password[-7:]
if new_password[18] == "8":
new_password = new_password_start + "w" + new_password_end
elif new_password[18] == "9":
new_password = new_password_start + "x" + new_password_end
elif new_password[18] == "-":
new_password = new_password_start + "y" + new_password_end
elif new_password[18] == ".":
new_password = new_password_start + "z" + new_password_end
else:
new_password = new_password_start + "y" + new_password_end
return new_password
def byte_dance_cookie(item_id):
"""
获取西瓜视频的 cookie
:param item_id:
"""
sess = requests.Session()
sess.headers.update({
'user-agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 11_1_0) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/87.0.4280.88 Safari/537.36',
'referer': 'https://www.ixigua.com/home/{}/'.format(item_id),
})
# 获取 cookies
sess.get('https://i.snssdk.com/slardar/sdk.js?bid=xigua_video_web_pc')
data = '{"region":"cn","aid":1768,"needFid":false,"service":"www.ixigua.com","migrate_info":{"ticket":"","source":"node"},"cbUrlProtocol":"https","union":true}'
r = sess.post('https://ttwid.bytedance.com/ttwid/union/register/', data=data)
# print(r.text)
return r.cookies.values()[0]
def get_video_url(video_info):
"""
获取视频的链接
"""
video_url_dict = {}
# video_url
if "videoResource" not in video_info:
video_url_dict["video_url"] = ""
video_url_dict["audio_url"] = ""
video_url_dict["video_width"] = 0
video_url_dict["video_height"] = 0
elif "dash_120fps" in video_info["videoResource"]:
if (
"video_list" in video_info["videoResource"]["dash_120fps"]
and "video_4" in video_info["videoResource"]["dash_120fps"]["video_list"]
):
video_url = video_info["videoResource"]["dash_120fps"]["video_list"][
"video_4"
]["backup_url_1"]
audio_url = video_info["videoResource"]["dash_120fps"]["video_list"][
"video_4"
]["backup_url_1"]
if len(video_url) % 3 == 1:
video_url += "=="
elif len(video_url) % 3 == 2:
video_url += "="
elif len(audio_url) % 3 == 1:
audio_url += "=="
elif len(audio_url) % 3 == 2:
audio_url += "="
video_url = base64.b64decode(video_url).decode("utf8")
audio_url = base64.b64decode(audio_url).decode("utf8")
video_width = video_info["videoResource"]["dash_120fps"]["video_list"][
"video_4"
]["vwidth"]
video_height = video_info["videoResource"]["dash_120fps"]["video_list"][
"video_4"
]["vheight"]
video_url_dict["video_url"] = video_url
video_url_dict["audio_url"] = audio_url
video_url_dict["video_width"] = video_width
video_url_dict["video_height"] = video_height
elif (
"video_list" in video_info["videoResource"]["dash_120fps"]
and "video_3" in video_info["videoResource"]["dash_120fps"]["video_list"]
):
video_url = video_info["videoResource"]["dash_120fps"]["video_list"][
"video_3"
]["backup_url_1"]
audio_url = video_info["videoResource"]["dash_120fps"]["video_list"][
"video_3"
]["backup_url_1"]
if len(video_url) % 3 == 1:
video_url += "=="
elif len(video_url) % 3 == 2:
video_url += "="
elif len(audio_url) % 3 == 1:
audio_url += "=="
elif len(audio_url) % 3 == 2:
audio_url += "="
video_url = base64.b64decode(video_url).decode("utf8")
audio_url = base64.b64decode(audio_url).decode("utf8")
video_width = video_info["videoResource"]["dash_120fps"]["video_list"][
"video_3"
]["vwidth"]
video_height = video_info["videoResource"]["dash_120fps"]["video_list"][
"video_3"
]["vheight"]
video_url_dict["video_url"] = video_url
video_url_dict["audio_url"] = audio_url
video_url_dict["video_width"] = video_width
video_url_dict["video_height"] = video_height
elif (
"video_list" in video_info["videoResource"]["dash_120fps"]
and "video_2" in video_info["videoResource"]["dash_120fps"]["video_list"]
):
video_url = video_info["videoResource"]["dash_120fps"]["video_list"][
"video_2"
]["backup_url_1"]
audio_url = video_info["videoResource"]["dash_120fps"]["video_list"][
"video_2"
]["backup_url_1"]
if len(video_url) % 3 == 1:
video_url += "=="
elif len(video_url) % 3 == 2:
video_url += "="
elif len(audio_url) % 3 == 1:
audio_url += "=="
elif len(audio_url) % 3 == 2:
audio_url += "="
video_url = base64.b64decode(video_url).decode("utf8")
audio_url = base64.b64decode(audio_url).decode("utf8")
video_width = video_info["videoResource"]["dash_120fps"]["video_list"][
"video_2"
]["vwidth"]
video_height = video_info["videoResource"]["dash_120fps"]["video_list"][
"video_2"
]["vheight"]
video_url_dict["video_url"] = video_url
video_url_dict["audio_url"] = audio_url
video_url_dict["video_width"] = video_width
video_url_dict["video_height"] = video_height
elif (
"video_list" in video_info["videoResource"]["dash_120fps"]
and "video_1" in video_info["videoResource"]["dash_120fps"]["video_list"]
):
video_url = video_info["videoResource"]["dash_120fps"]["video_list"][
"video_1"
]["backup_url_1"]
audio_url = video_info["videoResource"]["dash_120fps"]["video_list"][
"video_1"
]["backup_url_1"]
if len(video_url) % 3 == 1:
video_url += "=="
elif len(video_url) % 3 == 2:
video_url += "="
elif len(audio_url) % 3 == 1:
audio_url += "=="
elif len(audio_url) % 3 == 2:
audio_url += "="
video_url = base64.b64decode(video_url).decode("utf8")
audio_url = base64.b64decode(audio_url).decode("utf8")
video_width = video_info["videoResource"]["dash_120fps"]["video_list"][
"video_1"
]["vwidth"]
video_height = video_info["videoResource"]["dash_120fps"]["video_list"][
"video_1"
]["vheight"]
video_url_dict["video_url"] = video_url
video_url_dict["audio_url"] = audio_url
video_url_dict["video_width"] = video_width
video_url_dict["video_height"] = video_height
elif (
"dynamic_video" in video_info["videoResource"]["dash_120fps"]
and "dynamic_video_list"
in video_info["videoResource"]["dash_120fps"]["dynamic_video"]
and "dynamic_audio_list"
in video_info["videoResource"]["dash_120fps"]["dynamic_video"]
and len(
video_info["videoResource"]["dash_120fps"]["dynamic_video"][
"dynamic_video_list"
]
)
!= 0
and len(
video_info["videoResource"]["dash_120fps"]["dynamic_video"][
"dynamic_audio_list"
]
)
!= 0
):
video_url = video_info["videoResource"]["dash_120fps"]["dynamic_video"][
"dynamic_video_list"
][-1]["backup_url_1"]
audio_url = video_info["videoResource"]["dash_120fps"]["dynamic_video"][
"dynamic_audio_list"
][-1]["backup_url_1"]
if len(video_url) % 3 == 1:
video_url += "=="
elif len(video_url) % 3 == 2:
video_url += "="
elif len(audio_url) % 3 == 1:
audio_url += "=="
elif len(audio_url) % 3 == 2:
audio_url += "="
video_url = base64.b64decode(video_url).decode("utf8")
audio_url = base64.b64decode(audio_url).decode("utf8")
video_width = video_info["videoResource"]["dash_120fps"]["dynamic_video"][
"dynamic_video_list"
][-1]["vwidth"]
video_height = video_info["videoResource"]["dash_120fps"]["dynamic_video"][
"dynamic_video_list"
][-1]["vheight"]
video_url_dict["video_url"] = video_url
video_url_dict["audio_url"] = audio_url
video_url_dict["video_width"] = video_width
video_url_dict["video_height"] = video_height
else:
video_url_dict["video_url"] = ""
video_url_dict["audio_url"] = ""
video_url_dict["video_width"] = 0
video_url_dict["video_height"] = 0
elif "dash" in video_info["videoResource"]:
if (
"video_list" in video_info["videoResource"]["dash"]
and "video_4" in video_info["videoResource"]["dash"]["video_list"]
):
video_url = video_info["videoResource"]["dash"]["video_list"]["video_4"][
"backup_url_1"
]
audio_url = video_info["videoResource"]["dash"]["video_list"]["video_4"][
"backup_url_1"
]
if len(video_url) % 3 == 1:
video_url += "=="
elif len(video_url) % 3 == 2:
video_url += "="
elif len(audio_url) % 3 == 1:
audio_url += "=="
elif len(audio_url) % 3 == 2:
audio_url += "="
video_url = base64.b64decode(video_url).decode("utf8")
audio_url = base64.b64decode(audio_url).decode("utf8")
video_width = video_info["videoResource"]["dash"]["video_list"]["video_4"][
"vwidth"
]
video_height = video_info["videoResource"]["dash"]["video_list"]["video_4"][
"vheight"
]
video_url_dict["video_url"] = video_url
video_url_dict["audio_url"] = audio_url
video_url_dict["video_width"] = video_width
video_url_dict["video_height"] = video_height
elif (
"video_list" in video_info["videoResource"]["dash"]
and "video_3" in video_info["videoResource"]["dash"]["video_list"]
):
video_url = video_info["videoResource"]["dash"]["video_list"]["video_3"][
"backup_url_1"
]
audio_url = video_info["videoResource"]["dash"]["video_list"]["video_3"][
"backup_url_1"
]
if len(video_url) % 3 == 1:
video_url += "=="
elif len(video_url) % 3 == 2:
video_url += "="
elif len(audio_url) % 3 == 1:
audio_url += "=="
elif len(audio_url) % 3 == 2:
audio_url += "="
video_url = base64.b64decode(video_url).decode("utf8")
audio_url = base64.b64decode(audio_url).decode("utf8")
video_width = video_info["videoResource"]["dash"]["video_list"]["video_3"][
"vwidth"
]
video_height = video_info["videoResource"]["dash"]["video_list"]["video_3"][
"vheight"
]
video_url_dict["video_url"] = video_url
video_url_dict["audio_url"] = audio_url
video_url_dict["video_width"] = video_width
video_url_dict["video_height"] = video_height
elif (
"video_list" in video_info["videoResource"]["dash"]
and "video_2" in video_info["videoResource"]["dash"]["video_list"]
):
video_url = video_info["videoResource"]["dash"]["video_list"]["video_2"][
"backup_url_1"
]
audio_url = video_info["videoResource"]["dash"]["video_list"]["video_2"][
"backup_url_1"
]
if len(video_url) % 3 == 1:
video_url += "=="
elif len(video_url) % 3 == 2:
video_url += "="
elif len(audio_url) % 3 == 1:
audio_url += "=="
elif len(audio_url) % 3 == 2:
audio_url += "="
video_url = base64.b64decode(video_url).decode("utf8")
audio_url = base64.b64decode(audio_url).decode("utf8")
video_width = video_info["videoResource"]["dash"]["video_list"]["video_2"][
"vwidth"
]
video_height = video_info["videoResource"]["dash"]["video_list"]["video_2"][
"vheight"
]
video_url_dict["video_url"] = video_url
video_url_dict["audio_url"] = audio_url
video_url_dict["video_width"] = video_width
video_url_dict["video_height"] = video_height
elif (
"video_list" in video_info["videoResource"]["dash"]
and "video_1" in video_info["videoResource"]["dash"]["video_list"]
):
video_url = video_info["videoResource"]["dash"]["video_list"]["video_1"][
"backup_url_1"
]
audio_url = video_info["videoResource"]["dash"]["video_list"]["video_1"][
"backup_url_1"
]
if len(video_url) % 3 == 1:
video_url += "=="
elif len(video_url) % 3 == 2:
video_url += "="
elif len(audio_url) % 3 == 1:
audio_url += "=="
elif len(audio_url) % 3 == 2:
audio_url += "="
video_url = base64.b64decode(video_url).decode("utf8")
audio_url = base64.b64decode(audio_url).decode("utf8")
video_width = video_info["videoResource"]["dash"]["video_list"]["video_1"][
"vwidth"
]
video_height = video_info["videoResource"]["dash"]["video_list"]["video_1"][
"vheight"
]
video_url_dict["video_url"] = video_url
video_url_dict["audio_url"] = audio_url
video_url_dict["video_width"] = video_width
video_url_dict["video_height"] = video_height
elif (
"dynamic_video" in video_info["videoResource"]["dash"]
and "dynamic_video_list"
in video_info["videoResource"]["dash"]["dynamic_video"]
and "dynamic_audio_list"
in video_info["videoResource"]["dash"]["dynamic_video"]
and len(
video_info["videoResource"]["dash"]["dynamic_video"][
"dynamic_video_list"
]
)
!= 0
and len(
video_info["videoResource"]["dash"]["dynamic_video"][
"dynamic_audio_list"
]
)
!= 0
):
video_url = video_info["videoResource"]["dash"]["dynamic_video"][
"dynamic_video_list"
][-1]["backup_url_1"]
audio_url = video_info["videoResource"]["dash"]["dynamic_video"][
"dynamic_audio_list"
][-1]["backup_url_1"]
if len(video_url) % 3 == 1:
video_url += "=="
elif len(video_url) % 3 == 2:
video_url += "="
elif len(audio_url) % 3 == 1:
audio_url += "=="
elif len(audio_url) % 3 == 2:
audio_url += "="
video_url = base64.b64decode(video_url).decode("utf8")
audio_url = base64.b64decode(audio_url).decode("utf8")
video_width = video_info["videoResource"]["dash"]["dynamic_video"][
"dynamic_video_list"
][-1]["vwidth"]
video_height = video_info["videoResource"]["dash"]["dynamic_video"][
"dynamic_video_list"
][-1]["vheight"]
video_url_dict["video_url"] = video_url
video_url_dict["audio_url"] = audio_url
video_url_dict["video_width"] = video_width
video_url_dict["video_height"] = video_height
else:
video_url_dict["video_url"] = ""
video_url_dict["audio_url"] = ""
video_url_dict["video_width"] = 0
video_url_dict["video_height"] = 0
elif "normal" in video_info["videoResource"]:
if (
"video_list" in video_info["videoResource"]["normal"]
and "video_4" in video_info["videoResource"]["normal"]["video_list"]
):
video_url = video_info["videoResource"]["normal"]["video_list"]["video_4"][
"backup_url_1"
]
audio_url = video_info["videoResource"]["normal"]["video_list"]["video_4"][
"backup_url_1"
]
if len(video_url) % 3 == 1:
video_url += "=="
elif len(video_url) % 3 == 2:
video_url += "="
elif len(audio_url) % 3 == 1:
audio_url += "=="
elif len(audio_url) % 3 == 2:
audio_url += "="
video_url = base64.b64decode(video_url).decode("utf8")
audio_url = base64.b64decode(audio_url).decode("utf8")
video_width = video_info["videoResource"]["normal"]["video_list"][
"video_4"
]["vwidth"]
video_height = video_info["videoResource"]["normal"]["video_list"][
"video_4"
]["vheight"]
video_url_dict["video_url"] = video_url
video_url_dict["audio_url"] = audio_url
video_url_dict["video_width"] = video_width
video_url_dict["video_height"] = video_height
elif (
"video_list" in video_info["videoResource"]["normal"]
and "video_3" in video_info["videoResource"]["normal"]["video_list"]
):
video_url = video_info["videoResource"]["normal"]["video_list"]["video_3"][
"backup_url_1"
]
audio_url = video_info["videoResource"]["normal"]["video_list"]["video_3"][
"backup_url_1"
]
if len(video_url) % 3 == 1:
video_url += "=="
elif len(video_url) % 3 == 2:
video_url += "="
elif len(audio_url) % 3 == 1:
audio_url += "=="
elif len(audio_url) % 3 == 2:
audio_url += "="
video_url = base64.b64decode(video_url).decode("utf8")
audio_url = base64.b64decode(audio_url).decode("utf8")
video_width = video_info["videoResource"]["normal"]["video_list"][
"video_3"
]["vwidth"]
video_height = video_info["videoResource"]["normal"]["video_list"][
"video_3"
]["vheight"]
video_url_dict["video_url"] = video_url
video_url_dict["audio_url"] = audio_url
video_url_dict["video_width"] = video_width
video_url_dict["video_height"] = video_height
elif (
"video_list" in video_info["videoResource"]["normal"]
and "video_2" in video_info["videoResource"]["normal"]["video_list"]
):
video_url = video_info["videoResource"]["normal"]["video_list"]["video_2"][
"backup_url_1"
]
audio_url = video_info["videoResource"]["normal"]["video_list"]["video_2"][
"backup_url_1"
]
if len(video_url) % 3 == 1:
video_url += "=="
elif len(video_url) % 3 == 2:
video_url += "="
elif len(audio_url) % 3 == 1:
audio_url += "=="
elif len(audio_url) % 3 == 2:
audio_url += "="
video_url = base64.b64decode(video_url).decode("utf8")
audio_url = base64.b64decode(audio_url).decode("utf8")
video_width = video_info["videoResource"]["normal"]["video_list"][
"video_2"
]["vwidth"]
video_height = video_info["videoResource"]["normal"]["video_list"][
"video_2"
]["vheight"]
video_url_dict["video_url"] = video_url
video_url_dict["audio_url"] = audio_url
video_url_dict["video_width"] = video_width
video_url_dict["video_height"] = video_height
elif (
"video_list" in video_info["videoResource"]["normal"]
and "video_1" in video_info["videoResource"]["normal"]["video_list"]
):
video_url = video_info["videoResource"]["normal"]["video_list"]["video_1"][
"backup_url_1"
]
audio_url = video_info["videoResource"]["normal"]["video_list"]["video_1"][
"backup_url_1"
]
if len(video_url) % 3 == 1:
video_url += "=="
elif len(video_url) % 3 == 2:
video_url += "="
elif len(audio_url) % 3 == 1:
audio_url += "=="
elif len(audio_url) % 3 == 2:
audio_url += "="
video_url = base64.b64decode(video_url).decode("utf8")
audio_url = base64.b64decode(audio_url).decode("utf8")
video_width = video_info["videoResource"]["normal"]["video_list"][
"video_1"
]["vwidth"]
video_height = video_info["videoResource"]["normal"]["video_list"][
"video_1"
]["vheight"]
video_url_dict["video_url"] = video_url
video_url_dict["audio_url"] = audio_url
video_url_dict["video_width"] = video_width
video_url_dict["video_height"] = video_height
elif (
"dynamic_video" in video_info["videoResource"]["normal"]
and "dynamic_video_list"
in video_info["videoResource"]["normal"]["dynamic_video"]
and "dynamic_audio_list"
in video_info["videoResource"]["normal"]["dynamic_video"]
and len(
video_info["videoResource"]["normal"]["dynamic_video"][
"dynamic_video_list"
]
)
!= 0
and len(
video_info["videoResource"]["normal"]["dynamic_video"][
"dynamic_audio_list"
]
)
!= 0
):
video_url = video_info["videoResource"]["normal"]["dynamic_video"][
"dynamic_video_list"
][-1]["backup_url_1"]
audio_url = video_info["videoResource"]["normal"]["dynamic_video"][
"dynamic_audio_list"
][-1]["backup_url_1"]
if len(video_url) % 3 == 1:
video_url += "=="
elif len(video_url) % 3 == 2:
video_url += "="
elif len(audio_url) % 3 == 1:
audio_url += "=="
elif len(audio_url) % 3 == 2:
audio_url += "="
video_url = base64.b64decode(video_url).decode("utf8")
audio_url = base64.b64decode(audio_url).decode("utf8")
video_width = video_info["videoResource"]["normal"]["dynamic_video"][
"dynamic_video_list"
][-1]["vwidth"]
video_height = video_info["videoResource"]["normal"]["dynamic_video"][
"dynamic_video_list"
][-1]["vheight"]
video_url_dict["video_url"] = video_url
video_url_dict["audio_url"] = audio_url
video_url_dict["video_width"] = video_width
video_url_dict["video_height"] = video_height
else:
video_url_dict["video_url"] = ""
video_url_dict["audio_url"] = ""
video_url_dict["video_width"] = 0
video_url_dict["video_height"] = 0
else:
video_url_dict["video_url"] = ""
video_url_dict["audio_url"] = ""
video_url_dict["video_width"] = 0
video_url_dict["video_height"] = 0
return video_url_dict
def get_comment_cnt(item_id):
"""
获取视频的评论数量
"""
url = "https://www.ixigua.com/tlb/comment/article/v5/tab_comments/?"
params = {
"tab_index": "0",
"count": "10",
"offset": "10",
"group_id": str(item_id),
"item_id": str(item_id),
"aid": "1768",
"msToken": "50-JJObWB07HfHs-BMJWT1eIDX3G-6lPSF_i-QwxBIXE9VVa-iN0jbEXR5pG2DKjXBmP299n6ZTuXzY-GAy968CCvouSAYIS4GzvGQT3pNlKNejr5G4-1g==",
"X-Bogus": "DFSzswVOyGtANVeWtCLMqR/F6q9U",
"_signature": random_signature(),
}
headers = {
"authority": "www.ixigua.com",
"accept": "application/json, text/plain, */*",
"accept-language": "zh-CN,zh;q=0.9,en;q=0.8,en-GB;q=0.7,en-US;q=0.6",
"cache-control": "no-cache",
"cookie": "MONITOR_WEB_ID=67cb5099-a022-4ec3-bb8e-c4de6ba51dd0; passport_csrf_token=72b2574f3c99f8ba670e42df430218fd; passport_csrf_token_default=72b2574f3c99f8ba670e42df430218fd; sid_guard=c7472b508ea631823ba765a60cf8757f%7C1680867422%7C3024002%7CFri%2C+12-May-2023+11%3A37%3A04+GMT; uid_tt=c13f47d51767f616befe32fb3e9f485a; uid_tt_ss=c13f47d51767f616befe32fb3e9f485a; sid_tt=c7472b508ea631823ba765a60cf8757f; sessionid=c7472b508ea631823ba765a60cf8757f; sessionid_ss=c7472b508ea631823ba765a60cf8757f; sid_ucp_v1=1.0.0-KGUzNWYxNmRkZGJiZjgxY2MzZWNkMTEzMTkwYjY1Yjg5OTY5NzVlNmMKFQiu3d-eqQIQ3oDAoQYYGCAMOAhACxoCaGwiIGM3NDcyYjUwOGVhNjMxODIzYmE3NjVhNjBjZjg3NTdm; ssid_ucp_v1=1.0.0-KGUzNWYxNmRkZGJiZjgxY2MzZWNkMTEzMTkwYjY1Yjg5OTY5NzVlNmMKFQiu3d-eqQIQ3oDAoQYYGCAMOAhACxoCaGwiIGM3NDcyYjUwOGVhNjMxODIzYmE3NjVhNjBjZjg3NTdm; odin_tt=b893608d4dde2e1e8df8cd5d97a0e2fbeafc4ca762ac72ebef6e6c97e2ed19859bb01d46b4190ddd6dd17d7f9678e1de; SEARCH_CARD_MODE=7168304743566296612_0; support_webp=true; support_avif=false; csrf_session_id=a5355d954d3c63ed1ba35faada452b4d; tt_scid=7Pux7s634-z8DYvCM20y7KigwH5u7Rh6D9C-RROpnT.aGMEcz6Vsxp.oai47wJqa4f86; ttwid=1%7CHHtv2QqpSGuSu8r-zXF1QoWsvjmNi1SJrqOrZzg-UCY%7C1683858689%7Ca5223fe1500578e01e138a0d71d6444692018296c4c24f5885af174a65873c95; ixigua-a-s=3; msToken=50-JJObWB07HfHs-BMJWT1eIDX3G-6lPSF_i-QwxBIXE9VVa-iN0jbEXR5pG2DKjXBmP299n6ZTuXzY-GAy968CCvouSAYIS4GzvGQT3pNlKNejr5G4-1g==; __ac_nonce=0645dcbf0005064517440; __ac_signature=_02B4Z6wo00f01FEGmAwAAIDBKchzCGqn-MBRJpyAAHAjieFC5GEg6gGiwz.I4PRrJl7f0GcixFrExKmgt6QI1i1S-dQyofPEj2ugWTCnmKUdJQv-wYuDofeKNe8VtMtZq2aKewyUGeKU-5Ud21; ixigua-a-s=3",
"pragma": "no-cache",
"referer": f"https://www.ixigua.com/{item_id}?logTag=3c5aa86a8600b9ab8540",
"sec-ch-ua": '"Microsoft Edge";v="113", "Chromium";v="113", "Not-A.Brand";v="24"',
"sec-ch-ua-mobile": "?0",
"sec-ch-ua-platform": '"macOS"',
"sec-fetch-dest": "empty",
"sec-fetch-mode": "cors",
"sec-fetch-site": "same-origin",
"tt-anti-token": "cBITBHvmYjEygzv-f9c78c1297722cf1f559c74b084e4525ce4900bdcf9e8588f20cc7c2e3234422",
"user-agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/113.0.0.0 Safari/537.36 Edg/113.0.1774.35",
"x-secsdk-csrf-token": "000100000001f8e733cf37f0cd255a51aea9a81ff7bc0c09490cfe41ad827c3c5c18ec809279175e4d9f5553d8a5",
}
response = requests.get(
url=url, headers=headers, params=params, proxies=tunnel_proxies(), timeout=5
)
response.close()
if (
response.status_code != 200
or "total_number" not in response.json()
or response.json() == {}
):
return 0
return response.json().get("total_number", 0)
class XiGuaAuthor:
"""
西瓜账号爬虫
"""
def __init__(self, platform, mode, rule_dict, env, user_list):
self.platform = platform
self.mode = mode
self.rule_dict = rule_dict
self.env = env
self.user_list = user_list
self.mq = MQ(topic_name="topic_crawler_etl_" + self.env)
self.download_count = 0
self.limiter = AuthorLimit(platform=self.platform, mode=self.mode)
def rule_maker(self, account):
"""
通过不同的账号生成不同的规则
:param account: 输入的账号信息
{'play_cnt': {'min': 100000, 'max': 0}, 'period': {'min': 5, 'max': 5}}
"""
temp = account['link'].split("?")[0].split("_")
if len(temp) == 1:
return self.rule_dict
else:
flag = temp[-2]
match flag:
case "V1":
rule_dict = {
"play_cnt": {"min": 100000, "max": 0},
'period': {"min": 90, "max": 90},
'special': 0.02
}
return rule_dict
case "V2":
rule_dict = {
"play_cnt": {"min": 10000, "max": 0},
'period': {"min": 90, "max": 90},
'special': 0.01
}
return rule_dict
case "V3":
rule_dict = {
"play_cnt": {"min": 5000, "max": 0},
'period': {"min": 90, "max": 90},
'special': 0.01
}
return rule_dict
def get_author_list(self):
"""
每轮只抓取定量的数据,到达数量后自己退出
获取账号列表以及账号信息
"""
# max_count = int(self.rule_dict.get("videos_cnt", {}).get("min", 300))
for user_dict in self.user_list:
# if self.download_count <= max_count:
try:
flag = user_dict["link"][0]
match flag:
case "V":
self.get_video_list(user_dict)
case "X":
self.get_tiny_video_list(user_dict)
case "h":
self.get_video_list(user_dict)
case "D":
self.get_video_list(user_dict)
case "B":
self.get_video_list(user_dict)
self.get_tiny_video_list(user_dict)
except Exception as e:
AliyunLogger.logging(
code="3001",
account=user_dict["uid"],
platform=self.platform,
mode=self.mode,
env=self.env,
message="扫描账号时出现bug, 报错是 {}".format(e)
)
# time.sleep(random.randint(1, 15))
# else:
# AliyunLogger.logging(
# code="2000",
# platform=self.platform,
# mode=self.mode,
# env=self.env,
# message="本轮已经抓取足够数量的视频,已经自动退出",
# )
# return
def get_video_list(self, user_dict):
"""
获取某个账号的视频列表
账号分为 3 类
"""
offset = 0
signature = random_signature()
link = user_dict['link'].split("?")[0].split("_")[-1]
url = "https://www.ixigua.com/api/videov2/author/new_video_list?"
while True:
to_user_id = str(link.replace("https://www.ixigua.com/home/", ""))
params = {
"to_user_id": to_user_id,
"offset": str(offset),
"limit": "30",
"maxBehotTime": "0",
"order": "new",
"isHome": "0",
"_signature": signature,
}
headers = {
"referer": f'https://www.ixigua.com/home/{link.replace("https://www.ixigua.com/home/", "")}/video/?preActiveKey=hotsoon&list_entrance=userdetail',
"user-agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/110.0.0.0 Safari/537.36 Edg/110.0.1587.41",
}
response = requests.get(
url=url,
headers=headers,
params=params,
proxies=tunnel_proxies(),
timeout=5,
)
offset += 30
if "data" not in response.text or response.status_code != 200:
AliyunLogger.logging(
code="3000",
platform=self.platform,
mode=self.mode,
env=self.env,
message=f"get_videoList:{response.text}\n",
)
return
elif not response.json()["data"]["videoList"]:
AliyunLogger.logging(
account=link,
code="3000",
platform=self.platform,
mode=self.mode,
env=self.env,
data=response.json(),
message=f"没有更多数据啦~\n",
)
return
else:
feeds = response.json()["data"]["videoList"]
for video_obj in feeds:
try:
AliyunLogger.logging(
code="1001",
account=user_dict['uid'],
platform=self.platform,
mode=self.mode,
env=self.env,
data=video_obj,
message="扫描到一条视频",
)
date_flag = self.process_video_obj(video_obj, user_dict, "l")
if not date_flag:
return
except Exception as e:
AliyunLogger.logging(
code="3000",
platform=self.platform,
mode=self.mode,
env=self.env,
data=video_obj,
message="抓取单条视频异常, 报错原因是: {}".format(e),
)
def get_tiny_video_list(self, user_dict):
"""
获取小视频
"""
url = "https://www.ixigua.com/api/videov2/hotsoon/video"
max_behot_time = "0"
link = user_dict['link'].split("?")[0].split("_")[-1]
to_user_id = str(link.replace("https://www.ixigua.com/home/", ""))
while True:
params = {
"to_user_id": to_user_id,
"max_behot_time": max_behot_time,
"_signature": random_signature()
}
headers = {
"referer": "https://www.ixigua.com/{}?&".format(to_user_id),
"user-agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/110.0.0.0 Safari/537.36 Edg/110.0.1587.41",
}
response = requests.get(
url=url,
headers=headers,
params=params,
proxies=tunnel_proxies(),
timeout=5,
)
if "data" not in response.text or response.status_code != 200:
AliyunLogger.logging(
code="2000",
platform=self.platform,
mode=self.mode,
env=self.env,
message=f"get_videoList:{response.text}\n",
)
return
elif not response.json()["data"]["data"]:
AliyunLogger.logging(
account=link,
code="2000",
platform=self.platform,
mode=self.mode,
env=self.env,
data=response.json(),
message=f"没有更多数据啦~\n",
)
return
else:
video_list = response.json()['data']['data']
max_behot_time = video_list[-1]["max_behot_time"]
for video_obj in video_list:
try:
AliyunLogger.logging(
code="1001",
account=user_dict['uid'],
platform=self.platform,
mode=self.mode,
env=self.env,
data=video_obj,
message="扫描到一条小视频",
)
date_flag = self.process_video_obj(video_obj, user_dict, "s")
if not date_flag:
return
except Exception as e:
AliyunLogger.logging(
code="3000",
platform=self.platform,
mode=self.mode,
env=self.env,
data=video_obj,
message="抓取单条视频异常, 报错原因是: {}".format(e),
)
def process_video_obj(self, video_obj, user_dict, f):
"""
process video_obj and extract video_url
"""
new_rule = self.rule_maker(user_dict)
trace_id = self.platform + str(uuid.uuid1())
if f == "s":
item_id = video_obj.get("id_str", "")
else:
item_id = video_obj.get("item_id", "")
if not item_id:
AliyunLogger.logging(
code="2005",
account=user_dict['uid'],
platform=self.platform,
mode=self.mode,
env=self.env,
message="无效视频",
data=video_obj,
trace_id=trace_id,
)
return
# 获取视频信息
video_dict = self.get_video_info(item_id=item_id)
video_dict["platform"] = self.platform
video_dict["strategy"] = self.mode
video_dict["out_video_id"] = video_dict["video_id"]
video_dict["width"] = video_dict["video_width"]
video_dict["height"] = video_dict["video_height"]
video_dict["crawler_rule"] = json.dumps(new_rule)
video_dict["user_id"] = user_dict["uid"]
video_dict["publish_time"] = video_dict["publish_time_str"]
video_dict["strategy_type"] = self.mode
video_dict["update_time_stamp"] = int(time.time())
if int(time.time()) - video_dict['publish_time_stamp'] > 3600 * 24 * int(
new_rule.get("period", {}).get("max", 1000)):
if not video_obj['is_top']:
"""
非置顶数据发布时间超过才退出
"""
AliyunLogger.logging(
code="2004",
account=user_dict['uid'],
platform=self.platform,
mode=self.mode,
env=self.env,
data=video_dict,
message="发布时间超过{}天".format(
int(new_rule.get("period", {}).get("max", 1000))
),
)
return False
pipeline = PiaoQuanPipeline(
platform=self.platform,
mode=self.mode,
rule_dict=new_rule,
env=self.env,
item=video_dict,
trace_id=trace_id,
)
limit_flag = self.limiter.author_limitation(user_id=video_dict['user_id'])
if limit_flag:
title_flag = pipeline.title_flag()
repeat_flag = pipeline.repeat_video()
if title_flag and repeat_flag:
if new_rule.get("special"):
if int(video_dict['play_cnt']) >= int(new_rule.get("play_cnt", {}).get("min", 100000)):
if float(video_dict['like_cnt']) / float(video_dict['play_cnt']) >= new_rule['special']:
self.mq.send_msg(video_dict)
self.download_count += 1
AliyunLogger.logging(
code="1002",
account=user_dict['uid'],
platform=self.platform,
mode=self.mode,
env=self.env,
data=video_dict,
trace_id=trace_id,
message="成功发送 MQ 至 ETL",
)
return True
else:
AliyunLogger.logging(
code="2008",
account=user_dict['uid'],
platform=self.platform,
mode=self.mode,
env=self.env,
message="不满足特殊规则, 点赞量/播放量",
data=video_dict
)
else:
if int(video_dict['play_cnt']) >= int(new_rule.get("play_cnt", {}).get("min", 100000)):
self.mq.send_msg(video_dict)
self.download_count += 1
AliyunLogger.logging(
code="1002",
account=user_dict['uid'],
platform=self.platform,
mode=self.mode,
env=self.env,
data=video_dict,
trace_id=trace_id,
message="成功发送 MQ 至 ETL",
)
return True
else:
AliyunLogger.logging(
code="2008",
account=user_dict['uid'],
platform=self.platform,
mode=self.mode,
env=self.env,
message="不满足特殊规则, 播放量",
data=video_dict
)
return True
def get_video_info(self, item_id):
"""
获取视频信息
"""
url = "https://www.ixigua.com/{}".format(item_id)
headers = {
"accept-encoding": "gzip, deflate",
"accept-language": "zh-CN,zh-Hans;q=0.9",
"cookie": "ttwid={}".format(byte_dance_cookie(item_id)),
"user-agent": FakeUserAgent().random,
"referer": "https://www.ixigua.com/{}/".format(item_id),
}
response = requests.get(
url=url,
headers=headers,
proxies=tunnel_proxies(),
timeout=5,
)
video_info = extract_info_by_re(response.text)
video_dict = {
"video_title": video_info.get("title", ""),
"video_id": video_info.get("video_id"),
"gid": str(item_id),
"play_cnt": int(video_info.get("play_count", 0)),
"like_cnt": int(video_info.get("like_count", 0)),
"comment_cnt": 0,
"share_cnt": 0,
"favorite_cnt": 0,
"duration": int(video_info.get("duration", 0)),
"video_width": 0,
"video_height": 0,
"publish_time_stamp": int(video_info.get("publish_time", 0)),
"publish_time_str": time.strftime(
"%Y-%m-%d %H:%M:%S",
time.localtime(int(video_info.get("publish_time", 0))),
),
"avatar_url": str(
video_info.get("user_info", {}).get("avatar_url", "")
),
"cover_url": video_info.get("cover_url", "").replace("\\u002F", "/"),
"video_url": video_info.get("url"),
"session": f"xigua-author-{int(time.time())}",
}
return video_dict
if __name__ == "__main__":
user_list = [
{
"uid": 6267140,
"source": "xigua",
"link": "https://www.ixigua.com/home/2779177225827568",
"nick_name": "秋晴爱音乐",
"avatar_url": "",
"mode": "author",
},
{
"uid": 6267140,
"source": "xigua",
"link": "https://www.ixigua.com/home/2885546124776780",
"nick_name": "朗诵放歌的老山羊",
"avatar_url": "",
"mode": "author",
},
{
"uid": 6267140,
"source": "xigua",
"link": "https://www.ixigua.com/home/5880938217",
"nick_name": "天原声疗",
"avatar_url": "",
"mode": "author",
},
]
rule = {'period': {'min': 30, 'max': 30}, 'duration': {'min': 20, 'max': 0}, 'play_cnt': {'min': 100000, 'max': 0}}
XGA = XiGuaAuthor(
platform="xigua",
mode="author",
rule_dict=rule,
env="prod",
user_list=user_list
)
XGA.get_author_list()