1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198 |
- import json
- import os
- import re
- import random
- import sys
- import string
- import time
- import uuid
- import base64
- import requests
- from lxml import etree
- from Crypto.Cipher import AES
- from Crypto.Util.Padding import unpad
- from fake_useragent import FakeUserAgent
- from common.mq import MQ
- sys.path.append(os.getcwd())
- from common import AliyunLogger, PiaoQuanPipeline, tunnel_proxies
- from common.limit import AuthorLimit
- def aes_decrypt(data: str, key: str) -> str:
- """
- XiGua AES decrypt
- :param data:
- :param key:
- :return:
- """
- password = key.encode()
- iv = password[:16]
- try:
- ct = base64.b64decode(data.encode())
- cipher = AES.new(password, AES.MODE_CBC, iv)
- pt = unpad(cipher.decrypt(ct), AES.block_size)
- return base64.b64decode(pt).decode()
- except Exception as e:
- print("Incorrect decryption {}".format(e))
- return None
- def extract_video_url(text):
- """
- 获取视频 video_url
- :param text:
- :return:
- """
- HTML = etree.HTML(text)
- str_2 = HTML.xpath('//script[@id="SSR_HYDRATED_DATA"]/text()')[0]
- json_2 = str_2[str_2.find('{'):str_2.rfind('}') + 1]
- Irregulars = ['null', 'undefined', '=false', '=true', 'false', 'true']
- # python中不规则的定义
- for I in Irregulars:
- if I in ['=false', '=true']:
- json_2 = json_2.replace(I, '=' + I[1:].capitalize())
- else:
- json_2 = json_2.replace(I, '12')
- dict_2 = json.loads(json_2)["anyVideo"]["gidInformation"]["packerData"]["video"]["videoResource"]
- if dict_2['dash'] == 12:
- obj = dict_2['normal']
- ptk = obj['ptk']
- main_url = obj['video_list']['video_3']['main_url']
- real_video_url = aes_decrypt(data=main_url, key=ptk)
- else:
- obj = dict_2['dash']
- ptk = obj["ptk"]
- video_url = obj['dynamic_video']['main_url']
- real_video_url = aes_decrypt(data=video_url, key=ptk)
- return real_video_url
- def extract_info_by_re(text):
- """
- 通过正则表达式获取文本中的信息
- :param text:
- :return:
- """
- # 标题
- title_match = re.search(r'<title[^>]*>(.*?)</title>', text)
- if title_match:
- title_content = title_match.group(1)
- title_content = title_content.split(" - ")[0]
- title_content = bytes(title_content, "latin1").decode()
- else:
- title_content = ""
- # video_id
- video_id = re.search(r'"vid":"(.*?)"', text).group(1)
- # like_count
- like_count = re.search(r'"video_like_count":(.*?),', text).group(1)
- # cover_url
- cover_url = re.search(r'"avatar_url":"(.*?)"', text).group(1)
- # video_play
- video_watch_count = re.search(r'"video_watch_count":(.*?),', text).group(1)
- # "video_publish_time"
- publish_time = re.search(r'"video_publish_time":"(.*?)"', text).group(1)
- # video_duration
- duration = re.search(r'("video_duration":)(.*?)"', text).group(2).replace(",", "")
- return {
- "title": title_content,
- "url": extract_video_url(text),
- "video_id": video_id,
- "like_count": like_count,
- "cover_url": cover_url,
- "play_count": video_watch_count,
- "publish_time": publish_time,
- "duration": duration
- }
- def random_signature():
- """
- 随机生成签名
- """
- src_digits = string.digits # string_数字
- src_uppercase = string.ascii_uppercase # string_大写字母
- src_lowercase = string.ascii_lowercase # string_小写字母
- digits_num = random.randint(1, 6)
- uppercase_num = random.randint(1, 26 - digits_num - 1)
- lowercase_num = 26 - (digits_num + uppercase_num)
- password = (
- random.sample(src_digits, digits_num)
- + random.sample(src_uppercase, uppercase_num)
- + random.sample(src_lowercase, lowercase_num)
- )
- random.shuffle(password)
- new_password = "AAAAAAAAAA" + "".join(password)[10:-4] + "AAAB"
- new_password_start = new_password[0:18]
- new_password_end = new_password[-7:]
- if new_password[18] == "8":
- new_password = new_password_start + "w" + new_password_end
- elif new_password[18] == "9":
- new_password = new_password_start + "x" + new_password_end
- elif new_password[18] == "-":
- new_password = new_password_start + "y" + new_password_end
- elif new_password[18] == ".":
- new_password = new_password_start + "z" + new_password_end
- else:
- new_password = new_password_start + "y" + new_password_end
- return new_password
- def byte_dance_cookie(item_id):
- """
- 获取西瓜视频的 cookie
- :param item_id:
- """
- sess = requests.Session()
- sess.headers.update({
- 'user-agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 11_1_0) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/87.0.4280.88 Safari/537.36',
- 'referer': 'https://www.ixigua.com/home/{}/'.format(item_id),
- })
- # 获取 cookies
- sess.get('https://i.snssdk.com/slardar/sdk.js?bid=xigua_video_web_pc')
- data = '{"region":"cn","aid":1768,"needFid":false,"service":"www.ixigua.com","migrate_info":{"ticket":"","source":"node"},"cbUrlProtocol":"https","union":true}'
- r = sess.post('https://ttwid.bytedance.com/ttwid/union/register/', data=data)
- # print(r.text)
- return r.cookies.values()[0]
- def get_video_url(video_info):
- """
- 获取视频的链接
- """
- video_url_dict = {}
- # video_url
- if "videoResource" not in video_info:
- video_url_dict["video_url"] = ""
- video_url_dict["audio_url"] = ""
- video_url_dict["video_width"] = 0
- video_url_dict["video_height"] = 0
- elif "dash_120fps" in video_info["videoResource"]:
- if (
- "video_list" in video_info["videoResource"]["dash_120fps"]
- and "video_4" in video_info["videoResource"]["dash_120fps"]["video_list"]
- ):
- video_url = video_info["videoResource"]["dash_120fps"]["video_list"][
- "video_4"
- ]["backup_url_1"]
- audio_url = video_info["videoResource"]["dash_120fps"]["video_list"][
- "video_4"
- ]["backup_url_1"]
- if len(video_url) % 3 == 1:
- video_url += "=="
- elif len(video_url) % 3 == 2:
- video_url += "="
- elif len(audio_url) % 3 == 1:
- audio_url += "=="
- elif len(audio_url) % 3 == 2:
- audio_url += "="
- video_url = base64.b64decode(video_url).decode("utf8")
- audio_url = base64.b64decode(audio_url).decode("utf8")
- video_width = video_info["videoResource"]["dash_120fps"]["video_list"][
- "video_4"
- ]["vwidth"]
- video_height = video_info["videoResource"]["dash_120fps"]["video_list"][
- "video_4"
- ]["vheight"]
- video_url_dict["video_url"] = video_url
- video_url_dict["audio_url"] = audio_url
- video_url_dict["video_width"] = video_width
- video_url_dict["video_height"] = video_height
- elif (
- "video_list" in video_info["videoResource"]["dash_120fps"]
- and "video_3" in video_info["videoResource"]["dash_120fps"]["video_list"]
- ):
- video_url = video_info["videoResource"]["dash_120fps"]["video_list"][
- "video_3"
- ]["backup_url_1"]
- audio_url = video_info["videoResource"]["dash_120fps"]["video_list"][
- "video_3"
- ]["backup_url_1"]
- if len(video_url) % 3 == 1:
- video_url += "=="
- elif len(video_url) % 3 == 2:
- video_url += "="
- elif len(audio_url) % 3 == 1:
- audio_url += "=="
- elif len(audio_url) % 3 == 2:
- audio_url += "="
- video_url = base64.b64decode(video_url).decode("utf8")
- audio_url = base64.b64decode(audio_url).decode("utf8")
- video_width = video_info["videoResource"]["dash_120fps"]["video_list"][
- "video_3"
- ]["vwidth"]
- video_height = video_info["videoResource"]["dash_120fps"]["video_list"][
- "video_3"
- ]["vheight"]
- video_url_dict["video_url"] = video_url
- video_url_dict["audio_url"] = audio_url
- video_url_dict["video_width"] = video_width
- video_url_dict["video_height"] = video_height
- elif (
- "video_list" in video_info["videoResource"]["dash_120fps"]
- and "video_2" in video_info["videoResource"]["dash_120fps"]["video_list"]
- ):
- video_url = video_info["videoResource"]["dash_120fps"]["video_list"][
- "video_2"
- ]["backup_url_1"]
- audio_url = video_info["videoResource"]["dash_120fps"]["video_list"][
- "video_2"
- ]["backup_url_1"]
- if len(video_url) % 3 == 1:
- video_url += "=="
- elif len(video_url) % 3 == 2:
- video_url += "="
- elif len(audio_url) % 3 == 1:
- audio_url += "=="
- elif len(audio_url) % 3 == 2:
- audio_url += "="
- video_url = base64.b64decode(video_url).decode("utf8")
- audio_url = base64.b64decode(audio_url).decode("utf8")
- video_width = video_info["videoResource"]["dash_120fps"]["video_list"][
- "video_2"
- ]["vwidth"]
- video_height = video_info["videoResource"]["dash_120fps"]["video_list"][
- "video_2"
- ]["vheight"]
- video_url_dict["video_url"] = video_url
- video_url_dict["audio_url"] = audio_url
- video_url_dict["video_width"] = video_width
- video_url_dict["video_height"] = video_height
- elif (
- "video_list" in video_info["videoResource"]["dash_120fps"]
- and "video_1" in video_info["videoResource"]["dash_120fps"]["video_list"]
- ):
- video_url = video_info["videoResource"]["dash_120fps"]["video_list"][
- "video_1"
- ]["backup_url_1"]
- audio_url = video_info["videoResource"]["dash_120fps"]["video_list"][
- "video_1"
- ]["backup_url_1"]
- if len(video_url) % 3 == 1:
- video_url += "=="
- elif len(video_url) % 3 == 2:
- video_url += "="
- elif len(audio_url) % 3 == 1:
- audio_url += "=="
- elif len(audio_url) % 3 == 2:
- audio_url += "="
- video_url = base64.b64decode(video_url).decode("utf8")
- audio_url = base64.b64decode(audio_url).decode("utf8")
- video_width = video_info["videoResource"]["dash_120fps"]["video_list"][
- "video_1"
- ]["vwidth"]
- video_height = video_info["videoResource"]["dash_120fps"]["video_list"][
- "video_1"
- ]["vheight"]
- video_url_dict["video_url"] = video_url
- video_url_dict["audio_url"] = audio_url
- video_url_dict["video_width"] = video_width
- video_url_dict["video_height"] = video_height
- elif (
- "dynamic_video" in video_info["videoResource"]["dash_120fps"]
- and "dynamic_video_list"
- in video_info["videoResource"]["dash_120fps"]["dynamic_video"]
- and "dynamic_audio_list"
- in video_info["videoResource"]["dash_120fps"]["dynamic_video"]
- and len(
- video_info["videoResource"]["dash_120fps"]["dynamic_video"][
- "dynamic_video_list"
- ]
- )
- != 0
- and len(
- video_info["videoResource"]["dash_120fps"]["dynamic_video"][
- "dynamic_audio_list"
- ]
- )
- != 0
- ):
- video_url = video_info["videoResource"]["dash_120fps"]["dynamic_video"][
- "dynamic_video_list"
- ][-1]["backup_url_1"]
- audio_url = video_info["videoResource"]["dash_120fps"]["dynamic_video"][
- "dynamic_audio_list"
- ][-1]["backup_url_1"]
- if len(video_url) % 3 == 1:
- video_url += "=="
- elif len(video_url) % 3 == 2:
- video_url += "="
- elif len(audio_url) % 3 == 1:
- audio_url += "=="
- elif len(audio_url) % 3 == 2:
- audio_url += "="
- video_url = base64.b64decode(video_url).decode("utf8")
- audio_url = base64.b64decode(audio_url).decode("utf8")
- video_width = video_info["videoResource"]["dash_120fps"]["dynamic_video"][
- "dynamic_video_list"
- ][-1]["vwidth"]
- video_height = video_info["videoResource"]["dash_120fps"]["dynamic_video"][
- "dynamic_video_list"
- ][-1]["vheight"]
- video_url_dict["video_url"] = video_url
- video_url_dict["audio_url"] = audio_url
- video_url_dict["video_width"] = video_width
- video_url_dict["video_height"] = video_height
- else:
- video_url_dict["video_url"] = ""
- video_url_dict["audio_url"] = ""
- video_url_dict["video_width"] = 0
- video_url_dict["video_height"] = 0
- elif "dash" in video_info["videoResource"]:
- if (
- "video_list" in video_info["videoResource"]["dash"]
- and "video_4" in video_info["videoResource"]["dash"]["video_list"]
- ):
- video_url = video_info["videoResource"]["dash"]["video_list"]["video_4"][
- "backup_url_1"
- ]
- audio_url = video_info["videoResource"]["dash"]["video_list"]["video_4"][
- "backup_url_1"
- ]
- if len(video_url) % 3 == 1:
- video_url += "=="
- elif len(video_url) % 3 == 2:
- video_url += "="
- elif len(audio_url) % 3 == 1:
- audio_url += "=="
- elif len(audio_url) % 3 == 2:
- audio_url += "="
- video_url = base64.b64decode(video_url).decode("utf8")
- audio_url = base64.b64decode(audio_url).decode("utf8")
- video_width = video_info["videoResource"]["dash"]["video_list"]["video_4"][
- "vwidth"
- ]
- video_height = video_info["videoResource"]["dash"]["video_list"]["video_4"][
- "vheight"
- ]
- video_url_dict["video_url"] = video_url
- video_url_dict["audio_url"] = audio_url
- video_url_dict["video_width"] = video_width
- video_url_dict["video_height"] = video_height
- elif (
- "video_list" in video_info["videoResource"]["dash"]
- and "video_3" in video_info["videoResource"]["dash"]["video_list"]
- ):
- video_url = video_info["videoResource"]["dash"]["video_list"]["video_3"][
- "backup_url_1"
- ]
- audio_url = video_info["videoResource"]["dash"]["video_list"]["video_3"][
- "backup_url_1"
- ]
- if len(video_url) % 3 == 1:
- video_url += "=="
- elif len(video_url) % 3 == 2:
- video_url += "="
- elif len(audio_url) % 3 == 1:
- audio_url += "=="
- elif len(audio_url) % 3 == 2:
- audio_url += "="
- video_url = base64.b64decode(video_url).decode("utf8")
- audio_url = base64.b64decode(audio_url).decode("utf8")
- video_width = video_info["videoResource"]["dash"]["video_list"]["video_3"][
- "vwidth"
- ]
- video_height = video_info["videoResource"]["dash"]["video_list"]["video_3"][
- "vheight"
- ]
- video_url_dict["video_url"] = video_url
- video_url_dict["audio_url"] = audio_url
- video_url_dict["video_width"] = video_width
- video_url_dict["video_height"] = video_height
- elif (
- "video_list" in video_info["videoResource"]["dash"]
- and "video_2" in video_info["videoResource"]["dash"]["video_list"]
- ):
- video_url = video_info["videoResource"]["dash"]["video_list"]["video_2"][
- "backup_url_1"
- ]
- audio_url = video_info["videoResource"]["dash"]["video_list"]["video_2"][
- "backup_url_1"
- ]
- if len(video_url) % 3 == 1:
- video_url += "=="
- elif len(video_url) % 3 == 2:
- video_url += "="
- elif len(audio_url) % 3 == 1:
- audio_url += "=="
- elif len(audio_url) % 3 == 2:
- audio_url += "="
- video_url = base64.b64decode(video_url).decode("utf8")
- audio_url = base64.b64decode(audio_url).decode("utf8")
- video_width = video_info["videoResource"]["dash"]["video_list"]["video_2"][
- "vwidth"
- ]
- video_height = video_info["videoResource"]["dash"]["video_list"]["video_2"][
- "vheight"
- ]
- video_url_dict["video_url"] = video_url
- video_url_dict["audio_url"] = audio_url
- video_url_dict["video_width"] = video_width
- video_url_dict["video_height"] = video_height
- elif (
- "video_list" in video_info["videoResource"]["dash"]
- and "video_1" in video_info["videoResource"]["dash"]["video_list"]
- ):
- video_url = video_info["videoResource"]["dash"]["video_list"]["video_1"][
- "backup_url_1"
- ]
- audio_url = video_info["videoResource"]["dash"]["video_list"]["video_1"][
- "backup_url_1"
- ]
- if len(video_url) % 3 == 1:
- video_url += "=="
- elif len(video_url) % 3 == 2:
- video_url += "="
- elif len(audio_url) % 3 == 1:
- audio_url += "=="
- elif len(audio_url) % 3 == 2:
- audio_url += "="
- video_url = base64.b64decode(video_url).decode("utf8")
- audio_url = base64.b64decode(audio_url).decode("utf8")
- video_width = video_info["videoResource"]["dash"]["video_list"]["video_1"][
- "vwidth"
- ]
- video_height = video_info["videoResource"]["dash"]["video_list"]["video_1"][
- "vheight"
- ]
- video_url_dict["video_url"] = video_url
- video_url_dict["audio_url"] = audio_url
- video_url_dict["video_width"] = video_width
- video_url_dict["video_height"] = video_height
- elif (
- "dynamic_video" in video_info["videoResource"]["dash"]
- and "dynamic_video_list"
- in video_info["videoResource"]["dash"]["dynamic_video"]
- and "dynamic_audio_list"
- in video_info["videoResource"]["dash"]["dynamic_video"]
- and len(
- video_info["videoResource"]["dash"]["dynamic_video"][
- "dynamic_video_list"
- ]
- )
- != 0
- and len(
- video_info["videoResource"]["dash"]["dynamic_video"][
- "dynamic_audio_list"
- ]
- )
- != 0
- ):
- video_url = video_info["videoResource"]["dash"]["dynamic_video"][
- "dynamic_video_list"
- ][-1]["backup_url_1"]
- audio_url = video_info["videoResource"]["dash"]["dynamic_video"][
- "dynamic_audio_list"
- ][-1]["backup_url_1"]
- if len(video_url) % 3 == 1:
- video_url += "=="
- elif len(video_url) % 3 == 2:
- video_url += "="
- elif len(audio_url) % 3 == 1:
- audio_url += "=="
- elif len(audio_url) % 3 == 2:
- audio_url += "="
- video_url = base64.b64decode(video_url).decode("utf8")
- audio_url = base64.b64decode(audio_url).decode("utf8")
- video_width = video_info["videoResource"]["dash"]["dynamic_video"][
- "dynamic_video_list"
- ][-1]["vwidth"]
- video_height = video_info["videoResource"]["dash"]["dynamic_video"][
- "dynamic_video_list"
- ][-1]["vheight"]
- video_url_dict["video_url"] = video_url
- video_url_dict["audio_url"] = audio_url
- video_url_dict["video_width"] = video_width
- video_url_dict["video_height"] = video_height
- else:
- video_url_dict["video_url"] = ""
- video_url_dict["audio_url"] = ""
- video_url_dict["video_width"] = 0
- video_url_dict["video_height"] = 0
- elif "normal" in video_info["videoResource"]:
- if (
- "video_list" in video_info["videoResource"]["normal"]
- and "video_4" in video_info["videoResource"]["normal"]["video_list"]
- ):
- video_url = video_info["videoResource"]["normal"]["video_list"]["video_4"][
- "backup_url_1"
- ]
- audio_url = video_info["videoResource"]["normal"]["video_list"]["video_4"][
- "backup_url_1"
- ]
- if len(video_url) % 3 == 1:
- video_url += "=="
- elif len(video_url) % 3 == 2:
- video_url += "="
- elif len(audio_url) % 3 == 1:
- audio_url += "=="
- elif len(audio_url) % 3 == 2:
- audio_url += "="
- video_url = base64.b64decode(video_url).decode("utf8")
- audio_url = base64.b64decode(audio_url).decode("utf8")
- video_width = video_info["videoResource"]["normal"]["video_list"][
- "video_4"
- ]["vwidth"]
- video_height = video_info["videoResource"]["normal"]["video_list"][
- "video_4"
- ]["vheight"]
- video_url_dict["video_url"] = video_url
- video_url_dict["audio_url"] = audio_url
- video_url_dict["video_width"] = video_width
- video_url_dict["video_height"] = video_height
- elif (
- "video_list" in video_info["videoResource"]["normal"]
- and "video_3" in video_info["videoResource"]["normal"]["video_list"]
- ):
- video_url = video_info["videoResource"]["normal"]["video_list"]["video_3"][
- "backup_url_1"
- ]
- audio_url = video_info["videoResource"]["normal"]["video_list"]["video_3"][
- "backup_url_1"
- ]
- if len(video_url) % 3 == 1:
- video_url += "=="
- elif len(video_url) % 3 == 2:
- video_url += "="
- elif len(audio_url) % 3 == 1:
- audio_url += "=="
- elif len(audio_url) % 3 == 2:
- audio_url += "="
- video_url = base64.b64decode(video_url).decode("utf8")
- audio_url = base64.b64decode(audio_url).decode("utf8")
- video_width = video_info["videoResource"]["normal"]["video_list"][
- "video_3"
- ]["vwidth"]
- video_height = video_info["videoResource"]["normal"]["video_list"][
- "video_3"
- ]["vheight"]
- video_url_dict["video_url"] = video_url
- video_url_dict["audio_url"] = audio_url
- video_url_dict["video_width"] = video_width
- video_url_dict["video_height"] = video_height
- elif (
- "video_list" in video_info["videoResource"]["normal"]
- and "video_2" in video_info["videoResource"]["normal"]["video_list"]
- ):
- video_url = video_info["videoResource"]["normal"]["video_list"]["video_2"][
- "backup_url_1"
- ]
- audio_url = video_info["videoResource"]["normal"]["video_list"]["video_2"][
- "backup_url_1"
- ]
- if len(video_url) % 3 == 1:
- video_url += "=="
- elif len(video_url) % 3 == 2:
- video_url += "="
- elif len(audio_url) % 3 == 1:
- audio_url += "=="
- elif len(audio_url) % 3 == 2:
- audio_url += "="
- video_url = base64.b64decode(video_url).decode("utf8")
- audio_url = base64.b64decode(audio_url).decode("utf8")
- video_width = video_info["videoResource"]["normal"]["video_list"][
- "video_2"
- ]["vwidth"]
- video_height = video_info["videoResource"]["normal"]["video_list"][
- "video_2"
- ]["vheight"]
- video_url_dict["video_url"] = video_url
- video_url_dict["audio_url"] = audio_url
- video_url_dict["video_width"] = video_width
- video_url_dict["video_height"] = video_height
- elif (
- "video_list" in video_info["videoResource"]["normal"]
- and "video_1" in video_info["videoResource"]["normal"]["video_list"]
- ):
- video_url = video_info["videoResource"]["normal"]["video_list"]["video_1"][
- "backup_url_1"
- ]
- audio_url = video_info["videoResource"]["normal"]["video_list"]["video_1"][
- "backup_url_1"
- ]
- if len(video_url) % 3 == 1:
- video_url += "=="
- elif len(video_url) % 3 == 2:
- video_url += "="
- elif len(audio_url) % 3 == 1:
- audio_url += "=="
- elif len(audio_url) % 3 == 2:
- audio_url += "="
- video_url = base64.b64decode(video_url).decode("utf8")
- audio_url = base64.b64decode(audio_url).decode("utf8")
- video_width = video_info["videoResource"]["normal"]["video_list"][
- "video_1"
- ]["vwidth"]
- video_height = video_info["videoResource"]["normal"]["video_list"][
- "video_1"
- ]["vheight"]
- video_url_dict["video_url"] = video_url
- video_url_dict["audio_url"] = audio_url
- video_url_dict["video_width"] = video_width
- video_url_dict["video_height"] = video_height
- elif (
- "dynamic_video" in video_info["videoResource"]["normal"]
- and "dynamic_video_list"
- in video_info["videoResource"]["normal"]["dynamic_video"]
- and "dynamic_audio_list"
- in video_info["videoResource"]["normal"]["dynamic_video"]
- and len(
- video_info["videoResource"]["normal"]["dynamic_video"][
- "dynamic_video_list"
- ]
- )
- != 0
- and len(
- video_info["videoResource"]["normal"]["dynamic_video"][
- "dynamic_audio_list"
- ]
- )
- != 0
- ):
- video_url = video_info["videoResource"]["normal"]["dynamic_video"][
- "dynamic_video_list"
- ][-1]["backup_url_1"]
- audio_url = video_info["videoResource"]["normal"]["dynamic_video"][
- "dynamic_audio_list"
- ][-1]["backup_url_1"]
- if len(video_url) % 3 == 1:
- video_url += "=="
- elif len(video_url) % 3 == 2:
- video_url += "="
- elif len(audio_url) % 3 == 1:
- audio_url += "=="
- elif len(audio_url) % 3 == 2:
- audio_url += "="
- video_url = base64.b64decode(video_url).decode("utf8")
- audio_url = base64.b64decode(audio_url).decode("utf8")
- video_width = video_info["videoResource"]["normal"]["dynamic_video"][
- "dynamic_video_list"
- ][-1]["vwidth"]
- video_height = video_info["videoResource"]["normal"]["dynamic_video"][
- "dynamic_video_list"
- ][-1]["vheight"]
- video_url_dict["video_url"] = video_url
- video_url_dict["audio_url"] = audio_url
- video_url_dict["video_width"] = video_width
- video_url_dict["video_height"] = video_height
- else:
- video_url_dict["video_url"] = ""
- video_url_dict["audio_url"] = ""
- video_url_dict["video_width"] = 0
- video_url_dict["video_height"] = 0
- else:
- video_url_dict["video_url"] = ""
- video_url_dict["audio_url"] = ""
- video_url_dict["video_width"] = 0
- video_url_dict["video_height"] = 0
- return video_url_dict
- def get_comment_cnt(item_id):
- """
- 获取视频的评论数量
- """
- url = "https://www.ixigua.com/tlb/comment/article/v5/tab_comments/?"
- params = {
- "tab_index": "0",
- "count": "10",
- "offset": "10",
- "group_id": str(item_id),
- "item_id": str(item_id),
- "aid": "1768",
- "msToken": "50-JJObWB07HfHs-BMJWT1eIDX3G-6lPSF_i-QwxBIXE9VVa-iN0jbEXR5pG2DKjXBmP299n6ZTuXzY-GAy968CCvouSAYIS4GzvGQT3pNlKNejr5G4-1g==",
- "X-Bogus": "DFSzswVOyGtANVeWtCLMqR/F6q9U",
- "_signature": random_signature(),
- }
- headers = {
- "authority": "www.ixigua.com",
- "accept": "application/json, text/plain, */*",
- "accept-language": "zh-CN,zh;q=0.9,en;q=0.8,en-GB;q=0.7,en-US;q=0.6",
- "cache-control": "no-cache",
- "cookie": "MONITOR_WEB_ID=67cb5099-a022-4ec3-bb8e-c4de6ba51dd0; passport_csrf_token=72b2574f3c99f8ba670e42df430218fd; passport_csrf_token_default=72b2574f3c99f8ba670e42df430218fd; sid_guard=c7472b508ea631823ba765a60cf8757f%7C1680867422%7C3024002%7CFri%2C+12-May-2023+11%3A37%3A04+GMT; uid_tt=c13f47d51767f616befe32fb3e9f485a; uid_tt_ss=c13f47d51767f616befe32fb3e9f485a; sid_tt=c7472b508ea631823ba765a60cf8757f; sessionid=c7472b508ea631823ba765a60cf8757f; sessionid_ss=c7472b508ea631823ba765a60cf8757f; sid_ucp_v1=1.0.0-KGUzNWYxNmRkZGJiZjgxY2MzZWNkMTEzMTkwYjY1Yjg5OTY5NzVlNmMKFQiu3d-eqQIQ3oDAoQYYGCAMOAhACxoCaGwiIGM3NDcyYjUwOGVhNjMxODIzYmE3NjVhNjBjZjg3NTdm; ssid_ucp_v1=1.0.0-KGUzNWYxNmRkZGJiZjgxY2MzZWNkMTEzMTkwYjY1Yjg5OTY5NzVlNmMKFQiu3d-eqQIQ3oDAoQYYGCAMOAhACxoCaGwiIGM3NDcyYjUwOGVhNjMxODIzYmE3NjVhNjBjZjg3NTdm; odin_tt=b893608d4dde2e1e8df8cd5d97a0e2fbeafc4ca762ac72ebef6e6c97e2ed19859bb01d46b4190ddd6dd17d7f9678e1de; SEARCH_CARD_MODE=7168304743566296612_0; support_webp=true; support_avif=false; csrf_session_id=a5355d954d3c63ed1ba35faada452b4d; tt_scid=7Pux7s634-z8DYvCM20y7KigwH5u7Rh6D9C-RROpnT.aGMEcz6Vsxp.oai47wJqa4f86; ttwid=1%7CHHtv2QqpSGuSu8r-zXF1QoWsvjmNi1SJrqOrZzg-UCY%7C1683858689%7Ca5223fe1500578e01e138a0d71d6444692018296c4c24f5885af174a65873c95; ixigua-a-s=3; msToken=50-JJObWB07HfHs-BMJWT1eIDX3G-6lPSF_i-QwxBIXE9VVa-iN0jbEXR5pG2DKjXBmP299n6ZTuXzY-GAy968CCvouSAYIS4GzvGQT3pNlKNejr5G4-1g==; __ac_nonce=0645dcbf0005064517440; __ac_signature=_02B4Z6wo00f01FEGmAwAAIDBKchzCGqn-MBRJpyAAHAjieFC5GEg6gGiwz.I4PRrJl7f0GcixFrExKmgt6QI1i1S-dQyofPEj2ugWTCnmKUdJQv-wYuDofeKNe8VtMtZq2aKewyUGeKU-5Ud21; ixigua-a-s=3",
- "pragma": "no-cache",
- "referer": f"https://www.ixigua.com/{item_id}?logTag=3c5aa86a8600b9ab8540",
- "sec-ch-ua": '"Microsoft Edge";v="113", "Chromium";v="113", "Not-A.Brand";v="24"',
- "sec-ch-ua-mobile": "?0",
- "sec-ch-ua-platform": '"macOS"',
- "sec-fetch-dest": "empty",
- "sec-fetch-mode": "cors",
- "sec-fetch-site": "same-origin",
- "tt-anti-token": "cBITBHvmYjEygzv-f9c78c1297722cf1f559c74b084e4525ce4900bdcf9e8588f20cc7c2e3234422",
- "user-agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/113.0.0.0 Safari/537.36 Edg/113.0.1774.35",
- "x-secsdk-csrf-token": "000100000001f8e733cf37f0cd255a51aea9a81ff7bc0c09490cfe41ad827c3c5c18ec809279175e4d9f5553d8a5",
- }
- response = requests.get(
- url=url, headers=headers, params=params, proxies=tunnel_proxies(), timeout=5
- )
- response.close()
- if (
- response.status_code != 200
- or "total_number" not in response.json()
- or response.json() == {}
- ):
- return 0
- return response.json().get("total_number", 0)
- class XiGuaAuthor:
- """
- 西瓜账号爬虫
- """
- def __init__(self, platform, mode, rule_dict, env, user_list):
- self.platform = platform
- self.mode = mode
- self.rule_dict = rule_dict
- self.env = env
- self.user_list = user_list
- self.mq = MQ(topic_name="topic_crawler_etl_" + self.env)
- self.download_count = 0
- self.limiter = AuthorLimit(platform=self.platform, mode=self.mode)
- def rule_maker(self, account):
- """
- 通过不同的账号生成不同的规则
- :param account: 输入的账号信息
- {'play_cnt': {'min': 100000, 'max': 0}, 'period': {'min': 5, 'max': 5}}
- """
- temp = account['link'].split("?")[0].split("_")
- if len(temp) == 1:
- return self.rule_dict
- else:
- flag = temp[-2]
- match flag:
- case "V1":
- rule_dict = {
- "play_cnt": {"min": 100000, "max": 0},
- 'period': {"min": 90, "max": 90},
- 'special': 0.02
- }
- return rule_dict
- case "V2":
- rule_dict = {
- "play_cnt": {"min": 10000, "max": 0},
- 'period': {"min": 90, "max": 90},
- 'special': 0.01
- }
- return rule_dict
- case "V3":
- rule_dict = {
- "play_cnt": {"min": 5000, "max": 0},
- 'period': {"min": 90, "max": 90},
- 'special': 0.01
- }
- return rule_dict
- def get_author_list(self):
- """
- 每轮只抓取定量的数据,到达数量后自己退出
- 获取账号列表以及账号信息
- """
- # max_count = int(self.rule_dict.get("videos_cnt", {}).get("min", 300))
- for user_dict in self.user_list:
- # if self.download_count <= max_count:
- try:
- flag = user_dict["link"][0]
- match flag:
- case "V":
- self.get_video_list(user_dict)
- case "X":
- self.get_tiny_video_list(user_dict)
- case "h":
- self.get_video_list(user_dict)
- case "D":
- self.get_video_list(user_dict)
- case "B":
- self.get_video_list(user_dict)
- self.get_tiny_video_list(user_dict)
- except Exception as e:
- AliyunLogger.logging(
- code="3001",
- account=user_dict["uid"],
- platform=self.platform,
- mode=self.mode,
- env=self.env,
- message="扫描账号时出现bug, 报错是 {}".format(e)
- )
- # time.sleep(random.randint(1, 15))
- # else:
- # AliyunLogger.logging(
- # code="2000",
- # platform=self.platform,
- # mode=self.mode,
- # env=self.env,
- # message="本轮已经抓取足够数量的视频,已经自动退出",
- # )
- # return
- def get_video_list(self, user_dict):
- """
- 获取某个账号的视频列表
- 账号分为 3 类
- """
- offset = 0
- signature = random_signature()
- link = user_dict['link'].split("?")[0].split("_")[-1]
- url = "https://www.ixigua.com/api/videov2/author/new_video_list?"
- while True:
- to_user_id = str(link.replace("https://www.ixigua.com/home/", ""))
- params = {
- "to_user_id": to_user_id,
- "offset": str(offset),
- "limit": "30",
- "maxBehotTime": "0",
- "order": "new",
- "isHome": "0",
- "_signature": signature,
- }
- headers = {
- "referer": f'https://www.ixigua.com/home/{link.replace("https://www.ixigua.com/home/", "")}/video/?preActiveKey=hotsoon&list_entrance=userdetail',
- "user-agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/110.0.0.0 Safari/537.36 Edg/110.0.1587.41",
- }
- response = requests.get(
- url=url,
- headers=headers,
- params=params,
- proxies=tunnel_proxies(),
- timeout=5,
- )
- offset += 30
- if "data" not in response.text or response.status_code != 200:
- AliyunLogger.logging(
- code="3000",
- platform=self.platform,
- mode=self.mode,
- env=self.env,
- message=f"get_videoList:{response.text}\n",
- )
- return
- elif not response.json()["data"]["videoList"]:
- AliyunLogger.logging(
- account=link,
- code="3000",
- platform=self.platform,
- mode=self.mode,
- env=self.env,
- data=response.json(),
- message=f"没有更多数据啦~\n",
- )
- return
- else:
- feeds = response.json()["data"]["videoList"]
- for video_obj in feeds:
- try:
- AliyunLogger.logging(
- code="1001",
- account=user_dict['uid'],
- platform=self.platform,
- mode=self.mode,
- env=self.env,
- data=video_obj,
- message="扫描到一条视频",
- )
- date_flag = self.process_video_obj(video_obj, user_dict, "l")
- if not date_flag:
- return
- except Exception as e:
- AliyunLogger.logging(
- code="3000",
- platform=self.platform,
- mode=self.mode,
- env=self.env,
- data=video_obj,
- message="抓取单条视频异常, 报错原因是: {}".format(e),
- )
- def get_tiny_video_list(self, user_dict):
- """
- 获取小视频
- """
- url = "https://www.ixigua.com/api/videov2/hotsoon/video"
- max_behot_time = "0"
- link = user_dict['link'].split("?")[0].split("_")[-1]
- to_user_id = str(link.replace("https://www.ixigua.com/home/", ""))
- while True:
- params = {
- "to_user_id": to_user_id,
- "max_behot_time": max_behot_time,
- "_signature": random_signature()
- }
- headers = {
- "referer": "https://www.ixigua.com/{}?&".format(to_user_id),
- "user-agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/110.0.0.0 Safari/537.36 Edg/110.0.1587.41",
- }
- response = requests.get(
- url=url,
- headers=headers,
- params=params,
- proxies=tunnel_proxies(),
- timeout=5,
- )
- if "data" not in response.text or response.status_code != 200:
- AliyunLogger.logging(
- code="2000",
- platform=self.platform,
- mode=self.mode,
- env=self.env,
- message=f"get_videoList:{response.text}\n",
- )
- return
- elif not response.json()["data"]["data"]:
- AliyunLogger.logging(
- account=link,
- code="2000",
- platform=self.platform,
- mode=self.mode,
- env=self.env,
- data=response.json(),
- message=f"没有更多数据啦~\n",
- )
- return
- else:
- video_list = response.json()['data']['data']
- max_behot_time = video_list[-1]["max_behot_time"]
- for video_obj in video_list:
- try:
- AliyunLogger.logging(
- code="1001",
- account=user_dict['uid'],
- platform=self.platform,
- mode=self.mode,
- env=self.env,
- data=video_obj,
- message="扫描到一条小视频",
- )
- date_flag = self.process_video_obj(video_obj, user_dict, "s")
- if not date_flag:
- return
- except Exception as e:
- AliyunLogger.logging(
- code="3000",
- platform=self.platform,
- mode=self.mode,
- env=self.env,
- data=video_obj,
- message="抓取单条视频异常, 报错原因是: {}".format(e),
- )
- def process_video_obj(self, video_obj, user_dict, f):
- """
- process video_obj and extract video_url
- """
- new_rule = self.rule_maker(user_dict)
- trace_id = self.platform + str(uuid.uuid1())
- if f == "s":
- item_id = video_obj.get("id_str", "")
- else:
- item_id = video_obj.get("item_id", "")
- if not item_id:
- AliyunLogger.logging(
- code="2005",
- account=user_dict['uid'],
- platform=self.platform,
- mode=self.mode,
- env=self.env,
- message="无效视频",
- data=video_obj,
- trace_id=trace_id,
- )
- return
- # 获取视频信息
- video_dict = self.get_video_info(item_id=item_id)
- video_dict["platform"] = self.platform
- video_dict["strategy"] = self.mode
- video_dict["out_video_id"] = video_dict["video_id"]
- video_dict["width"] = video_dict["video_width"]
- video_dict["height"] = video_dict["video_height"]
- video_dict["crawler_rule"] = json.dumps(new_rule)
- video_dict["user_id"] = user_dict["uid"]
- video_dict["publish_time"] = video_dict["publish_time_str"]
- video_dict["strategy_type"] = self.mode
- video_dict["update_time_stamp"] = int(time.time())
- if int(time.time()) - video_dict['publish_time_stamp'] > 3600 * 24 * int(
- new_rule.get("period", {}).get("max", 1000)):
- if not video_obj['is_top']:
- """
- 非置顶数据发布时间超过才退出
- """
- AliyunLogger.logging(
- code="2004",
- account=user_dict['uid'],
- platform=self.platform,
- mode=self.mode,
- env=self.env,
- data=video_dict,
- message="发布时间超过{}天".format(
- int(new_rule.get("period", {}).get("max", 1000))
- ),
- )
- return False
- pipeline = PiaoQuanPipeline(
- platform=self.platform,
- mode=self.mode,
- rule_dict=new_rule,
- env=self.env,
- item=video_dict,
- trace_id=trace_id,
- )
- limit_flag = self.limiter.author_limitation(user_id=video_dict['user_id'])
- if limit_flag:
- title_flag = pipeline.title_flag()
- repeat_flag = pipeline.repeat_video()
- if title_flag and repeat_flag:
- if new_rule.get("special"):
- if int(video_dict['play_cnt']) >= int(new_rule.get("play_cnt", {}).get("min", 100000)):
- if float(video_dict['like_cnt']) / float(video_dict['play_cnt']) >= new_rule['special']:
- self.mq.send_msg(video_dict)
- self.download_count += 1
- AliyunLogger.logging(
- code="1002",
- account=user_dict['uid'],
- platform=self.platform,
- mode=self.mode,
- env=self.env,
- data=video_dict,
- trace_id=trace_id,
- message="成功发送 MQ 至 ETL",
- )
- return True
- else:
- AliyunLogger.logging(
- code="2008",
- account=user_dict['uid'],
- platform=self.platform,
- mode=self.mode,
- env=self.env,
- message="不满足特殊规则, 点赞量/播放量",
- data=video_dict
- )
- else:
- if int(video_dict['play_cnt']) >= int(new_rule.get("play_cnt", {}).get("min", 100000)):
- self.mq.send_msg(video_dict)
- self.download_count += 1
- AliyunLogger.logging(
- code="1002",
- account=user_dict['uid'],
- platform=self.platform,
- mode=self.mode,
- env=self.env,
- data=video_dict,
- trace_id=trace_id,
- message="成功发送 MQ 至 ETL",
- )
- return True
- else:
- AliyunLogger.logging(
- code="2008",
- account=user_dict['uid'],
- platform=self.platform,
- mode=self.mode,
- env=self.env,
- message="不满足特殊规则, 播放量",
- data=video_dict
- )
- return True
- def get_video_info(self, item_id):
- """
- 获取视频信息
- """
- url = "https://www.ixigua.com/{}".format(item_id)
- headers = {
- "accept-encoding": "gzip, deflate",
- "accept-language": "zh-CN,zh-Hans;q=0.9",
- "cookie": "ttwid={}".format(byte_dance_cookie(item_id)),
- "user-agent": FakeUserAgent().random,
- "referer": "https://www.ixigua.com/{}/".format(item_id),
- }
- response = requests.get(
- url=url,
- headers=headers,
- proxies=tunnel_proxies(),
- timeout=5,
- )
- time.sleep(random.randint(1, 5))
- video_info = extract_info_by_re(response.text)
- video_dict = {
- "video_title": video_info.get("title", ""),
- "video_id": video_info.get("video_id"),
- "gid": str(item_id),
- "play_cnt": int(video_info.get("play_count", 0)),
- "like_cnt": int(video_info.get("like_count", 0)),
- "comment_cnt": 0,
- "share_cnt": 0,
- "favorite_cnt": 0,
- "duration": int(video_info.get("duration", 0)),
- "video_width": 0,
- "video_height": 0,
- "publish_time_stamp": int(video_info.get("publish_time", 0)),
- "publish_time_str": time.strftime(
- "%Y-%m-%d %H:%M:%S",
- time.localtime(int(video_info.get("publish_time", 0))),
- ),
- "avatar_url": str(
- video_info.get("user_info", {}).get("avatar_url", "")
- ),
- "cover_url": video_info.get("cover_url", "").replace("\\u002F", "/"),
- "video_url": video_info.get("url"),
- "session": f"xigua-author-{int(time.time())}",
- }
- return video_dict
- if __name__ == "__main__":
- user_list = [
- {
- "uid": 6267140,
- "source": "xigua",
- "link": "https://www.ixigua.com/home/2779177225827568",
- "nick_name": "秋晴爱音乐",
- "avatar_url": "",
- "mode": "author",
- },
- {
- "uid": 6267140,
- "source": "xigua",
- "link": "https://www.ixigua.com/home/2885546124776780",
- "nick_name": "朗诵放歌的老山羊",
- "avatar_url": "",
- "mode": "author",
- },
- {
- "uid": 6267140,
- "source": "xigua",
- "link": "https://www.ixigua.com/home/5880938217",
- "nick_name": "天原声疗",
- "avatar_url": "",
- "mode": "author",
- },
- ]
- rule = {'period': {'min': 30, 'max': 30}, 'duration': {'min': 20, 'max': 0}, 'play_cnt': {'min': 100000, 'max': 0}}
- XGA = XiGuaAuthor(
- platform="xigua",
- mode="author",
- rule_dict=rule,
- env="prod",
- user_list=user_list
- )
- XGA.get_author_list()
|