Browse Source

西瓜视频——cookie 逆向, 解决 cookie 过期问题

罗俊辉 1 year ago
parent
commit
e904458226
2 changed files with 533 additions and 32 deletions
  1. 35 32
      xigua/xigua_author/xigua_author.py
  2. 498 0
      xigua/xigua_author/xigua_dev.py

+ 35 - 32
xigua/xigua_author/xigua_author.py

@@ -47,13 +47,13 @@ def extract_info_by_re(text):
     video_id = re.search(r'"vid":"(.*?)"', text).group(1)
 
     # like_count
-    like_count = re.search(r'"video_like_count":"(.*?)"', text).group(1)
+    like_count = re.search(r'"video_like_count":(.*?),', text).group(1)
 
     # cover_url
     cover_url = re.search(r'"avatar_url":"(.*?)"', text).group(1)
 
     # video_play
-    video_watch_count = re.search(r'"video_watch_count":"(.*?)"', text).group(1)
+    video_watch_count = re.search(r'"video_watch_count":(.*?),', text).group(1)
 
     # "video_publish_time"
     publish_time = re.search(r'"video_publish_time":"(.*?)"', text).group(1)
@@ -102,6 +102,23 @@ def random_signature():
         new_password = new_password_start + "y" + new_password_end
     return new_password
 
