import json
import os
import re
import random
import sys
import string
import time
import uuid
import base64
import requests
from fake_useragent import FakeUserAgent
from common.mq import MQ
sys.path.append(os.getcwd())
from common import AliyunLogger, PiaoQuanPipeline, tunnel_proxies
from common.limit import AuthorLimit
def extract_info_by_re(text):
"""
通过正则表达式获取文本中的信息
:param text:
:return:
"""
# 标题
title_match = re.search(r'
]*>(.*?)', text)
if title_match:
title_content = title_match.group(1)
title_content = title_content.split(" - ")[0]
title_content = bytes(title_content, "latin1").decode()
else:
title_content = ""
# video_url
main_url = re.search(r'("main_url":")(.*?)"', text)[0]
main_url = main_url.split(":")[1]
decoded_data = base64.b64decode(main_url)
try:
# 尝试使用utf-8解码
video_url = decoded_data.decode()
except UnicodeDecodeError:
# 如果utf-8解码失败,尝试使用其他编码方式
video_url = decoded_data.decode('latin-1')
# video_id
video_id = re.search(r'"vid":"(.*?)"', text).group(1)
# like_count
like_count = re.search(r'"video_like_count":(.*?),', text).group(1)
# cover_url
cover_url = re.search(r'"avatar_url":"(.*?)"', text).group(1)
# video_play
video_watch_count = re.search(r'"video_watch_count":(.*?),', text).group(1)
# "video_publish_time"
publish_time = re.search(r'"video_publish_time":"(.*?)"', text).group(1)
# video_duration
duration = re.search(r'("video_duration":)(.*?)"', text).group(2).replace(",", "")
return {
"title": title_content,
"url": video_url,
"video_id": video_id,
"like_count": like_count,
"cover_url": cover_url,
"play_count": video_watch_count,
"publish_time": publish_time,
"duration": duration
}
def random_signature():
"""
随机生成签名
"""
src_digits = string.digits # string_数字
src_uppercase = string.ascii_uppercase # string_大写字母
src_lowercase = string.ascii_lowercase # string_小写字母
digits_num = random.randint(1, 6)
uppercase_num = random.randint(1, 26 - digits_num - 1)
lowercase_num = 26 - (digits_num + uppercase_num)
password = (
random.sample(src_digits, digits_num)
+ random.sample(src_uppercase, uppercase_num)
+ random.sample(src_lowercase, lowercase_num)
)
random.shuffle(password)
new_password = "AAAAAAAAAA" + "".join(password)[10:-4] + "AAAB"
new_password_start = new_password[0:18]
new_password_end = new_password[-7:]
if new_password[18] == "8":
new_password = new_password_start + "w" + new_password_end
elif new_password[18] == "9":
new_password = new_password_start + "x" + new_password_end
elif new_password[18] == "-":
new_password = new_password_start + "y" + new_password_end
elif new_password[18] == ".":
new_password = new_password_start + "z" + new_password_end
else:
new_password = new_password_start + "y" + new_password_end
return new_password
def byte_dance_cookie(item_id):
"""
获取西瓜视频的 cookie
:param item_id:
"""
sess = requests.Session()
sess.headers.update({
'user-agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 11_1_0) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/87.0.4280.88 Safari/537.36',
'referer': 'https://www.ixigua.com/home/{}/'.format(item_id),
})
# 获取 cookies
sess.get('https://i.snssdk.com/slardar/sdk.js?bid=xigua_video_web_pc')
data = '{"region":"cn","aid":1768,"needFid":false,"service":"www.ixigua.com","migrate_info":{"ticket":"","source":"node"},"cbUrlProtocol":"https","union":true}'
r = sess.post('https://ttwid.bytedance.com/ttwid/union/register/', data=data)
# print(r.text)
return r.cookies.values()[0]
def get_video_url(video_info):
"""
获取视频的链接
"""
video_url_dict = {}
# video_url
if "videoResource" not in video_info:
video_url_dict["video_url"] = ""
video_url_dict["audio_url"] = ""
video_url_dict["video_width"] = 0
video_url_dict["video_height"] = 0
elif "dash_120fps" in video_info["videoResource"]:
if (
"video_list" in video_info["videoResource"]["dash_120fps"]
and "video_4" in video_info["videoResource"]["dash_120fps"]["video_list"]
):
video_url = video_info["videoResource"]["dash_120fps"]["video_list"][
"video_4"
]["backup_url_1"]
audio_url = video_info["videoResource"]["dash_120fps"]["video_list"][
"video_4"
]["backup_url_1"]
if len(video_url) % 3 == 1:
video_url += "=="
elif len(video_url) % 3 == 2:
video_url += "="
elif len(audio_url) % 3 == 1:
audio_url += "=="
elif len(audio_url) % 3 == 2:
audio_url += "="
video_url = base64.b64decode(video_url).decode("utf8")
audio_url = base64.b64decode(audio_url).decode("utf8")
video_width = video_info["videoResource"]["dash_120fps"]["video_list"][
"video_4"
]["vwidth"]
video_height = video_info["videoResource"]["dash_120fps"]["video_list"][
"video_4"
]["vheight"]
video_url_dict["video_url"] = video_url
video_url_dict["audio_url"] = audio_url
video_url_dict["video_width"] = video_width
video_url_dict["video_height"] = video_height
elif (
"video_list" in video_info["videoResource"]["dash_120fps"]
and "video_3" in video_info["videoResource"]["dash_120fps"]["video_list"]
):
video_url = video_info["videoResource"]["dash_120fps"]["video_list"][
"video_3"
]["backup_url_1"]
audio_url = video_info["videoResource"]["dash_120fps"]["video_list"][
"video_3"
]["backup_url_1"]
if len(video_url) % 3 == 1:
video_url += "=="
elif len(video_url) % 3 == 2:
video_url += "="
elif len(audio_url) % 3 == 1:
audio_url += "=="
elif len(audio_url) % 3 == 2:
audio_url += "="
video_url = base64.b64decode(video_url).decode("utf8")
audio_url = base64.b64decode(audio_url).decode("utf8")
video_width = video_info["videoResource"]["dash_120fps"]["video_list"][
"video_3"
]["vwidth"]
video_height = video_info["videoResource"]["dash_120fps"]["video_list"][
"video_3"
]["vheight"]
video_url_dict["video_url"] = video_url
video_url_dict["audio_url"] = audio_url
video_url_dict["video_width"] = video_width
video_url_dict["video_height"] = video_height
elif (
"video_list" in video_info["videoResource"]["dash_120fps"]
and "video_2" in video_info["videoResource"]["dash_120fps"]["video_list"]
):
video_url = video_info["videoResource"]["dash_120fps"]["video_list"][
"video_2"
]["backup_url_1"]
audio_url = video_info["videoResource"]["dash_120fps"]["video_list"][
"video_2"
]["backup_url_1"]
if len(video_url) % 3 == 1:
video_url += "=="
elif len(video_url) % 3 == 2:
video_url += "="
elif len(audio_url) % 3 == 1:
audio_url += "=="
elif len(audio_url) % 3 == 2:
audio_url += "="
video_url = base64.b64decode(video_url).decode("utf8")
audio_url = base64.b64decode(audio_url).decode("utf8")
video_width = video_info["videoResource"]["dash_120fps"]["video_list"][
"video_2"
]["vwidth"]
video_height = video_info["videoResource"]["dash_120fps"]["video_list"][
"video_2"
]["vheight"]
video_url_dict["video_url"] = video_url
video_url_dict["audio_url"] = audio_url
video_url_dict["video_width"] = video_width
video_url_dict["video_height"] = video_height
elif (
"video_list" in video_info["videoResource"]["dash_120fps"]
and "video_1" in video_info["videoResource"]["dash_120fps"]["video_list"]
):
video_url = video_info["videoResource"]["dash_120fps"]["video_list"][
"video_1"
]["backup_url_1"]
audio_url = video_info["videoResource"]["dash_120fps"]["video_list"][
"video_1"
]["backup_url_1"]
if len(video_url) % 3 == 1:
video_url += "=="
elif len(video_url) % 3 == 2:
video_url += "="
elif len(audio_url) % 3 == 1:
audio_url += "=="
elif len(audio_url) % 3 == 2:
audio_url += "="
video_url = base64.b64decode(video_url).decode("utf8")
audio_url = base64.b64decode(audio_url).decode("utf8")
video_width = video_info["videoResource"]["dash_120fps"]["video_list"][
"video_1"
]["vwidth"]
video_height = video_info["videoResource"]["dash_120fps"]["video_list"][
"video_1"
]["vheight"]
video_url_dict["video_url"] = video_url
video_url_dict["audio_url"] = audio_url
video_url_dict["video_width"] = video_width
video_url_dict["video_height"] = video_height
elif (
"dynamic_video" in video_info["videoResource"]["dash_120fps"]
and "dynamic_video_list"
in video_info["videoResource"]["dash_120fps"]["dynamic_video"]
and "dynamic_audio_list"
in video_info["videoResource"]["dash_120fps"]["dynamic_video"]
and len(
video_info["videoResource"]["dash_120fps"]["dynamic_video"][
"dynamic_video_list"
]
)
!= 0
and len(
video_info["videoResource"]["dash_120fps"]["dynamic_video"][
"dynamic_audio_list"
]
)
!= 0
):
video_url = video_info["videoResource"]["dash_120fps"]["dynamic_video"][
"dynamic_video_list"
][-1]["backup_url_1"]
audio_url = video_info["videoResource"]["dash_120fps"]["dynamic_video"][
"dynamic_audio_list"
][-1]["backup_url_1"]
if len(video_url) % 3 == 1:
video_url += "=="
elif len(video_url) % 3 == 2:
video_url += "="
elif len(audio_url) % 3 == 1:
audio_url += "=="
elif len(audio_url) % 3 == 2:
audio_url += "="
video_url = base64.b64decode(video_url).decode("utf8")
audio_url = base64.b64decode(audio_url).decode("utf8")
video_width = video_info["videoResource"]["dash_120fps"]["dynamic_video"][
"dynamic_video_list"
][-1]["vwidth"]
video_height = video_info["videoResource"]["dash_120fps"]["dynamic_video"][
"dynamic_video_list"
][-1]["vheight"]
video_url_dict["video_url"] = video_url
video_url_dict["audio_url"] = audio_url
video_url_dict["video_width"] = video_width
video_url_dict["video_height"] = video_height
else:
video_url_dict["video_url"] = ""
video_url_dict["audio_url"] = ""
video_url_dict["video_width"] = 0
video_url_dict["video_height"] = 0
elif "dash" in video_info["videoResource"]:
if (
"video_list" in video_info["videoResource"]["dash"]
and "video_4" in video_info["videoResource"]["dash"]["video_list"]
):
video_url = video_info["videoResource"]["dash"]["video_list"]["video_4"][
"backup_url_1"
]
audio_url = video_info["videoResource"]["dash"]["video_list"]["video_4"][
"backup_url_1"
]
if len(video_url) % 3 == 1:
video_url += "=="
elif len(video_url) % 3 == 2:
video_url += "="
elif len(audio_url) % 3 == 1:
audio_url += "=="
elif len(audio_url) % 3 == 2:
audio_url += "="
video_url = base64.b64decode(video_url).decode("utf8")
audio_url = base64.b64decode(audio_url).decode("utf8")
video_width = video_info["videoResource"]["dash"]["video_list"]["video_4"][
"vwidth"
]
video_height = video_info["videoResource"]["dash"]["video_list"]["video_4"][
"vheight"
]
video_url_dict["video_url"] = video_url
video_url_dict["audio_url"] = audio_url
video_url_dict["video_width"] = video_width
video_url_dict["video_height"] = video_height
elif (
"video_list" in video_info["videoResource"]["dash"]
and "video_3" in video_info["videoResource"]["dash"]["video_list"]
):
video_url = video_info["videoResource"]["dash"]["video_list"]["video_3"][
"backup_url_1"
]
audio_url = video_info["videoResource"]["dash"]["video_list"]["video_3"][
"backup_url_1"
]
if len(video_url) % 3 == 1:
video_url += "=="
elif len(video_url) % 3 == 2:
video_url += "="
elif len(audio_url) % 3 == 1:
audio_url += "=="
elif len(audio_url) % 3 == 2:
audio_url += "="
video_url = base64.b64decode(video_url).decode("utf8")
audio_url = base64.b64decode(audio_url).decode("utf8")
video_width = video_info["videoResource"]["dash"]["video_list"]["video_3"][
"vwidth"
]
video_height = video_info["videoResource"]["dash"]["video_list"]["video_3"][
"vheight"
]
video_url_dict["video_url"] = video_url
video_url_dict["audio_url"] = audio_url
video_url_dict["video_width"] = video_width
video_url_dict["video_height"] = video_height
elif (
"video_list" in video_info["videoResource"]["dash"]
and "video_2" in video_info["videoResource"]["dash"]["video_list"]
):
video_url = video_info["videoResource"]["dash"]["video_list"]["video_2"][
"backup_url_1"
]
audio_url = video_info["videoResource"]["dash"]["video_list"]["video_2"][
"backup_url_1"
]
if len(video_url) % 3 == 1:
video_url += "=="
elif len(video_url) % 3 == 2:
video_url += "="
elif len(audio_url) % 3 == 1:
audio_url += "=="
elif len(audio_url) % 3 == 2:
audio_url += "="
video_url = base64.b64decode(video_url).decode("utf8")
audio_url = base64.b64decode(audio_url).decode("utf8")
video_width = video_info["videoResource"]["dash"]["video_list"]["video_2"][
"vwidth"
]
video_height = video_info["videoResource"]["dash"]["video_list"]["video_2"][
"vheight"
]
video_url_dict["video_url"] = video_url
video_url_dict["audio_url"] = audio_url
video_url_dict["video_width"] = video_width
video_url_dict["video_height"] = video_height
elif (
"video_list" in video_info["videoResource"]["dash"]
and "video_1" in video_info["videoResource"]["dash"]["video_list"]
):
video_url = video_info["videoResource"]["dash"]["video_list"]["video_1"][
"backup_url_1"
]
audio_url = video_info["videoResource"]["dash"]["video_list"]["video_1"][
"backup_url_1"
]
if len(video_url) % 3 == 1:
video_url += "=="
elif len(video_url) % 3 == 2:
video_url += "="
elif len(audio_url) % 3 == 1:
audio_url += "=="
elif len(audio_url) % 3 == 2:
audio_url += "="
video_url = base64.b64decode(video_url).decode("utf8")
audio_url = base64.b64decode(audio_url).decode("utf8")
video_width = video_info["videoResource"]["dash"]["video_list"]["video_1"][
"vwidth"
]
video_height = video_info["videoResource"]["dash"]["video_list"]["video_1"][
"vheight"
]
video_url_dict["video_url"] = video_url
video_url_dict["audio_url"] = audio_url
video_url_dict["video_width"] = video_width
video_url_dict["video_height"] = video_height
elif (
"dynamic_video" in video_info["videoResource"]["dash"]
and "dynamic_video_list"
in video_info["videoResource"]["dash"]["dynamic_video"]
and "dynamic_audio_list"
in video_info["videoResource"]["dash"]["dynamic_video"]
and len(
video_info["videoResource"]["dash"]["dynamic_video"][
"dynamic_video_list"
]
)
!= 0
and len(
video_info["videoResource"]["dash"]["dynamic_video"][
"dynamic_audio_list"
]
)
!= 0
):
video_url = video_info["videoResource"]["dash"]["dynamic_video"][
"dynamic_video_list"
][-1]["backup_url_1"]
audio_url = video_info["videoResource"]["dash"]["dynamic_video"][
"dynamic_audio_list"
][-1]["backup_url_1"]
if len(video_url) % 3 == 1:
video_url += "=="
elif len(video_url) % 3 == 2:
video_url += "="
elif len(audio_url) % 3 == 1:
audio_url += "=="
elif len(audio_url) % 3 == 2:
audio_url += "="
video_url = base64.b64decode(video_url).decode("utf8")
audio_url = base64.b64decode(audio_url).decode("utf8")
video_width = video_info["videoResource"]["dash"]["dynamic_video"][
"dynamic_video_list"
][-1]["vwidth"]
video_height = video_info["videoResource"]["dash"]["dynamic_video"][
"dynamic_video_list"
][-1]["vheight"]
video_url_dict["video_url"] = video_url
video_url_dict["audio_url"] = audio_url
video_url_dict["video_width"] = video_width
video_url_dict["video_height"] = video_height
else:
video_url_dict["video_url"] = ""
video_url_dict["audio_url"] = ""
video_url_dict["video_width"] = 0
video_url_dict["video_height"] = 0
elif "normal" in video_info["videoResource"]:
if (
"video_list" in video_info["videoResource"]["normal"]
and "video_4" in video_info["videoResource"]["normal"]["video_list"]
):
video_url = video_info["videoResource"]["normal"]["video_list"]["video_4"][
"backup_url_1"
]
audio_url = video_info["videoResource"]["normal"]["video_list"]["video_4"][
"backup_url_1"
]
if len(video_url) % 3 == 1:
video_url += "=="
elif len(video_url) % 3 == 2:
video_url += "="
elif len(audio_url) % 3 == 1:
audio_url += "=="
elif len(audio_url) % 3 == 2:
audio_url += "="
video_url = base64.b64decode(video_url).decode("utf8")
audio_url = base64.b64decode(audio_url).decode("utf8")
video_width = video_info["videoResource"]["normal"]["video_list"][
"video_4"
]["vwidth"]
video_height = video_info["videoResource"]["normal"]["video_list"][
"video_4"
]["vheight"]
video_url_dict["video_url"] = video_url
video_url_dict["audio_url"] = audio_url
video_url_dict["video_width"] = video_width
video_url_dict["video_height"] = video_height
elif (
"video_list" in video_info["videoResource"]["normal"]
and "video_3" in video_info["videoResource"]["normal"]["video_list"]
):
video_url = video_info["videoResource"]["normal"]["video_list"]["video_3"][
"backup_url_1"
]
audio_url = video_info["videoResource"]["normal"]["video_list"]["video_3"][
"backup_url_1"
]
if len(video_url) % 3 == 1:
video_url += "=="
elif len(video_url) % 3 == 2:
video_url += "="
elif len(audio_url) % 3 == 1:
audio_url += "=="
elif len(audio_url) % 3 == 2:
audio_url += "="
video_url = base64.b64decode(video_url).decode("utf8")
audio_url = base64.b64decode(audio_url).decode("utf8")
video_width = video_info["videoResource"]["normal"]["video_list"][
"video_3"
]["vwidth"]
video_height = video_info["videoResource"]["normal"]["video_list"][
"video_3"
]["vheight"]
video_url_dict["video_url"] = video_url
video_url_dict["audio_url"] = audio_url
video_url_dict["video_width"] = video_width
video_url_dict["video_height"] = video_height
elif (
"video_list" in video_info["videoResource"]["normal"]
and "video_2" in video_info["videoResource"]["normal"]["video_list"]
):
video_url = video_info["videoResource"]["normal"]["video_list"]["video_2"][
"backup_url_1"
]
audio_url = video_info["videoResource"]["normal"]["video_list"]["video_2"][
"backup_url_1"
]
if len(video_url) % 3 == 1:
video_url += "=="
elif len(video_url) % 3 == 2:
video_url += "="
elif len(audio_url) % 3 == 1:
audio_url += "=="
elif len(audio_url) % 3 == 2:
audio_url += "="
video_url = base64.b64decode(video_url).decode("utf8")
audio_url = base64.b64decode(audio_url).decode("utf8")
video_width = video_info["videoResource"]["normal"]["video_list"][
"video_2"
]["vwidth"]
video_height = video_info["videoResource"]["normal"]["video_list"][
"video_2"
]["vheight"]
video_url_dict["video_url"] = video_url
video_url_dict["audio_url"] = audio_url
video_url_dict["video_width"] = video_width
video_url_dict["video_height"] = video_height
elif (
"video_list" in video_info["videoResource"]["normal"]
and "video_1" in video_info["videoResource"]["normal"]["video_list"]
):
video_url = video_info["videoResource"]["normal"]["video_list"]["video_1"][
"backup_url_1"
]
audio_url = video_info["videoResource"]["normal"]["video_list"]["video_1"][
"backup_url_1"
]
if len(video_url) % 3 == 1:
video_url += "=="
elif len(video_url) % 3 == 2:
video_url += "="
elif len(audio_url) % 3 == 1:
audio_url += "=="
elif len(audio_url) % 3 == 2:
audio_url += "="
video_url = base64.b64decode(video_url).decode("utf8")
audio_url = base64.b64decode(audio_url).decode("utf8")
video_width = video_info["videoResource"]["normal"]["video_list"][
"video_1"
]["vwidth"]
video_height = video_info["videoResource"]["normal"]["video_list"][
"video_1"
]["vheight"]
video_url_dict["video_url"] = video_url
video_url_dict["audio_url"] = audio_url
video_url_dict["video_width"] = video_width
video_url_dict["video_height"] = video_height
elif (
"dynamic_video" in video_info["videoResource"]["normal"]
and "dynamic_video_list"
in video_info["videoResource"]["normal"]["dynamic_video"]
and "dynamic_audio_list"
in video_info["videoResource"]["normal"]["dynamic_video"]
and len(
video_info["videoResource"]["normal"]["dynamic_video"][
"dynamic_video_list"
]
)
!= 0
and len(
video_info["videoResource"]["normal"]["dynamic_video"][
"dynamic_audio_list"
]
)
!= 0
):
video_url = video_info["videoResource"]["normal"]["dynamic_video"][
"dynamic_video_list"
][-1]["backup_url_1"]
audio_url = video_info["videoResource"]["normal"]["dynamic_video"][
"dynamic_audio_list"
][-1]["backup_url_1"]
if len(video_url) % 3 == 1:
video_url += "=="
elif len(video_url) % 3 == 2:
video_url += "="
elif len(audio_url) % 3 == 1:
audio_url += "=="
elif len(audio_url) % 3 == 2:
audio_url += "="
video_url = base64.b64decode(video_url).decode("utf8")
audio_url = base64.b64decode(audio_url).decode("utf8")
video_width = video_info["videoResource"]["normal"]["dynamic_video"][
"dynamic_video_list"
][-1]["vwidth"]
video_height = video_info["videoResource"]["normal"]["dynamic_video"][
"dynamic_video_list"
][-1]["vheight"]
video_url_dict["video_url"] = video_url
video_url_dict["audio_url"] = audio_url
video_url_dict["video_width"] = video_width
video_url_dict["video_height"] = video_height
else:
video_url_dict["video_url"] = ""
video_url_dict["audio_url"] = ""
video_url_dict["video_width"] = 0
video_url_dict["video_height"] = 0
else:
video_url_dict["video_url"] = ""
video_url_dict["audio_url"] = ""
video_url_dict["video_width"] = 0
video_url_dict["video_height"] = 0
return video_url_dict
def get_comment_cnt(item_id):
"""
获取视频的评论数量
"""
url = "https://www.ixigua.com/tlb/comment/article/v5/tab_comments/?"
params = {
"tab_index": "0",
"count": "10",
"offset": "10",
"group_id": str(item_id),
"item_id": str(item_id),
"aid": "1768",
"msToken": "50-JJObWB07HfHs-BMJWT1eIDX3G-6lPSF_i-QwxBIXE9VVa-iN0jbEXR5pG2DKjXBmP299n6ZTuXzY-GAy968CCvouSAYIS4GzvGQT3pNlKNejr5G4-1g==",
"X-Bogus": "DFSzswVOyGtANVeWtCLMqR/F6q9U",
"_signature": random_signature(),
}
headers = {
"authority": "www.ixigua.com",
"accept": "application/json, text/plain, */*",
"accept-language": "zh-CN,zh;q=0.9,en;q=0.8,en-GB;q=0.7,en-US;q=0.6",
"cache-control": "no-cache",
"cookie": "MONITOR_WEB_ID=67cb5099-a022-4ec3-bb8e-c4de6ba51dd0; passport_csrf_token=72b2574f3c99f8ba670e42df430218fd; passport_csrf_token_default=72b2574f3c99f8ba670e42df430218fd; sid_guard=c7472b508ea631823ba765a60cf8757f%7C1680867422%7C3024002%7CFri%2C+12-May-2023+11%3A37%3A04+GMT; uid_tt=c13f47d51767f616befe32fb3e9f485a; uid_tt_ss=c13f47d51767f616befe32fb3e9f485a; sid_tt=c7472b508ea631823ba765a60cf8757f; sessionid=c7472b508ea631823ba765a60cf8757f; sessionid_ss=c7472b508ea631823ba765a60cf8757f; sid_ucp_v1=1.0.0-KGUzNWYxNmRkZGJiZjgxY2MzZWNkMTEzMTkwYjY1Yjg5OTY5NzVlNmMKFQiu3d-eqQIQ3oDAoQYYGCAMOAhACxoCaGwiIGM3NDcyYjUwOGVhNjMxODIzYmE3NjVhNjBjZjg3NTdm; ssid_ucp_v1=1.0.0-KGUzNWYxNmRkZGJiZjgxY2MzZWNkMTEzMTkwYjY1Yjg5OTY5NzVlNmMKFQiu3d-eqQIQ3oDAoQYYGCAMOAhACxoCaGwiIGM3NDcyYjUwOGVhNjMxODIzYmE3NjVhNjBjZjg3NTdm; odin_tt=b893608d4dde2e1e8df8cd5d97a0e2fbeafc4ca762ac72ebef6e6c97e2ed19859bb01d46b4190ddd6dd17d7f9678e1de; SEARCH_CARD_MODE=7168304743566296612_0; support_webp=true; support_avif=false; csrf_session_id=a5355d954d3c63ed1ba35faada452b4d; tt_scid=7Pux7s634-z8DYvCM20y7KigwH5u7Rh6D9C-RROpnT.aGMEcz6Vsxp.oai47wJqa4f86; ttwid=1%7CHHtv2QqpSGuSu8r-zXF1QoWsvjmNi1SJrqOrZzg-UCY%7C1683858689%7Ca5223fe1500578e01e138a0d71d6444692018296c4c24f5885af174a65873c95; ixigua-a-s=3; msToken=50-JJObWB07HfHs-BMJWT1eIDX3G-6lPSF_i-QwxBIXE9VVa-iN0jbEXR5pG2DKjXBmP299n6ZTuXzY-GAy968CCvouSAYIS4GzvGQT3pNlKNejr5G4-1g==; __ac_nonce=0645dcbf0005064517440; __ac_signature=_02B4Z6wo00f01FEGmAwAAIDBKchzCGqn-MBRJpyAAHAjieFC5GEg6gGiwz.I4PRrJl7f0GcixFrExKmgt6QI1i1S-dQyofPEj2ugWTCnmKUdJQv-wYuDofeKNe8VtMtZq2aKewyUGeKU-5Ud21; ixigua-a-s=3",
"pragma": "no-cache",
"referer": f"https://www.ixigua.com/{item_id}?logTag=3c5aa86a8600b9ab8540",
"sec-ch-ua": '"Microsoft Edge";v="113", "Chromium";v="113", "Not-A.Brand";v="24"',
"sec-ch-ua-mobile": "?0",
"sec-ch-ua-platform": '"macOS"',
"sec-fetch-dest": "empty",
"sec-fetch-mode": "cors",
"sec-fetch-site": "same-origin",
"tt-anti-token": "cBITBHvmYjEygzv-f9c78c1297722cf1f559c74b084e4525ce4900bdcf9e8588f20cc7c2e3234422",
"user-agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/113.0.0.0 Safari/537.36 Edg/113.0.1774.35",
"x-secsdk-csrf-token": "000100000001f8e733cf37f0cd255a51aea9a81ff7bc0c09490cfe41ad827c3c5c18ec809279175e4d9f5553d8a5",
}
response = requests.get(
url=url, headers=headers, params=params, proxies=tunnel_proxies(), timeout=5
)
response.close()
if (
response.status_code != 200
or "total_number" not in response.json()
or response.json() == {}
):
return 0
return response.json().get("total_number", 0)
class XiGuaAuthor:
"""
西瓜账号爬虫
"""
def __init__(self, platform, mode, rule_dict, env, user_list):
self.platform = platform
self.mode = mode
self.rule_dict = rule_dict
self.env = env
self.user_list = user_list
self.mq = MQ(topic_name="topic_crawler_etl_" + self.env)
self.download_count = 0
self.limiter = AuthorLimit(platform=self.platform, mode=self.mode)
def rule_maker(self, account):
"""
通过不同的账号生成不同的规则
:param account: 输入的账号信息
{'play_cnt': {'min': 100000, 'max': 0}, 'period': {'min': 5, 'max': 5}}
"""
temp = account['link'].split("?")[0].split("_")
if len(temp) == 1:
return self.rule_dict
else:
flag = temp[-2]
match flag:
case "V1":
rule_dict = {
"play_cnt": {"min": 100000, "max": 0},
'period': {"min": 90, "max": 90},
'special': 0.02
}
return rule_dict
case "V2":
rule_dict = {
"play_cnt": {"min": 10000, "max": 0},
'period': {"min": 90, "max": 90},
'special': 0.01
}
return rule_dict
case "V3":
rule_dict = {
"play_cnt": {"min": 5000, "max": 0},
'period': {"min": 90, "max": 90},
'special': 0.01
}
return rule_dict
def get_author_list(self):
"""
每轮只抓取定量的数据,到达数量后自己退出
获取账号列表以及账号信息
"""
# max_count = int(self.rule_dict.get("videos_cnt", {}).get("min", 300))
for user_dict in self.user_list:
# if self.download_count <= max_count:
try:
flag = user_dict["link"][0]
match flag:
case "V":
self.get_video_list(user_dict)
case "X":
self.get_tiny_video_list(user_dict)
case "h":
self.get_video_list(user_dict)
case "D":
self.get_video_list(user_dict)
case "B":
self.get_video_list(user_dict)
self.get_tiny_video_list(user_dict)
except Exception as e:
AliyunLogger.logging(
code="3001",
account=user_dict["uid"],
platform=self.platform,
mode=self.mode,
env=self.env,
message="扫描账号时出现bug, 报错是 {}".format(e)
)
# time.sleep(random.randint(1, 15))
# else:
# AliyunLogger.logging(
# code="2000",
# platform=self.platform,
# mode=self.mode,
# env=self.env,
# message="本轮已经抓取足够数量的视频,已经自动退出",
# )
# return
def get_video_list(self, user_dict):
"""
获取某个账号的视频列表
账号分为 3 类
"""
offset = 0
signature = random_signature()
link = user_dict['link'].split("?")[0].split("_")[-1]
url = "https://www.ixigua.com/api/videov2/author/new_video_list?"
while True:
to_user_id = str(link.replace("https://www.ixigua.com/home/", ""))
params = {
"to_user_id": to_user_id,
"offset": str(offset),
"limit": "30",
"maxBehotTime": "0",
"order": "new",
"isHome": "0",
"_signature": signature,
}
headers = {
"referer": f'https://www.ixigua.com/home/{link.replace("https://www.ixigua.com/home/", "")}/video/?preActiveKey=hotsoon&list_entrance=userdetail',
"user-agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/110.0.0.0 Safari/537.36 Edg/110.0.1587.41",
}
response = requests.get(
url=url,
headers=headers,
params=params,
proxies=tunnel_proxies(),
timeout=5,
)
offset += 30
if "data" not in response.text or response.status_code != 200:
AliyunLogger.logging(
code="3000",
platform=self.platform,
mode=self.mode,
env=self.env,
message=f"get_videoList:{response.text}\n",
)
return
elif not response.json()["data"]["videoList"]:
AliyunLogger.logging(
account=link,
code="3000",
platform=self.platform,
mode=self.mode,
env=self.env,
data=response.json(),
message=f"没有更多数据啦~\n",
)
return
else:
feeds = response.json()["data"]["videoList"]
for video_obj in feeds:
try:
AliyunLogger.logging(
code="1001",
account=user_dict['uid'],
platform=self.platform,
mode=self.mode,
env=self.env,
data=video_obj,
message="扫描到一条视频",
)
date_flag = self.process_video_obj(video_obj, user_dict, "l")
if not date_flag:
return
except Exception as e:
AliyunLogger.logging(
code="3000",
platform=self.platform,
mode=self.mode,
env=self.env,
data=video_obj,
message="抓取单条视频异常, 报错原因是: {}".format(e),
)
def get_tiny_video_list(self, user_dict):
"""
获取小视频
"""
url = "https://www.ixigua.com/api/videov2/hotsoon/video"
max_behot_time = "0"
link = user_dict['link'].split("?")[0].split("_")[-1]
to_user_id = str(link.replace("https://www.ixigua.com/home/", ""))
while True:
params = {
"to_user_id": to_user_id,
"max_behot_time": max_behot_time,
"_signature": random_signature()
}
headers = {
"referer": "https://www.ixigua.com/{}?&".format(to_user_id),
"user-agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/110.0.0.0 Safari/537.36 Edg/110.0.1587.41",
}
response = requests.get(
url=url,
headers=headers,
params=params,
proxies=tunnel_proxies(),
timeout=5,
)
if "data" not in response.text or response.status_code != 200:
AliyunLogger.logging(
code="2000",
platform=self.platform,
mode=self.mode,
env=self.env,
message=f"get_videoList:{response.text}\n",
)
return
elif not response.json()["data"]["data"]:
AliyunLogger.logging(
account=link,
code="2000",
platform=self.platform,
mode=self.mode,
env=self.env,
data=response.json(),
message=f"没有更多数据啦~\n",
)
return
else:
video_list = response.json()['data']['data']
max_behot_time = video_list[-1]["max_behot_time"]
for video_obj in video_list:
try:
AliyunLogger.logging(
code="1001",
account=user_dict['uid'],
platform=self.platform,
mode=self.mode,
env=self.env,
data=video_obj,
message="扫描到一条小视频",
)
date_flag = self.process_video_obj(video_obj, user_dict, "s")
if not date_flag:
return
except Exception as e:
AliyunLogger.logging(
code="3000",
platform=self.platform,
mode=self.mode,
env=self.env,
data=video_obj,
message="抓取单条视频异常, 报错原因是: {}".format(e),
)
def process_video_obj(self, video_obj, user_dict, f):
"""
process video_obj and extract video_url
"""
new_rule = self.rule_maker(user_dict)
trace_id = self.platform + str(uuid.uuid1())
if f == "s":
item_id = video_obj.get("id_str", "")
else:
item_id = video_obj.get("item_id", "")
if not item_id:
AliyunLogger.logging(
code="2005",
account=user_dict['uid'],
platform=self.platform,
mode=self.mode,
env=self.env,
message="无效视频",
data=video_obj,
trace_id=trace_id,
)
return
# 获取视频信息
video_dict = self.get_video_info(item_id=item_id)
video_dict["platform"] = self.platform
video_dict["strategy"] = self.mode
video_dict["out_video_id"] = video_dict["video_id"]
video_dict["width"] = video_dict["video_width"]
video_dict["height"] = video_dict["video_height"]
video_dict["crawler_rule"] = json.dumps(new_rule)
video_dict["user_id"] = user_dict["uid"]
video_dict["publish_time"] = video_dict["publish_time_str"]
video_dict["strategy_type"] = self.mode
video_dict["update_time_stamp"] = int(time.time())
if int(time.time()) - video_dict['publish_time_stamp'] > 3600 * 24 * int(
new_rule.get("period", {}).get("max", 1000)):
if not video_obj['is_top']:
"""
非置顶数据发布时间超过才退出
"""
AliyunLogger.logging(
code="2004",
account=user_dict['uid'],
platform=self.platform,
mode=self.mode,
env=self.env,
data=video_dict,
message="发布时间超过{}天".format(
int(new_rule.get("period", {}).get("max", 1000))
),
)
return False
pipeline = PiaoQuanPipeline(
platform=self.platform,
mode=self.mode,
rule_dict=new_rule,
env=self.env,
item=video_dict,
trace_id=trace_id,
)
limit_flag = self.limiter.author_limitation(user_id=video_dict['user_id'])
if limit_flag:
title_flag = pipeline.title_flag()
repeat_flag = pipeline.repeat_video()
if title_flag and repeat_flag:
if new_rule.get("special"):
if int(video_dict['play_cnt']) >= int(new_rule.get("play_cnt", {}).get("min", 100000)):
if float(video_dict['like_cnt']) / float(video_dict['play_cnt']) >= new_rule['special']:
self.mq.send_msg(video_dict)
self.download_count += 1
AliyunLogger.logging(
code="1002",
account=user_dict['uid'],
platform=self.platform,
mode=self.mode,
env=self.env,
data=video_dict,
trace_id=trace_id,
message="成功发送 MQ 至 ETL",
)
return True
else:
AliyunLogger.logging(
code="2008",
account=user_dict['uid'],
platform=self.platform,
mode=self.mode,
env=self.env,
message="不满足特殊规则, 点赞量/播放量",
data=video_dict
)
else:
if int(video_dict['play_cnt']) >= int(new_rule.get("play_cnt", {}).get("min", 100000)):
self.mq.send_msg(video_dict)
self.download_count += 1
AliyunLogger.logging(
code="1002",
account=user_dict['uid'],
platform=self.platform,
mode=self.mode,
env=self.env,
data=video_dict,
trace_id=trace_id,
message="成功发送 MQ 至 ETL",
)
return True
else:
AliyunLogger.logging(
code="2008",
account=user_dict['uid'],
platform=self.platform,
mode=self.mode,
env=self.env,
message="不满足特殊规则, 播放量",
data=video_dict
)
return True
def get_video_info(self, item_id):
"""
获取视频信息
"""
url = "https://www.ixigua.com/{}".format(item_id)
headers = {
"accept-encoding": "gzip, deflate",
"accept-language": "zh-CN,zh-Hans;q=0.9",
"cookie": "ttwid={}".format(byte_dance_cookie(item_id)),
"user-agent": FakeUserAgent().random,
"referer": "https://www.ixigua.com/{}/".format(item_id),
}
response = requests.get(
url=url,
headers=headers,
proxies=tunnel_proxies(),
timeout=5,
)
video_info = extract_info_by_re(response.text)
video_dict = {
"video_title": video_info.get("title", ""),
"video_id": video_info.get("video_id"),
"gid": str(item_id),
"play_cnt": int(video_info.get("play_count", 0)),
"like_cnt": int(video_info.get("like_count", 0)),
"comment_cnt": 0,
"share_cnt": 0,
"favorite_cnt": 0,
"duration": int(video_info.get("duration", 0)),
"video_width": 0,
"video_height": 0,
"publish_time_stamp": int(video_info.get("publish_time", 0)),
"publish_time_str": time.strftime(
"%Y-%m-%d %H:%M:%S",
time.localtime(int(video_info.get("publish_time", 0))),
),
"avatar_url": str(
video_info.get("user_info", {}).get("avatar_url", "")
),
"cover_url": video_info.get("cover_url", "").replace("\\u002F", "/"),
"video_url": video_info.get("url"),
"session": f"xigua-author-{int(time.time())}",
}
return video_dict
if __name__ == "__main__":
user_list = [
{
"uid": 6267140,
"source": "xigua",
"link": "https://www.ixigua.com/home/2779177225827568",
"nick_name": "秋晴爱音乐",
"avatar_url": "",
"mode": "author",
},
{
"uid": 6267140,
"source": "xigua",
"link": "https://www.ixigua.com/home/2885546124776780",
"nick_name": "朗诵放歌的老山羊",
"avatar_url": "",
"mode": "author",
},
{
"uid": 6267140,
"source": "xigua",
"link": "https://www.ixigua.com/home/5880938217",
"nick_name": "天原声疗",
"avatar_url": "",
"mode": "author",
},
]
rule = {'period': {'min': 30, 'max': 30}, 'duration': {'min': 20, 'max': 0}, 'play_cnt': {'min': 100000, 'max': 0}}
XGA = XiGuaAuthor(
platform="xigua",
mode="author",
rule_dict=rule,
env="prod",
user_list=user_list
)
XGA.get_author_list()