import json import os import re import random import sys import string import time import uuid import base64 import requests from fake_useragent import FakeUserAgent from common.mq import MQ sys.path.append(os.getcwd()) from common import PiaoQuanPipeline, tunnel_proxies from common.limit import AuthorLimit def extract_info_by_re(text): """ 通过正则表达式获取文本中的信息 :param text: :return: """ # 标题 title_match = re.search(r']*>(.*?)', text) if title_match: title_content = title_match.group(1) title_content = title_content.split(" - ")[0] title_content = bytes(title_content, "latin1").decode() else: title_content = "" # video_url main_url = re.search(r'("main_url":")(.*?)"', text)[0] main_url = main_url.split(":")[1] decoded_data = base64.b64decode(main_url) try: # 尝试使用utf-8解码 video_url = decoded_data.decode() except UnicodeDecodeError: # 如果utf-8解码失败,尝试使用其他编码方式 video_url = decoded_data.decode('latin-1') # video_id video_id = re.search(r'"vid":"(.*?)"', text).group(1) # like_count like_count = re.search(r'"video_like_count":(.*?),', text).group(1) # cover_url cover_url = re.search(r'"avatar_url":"(.*?)"', text).group(1) # video_play video_watch_count = re.search(r'"video_watch_count":(.*?),', text).group(1) # "video_publish_time" publish_time = re.search(r'"video_publish_time":"(.*?)"', text).group(1) # video_duration duration = re.search(r'("video_duration":)(.*?)"', text).group(2).replace(",", "") return { "title": title_content, "url": video_url, "video_id": video_id, "like_count": like_count, "cover_url": cover_url, "play_count": video_watch_count, "publish_time": publish_time, "duration": duration } def random_signature(): """ 随机生成签名 """ src_digits = string.digits # string_数字 src_uppercase = string.ascii_uppercase # string_大写字母 src_lowercase = string.ascii_lowercase # string_小写字母 digits_num = random.randint(1, 6) uppercase_num = random.randint(1, 26 - digits_num - 1) lowercase_num = 26 - (digits_num + uppercase_num) password = ( random.sample(src_digits, digits_num) + random.sample(src_uppercase, uppercase_num) + random.sample(src_lowercase, lowercase_num) ) random.shuffle(password) new_password = "AAAAAAAAAA" + "".join(password)[10:-4] + "AAAB" new_password_start = new_password[0:18] new_password_end = new_password[-7:] if new_password[18] == "8": new_password = new_password_start + "w" + new_password_end elif new_password[18] == "9": new_password = new_password_start + "x" + new_password_end elif new_password[18] == "-": new_password = new_password_start + "y" + new_password_end elif new_password[18] == ".": new_password = new_password_start + "z" + new_password_end else: new_password = new_password_start + "y" + new_password_end return new_password def byte_dance_cookie(item_id): """ 获取西瓜视频的 cookie :param item_id: """ sess = requests.Session() sess.headers.update({ 'user-agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 11_1_0) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/87.0.4280.88 Safari/537.36', 'referer': 'https://www.ixigua.com/home/{}/'.format(item_id), }) # 获取 cookies sess.get('https://i.snssdk.com/slardar/sdk.js?bid=xigua_video_web_pc') data = '{"region":"cn","aid":1768,"needFid":false,"service":"www.ixigua.com","migrate_info":{"ticket":"","source":"node"},"cbUrlProtocol":"https","union":true}' r = sess.post('https://ttwid.bytedance.com/ttwid/union/register/', data=data) # print(r.text) return r.cookies.values()[0] class XiGuaAuthor(object): """ 西瓜账号爬虫 """ def __init__(self, platform, mode, rule_dict, env, user_list): self.platform = platform self.mode = mode self.rule_dict = rule_dict self.env = env self.user_list = user_list self.mq = MQ(topic_name="topic_crawler_etl_" + self.env) self.download_count = 0 self.limiter = AuthorLimit(platform=self.platform, mode=self.mode) def rule_maker(self, account): """ 通过不同的账号生成不同的规则 :param account: 输入的账号信息 {'play_cnt': {'min': 100000, 'max': 0}, 'period': {'min': 5, 'max': 5}} """ temp = account['link'].split("_") if len(temp) == 1: return self.rule_dict else: flag = temp[-2] match flag: case "V1": rule_dict = { "play_cnt": {"min": 100000, "max": 0}, 'period': {"min": 90, "max": 90}, 'special': 0.02 } return rule_dict case "V2": rule_dict = { "play_cnt": {"min": 10000, "max": 0}, 'period': {"min": 90, "max": 90}, 'special': 0.01 } return rule_dict case "V3": rule_dict = { "play_cnt": {"min": 5000, "max": 0}, 'period': {"min": 90, "max": 90}, 'special': 0.01 } return rule_dict def get_author_list(self): """ 每轮只抓取定量的数据,到达数量后自己退出 获取账号列表以及账号信息 """ # max_count = int(self.rule_dict.get("videos_cnt", {}).get("min", 300)) for user_dict in self.user_list: # if self.download_count <= max_count: flag = user_dict["link"][0] print(user_dict) print(flag) match flag: case "V": self.get_video_list(user_dict) case "X": self.get_tiny_video_list(user_dict) case "h": self.get_video_list(user_dict) case "D": self.get_video_list(user_dict) case "B": self.get_video_list(user_dict) self.get_tiny_video_list(user_dict) # time.sleep(random.randint(1, 15)) # else: # AliyunLogger.logging( # code="2000", # platform=self.platform, # mode=self.mode, # env=self.env, # message="本轮已经抓取足够数量的视频,已经自动退出", # ) # return def get_video_list(self, user_dict): """ 获取某个账号的视频列表 账号分为 3 类 """ offset = 0 signature = random_signature() link = user_dict['link'].split("_")[-1] url = "https://www.ixigua.com/api/videov2/author/new_video_list?" while True: to_user_id = str(link.replace("https://www.ixigua.com/home/", "")) params = { "to_user_id": to_user_id, "offset": str(offset), "limit": "30", "maxBehotTime": "0", "order": "new", "isHome": "0", "_signature": signature, } headers = { "referer": f'https://www.ixigua.com/home/{link.replace("https://www.ixigua.com/home/", "")}/video/?preActiveKey=hotsoon&list_entrance=userdetail', "user-agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/110.0.0.0 Safari/537.36 Edg/110.0.1587.41", } response = requests.get( url=url, headers=headers, params=params, proxies=tunnel_proxies(), timeout=5, ) offset += 30 if "data" not in response.text or response.status_code != 200: message = f"get_videoList:{response.text}\n" print(message) return elif not response.json()["data"]["videoList"]: message = f"没有更多数据啦~\n" print(params) return else: feeds = response.json()["data"]["videoList"] for video_obj in feeds: message = "扫描到一条视频" print(message) date_flag = self.process_video_obj(video_obj, user_dict, "l") if not date_flag: return def get_tiny_video_list(self, user_dict): """ 获取小视频 """ url = "https://www.ixigua.com/api/videov2/hotsoon/video" max_behot_time = "0" link = user_dict['link'].split("_")[-1] to_user_id = str(link.replace("https://www.ixigua.com/home/", "")) while True: params = { "to_user_id": to_user_id, "max_behot_time": max_behot_time, "_signature": random_signature() } headers = { "referer": "https://www.ixigua.com/{}?&".format(to_user_id), "user-agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/110.0.0.0 Safari/537.36 Edg/110.0.1587.41", } response = requests.get( url=url, headers=headers, params=params, proxies=tunnel_proxies(), timeout=5, ) if "data" not in response.text or response.status_code != 200: AliyunLogger.logging( code="2000", platform=self.platform, mode=self.mode, env=self.env, message=f"get_videoList:{response.text}\n", ) return elif not response.json()["data"]["data"]: AliyunLogger.logging( code="2000", platform=self.platform, mode=self.mode, env=self.env, message=f"没有更多数据啦~\n", ) return else: video_list = response.json()['data']['data'] max_behot_time = video_list[-1]["max_behot_time"] for video_obj in video_list: try: AliyunLogger.logging( code="1001", account=user_dict['uid'], platform=self.platform, mode=self.mode, env=self.env, data=video_obj, message="扫描到一条小视频", ) date_flag = self.process_video_obj(video_obj, user_dict, "s") if not date_flag: return except Exception as e: AliyunLogger.logging( code="3000", platform=self.platform, mode=self.mode, env=self.env, data=video_obj, message="抓取单条视频异常, 报错原因是: {}".format(e), ) def process_video_obj(self, video_obj, user_dict, f): """ process video_obj and extract video_url """ new_rule = self.rule_maker(user_dict) trace_id = self.platform + str(uuid.uuid1()) if f == "s": item_id = video_obj.get("id_str", "") else: item_id = video_obj.get("item_id", "") if not item_id: message="无效视频" print(message) return # 获取视频信息 video_dict = self.get_video_info(item_id=item_id, trace_id=trace_id) # video_dict["out_user_id"] = video_dict["user_id"] video_dict["platform"] = self.platform video_dict["strategy"] = self.mode video_dict["out_video_id"] = video_dict["video_id"] video_dict["width"] = video_dict["video_width"] video_dict["height"] = video_dict["video_height"] video_dict["crawler_rule"] = json.dumps(new_rule) video_dict["user_id"] = user_dict["uid"] video_dict["publish_time"] = video_dict["publish_time_str"] video_dict["strategy_type"] = self.mode video_dict["update_time_stamp"] = int(time.time()) if int(time.time()) - video_dict['publish_time_stamp'] > 3600 * 24 * int( new_rule.get("period", {}).get("max", 1000)): if not video_obj['is_top']: """ 非置顶数据发布时间超过才退出 """ message = "发布时间超过{}天".format( int(new_rule.get("period", {}).get("max", 1000)) ) print(message) return False pipeline = PiaoQuanPipeline( platform=self.platform, mode=self.mode, rule_dict=new_rule, env=self.env, item=video_dict, trace_id=trace_id, ) limit_flag = self.limiter.author_limitation(user_id=video_dict['user_id']) print(json.dumps(video_dict, ensure_ascii=False, indent=4)) # if limit_flag: # title_flag = pipeline.title_flag() # repeat_flag = pipeline.repeat_video() # if title_flag and repeat_flag: # if new_rule.get("special"): # if int(video_dict['play_cnt']) >= int(new_rule.get("play_cnt", {}).get("min", 100000)): # if float(video_dict['like_cnt']) / float(video_dict['play_cnt']) >= new_rule['special']: # print(json.dumps(video_dict, ensure_ascii=False, indent=4)) # # self.mq.send_msg(video_dict) # self.download_count += 1 # # return True # else: # message="不满足特殊规则, 点赞量/播放量" # print(json.dumps(video_dict, ensure_ascii=False, indent=4)) # print(message) # return False # # else: # if int(video_dict['play_cnt']) >= int(new_rule.get("play_cnt", {}).get("min", 100000)): # self.mq.send_msg(video_dict) # self.download_count += 1 # message="成功发送 MQ 至 ETL", # ) # return True # else: # AliyunLogger.logging( # code="2008", # account=user_dict['uid'], # platform=self.platform, # mode=self.mode, # env=self.env, # message="不满足特殊规则, 播放量", # data=video_dict # ) # return True def get_video_info(self, item_id, trace_id): """ 获取视频信息 """ url = "https://www.ixigua.com/{}".format(item_id) headers = { "accept-encoding": "gzip, deflate", "accept-language": "zh-CN,zh-Hans;q=0.9", "user-agent": FakeUserAgent().random, "cookie": "ttwid={}".format(byte_dance_cookie(item_id)), "referer": "https://www.ixigua.com/{}/".format(item_id), } response = requests.get( url=url, headers=headers, proxies=tunnel_proxies(), timeout=5, ) video_info = extract_info_by_re(response.text) video_dict = { "video_title": video_info.get("title", ""), "video_id": video_info.get("video_id"), "gid": str(item_id), "play_cnt": int(video_info.get("play_count", 0)), "like_cnt": int(video_info.get("like_count", 0)), "comment_cnt": 0, "share_cnt": 0, "favorite_cnt": 0, "duration": int(video_info.get("duration", 0)), "video_width": 0, "video_height": 0, "publish_time_stamp": int(video_info.get("publish_time", 0)), "publish_time_str": time.strftime( "%Y-%m-%d %H:%M:%S", time.localtime(int(video_info.get("publish_time", 0))), ), "avatar_url": str( video_info.get("user_info", {}).get("avatar_url", "") ), "cover_url": video_info.get("cover_url", ""), "video_url": video_info.get("url"), "session": f"xigua-search-{int(time.time())}", } return video_dict if __name__ == "__main__": user_list = [ { "uid": 6267140, "source": "xigua", "link": "https://www.ixigua.com/home/2779177225827568", "nick_name": "秋晴爱音乐", "avatar_url": "", "mode": "author", }, { "uid": 6267140, "source": "xigua", "link": "https://www.ixigua.com/home/2885546124776780", "nick_name": "朗诵放歌的老山羊", "avatar_url": "", "mode": "author", }, { "uid": 6267140, "source": "xigua", "link": "https://www.ixigua.com/home/5880938217", "nick_name": "天原声疗", "avatar_url": "", "mode": "author", }, ] rule = {'period': {'min': 30, 'max': 30}, 'duration': {'min': 20, 'max': 0}, 'play_cnt': {'min': 100000, 'max': 0}} XGA = XiGuaAuthor( platform="xigua", mode="author", rule_dict=rule, env="prod", user_list=user_list ) XGA.get_author_list()