import json import os import re import random import sys import string import time import uuid import base64 import requests from lxml import etree from Crypto.Cipher import AES from Crypto.Util.Padding import unpad from fake_useragent import FakeUserAgent from common.mq import MQ sys.path.append(os.getcwd()) from common import AliyunLogger, PiaoQuanPipeline, tunnel_proxies from common.limit import AuthorLimit def aes_decrypt(data: str, key: str) -> str: """ XiGua AES decrypt :param data: :param key: :return: """ password = key.encode() iv = password[:16] try: ct = base64.b64decode(data.encode()) cipher = AES.new(password, AES.MODE_CBC, iv) pt = unpad(cipher.decrypt(ct), AES.block_size) return base64.b64decode(pt).decode() except Exception as e: print("Incorrect decryption {}".format(e)) return None def extract_video_url(text): """ 获取视频 video_url :param text: :return: """ HTML = etree.HTML(text) str_2 = HTML.xpath('//script[@id="SSR_HYDRATED_DATA"]/text()')[0] json_2 = str_2[str_2.find('{'):str_2.rfind('}') + 1] Irregulars = ['null', 'undefined', '=false', '=true', 'false', 'true'] # python中不规则的定义 for I in Irregulars: if I in ['=false', '=true']: json_2 = json_2.replace(I, '=' + I[1:].capitalize()) else: json_2 = json_2.replace(I, '12') dict_2 = json.loads(json_2)["anyVideo"]["gidInformation"]["packerData"]["video"]["videoResource"]["dash"] ptk = dict_2["ptk"] video_url = dict_2['dynamic_video']['main_url'] real_video_url = aes_decrypt(data=video_url, key=ptk) return real_video_url def extract_info_by_re(text): """ 通过正则表达式获取文本中的信息 :param text: :return: """ # 标题 title_match = re.search(r']*>(.*?)', text) if title_match: title_content = title_match.group(1) title_content = title_content.split(" - ")[0] title_content = bytes(title_content, "latin1").decode() else: title_content = "" # video_id video_id = re.search(r'"vid":"(.*?)"', text).group(1) # like_count like_count = re.search(r'"video_like_count":(.*?),', text).group(1) # cover_url cover_url = re.search(r'"avatar_url":"(.*?)"', text).group(1) # video_play video_watch_count = re.search(r'"video_watch_count":(.*?),', text).group(1) # "video_publish_time" publish_time = re.search(r'"video_publish_time":"(.*?)"', text).group(1) # video_duration duration = re.search(r'("video_duration":)(.*?)"', text).group(2).replace(",", "") return { "title": title_content, "url": extract_video_url(text), "video_id": video_id, "like_count": like_count, "cover_url": cover_url, "play_count": video_watch_count, "publish_time": publish_time, "duration": duration } def random_signature(): """ 随机生成签名 """ src_digits = string.digits # string_数字 src_uppercase = string.ascii_uppercase # string_大写字母 src_lowercase = string.ascii_lowercase # string_小写字母 digits_num = random.randint(1, 6) uppercase_num = random.randint(1, 26 - digits_num - 1) lowercase_num = 26 - (digits_num + uppercase_num) password = ( random.sample(src_digits, digits_num) + random.sample(src_uppercase, uppercase_num) + random.sample(src_lowercase, lowercase_num) ) random.shuffle(password) new_password = "AAAAAAAAAA" + "".join(password)[10:-4] + "AAAB" new_password_start = new_password[0:18] new_password_end = new_password[-7:] if new_password[18] == "8": new_password = new_password_start + "w" + new_password_end elif new_password[18] == "9": new_password = new_password_start + "x" + new_password_end elif new_password[18] == "-": new_password = new_password_start + "y" + new_password_end elif new_password[18] == ".": new_password = new_password_start + "z" + new_password_end else: new_password = new_password_start + "y" + new_password_end return new_password def byte_dance_cookie(item_id): """ 获取西瓜视频的 cookie :param item_id: """ sess = requests.Session() sess.headers.update({ 'user-agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 11_1_0) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/87.0.4280.88 Safari/537.36', 'referer': 'https://www.ixigua.com/home/{}/'.format(item_id), }) # 获取 cookies sess.get('https://i.snssdk.com/slardar/sdk.js?bid=xigua_video_web_pc') data = '{"region":"cn","aid":1768,"needFid":false,"service":"www.ixigua.com","migrate_info":{"ticket":"","source":"node"},"cbUrlProtocol":"https","union":true}' r = sess.post('https://ttwid.bytedance.com/ttwid/union/register/', data=data) # print(r.text) return r.cookies.values()[0] def get_video_url(video_info): """ 获取视频的链接 """ video_url_dict = {} # video_url if "videoResource" not in video_info: video_url_dict["video_url"] = "" video_url_dict["audio_url"] = "" video_url_dict["video_width"] = 0 video_url_dict["video_height"] = 0 elif "dash_120fps" in video_info["videoResource"]: if ( "video_list" in video_info["videoResource"]["dash_120fps"] and "video_4" in video_info["videoResource"]["dash_120fps"]["video_list"] ): video_url = video_info["videoResource"]["dash_120fps"]["video_list"][ "video_4" ]["backup_url_1"] audio_url = video_info["videoResource"]["dash_120fps"]["video_list"][ "video_4" ]["backup_url_1"] if len(video_url) % 3 == 1: video_url += "==" elif len(video_url) % 3 == 2: video_url += "=" elif len(audio_url) % 3 == 1: audio_url += "==" elif len(audio_url) % 3 == 2: audio_url += "=" video_url = base64.b64decode(video_url).decode("utf8") audio_url = base64.b64decode(audio_url).decode("utf8") video_width = video_info["videoResource"]["dash_120fps"]["video_list"][ "video_4" ]["vwidth"] video_height = video_info["videoResource"]["dash_120fps"]["video_list"][ "video_4" ]["vheight"] video_url_dict["video_url"] = video_url video_url_dict["audio_url"] = audio_url video_url_dict["video_width"] = video_width video_url_dict["video_height"] = video_height elif ( "video_list" in video_info["videoResource"]["dash_120fps"] and "video_3" in video_info["videoResource"]["dash_120fps"]["video_list"] ): video_url = video_info["videoResource"]["dash_120fps"]["video_list"][ "video_3" ]["backup_url_1"] audio_url = video_info["videoResource"]["dash_120fps"]["video_list"][ "video_3" ]["backup_url_1"] if len(video_url) % 3 == 1: video_url += "==" elif len(video_url) % 3 == 2: video_url += "=" elif len(audio_url) % 3 == 1: audio_url += "==" elif len(audio_url) % 3 == 2: audio_url += "=" video_url = base64.b64decode(video_url).decode("utf8") audio_url = base64.b64decode(audio_url).decode("utf8") video_width = video_info["videoResource"]["dash_120fps"]["video_list"][ "video_3" ]["vwidth"] video_height = video_info["videoResource"]["dash_120fps"]["video_list"][ "video_3" ]["vheight"] video_url_dict["video_url"] = video_url video_url_dict["audio_url"] = audio_url video_url_dict["video_width"] = video_width video_url_dict["video_height"] = video_height elif ( "video_list" in video_info["videoResource"]["dash_120fps"] and "video_2" in video_info["videoResource"]["dash_120fps"]["video_list"] ): video_url = video_info["videoResource"]["dash_120fps"]["video_list"][ "video_2" ]["backup_url_1"] audio_url = video_info["videoResource"]["dash_120fps"]["video_list"][ "video_2" ]["backup_url_1"] if len(video_url) % 3 == 1: video_url += "==" elif len(video_url) % 3 == 2: video_url += "=" elif len(audio_url) % 3 == 1: audio_url += "==" elif len(audio_url) % 3 == 2: audio_url += "=" video_url = base64.b64decode(video_url).decode("utf8") audio_url = base64.b64decode(audio_url).decode("utf8") video_width = video_info["videoResource"]["dash_120fps"]["video_list"][ "video_2" ]["vwidth"] video_height = video_info["videoResource"]["dash_120fps"]["video_list"][ "video_2" ]["vheight"] video_url_dict["video_url"] = video_url video_url_dict["audio_url"] = audio_url video_url_dict["video_width"] = video_width video_url_dict["video_height"] = video_height elif ( "video_list" in video_info["videoResource"]["dash_120fps"] and "video_1" in video_info["videoResource"]["dash_120fps"]["video_list"] ): video_url = video_info["videoResource"]["dash_120fps"]["video_list"][ "video_1" ]["backup_url_1"] audio_url = video_info["videoResource"]["dash_120fps"]["video_list"][ "video_1" ]["backup_url_1"] if len(video_url) % 3 == 1: video_url += "==" elif len(video_url) % 3 == 2: video_url += "=" elif len(audio_url) % 3 == 1: audio_url += "==" elif len(audio_url) % 3 == 2: audio_url += "=" video_url = base64.b64decode(video_url).decode("utf8") audio_url = base64.b64decode(audio_url).decode("utf8") video_width = video_info["videoResource"]["dash_120fps"]["video_list"][ "video_1" ]["vwidth"] video_height = video_info["videoResource"]["dash_120fps"]["video_list"][ "video_1" ]["vheight"] video_url_dict["video_url"] = video_url video_url_dict["audio_url"] = audio_url video_url_dict["video_width"] = video_width video_url_dict["video_height"] = video_height elif ( "dynamic_video" in video_info["videoResource"]["dash_120fps"] and "dynamic_video_list" in video_info["videoResource"]["dash_120fps"]["dynamic_video"] and "dynamic_audio_list" in video_info["videoResource"]["dash_120fps"]["dynamic_video"] and len( video_info["videoResource"]["dash_120fps"]["dynamic_video"][ "dynamic_video_list" ] ) != 0 and len( video_info["videoResource"]["dash_120fps"]["dynamic_video"][ "dynamic_audio_list" ] ) != 0 ): video_url = video_info["videoResource"]["dash_120fps"]["dynamic_video"][ "dynamic_video_list" ][-1]["backup_url_1"] audio_url = video_info["videoResource"]["dash_120fps"]["dynamic_video"][ "dynamic_audio_list" ][-1]["backup_url_1"] if len(video_url) % 3 == 1: video_url += "==" elif len(video_url) % 3 == 2: video_url += "=" elif len(audio_url) % 3 == 1: audio_url += "==" elif len(audio_url) % 3 == 2: audio_url += "=" video_url = base64.b64decode(video_url).decode("utf8") audio_url = base64.b64decode(audio_url).decode("utf8") video_width = video_info["videoResource"]["dash_120fps"]["dynamic_video"][ "dynamic_video_list" ][-1]["vwidth"] video_height = video_info["videoResource"]["dash_120fps"]["dynamic_video"][ "dynamic_video_list" ][-1]["vheight"] video_url_dict["video_url"] = video_url video_url_dict["audio_url"] = audio_url video_url_dict["video_width"] = video_width video_url_dict["video_height"] = video_height else: video_url_dict["video_url"] = "" video_url_dict["audio_url"] = "" video_url_dict["video_width"] = 0 video_url_dict["video_height"] = 0 elif "dash" in video_info["videoResource"]: if ( "video_list" in video_info["videoResource"]["dash"] and "video_4" in video_info["videoResource"]["dash"]["video_list"] ): video_url = video_info["videoResource"]["dash"]["video_list"]["video_4"][ "backup_url_1" ] audio_url = video_info["videoResource"]["dash"]["video_list"]["video_4"][ "backup_url_1" ] if len(video_url) % 3 == 1: video_url += "==" elif len(video_url) % 3 == 2: video_url += "=" elif len(audio_url) % 3 == 1: audio_url += "==" elif len(audio_url) % 3 == 2: audio_url += "=" video_url = base64.b64decode(video_url).decode("utf8") audio_url = base64.b64decode(audio_url).decode("utf8") video_width = video_info["videoResource"]["dash"]["video_list"]["video_4"][ "vwidth" ] video_height = video_info["videoResource"]["dash"]["video_list"]["video_4"][ "vheight" ] video_url_dict["video_url"] = video_url video_url_dict["audio_url"] = audio_url video_url_dict["video_width"] = video_width video_url_dict["video_height"] = video_height elif ( "video_list" in video_info["videoResource"]["dash"] and "video_3" in video_info["videoResource"]["dash"]["video_list"] ): video_url = video_info["videoResource"]["dash"]["video_list"]["video_3"][ "backup_url_1" ] audio_url = video_info["videoResource"]["dash"]["video_list"]["video_3"][ "backup_url_1" ] if len(video_url) % 3 == 1: video_url += "==" elif len(video_url) % 3 == 2: video_url += "=" elif len(audio_url) % 3 == 1: audio_url += "==" elif len(audio_url) % 3 == 2: audio_url += "=" video_url = base64.b64decode(video_url).decode("utf8") audio_url = base64.b64decode(audio_url).decode("utf8") video_width = video_info["videoResource"]["dash"]["video_list"]["video_3"][ "vwidth" ] video_height = video_info["videoResource"]["dash"]["video_list"]["video_3"][ "vheight" ] video_url_dict["video_url"] = video_url video_url_dict["audio_url"] = audio_url video_url_dict["video_width"] = video_width video_url_dict["video_height"] = video_height elif ( "video_list" in video_info["videoResource"]["dash"] and "video_2" in video_info["videoResource"]["dash"]["video_list"] ): video_url = video_info["videoResource"]["dash"]["video_list"]["video_2"][ "backup_url_1" ] audio_url = video_info["videoResource"]["dash"]["video_list"]["video_2"][ "backup_url_1" ] if len(video_url) % 3 == 1: video_url += "==" elif len(video_url) % 3 == 2: video_url += "=" elif len(audio_url) % 3 == 1: audio_url += "==" elif len(audio_url) % 3 == 2: audio_url += "=" video_url = base64.b64decode(video_url).decode("utf8") audio_url = base64.b64decode(audio_url).decode("utf8") video_width = video_info["videoResource"]["dash"]["video_list"]["video_2"][ "vwidth" ] video_height = video_info["videoResource"]["dash"]["video_list"]["video_2"][ "vheight" ] video_url_dict["video_url"] = video_url video_url_dict["audio_url"] = audio_url video_url_dict["video_width"] = video_width video_url_dict["video_height"] = video_height elif ( "video_list" in video_info["videoResource"]["dash"] and "video_1" in video_info["videoResource"]["dash"]["video_list"] ): video_url = video_info["videoResource"]["dash"]["video_list"]["video_1"][ "backup_url_1" ] audio_url = video_info["videoResource"]["dash"]["video_list"]["video_1"][ "backup_url_1" ] if len(video_url) % 3 == 1: video_url += "==" elif len(video_url) % 3 == 2: video_url += "=" elif len(audio_url) % 3 == 1: audio_url += "==" elif len(audio_url) % 3 == 2: audio_url += "=" video_url = base64.b64decode(video_url).decode("utf8") audio_url = base64.b64decode(audio_url).decode("utf8") video_width = video_info["videoResource"]["dash"]["video_list"]["video_1"][ "vwidth" ] video_height = video_info["videoResource"]["dash"]["video_list"]["video_1"][ "vheight" ] video_url_dict["video_url"] = video_url video_url_dict["audio_url"] = audio_url video_url_dict["video_width"] = video_width video_url_dict["video_height"] = video_height elif ( "dynamic_video" in video_info["videoResource"]["dash"] and "dynamic_video_list" in video_info["videoResource"]["dash"]["dynamic_video"] and "dynamic_audio_list" in video_info["videoResource"]["dash"]["dynamic_video"] and len( video_info["videoResource"]["dash"]["dynamic_video"][ "dynamic_video_list" ] ) != 0 and len( video_info["videoResource"]["dash"]["dynamic_video"][ "dynamic_audio_list" ] ) != 0 ): video_url = video_info["videoResource"]["dash"]["dynamic_video"][ "dynamic_video_list" ][-1]["backup_url_1"] audio_url = video_info["videoResource"]["dash"]["dynamic_video"][ "dynamic_audio_list" ][-1]["backup_url_1"] if len(video_url) % 3 == 1: video_url += "==" elif len(video_url) % 3 == 2: video_url += "=" elif len(audio_url) % 3 == 1: audio_url += "==" elif len(audio_url) % 3 == 2: audio_url += "=" video_url = base64.b64decode(video_url).decode("utf8") audio_url = base64.b64decode(audio_url).decode("utf8") video_width = video_info["videoResource"]["dash"]["dynamic_video"][ "dynamic_video_list" ][-1]["vwidth"] video_height = video_info["videoResource"]["dash"]["dynamic_video"][ "dynamic_video_list" ][-1]["vheight"] video_url_dict["video_url"] = video_url video_url_dict["audio_url"] = audio_url video_url_dict["video_width"] = video_width video_url_dict["video_height"] = video_height else: video_url_dict["video_url"] = "" video_url_dict["audio_url"] = "" video_url_dict["video_width"] = 0 video_url_dict["video_height"] = 0 elif "normal" in video_info["videoResource"]: if ( "video_list" in video_info["videoResource"]["normal"] and "video_4" in video_info["videoResource"]["normal"]["video_list"] ): video_url = video_info["videoResource"]["normal"]["video_list"]["video_4"][ "backup_url_1" ] audio_url = video_info["videoResource"]["normal"]["video_list"]["video_4"][ "backup_url_1" ] if len(video_url) % 3 == 1: video_url += "==" elif len(video_url) % 3 == 2: video_url += "=" elif len(audio_url) % 3 == 1: audio_url += "==" elif len(audio_url) % 3 == 2: audio_url += "=" video_url = base64.b64decode(video_url).decode("utf8") audio_url = base64.b64decode(audio_url).decode("utf8") video_width = video_info["videoResource"]["normal"]["video_list"][ "video_4" ]["vwidth"] video_height = video_info["videoResource"]["normal"]["video_list"][ "video_4" ]["vheight"] video_url_dict["video_url"] = video_url video_url_dict["audio_url"] = audio_url video_url_dict["video_width"] = video_width video_url_dict["video_height"] = video_height elif ( "video_list" in video_info["videoResource"]["normal"] and "video_3" in video_info["videoResource"]["normal"]["video_list"] ): video_url = video_info["videoResource"]["normal"]["video_list"]["video_3"][ "backup_url_1" ] audio_url = video_info["videoResource"]["normal"]["video_list"]["video_3"][ "backup_url_1" ] if len(video_url) % 3 == 1: video_url += "==" elif len(video_url) % 3 == 2: video_url += "=" elif len(audio_url) % 3 == 1: audio_url += "==" elif len(audio_url) % 3 == 2: audio_url += "=" video_url = base64.b64decode(video_url).decode("utf8") audio_url = base64.b64decode(audio_url).decode("utf8") video_width = video_info["videoResource"]["normal"]["video_list"][ "video_3" ]["vwidth"] video_height = video_info["videoResource"]["normal"]["video_list"][ "video_3" ]["vheight"] video_url_dict["video_url"] = video_url video_url_dict["audio_url"] = audio_url video_url_dict["video_width"] = video_width video_url_dict["video_height"] = video_height elif ( "video_list" in video_info["videoResource"]["normal"] and "video_2" in video_info["videoResource"]["normal"]["video_list"] ): video_url = video_info["videoResource"]["normal"]["video_list"]["video_2"][ "backup_url_1" ] audio_url = video_info["videoResource"]["normal"]["video_list"]["video_2"][ "backup_url_1" ] if len(video_url) % 3 == 1: video_url += "==" elif len(video_url) % 3 == 2: video_url += "=" elif len(audio_url) % 3 == 1: audio_url += "==" elif len(audio_url) % 3 == 2: audio_url += "=" video_url = base64.b64decode(video_url).decode("utf8") audio_url = base64.b64decode(audio_url).decode("utf8") video_width = video_info["videoResource"]["normal"]["video_list"][ "video_2" ]["vwidth"] video_height = video_info["videoResource"]["normal"]["video_list"][ "video_2" ]["vheight"] video_url_dict["video_url"] = video_url video_url_dict["audio_url"] = audio_url video_url_dict["video_width"] = video_width video_url_dict["video_height"] = video_height elif ( "video_list" in video_info["videoResource"]["normal"] and "video_1" in video_info["videoResource"]["normal"]["video_list"] ): video_url = video_info["videoResource"]["normal"]["video_list"]["video_1"][ "backup_url_1" ] audio_url = video_info["videoResource"]["normal"]["video_list"]["video_1"][ "backup_url_1" ] if len(video_url) % 3 == 1: video_url += "==" elif len(video_url) % 3 == 2: video_url += "=" elif len(audio_url) % 3 == 1: audio_url += "==" elif len(audio_url) % 3 == 2: audio_url += "=" video_url = base64.b64decode(video_url).decode("utf8") audio_url = base64.b64decode(audio_url).decode("utf8") video_width = video_info["videoResource"]["normal"]["video_list"][ "video_1" ]["vwidth"] video_height = video_info["videoResource"]["normal"]["video_list"][ "video_1" ]["vheight"] video_url_dict["video_url"] = video_url video_url_dict["audio_url"] = audio_url video_url_dict["video_width"] = video_width video_url_dict["video_height"] = video_height elif ( "dynamic_video" in video_info["videoResource"]["normal"] and "dynamic_video_list" in video_info["videoResource"]["normal"]["dynamic_video"] and "dynamic_audio_list" in video_info["videoResource"]["normal"]["dynamic_video"] and len( video_info["videoResource"]["normal"]["dynamic_video"][ "dynamic_video_list" ] ) != 0 and len( video_info["videoResource"]["normal"]["dynamic_video"][ "dynamic_audio_list" ] ) != 0 ): video_url = video_info["videoResource"]["normal"]["dynamic_video"][ "dynamic_video_list" ][-1]["backup_url_1"] audio_url = video_info["videoResource"]["normal"]["dynamic_video"][ "dynamic_audio_list" ][-1]["backup_url_1"] if len(video_url) % 3 == 1: video_url += "==" elif len(video_url) % 3 == 2: video_url += "=" elif len(audio_url) % 3 == 1: audio_url += "==" elif len(audio_url) % 3 == 2: audio_url += "=" video_url = base64.b64decode(video_url).decode("utf8") audio_url = base64.b64decode(audio_url).decode("utf8") video_width = video_info["videoResource"]["normal"]["dynamic_video"][ "dynamic_video_list" ][-1]["vwidth"] video_height = video_info["videoResource"]["normal"]["dynamic_video"][ "dynamic_video_list" ][-1]["vheight"] video_url_dict["video_url"] = video_url video_url_dict["audio_url"] = audio_url video_url_dict["video_width"] = video_width video_url_dict["video_height"] = video_height else: video_url_dict["video_url"] = "" video_url_dict["audio_url"] = "" video_url_dict["video_width"] = 0 video_url_dict["video_height"] = 0 else: video_url_dict["video_url"] = "" video_url_dict["audio_url"] = "" video_url_dict["video_width"] = 0 video_url_dict["video_height"] = 0 return video_url_dict def get_comment_cnt(item_id): """ 获取视频的评论数量 """ url = "https://www.ixigua.com/tlb/comment/article/v5/tab_comments/?" params = { "tab_index": "0", "count": "10", "offset": "10", "group_id": str(item_id), "item_id": str(item_id), "aid": "1768", "msToken": "50-JJObWB07HfHs-BMJWT1eIDX3G-6lPSF_i-QwxBIXE9VVa-iN0jbEXR5pG2DKjXBmP299n6ZTuXzY-GAy968CCvouSAYIS4GzvGQT3pNlKNejr5G4-1g==", "X-Bogus": "DFSzswVOyGtANVeWtCLMqR/F6q9U", "_signature": random_signature(), } headers = { "authority": "www.ixigua.com", "accept": "application/json, text/plain, */*", "accept-language": "zh-CN,zh;q=0.9,en;q=0.8,en-GB;q=0.7,en-US;q=0.6", "cache-control": "no-cache", "cookie": "MONITOR_WEB_ID=67cb5099-a022-4ec3-bb8e-c4de6ba51dd0; passport_csrf_token=72b2574f3c99f8ba670e42df430218fd; passport_csrf_token_default=72b2574f3c99f8ba670e42df430218fd; sid_guard=c7472b508ea631823ba765a60cf8757f%7C1680867422%7C3024002%7CFri%2C+12-May-2023+11%3A37%3A04+GMT; uid_tt=c13f47d51767f616befe32fb3e9f485a; uid_tt_ss=c13f47d51767f616befe32fb3e9f485a; sid_tt=c7472b508ea631823ba765a60cf8757f; sessionid=c7472b508ea631823ba765a60cf8757f; sessionid_ss=c7472b508ea631823ba765a60cf8757f; sid_ucp_v1=1.0.0-KGUzNWYxNmRkZGJiZjgxY2MzZWNkMTEzMTkwYjY1Yjg5OTY5NzVlNmMKFQiu3d-eqQIQ3oDAoQYYGCAMOAhACxoCaGwiIGM3NDcyYjUwOGVhNjMxODIzYmE3NjVhNjBjZjg3NTdm; ssid_ucp_v1=1.0.0-KGUzNWYxNmRkZGJiZjgxY2MzZWNkMTEzMTkwYjY1Yjg5OTY5NzVlNmMKFQiu3d-eqQIQ3oDAoQYYGCAMOAhACxoCaGwiIGM3NDcyYjUwOGVhNjMxODIzYmE3NjVhNjBjZjg3NTdm; odin_tt=b893608d4dde2e1e8df8cd5d97a0e2fbeafc4ca762ac72ebef6e6c97e2ed19859bb01d46b4190ddd6dd17d7f9678e1de; SEARCH_CARD_MODE=7168304743566296612_0; support_webp=true; support_avif=false; csrf_session_id=a5355d954d3c63ed1ba35faada452b4d; tt_scid=7Pux7s634-z8DYvCM20y7KigwH5u7Rh6D9C-RROpnT.aGMEcz6Vsxp.oai47wJqa4f86; ttwid=1%7CHHtv2QqpSGuSu8r-zXF1QoWsvjmNi1SJrqOrZzg-UCY%7C1683858689%7Ca5223fe1500578e01e138a0d71d6444692018296c4c24f5885af174a65873c95; ixigua-a-s=3; msToken=50-JJObWB07HfHs-BMJWT1eIDX3G-6lPSF_i-QwxBIXE9VVa-iN0jbEXR5pG2DKjXBmP299n6ZTuXzY-GAy968CCvouSAYIS4GzvGQT3pNlKNejr5G4-1g==; __ac_nonce=0645dcbf0005064517440; __ac_signature=_02B4Z6wo00f01FEGmAwAAIDBKchzCGqn-MBRJpyAAHAjieFC5GEg6gGiwz.I4PRrJl7f0GcixFrExKmgt6QI1i1S-dQyofPEj2ugWTCnmKUdJQv-wYuDofeKNe8VtMtZq2aKewyUGeKU-5Ud21; ixigua-a-s=3", "pragma": "no-cache", "referer": f"https://www.ixigua.com/{item_id}?logTag=3c5aa86a8600b9ab8540", "sec-ch-ua": '"Microsoft Edge";v="113", "Chromium";v="113", "Not-A.Brand";v="24"', "sec-ch-ua-mobile": "?0", "sec-ch-ua-platform": '"macOS"', "sec-fetch-dest": "empty", "sec-fetch-mode": "cors", "sec-fetch-site": "same-origin", "tt-anti-token": "cBITBHvmYjEygzv-f9c78c1297722cf1f559c74b084e4525ce4900bdcf9e8588f20cc7c2e3234422", "user-agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/113.0.0.0 Safari/537.36 Edg/113.0.1774.35", "x-secsdk-csrf-token": "000100000001f8e733cf37f0cd255a51aea9a81ff7bc0c09490cfe41ad827c3c5c18ec809279175e4d9f5553d8a5", } response = requests.get( url=url, headers=headers, params=params, proxies=tunnel_proxies(), timeout=5 ) response.close() if ( response.status_code != 200 or "total_number" not in response.json() or response.json() == {} ): return 0 return response.json().get("total_number", 0) class XiGuaAuthor: """ 西瓜账号爬虫 """ def __init__(self, platform, mode, rule_dict, env, user_list): self.platform = platform self.mode = mode self.rule_dict = rule_dict self.env = env self.user_list = user_list self.mq = MQ(topic_name="topic_crawler_etl_" + self.env) self.download_count = 0 self.limiter = AuthorLimit(platform=self.platform, mode=self.mode) def rule_maker(self, account): """ 通过不同的账号生成不同的规则 :param account: 输入的账号信息 {'play_cnt': {'min': 100000, 'max': 0}, 'period': {'min': 5, 'max': 5}} """ temp = account['link'].split("?")[0].split("_") if len(temp) == 1: return self.rule_dict else: flag = temp[-2] match flag: case "V1": rule_dict = { "play_cnt": {"min": 100000, "max": 0}, 'period': {"min": 90, "max": 90}, 'special': 0.02 } return rule_dict case "V2": rule_dict = { "play_cnt": {"min": 10000, "max": 0}, 'period': {"min": 90, "max": 90}, 'special': 0.01 } return rule_dict case "V3": rule_dict = { "play_cnt": {"min": 5000, "max": 0}, 'period': {"min": 90, "max": 90}, 'special': 0.01 } return rule_dict def get_author_list(self): """ 每轮只抓取定量的数据,到达数量后自己退出 获取账号列表以及账号信息 """ # max_count = int(self.rule_dict.get("videos_cnt", {}).get("min", 300)) for user_dict in self.user_list: # if self.download_count <= max_count: try: flag = user_dict["link"][0] match flag: case "V": self.get_video_list(user_dict) case "X": self.get_tiny_video_list(user_dict) case "h": self.get_video_list(user_dict) case "D": self.get_video_list(user_dict) case "B": self.get_video_list(user_dict) self.get_tiny_video_list(user_dict) except Exception as e: AliyunLogger.logging( code="3001", account=user_dict["uid"], platform=self.platform, mode=self.mode, env=self.env, message="扫描账号时出现bug, 报错是 {}".format(e) ) # time.sleep(random.randint(1, 15)) # else: # AliyunLogger.logging( # code="2000", # platform=self.platform, # mode=self.mode, # env=self.env, # message="本轮已经抓取足够数量的视频,已经自动退出", # ) # return def get_video_list(self, user_dict): """ 获取某个账号的视频列表 账号分为 3 类 """ offset = 0 signature = random_signature() link = user_dict['link'].split("?")[0].split("_")[-1] url = "https://www.ixigua.com/api/videov2/author/new_video_list?" while True: to_user_id = str(link.replace("https://www.ixigua.com/home/", "")) params = { "to_user_id": to_user_id, "offset": str(offset), "limit": "30", "maxBehotTime": "0", "order": "new", "isHome": "0", "_signature": signature, } headers = { "referer": f'https://www.ixigua.com/home/{link.replace("https://www.ixigua.com/home/", "")}/video/?preActiveKey=hotsoon&list_entrance=userdetail', "user-agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/110.0.0.0 Safari/537.36 Edg/110.0.1587.41", } response = requests.get( url=url, headers=headers, params=params, proxies=tunnel_proxies(), timeout=5, ) offset += 30 if "data" not in response.text or response.status_code != 200: AliyunLogger.logging( code="3000", platform=self.platform, mode=self.mode, env=self.env, message=f"get_videoList:{response.text}\n", ) return elif not response.json()["data"]["videoList"]: AliyunLogger.logging( account=link, code="3000", platform=self.platform, mode=self.mode, env=self.env, data=response.json(), message=f"没有更多数据啦~\n", ) return else: feeds = response.json()["data"]["videoList"] for video_obj in feeds: try: AliyunLogger.logging( code="1001", account=user_dict['uid'], platform=self.platform, mode=self.mode, env=self.env, data=video_obj, message="扫描到一条视频", ) date_flag = self.process_video_obj(video_obj, user_dict, "l") if not date_flag: return except Exception as e: AliyunLogger.logging( code="3000", platform=self.platform, mode=self.mode, env=self.env, data=video_obj, message="抓取单条视频异常, 报错原因是: {}".format(e), ) def get_tiny_video_list(self, user_dict): """ 获取小视频 """ url = "https://www.ixigua.com/api/videov2/hotsoon/video" max_behot_time = "0" link = user_dict['link'].split("?")[0].split("_")[-1] to_user_id = str(link.replace("https://www.ixigua.com/home/", "")) while True: params = { "to_user_id": to_user_id, "max_behot_time": max_behot_time, "_signature": random_signature() } headers = { "referer": "https://www.ixigua.com/{}?&".format(to_user_id), "user-agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/110.0.0.0 Safari/537.36 Edg/110.0.1587.41", } response = requests.get( url=url, headers=headers, params=params, proxies=tunnel_proxies(), timeout=5, ) if "data" not in response.text or response.status_code != 200: AliyunLogger.logging( code="2000", platform=self.platform, mode=self.mode, env=self.env, message=f"get_videoList:{response.text}\n", ) return elif not response.json()["data"]["data"]: AliyunLogger.logging( account=link, code="2000", platform=self.platform, mode=self.mode, env=self.env, data=response.json(), message=f"没有更多数据啦~\n", ) return else: video_list = response.json()['data']['data'] max_behot_time = video_list[-1]["max_behot_time"] for video_obj in video_list: try: AliyunLogger.logging( code="1001", account=user_dict['uid'], platform=self.platform, mode=self.mode, env=self.env, data=video_obj, message="扫描到一条小视频", ) date_flag = self.process_video_obj(video_obj, user_dict, "s") if not date_flag: return except Exception as e: AliyunLogger.logging( code="3000", platform=self.platform, mode=self.mode, env=self.env, data=video_obj, message="抓取单条视频异常, 报错原因是: {}".format(e), ) def process_video_obj(self, video_obj, user_dict, f): """ process video_obj and extract video_url """ new_rule = self.rule_maker(user_dict) trace_id = self.platform + str(uuid.uuid1()) if f == "s": item_id = video_obj.get("id_str", "") else: item_id = video_obj.get("item_id", "") if not item_id: AliyunLogger.logging( code="2005", account=user_dict['uid'], platform=self.platform, mode=self.mode, env=self.env, message="无效视频", data=video_obj, trace_id=trace_id, ) return # 获取视频信息 video_dict = self.get_video_info(item_id=item_id) video_dict["platform"] = self.platform video_dict["strategy"] = self.mode video_dict["out_video_id"] = video_dict["video_id"] video_dict["width"] = video_dict["video_width"] video_dict["height"] = video_dict["video_height"] video_dict["crawler_rule"] = json.dumps(new_rule) video_dict["user_id"] = user_dict["uid"] video_dict["publish_time"] = video_dict["publish_time_str"] video_dict["strategy_type"] = self.mode video_dict["update_time_stamp"] = int(time.time()) if int(time.time()) - video_dict['publish_time_stamp'] > 3600 * 24 * int( new_rule.get("period", {}).get("max", 1000)): if not video_obj['is_top']: """ 非置顶数据发布时间超过才退出 """ AliyunLogger.logging( code="2004", account=user_dict['uid'], platform=self.platform, mode=self.mode, env=self.env, data=video_dict, message="发布时间超过{}天".format( int(new_rule.get("period", {}).get("max", 1000)) ), ) return False pipeline = PiaoQuanPipeline( platform=self.platform, mode=self.mode, rule_dict=new_rule, env=self.env, item=video_dict, trace_id=trace_id, ) limit_flag = self.limiter.author_limitation(user_id=video_dict['user_id']) if limit_flag: title_flag = pipeline.title_flag() repeat_flag = pipeline.repeat_video() if title_flag and repeat_flag: if new_rule.get("special"): if int(video_dict['play_cnt']) >= int(new_rule.get("play_cnt", {}).get("min", 100000)): if float(video_dict['like_cnt']) / float(video_dict['play_cnt']) >= new_rule['special']: self.mq.send_msg(video_dict) self.download_count += 1 AliyunLogger.logging( code="1002", account=user_dict['uid'], platform=self.platform, mode=self.mode, env=self.env, data=video_dict, trace_id=trace_id, message="成功发送 MQ 至 ETL", ) return True else: AliyunLogger.logging( code="2008", account=user_dict['uid'], platform=self.platform, mode=self.mode, env=self.env, message="不满足特殊规则, 点赞量/播放量", data=video_dict ) else: if int(video_dict['play_cnt']) >= int(new_rule.get("play_cnt", {}).get("min", 100000)): self.mq.send_msg(video_dict) self.download_count += 1 AliyunLogger.logging( code="1002", account=user_dict['uid'], platform=self.platform, mode=self.mode, env=self.env, data=video_dict, trace_id=trace_id, message="成功发送 MQ 至 ETL", ) return True else: AliyunLogger.logging( code="2008", account=user_dict['uid'], platform=self.platform, mode=self.mode, env=self.env, message="不满足特殊规则, 播放量", data=video_dict ) return True def get_video_info(self, item_id): """ 获取视频信息 """ url = "https://www.ixigua.com/{}".format(item_id) headers = { "accept-encoding": "gzip, deflate", "accept-language": "zh-CN,zh-Hans;q=0.9", "cookie": "ttwid={}".format(byte_dance_cookie(item_id)), "user-agent": FakeUserAgent().random, "referer": "https://www.ixigua.com/{}/".format(item_id), } response = requests.get( url=url, headers=headers, proxies=tunnel_proxies(), timeout=5, ) video_info = extract_info_by_re(response.text) video_dict = { "video_title": video_info.get("title", ""), "video_id": video_info.get("video_id"), "gid": str(item_id), "play_cnt": int(video_info.get("play_count", 0)), "like_cnt": int(video_info.get("like_count", 0)), "comment_cnt": 0, "share_cnt": 0, "favorite_cnt": 0, "duration": int(video_info.get("duration", 0)), "video_width": 0, "video_height": 0, "publish_time_stamp": int(video_info.get("publish_time", 0)), "publish_time_str": time.strftime( "%Y-%m-%d %H:%M:%S", time.localtime(int(video_info.get("publish_time", 0))), ), "avatar_url": str( video_info.get("user_info", {}).get("avatar_url", "") ), "cover_url": video_info.get("cover_url", "").replace("\\u002F", "/"), "video_url": video_info.get("url"), "session": f"xigua-author-{int(time.time())}", } return video_dict if __name__ == "__main__": user_list = [ { "uid": 6267140, "source": "xigua", "link": "https://www.ixigua.com/home/2779177225827568", "nick_name": "秋晴爱音乐", "avatar_url": "", "mode": "author", }, { "uid": 6267140, "source": "xigua", "link": "https://www.ixigua.com/home/2885546124776780", "nick_name": "朗诵放歌的老山羊", "avatar_url": "", "mode": "author", }, { "uid": 6267140, "source": "xigua", "link": "https://www.ixigua.com/home/5880938217", "nick_name": "天原声疗", "avatar_url": "", "mode": "author", }, ] rule = {'period': {'min': 30, 'max': 30}, 'duration': {'min': 20, 'max': 0}, 'play_cnt': {'min': 100000, 'max': 0}} XGA = XiGuaAuthor( platform="xigua", mode="author", rule_dict=rule, env="prod", user_list=user_list ) XGA.get_author_list()