123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498 |
- import json
- import os
- import re
- import random
- import sys
- import string
- import time
- import uuid
- import base64
- import requests
- from fake_useragent import FakeUserAgent
- from common.mq import MQ
- sys.path.append(os.getcwd())
- from common import PiaoQuanPipeline, tunnel_proxies
- from common.limit import AuthorLimit
- def extract_info_by_re(text):
- """
- 通过正则表达式获取文本中的信息
- :param text:
- :return:
- """
- # 标题
- title_match = re.search(r'<title[^>]*>(.*?)</title>', text)
- if title_match:
- title_content = title_match.group(1)
- title_content = title_content.split(" - ")[0]
- title_content = bytes(title_content, "latin1").decode()
- else:
- title_content = ""
- # video_url
- main_url = re.search(r'("main_url":")(.*?)"', text)[0]
- main_url = main_url.split(":")[1]
- decoded_data = base64.b64decode(main_url)
- try:
- # 尝试使用utf-8解码
- video_url = decoded_data.decode()
- except UnicodeDecodeError:
- # 如果utf-8解码失败,尝试使用其他编码方式
- video_url = decoded_data.decode('latin-1')
- # video_id
- video_id = re.search(r'"vid":"(.*?)"', text).group(1)
- # like_count
- like_count = re.search(r'"video_like_count":(.*?),', text).group(1)
- # cover_url
- cover_url = re.search(r'"avatar_url":"(.*?)"', text).group(1)
- # video_play
- video_watch_count = re.search(r'"video_watch_count":(.*?),', text).group(1)
- # "video_publish_time"
- publish_time = re.search(r'"video_publish_time":"(.*?)"', text).group(1)
- # video_duration
- duration = re.search(r'("video_duration":)(.*?)"', text).group(2).replace(",", "")
- return {
- "title": title_content,
- "url": video_url,
- "video_id": video_id,
- "like_count": like_count,
- "cover_url": cover_url,
- "play_count": video_watch_count,
- "publish_time": publish_time,
- "duration": duration
- }
- def random_signature():
- """
- 随机生成签名
- """
- src_digits = string.digits # string_数字
- src_uppercase = string.ascii_uppercase # string_大写字母
- src_lowercase = string.ascii_lowercase # string_小写字母
- digits_num = random.randint(1, 6)
- uppercase_num = random.randint(1, 26 - digits_num - 1)
- lowercase_num = 26 - (digits_num + uppercase_num)
- password = (
- random.sample(src_digits, digits_num)
- + random.sample(src_uppercase, uppercase_num)
- + random.sample(src_lowercase, lowercase_num)
- )
- random.shuffle(password)
- new_password = "AAAAAAAAAA" + "".join(password)[10:-4] + "AAAB"
- new_password_start = new_password[0:18]
- new_password_end = new_password[-7:]
- if new_password[18] == "8":
- new_password = new_password_start + "w" + new_password_end
- elif new_password[18] == "9":
- new_password = new_password_start + "x" + new_password_end
- elif new_password[18] == "-":
- new_password = new_password_start + "y" + new_password_end
- elif new_password[18] == ".":
- new_password = new_password_start + "z" + new_password_end
- else:
- new_password = new_password_start + "y" + new_password_end
- return new_password
- def byte_dance_cookie(item_id):
- """
- 获取西瓜视频的 cookie
- :param item_id:
- """
- sess = requests.Session()
- sess.headers.update({
- 'user-agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 11_1_0) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/87.0.4280.88 Safari/537.36',
- 'referer': 'https://www.ixigua.com/home/{}/'.format(item_id),
- })
- # 获取 cookies
- sess.get('https://i.snssdk.com/slardar/sdk.js?bid=xigua_video_web_pc')
- data = '{"region":"cn","aid":1768,"needFid":false,"service":"www.ixigua.com","migrate_info":{"ticket":"","source":"node"},"cbUrlProtocol":"https","union":true}'
- r = sess.post('https://ttwid.bytedance.com/ttwid/union/register/', data=data)
- # print(r.text)
- return r.cookies.values()[0]
- class XiGuaAuthor(object):
- """
- 西瓜账号爬虫
- """
- def __init__(self, platform, mode, rule_dict, env, user_list):
- self.platform = platform
- self.mode = mode
- self.rule_dict = rule_dict
- self.env = env
- self.user_list = user_list
- self.mq = MQ(topic_name="topic_crawler_etl_" + self.env)
- self.download_count = 0
- self.limiter = AuthorLimit(platform=self.platform, mode=self.mode)
- def rule_maker(self, account):
- """
- 通过不同的账号生成不同的规则
- :param account: 输入的账号信息
- {'play_cnt': {'min': 100000, 'max': 0}, 'period': {'min': 5, 'max': 5}}
- """
- temp = account['link'].split("_")
- if len(temp) == 1:
- return self.rule_dict
- else:
- flag = temp[-2]
- match flag:
- case "V1":
- rule_dict = {
- "play_cnt": {"min": 100000, "max": 0},
- 'period': {"min": 90, "max": 90},
- 'special': 0.02
- }
- return rule_dict
- case "V2":
- rule_dict = {
- "play_cnt": {"min": 10000, "max": 0},
- 'period': {"min": 90, "max": 90},
- 'special': 0.01
- }
- return rule_dict
- case "V3":
- rule_dict = {
- "play_cnt": {"min": 5000, "max": 0},
- 'period': {"min": 90, "max": 90},
- 'special': 0.01
- }
- return rule_dict
- def get_author_list(self):
- """
- 每轮只抓取定量的数据,到达数量后自己退出
- 获取账号列表以及账号信息
- """
- # max_count = int(self.rule_dict.get("videos_cnt", {}).get("min", 300))
- for user_dict in self.user_list:
- # if self.download_count <= max_count:
- flag = user_dict["link"][0]
- print(user_dict)
- print(flag)
- match flag:
- case "V":
- self.get_video_list(user_dict)
- case "X":
- self.get_tiny_video_list(user_dict)
- case "h":
- self.get_video_list(user_dict)
- case "D":
- self.get_video_list(user_dict)
- case "B":
- self.get_video_list(user_dict)
- self.get_tiny_video_list(user_dict)
- # time.sleep(random.randint(1, 15))
- # else:
- # AliyunLogger.logging(
- # code="2000",
- # platform=self.platform,
- # mode=self.mode,
- # env=self.env,
- # message="本轮已经抓取足够数量的视频,已经自动退出",
- # )
- # return
- def get_video_list(self, user_dict):
- """
- 获取某个账号的视频列表
- 账号分为 3 类
- """
- offset = 0
- signature = random_signature()
- link = user_dict['link'].split("_")[-1]
- url = "https://www.ixigua.com/api/videov2/author/new_video_list?"
- while True:
- to_user_id = str(link.replace("https://www.ixigua.com/home/", ""))
- params = {
- "to_user_id": to_user_id,
- "offset": str(offset),
- "limit": "30",
- "maxBehotTime": "0",
- "order": "new",
- "isHome": "0",
- "_signature": signature,
- }
- headers = {
- "referer": f'https://www.ixigua.com/home/{link.replace("https://www.ixigua.com/home/", "")}/video/?preActiveKey=hotsoon&list_entrance=userdetail',
- "user-agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/110.0.0.0 Safari/537.36 Edg/110.0.1587.41",
- }
- response = requests.get(
- url=url,
- headers=headers,
- params=params,
- proxies=tunnel_proxies(),
- timeout=5,
- )
- offset += 30
- if "data" not in response.text or response.status_code != 200:
- message = f"get_videoList:{response.text}\n"
- print(message)
- return
- elif not response.json()["data"]["videoList"]:
- message = f"没有更多数据啦~\n"
- print(params)
- return
- else:
- feeds = response.json()["data"]["videoList"]
- for video_obj in feeds:
- message = "扫描到一条视频"
- print(message)
- date_flag = self.process_video_obj(video_obj, user_dict, "l")
- if not date_flag:
- return
- def get_tiny_video_list(self, user_dict):
- """
- 获取小视频
- """
- url = "https://www.ixigua.com/api/videov2/hotsoon/video"
- max_behot_time = "0"
- link = user_dict['link'].split("_")[-1]
- to_user_id = str(link.replace("https://www.ixigua.com/home/", ""))
- while True:
- params = {
- "to_user_id": to_user_id,
- "max_behot_time": max_behot_time,
- "_signature": random_signature()
- }
- headers = {
- "referer": "https://www.ixigua.com/{}?&".format(to_user_id),
- "user-agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/110.0.0.0 Safari/537.36 Edg/110.0.1587.41",
- }
- response = requests.get(
- url=url,
- headers=headers,
- params=params,
- proxies=tunnel_proxies(),
- timeout=5,
- )
- if "data" not in response.text or response.status_code != 200:
- AliyunLogger.logging(
- code="2000",
- platform=self.platform,
- mode=self.mode,
- env=self.env,
- message=f"get_videoList:{response.text}\n",
- )
- return
- elif not response.json()["data"]["data"]:
- AliyunLogger.logging(
- code="2000",
- platform=self.platform,
- mode=self.mode,
- env=self.env,
- message=f"没有更多数据啦~\n",
- )
- return
- else:
- video_list = response.json()['data']['data']
- max_behot_time = video_list[-1]["max_behot_time"]
- for video_obj in video_list:
- try:
- AliyunLogger.logging(
- code="1001",
- account=user_dict['uid'],
- platform=self.platform,
- mode=self.mode,
- env=self.env,
- data=video_obj,
- message="扫描到一条小视频",
- )
- date_flag = self.process_video_obj(video_obj, user_dict, "s")
- if not date_flag:
- return
- except Exception as e:
- AliyunLogger.logging(
- code="3000",
- platform=self.platform,
- mode=self.mode,
- env=self.env,
- data=video_obj,
- message="抓取单条视频异常, 报错原因是: {}".format(e),
- )
- def process_video_obj(self, video_obj, user_dict, f):
- """
- process video_obj and extract video_url
- """
- new_rule = self.rule_maker(user_dict)
- trace_id = self.platform + str(uuid.uuid1())
- if f == "s":
- item_id = video_obj.get("id_str", "")
- else:
- item_id = video_obj.get("item_id", "")
- if not item_id:
- message="无效视频"
- print(message)
- return
- # 获取视频信息
- video_dict = self.get_video_info(item_id=item_id, trace_id=trace_id)
- # video_dict["out_user_id"] = video_dict["user_id"]
- video_dict["platform"] = self.platform
- video_dict["strategy"] = self.mode
- video_dict["out_video_id"] = video_dict["video_id"]
- video_dict["width"] = video_dict["video_width"]
- video_dict["height"] = video_dict["video_height"]
- video_dict["crawler_rule"] = json.dumps(new_rule)
- video_dict["user_id"] = user_dict["uid"]
- video_dict["publish_time"] = video_dict["publish_time_str"]
- video_dict["strategy_type"] = self.mode
- video_dict["update_time_stamp"] = int(time.time())
- if int(time.time()) - video_dict['publish_time_stamp'] > 3600 * 24 * int(
- new_rule.get("period", {}).get("max", 1000)):
- if not video_obj['is_top']:
- """
- 非置顶数据发布时间超过才退出
- """
- message = "发布时间超过{}天".format(
- int(new_rule.get("period", {}).get("max", 1000))
- )
- print(message)
- return False
- pipeline = PiaoQuanPipeline(
- platform=self.platform,
- mode=self.mode,
- rule_dict=new_rule,
- env=self.env,
- item=video_dict,
- trace_id=trace_id,
- )
- limit_flag = self.limiter.author_limitation(user_id=video_dict['user_id'])
- print(json.dumps(video_dict, ensure_ascii=False, indent=4))
- # if limit_flag:
- # title_flag = pipeline.title_flag()
- # repeat_flag = pipeline.repeat_video()
- # if title_flag and repeat_flag:
- # if new_rule.get("special"):
- # if int(video_dict['play_cnt']) >= int(new_rule.get("play_cnt", {}).get("min", 100000)):
- # if float(video_dict['like_cnt']) / float(video_dict['play_cnt']) >= new_rule['special']:
- # print(json.dumps(video_dict, ensure_ascii=False, indent=4))
- # # self.mq.send_msg(video_dict)
- # self.download_count += 1
- #
- # return True
- # else:
- # message="不满足特殊规则, 点赞量/播放量"
- # print(json.dumps(video_dict, ensure_ascii=False, indent=4))
- # print(message)
- # return False
- #
- # else:
- # if int(video_dict['play_cnt']) >= int(new_rule.get("play_cnt", {}).get("min", 100000)):
- # self.mq.send_msg(video_dict)
- # self.download_count += 1
- # message="成功发送 MQ 至 ETL",
- # )
- # return True
- # else:
- # AliyunLogger.logging(
- # code="2008",
- # account=user_dict['uid'],
- # platform=self.platform,
- # mode=self.mode,
- # env=self.env,
- # message="不满足特殊规则, 播放量",
- # data=video_dict
- # )
- # return True
- def get_video_info(self, item_id, trace_id):
- """
- 获取视频信息
- """
- url = "https://www.ixigua.com/{}".format(item_id)
- headers = {
- "accept-encoding": "gzip, deflate",
- "accept-language": "zh-CN,zh-Hans;q=0.9",
- "user-agent": FakeUserAgent().random,
- "cookie": "ttwid={}".format(byte_dance_cookie(item_id)),
- "referer": "https://www.ixigua.com/{}/".format(item_id),
- }
- response = requests.get(
- url=url,
- headers=headers,
- proxies=tunnel_proxies(),
- timeout=5,
- )
- video_info = extract_info_by_re(response.text)
- video_dict = {
- "video_title": video_info.get("title", ""),
- "video_id": video_info.get("video_id"),
- "gid": str(item_id),
- "play_cnt": int(video_info.get("play_count", 0)),
- "like_cnt": int(video_info.get("like_count", 0)),
- "comment_cnt": 0,
- "share_cnt": 0,
- "favorite_cnt": 0,
- "duration": int(video_info.get("duration", 0)),
- "video_width": 0,
- "video_height": 0,
- "publish_time_stamp": int(video_info.get("publish_time", 0)),
- "publish_time_str": time.strftime(
- "%Y-%m-%d %H:%M:%S",
- time.localtime(int(video_info.get("publish_time", 0))),
- ),
- "avatar_url": str(
- video_info.get("user_info", {}).get("avatar_url", "")
- ),
- "cover_url": video_info.get("cover_url", ""),
- "video_url": video_info.get("url"),
- "session": f"xigua-search-{int(time.time())}",
- }
- return video_dict
- if __name__ == "__main__":
- user_list = [
- {
- "uid": 6267140,
- "source": "xigua",
- "link": "https://www.ixigua.com/home/2779177225827568",
- "nick_name": "秋晴爱音乐",
- "avatar_url": "",
- "mode": "author",
- },
- {
- "uid": 6267140,
- "source": "xigua",
- "link": "https://www.ixigua.com/home/2885546124776780",
- "nick_name": "朗诵放歌的老山羊",
- "avatar_url": "",
- "mode": "author",
- },
- {
- "uid": 6267140,
- "source": "xigua",
- "link": "https://www.ixigua.com/home/5880938217",
- "nick_name": "天原声疗",
- "avatar_url": "",
- "mode": "author",
- },
- ]
- rule = {'period': {'min': 30, 'max': 30}, 'duration': {'min': 20, 'max': 0}, 'play_cnt': {'min': 100000, 'max': 0}}
- XGA = XiGuaAuthor(
- platform="xigua",
- mode="author",
- rule_dict=rule,
- env="prod",
- user_list=user_list
- )
- XGA.get_author_list()
|