+def byte_dance_cookie(item_id):
+    """
+    获取西瓜视频的 cookie
+    :param item_id:
+    """
+    sess = requests.Session()
+    sess.headers.update({
+        'user-agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 11_1_0) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/87.0.4280.88 Safari/537.36',
+        'referer': 'https://www.ixigua.com/home/{}/'.format(item_id),
+    })
+
+    # 获取 cookies
+    sess.get('https://i.snssdk.com/slardar/sdk.js?bid=xigua_video_web_pc')
+    data = '{"region":"cn","aid":1768,"needFid":false,"service":"www.ixigua.com","migrate_info":{"ticket":"","source":"node"},"cbUrlProtocol":"https","union":true}'
+    r = sess.post('https://ttwid.bytedance.com/ttwid/union/register/', data=data)
+    # print(r.text)
+    return r.cookies.values()[0]
 
 def get_video_url(video_info):
     """
@@ -950,8 +967,7 @@ class XiGuaAuthor:
             )
             return
         # 获取视频信息
-        video_dict = self.get_video_info(item_id=item_id, trace_id=trace_id)
-        video_dict["out_user_id"] = video_dict["user_id"]
+        video_dict = self.get_video_info(item_id=item_id)
         video_dict["platform"] = self.platform
         video_dict["strategy"] = self.mode
         video_dict["out_video_id"] = video_dict["video_id"]
@@ -1046,7 +1062,7 @@ class XiGuaAuthor:
                         )
             return True
 
-    def get_video_info(self, item_id, trace_id):
+    def get_video_info(self, item_id):
         """
         获取视频信息
         """
@@ -1054,6 +1070,7 @@ class XiGuaAuthor:
         headers = {
             "accept-encoding": "gzip, deflate",
             "accept-language": "zh-CN,zh-Hans;q=0.9",
+            "cookie": "ttwid={}".format(byte_dance_cookie(item_id)),
             "user-agent": FakeUserAgent().random,
             "referer": "https://www.ixigua.com/{}/".format(item_id),
         }
@@ -1063,23 +1080,9 @@ class XiGuaAuthor:
             proxies=tunnel_proxies(),
             timeout=5,
         )
-        if (
-                response.status_code != 200
-                or "data" not in response.json()
-                or response.json()["data"] == {}
-        ):
-            AliyunLogger.logging(
-                code="2000",
-                platform=self.platform,
-                mode=self.mode,
-                env=self.env,
-                message="获取视频信息失败",
-                trace_id=trace_id,
-            )
-            return None
-        else:
-            video_info = extract_info_by_re(response.text)
-            video_dict = {
+
+        video_info = extract_info_by_re(response.text)
+        video_dict = {
                 "video_title": video_info.get("title", ""),
                 "video_id": video_info.get("video_id"),
                 "gid": str(item_id),
@@ -1103,7 +1106,7 @@ class XiGuaAuthor:
                 "video_url": video_info.get("url"),
                 "session": f"xigua-search-{int(time.time())}",
             }
-            return video_dict
+        return video_dict
 
 
 if __name__ == "__main__":
@@ -1133,12 +1136,12 @@ if __name__ == "__main__":
             "mode": "author",
         },
     ]
-    # rule = {'period': {'min': 30, 'max': 30}, 'duration': {'min': 20, 'max': 0}, 'play_cnt': {'min': 100000, 'max': 0}}
-    # XGA = XiGuaAuthor(
-    #     platform="xigua",
-    #     mode="author",
-    #     rule_dict=rule,
-    #     env="prod",
-    #     user_list=user_list
-    # )
-    # XGA.get_author_list()
+    rule = {'period': {'min': 30, 'max': 30}, 'duration': {'min': 20, 'max': 0}, 'play_cnt': {'min': 100000, 'max': 0}}
+    XGA = XiGuaAuthor(
+        platform="xigua",
+        mode="author",
+        rule_dict=rule,
+        env="prod",
+        user_list=user_list
+    )
+    XGA.get_author_list()

+ 498 - 0
xigua/xigua_author/xigua_dev.py

@@ -0,0 +1,498 @@
+import json
+import os
+import re
+import random
+import sys
+import string
+import time
+import uuid
+import base64
+import requests
+from fake_useragent import FakeUserAgent
+
+from common.mq import MQ
+
+sys.path.append(os.getcwd())
+
+from common import PiaoQuanPipeline, tunnel_proxies
+from common.limit import AuthorLimit
+
+
+def extract_info_by_re(text):
+    """
+    通过正则表达式获取文本中的信息
+    :param text:
+    :return:
+    """
+    # 标题
+    title_match = re.search(r'<title[^>]*>(.*?)</title>', text)
+    if title_match:
+        title_content = title_match.group(1)
+        title_content = title_content.split(" - ")[0]
+        title_content = bytes(title_content, "latin1").decode()
+    else:
+        title_content = ""
+    # video_url
+    main_url = re.search(r'("main_url":")(.*?)"', text)[0]
+    main_url = main_url.split(":")[1]
+    decoded_data = base64.b64decode(main_url)
+    try:
+        # 尝试使用utf-8解码
+        video_url = decoded_data.decode()
+    except UnicodeDecodeError:
+        # 如果utf-8解码失败,尝试使用其他编码方式
+        video_url = decoded_data.decode('latin-1')
+
+    # video_id
+    video_id = re.search(r'"vid":"(.*?)"', text).group(1)
+
+    # like_count
+    like_count = re.search(r'"video_like_count":(.*?),', text).group(1)
+
+    # cover_url
+    cover_url = re.search(r'"avatar_url":"(.*?)"', text).group(1)
+
+    # video_play
+    video_watch_count = re.search(r'"video_watch_count":(.*?),', text).group(1)
+
+    # "video_publish_time"
+    publish_time = re.search(r'"video_publish_time":"(.*?)"', text).group(1)
+
+    # video_duration
+    duration = re.search(r'("video_duration":)(.*?)"', text).group(2).replace(",", "")
+    return {
+        "title": title_content,
+        "url": video_url,
+        "video_id": video_id,
+        "like_count": like_count,
+        "cover_url": cover_url,
+        "play_count": video_watch_count,
+        "publish_time": publish_time,
+        "duration": duration
+    }
+
+
+def random_signature():
+    """
+    随机生成签名
+    """
+    src_digits = string.digits  # string_数字
+    src_uppercase = string.ascii_uppercase  # string_大写字母
+    src_lowercase = string.ascii_lowercase  # string_小写字母
+    digits_num = random.randint(1, 6)
+    uppercase_num = random.randint(1, 26 - digits_num - 1)
+    lowercase_num = 26 - (digits_num + uppercase_num)
+    password = (
+            random.sample(src_digits, digits_num)
+            + random.sample(src_uppercase, uppercase_num)
+            + random.sample(src_lowercase, lowercase_num)
+    )
+    random.shuffle(password)
+    new_password = "AAAAAAAAAA" + "".join(password)[10:-4] + "AAAB"
+    new_password_start = new_password[0:18]
+    new_password_end = new_password[-7:]
+    if new_password[18] == "8":
+        new_password = new_password_start + "w" + new_password_end
+    elif new_password[18] == "9":
+        new_password = new_password_start + "x" + new_password_end
+    elif new_password[18] == "-":
+        new_password = new_password_start + "y" + new_password_end
+    elif new_password[18] == ".":
+        new_password = new_password_start + "z" + new_password_end
+    else:
+        new_password = new_password_start + "y" + new_password_end
+    return new_password
+
+
+def byte_dance_cookie(item_id):
+    """
+    获取西瓜视频的 cookie
+    :param item_id:
+    """
+    sess = requests.Session()
+    sess.headers.update({
+        'user-agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 11_1_0) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/87.0.4280.88 Safari/537.36',
+        'referer': 'https://www.ixigua.com/home/{}/'.format(item_id),
+    })
+
+    # 获取 cookies
+    sess.get('https://i.snssdk.com/slardar/sdk.js?bid=xigua_video_web_pc')
+    data = '{"region":"cn","aid":1768,"needFid":false,"service":"www.ixigua.com","migrate_info":{"ticket":"","source":"node"},"cbUrlProtocol":"https","union":true}'
+    r = sess.post('https://ttwid.bytedance.com/ttwid/union/register/', data=data)
+    # print(r.text)
+    return r.cookies.values()[0]
+
+
+class XiGuaAuthor(object):
+    """
+    西瓜账号爬虫
+    """
+
+    def __init__(self, platform, mode, rule_dict, env, user_list):
+        self.platform = platform
+        self.mode = mode
+        self.rule_dict = rule_dict
+        self.env = env
+        self.user_list = user_list
+        self.mq = MQ(topic_name="topic_crawler_etl_" + self.env)
+        self.download_count = 0
+        self.limiter = AuthorLimit(platform=self.platform, mode=self.mode)
+
+    def rule_maker(self, account):
+        """
+        通过不同的账号生成不同的规则
+        :param account: 输入的账号信息
+        {'play_cnt': {'min': 100000, 'max': 0}, 'period': {'min': 5, 'max': 5}}
+        """
+        temp = account['link'].split("_")
+        if len(temp) == 1:
+            return self.rule_dict
+        else:
+            flag = temp[-2]
+            match flag:
+                case "V1":
+                    rule_dict = {
+                        "play_cnt": {"min": 100000, "max": 0},
+                        'period': {"min": 90, "max": 90},
+                        'special': 0.02
+                    }
+                    return rule_dict
+                case "V2":
+                    rule_dict = {
+                        "play_cnt": {"min": 10000, "max": 0},
+                        'period': {"min": 90, "max": 90},
+                        'special': 0.01
+                    }
+                    return rule_dict
+                case "V3":
+                    rule_dict = {
+                        "play_cnt": {"min": 5000, "max": 0},
+                        'period': {"min": 90, "max": 90},
+                        'special': 0.01
+                    }
+                    return rule_dict
+
+    def get_author_list(self):
+        """
+        每轮只抓取定量的数据,到达数量后自己退出
+        获取账号列表以及账号信息
+        """
+        # max_count = int(self.rule_dict.get("videos_cnt", {}).get("min", 300))
+        for user_dict in self.user_list:
+            # if self.download_count <= max_count:
+
+            flag = user_dict["link"][0]
+            print(user_dict)
+            print(flag)
+            match flag:
+                case "V":
+                    self.get_video_list(user_dict)
+                case "X":
+                    self.get_tiny_video_list(user_dict)
+                case "h":
+                    self.get_video_list(user_dict)
+                case "D":
+                    self.get_video_list(user_dict)
+                case "B":
+                    self.get_video_list(user_dict)
+                    self.get_tiny_video_list(user_dict)
+
+            #     time.sleep(random.randint(1, 15))
+            # else:
+            #     AliyunLogger.logging(
+            #         code="2000",
+            #         platform=self.platform,
+            #         mode=self.mode,
+            #         env=self.env,
+            #         message="本轮已经抓取足够数量的视频,已经自动退出",
+            #     )
+            #     return
+
+    def get_video_list(self, user_dict):
+        """
+        获取某个账号的视频列表
+        账号分为 3 类
+        """
+        offset = 0
+        signature = random_signature()
+        link = user_dict['link'].split("_")[-1]
+        url = "https://www.ixigua.com/api/videov2/author/new_video_list?"
+        while True:
+            to_user_id = str(link.replace("https://www.ixigua.com/home/", ""))
+            params = {
+                "to_user_id": to_user_id,
+                "offset": str(offset),
+                "limit": "30",
+                "maxBehotTime": "0",
+                "order": "new",
+                "isHome": "0",
+                "_signature": signature,
+            }
+            headers = {
+                "referer": f'https://www.ixigua.com/home/{link.replace("https://www.ixigua.com/home/", "")}/video/?preActiveKey=hotsoon&list_entrance=userdetail',
+                "user-agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/110.0.0.0 Safari/537.36 Edg/110.0.1587.41",
+            }
+            response = requests.get(
+                url=url,
+                headers=headers,
+                params=params,
+                proxies=tunnel_proxies(),
+                timeout=5,
+            )
+            offset += 30
+            if "data" not in response.text or response.status_code != 200:
+                message = f"get_videoList:{response.text}\n"
+                print(message)
+                return
+            elif not response.json()["data"]["videoList"]:
+                message = f"没有更多数据啦~\n"
+                print(params)
+                return
+            else:
+                feeds = response.json()["data"]["videoList"]
+                for video_obj in feeds:
+                    message = "扫描到一条视频"
+                    print(message)
+                    date_flag = self.process_video_obj(video_obj, user_dict, "l")
+                    if not date_flag:
+                        return
+
+    def get_tiny_video_list(self, user_dict):
+        """
+        获取小视频
+        """
+        url = "https://www.ixigua.com/api/videov2/hotsoon/video"
+        max_behot_time = "0"
+        link = user_dict['link'].split("_")[-1]
+        to_user_id = str(link.replace("https://www.ixigua.com/home/", ""))
+        while True:
+            params = {
+                "to_user_id": to_user_id,
+                "max_behot_time": max_behot_time,
+                "_signature": random_signature()
+            }
+            headers = {
+                "referer": "https://www.ixigua.com/{}?&".format(to_user_id),
+                "user-agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/110.0.0.0 Safari/537.36 Edg/110.0.1587.41",
+            }
+            response = requests.get(
+                url=url,
+                headers=headers,
+                params=params,
+                proxies=tunnel_proxies(),
+                timeout=5,
+            )
+            if "data" not in response.text or response.status_code != 200:
+                AliyunLogger.logging(
+                    code="2000",
+                    platform=self.platform,
+                    mode=self.mode,
+                    env=self.env,
+                    message=f"get_videoList:{response.text}\n",
+                )
+                return
+            elif not response.json()["data"]["data"]:
+                AliyunLogger.logging(
+                    code="2000",
+                    platform=self.platform,
+                    mode=self.mode,
+                    env=self.env,
+                    message=f"没有更多数据啦~\n",
+                )
+                return
+            else:
+                video_list = response.json()['data']['data']
+                max_behot_time = video_list[-1]["max_behot_time"]
+                for video_obj in video_list:
+                    try:
+                        AliyunLogger.logging(
+                            code="1001",
+                            account=user_dict['uid'],
+                            platform=self.platform,
+                            mode=self.mode,
+                            env=self.env,
+                            data=video_obj,
+                            message="扫描到一条小视频",
+                        )
+                        date_flag = self.process_video_obj(video_obj, user_dict, "s")
+                        if not date_flag:
+                            return
+                    except Exception as e:
+                        AliyunLogger.logging(
+                            code="3000",
+                            platform=self.platform,
+                            mode=self.mode,
+                            env=self.env,
+                            data=video_obj,
+                            message="抓取单条视频异常, 报错原因是: {}".format(e),
+                        )
+
+    def process_video_obj(self, video_obj, user_dict, f):
+        """
+        process video_obj and extract video_url
+        """
+        new_rule = self.rule_maker(user_dict)
+        trace_id = self.platform + str(uuid.uuid1())
+        if f == "s":
+            item_id = video_obj.get("id_str", "")
+        else:
+            item_id = video_obj.get("item_id", "")
+        if not item_id:
+            message="无效视频"
+            print(message)
+            return
+        # 获取视频信息
+        video_dict = self.get_video_info(item_id=item_id, trace_id=trace_id)
+        # video_dict["out_user_id"] = video_dict["user_id"]
+        video_dict["platform"] = self.platform
+        video_dict["strategy"] = self.mode
+        video_dict["out_video_id"] = video_dict["video_id"]
+        video_dict["width"] = video_dict["video_width"]
+        video_dict["height"] = video_dict["video_height"]
+        video_dict["crawler_rule"] = json.dumps(new_rule)
+        video_dict["user_id"] = user_dict["uid"]
+        video_dict["publish_time"] = video_dict["publish_time_str"]
+        video_dict["strategy_type"] = self.mode
+        video_dict["update_time_stamp"] = int(time.time())
+        if int(time.time()) - video_dict['publish_time_stamp'] > 3600 * 24 * int(
+                new_rule.get("period", {}).get("max", 1000)):
+            if not video_obj['is_top']:
+                """
+                非置顶数据发布时间超过才退出
+                """
+
+                message = "发布时间超过{}天".format(
+                    int(new_rule.get("period", {}).get("max", 1000))
+                )
+                print(message)
+
+                return False
+        pipeline = PiaoQuanPipeline(
+            platform=self.platform,
+            mode=self.mode,
+            rule_dict=new_rule,
+            env=self.env,
+            item=video_dict,
+            trace_id=trace_id,
+        )
+        limit_flag = self.limiter.author_limitation(user_id=video_dict['user_id'])
+        print(json.dumps(video_dict, ensure_ascii=False, indent=4))
+        # if limit_flag:
+        #     title_flag = pipeline.title_flag()
+        #     repeat_flag = pipeline.repeat_video()
+        #     if title_flag and repeat_flag:
+        #         if new_rule.get("special"):
+        #             if int(video_dict['play_cnt']) >= int(new_rule.get("play_cnt", {}).get("min", 100000)):
+        #                 if float(video_dict['like_cnt']) / float(video_dict['play_cnt']) >= new_rule['special']:
+        #                     print(json.dumps(video_dict, ensure_ascii=False, indent=4))
+        #                     # self.mq.send_msg(video_dict)
+        #                     self.download_count += 1
+        #
+        #                     return True
+        #                 else:
+        #                     message="不满足特殊规则, 点赞量/播放量"
+        #                     print(json.dumps(video_dict, ensure_ascii=False, indent=4))
+        #                     print(message)
+        #                     return False
+        #
+        #         else:
+        #             if int(video_dict['play_cnt']) >= int(new_rule.get("play_cnt", {}).get("min", 100000)):
+        #                 self.mq.send_msg(video_dict)
+        #                 self.download_count += 1
+        #                     message="成功发送 MQ 至 ETL",
+        #                 )
+        #                 return True
+        #             else:
+        #                 AliyunLogger.logging(
+        #                     code="2008",
+        #                     account=user_dict['uid'],
+        #                     platform=self.platform,
+        #                     mode=self.mode,
+        #                     env=self.env,
+        #                     message="不满足特殊规则, 播放量",
+        #                     data=video_dict
+        #                 )
+        #     return True
+
+    def get_video_info(self, item_id, trace_id):
+        """
+        获取视频信息
+        """
+        url = "https://www.ixigua.com/{}".format(item_id)
+        headers = {
+            "accept-encoding": "gzip, deflate",
+            "accept-language": "zh-CN,zh-Hans;q=0.9",
+            "user-agent": FakeUserAgent().random,
+            "cookie": "ttwid={}".format(byte_dance_cookie(item_id)),
+            "referer": "https://www.ixigua.com/{}/".format(item_id),
+        }
+        response = requests.get(
+            url=url,
+            headers=headers,
+            proxies=tunnel_proxies(),
+            timeout=5,
+        )
+        video_info = extract_info_by_re(response.text)
+        video_dict = {
+            "video_title": video_info.get("title", ""),
+            "video_id": video_info.get("video_id"),
+            "gid": str(item_id),
+            "play_cnt": int(video_info.get("play_count", 0)),
+            "like_cnt": int(video_info.get("like_count", 0)),
+            "comment_cnt": 0,
+            "share_cnt": 0,
+            "favorite_cnt": 0,
+            "duration": int(video_info.get("duration", 0)),
+            "video_width": 0,
+            "video_height": 0,
+            "publish_time_stamp": int(video_info.get("publish_time", 0)),
+            "publish_time_str": time.strftime(
+                "%Y-%m-%d %H:%M:%S",
+                time.localtime(int(video_info.get("publish_time", 0))),
+            ),
+            "avatar_url": str(
+                video_info.get("user_info", {}).get("avatar_url", "")
+            ),
+            "cover_url": video_info.get("cover_url", ""),
+            "video_url": video_info.get("url"),
+            "session": f"xigua-search-{int(time.time())}",
+        }
+        return video_dict
+
+
+if __name__ == "__main__":
+    user_list = [
+        {
+            "uid": 6267140,
+            "source": "xigua",
+            "link": "https://www.ixigua.com/home/2779177225827568",
+            "nick_name": "秋晴爱音乐",
+            "avatar_url": "",
+            "mode": "author",
+        },
+        {
+            "uid": 6267140,
+            "source": "xigua",
+            "link": "https://www.ixigua.com/home/2885546124776780",
+            "nick_name": "朗诵放歌的老山羊",
+            "avatar_url": "",
+            "mode": "author",
+        },
+        {
+            "uid": 6267140,
+            "source": "xigua",
+            "link": "https://www.ixigua.com/home/5880938217",
+            "nick_name": "天原声疗",
+            "avatar_url": "",
+            "mode": "author",
+        },
+    ]
+    rule = {'period': {'min': 30, 'max': 30}, 'duration': {'min': 20, 'max': 0}, 'play_cnt': {'min': 100000, 'max': 0}}
+    XGA = XiGuaAuthor(
+        platform="xigua",
+        mode="author",
+        rule_dict=rule,
+        env="prod",
+        user_list=user_list
+    )
+    XGA.get_author_list()