Browse Source

小年糕上测试代码

罗俊辉 1 year ago
parent
commit
d6762b4603
2 changed files with 302 additions and 0 deletions
  1. 108 0
      common/pipeline.py
  2. 194 0
      xiaoniangao/xiaoniangao_author/xiaoniangao_author_test.py

+ 108 - 0
common/pipeline.py

@@ -173,3 +173,111 @@ class PiaoQuanPipeline:
             # 记录相关日志
             return False
         return True
+
+
+class PiaoQuanPipelineTest:
+    def __init__(self, platform, mode, rule_dict, env, item, trace_id):
+        self.platform = platform
+        self.mode = mode
+        self.item = item
+        self.rule_dict = rule_dict
+        self.env = env
+        self.trace_id = trace_id
+
+    # 视频的发布时间限制, 属于是规则过滤
+    def publish_time_flag(self):
+        # 判断发布时间
+        publish_time_stamp = self.item["publish_time_stamp"]
+        update_time_stamp = self.item["update_time_stamp"]
+        if self.platform == "gongzhonghao":
+            if (
+                int(time.time()) - publish_time_stamp
+                > 3600 * 24 * int(self.rule_dict.get("period", {}).get("max", 1000))
+            ) and (
+                int(time.time()) - update_time_stamp
+                > 3600 * 24 * int(self.rule_dict.get("period", {}).get("max", 1000))
+            ):
+                message = "发布时间超过{}天".format(
+                    int(self.rule_dict.get("period", {}).get("max", 1000))
+                )
+                print(message)
+                return False
+        else:
+            if (
+                int(time.time()) - publish_time_stamp
+                > 3600 * 24 * int(self.rule_dict.get("period", {}).get("max", 1000))
+            ):
+                message = "发布时间超过{}天".format(
+                    int(self.rule_dict.get("period", {}).get("max", 1000))
+                )
+                print(message)
+                return False
+        return True
+
+    # 视频标题是否满足需求
+    def title_flag(self):
+        title = self.item["video_title"]
+        cleaned_title = re.sub(r"[^\w]", " ", title)
+        # 敏感词
+        # 获取敏感词列表
+        sensitive_words = []
+        if any(word in cleaned_title for word in sensitive_words):
+            message = "标题中包含敏感词"
+            print(message)
+            return False
+        return True
+
+    # 视频基础下载规则
+    def download_rule_flag(self):
+        for key in self.item:
+            if self.rule_dict.get(key):
+                max_value = (
+                    int(self.rule_dict[key]["max"])
+                    if int(self.rule_dict[key]["max"]) > 0
+                    else 999999999999999
+                )
+                if key == "peroid": # peroid是抓取周期天数
+                    continue
+                else:
+                    flag = int(self.rule_dict[key]["min"]) <= int(self.item[key]) <= max_value
+                    if not flag:
+                        message = "{}: {} <= {} <= {}, {}".format(
+                            key,
+                            self.rule_dict[key]["min"],
+                            self.item[key],
+                            max_value,
+                            flag,
+                        )
+                        print(message)
+                        return flag
+            else:
+                continue
+        return True
+
+    # 按照某个具体平台来去重
+    def repeat_video(self):
+        # sql = f""" select * from crawler_video where platform="公众号" and out_video_id="{video_id}"; """
+        out_id = self.item["out_video_id"]
+        sql = f""" select * from crawler_video where platform = "{self.platform}" and out_video_id="{out_id}"; """
+        repeat_video = MysqlHelper.get_values(
+            log_type=self.mode, crawler=self.platform, env=self.env, sql=sql, action=""
+        )
+        if repeat_video:
+            message = "重复的视频"
+            return False
+        return True
+
+    def process_item(self):
+        if not self.publish_time_flag():
+            # 记录相关日志
+            return False
+        if not self.title_flag():
+            # 记录相关日志
+            return False
+        if not self.repeat_video():
+            # 记录相关日志
+            return False
+        if not self.download_rule_flag():
+            # 记录相关日志
+            return False
+        return True

+ 194 - 0
xiaoniangao/xiaoniangao_author/xiaoniangao_author_test.py

@@ -0,0 +1,194 @@
+import json
+import os
+import random
+import sys
+import time
+import uuid
+import requests
+
+from common.mq import MQ
+
+sys.path.append(os.getcwd())
+from common.pipeline import PiaoQuanPipelineTest
+from common.public import get_config_from_mysql, clean_title
+
+
+def tunnel_proxies():
+    # 隧道域名:端口号
+    tunnel = "q796.kdltps.com:15818"
+
+    # 用户名密码方式
+    username = "t17772369458618"
+    password = "5zqcjkmy"
+    tunnel_proxies = {
+        "http": "http://%(user)s:%(pwd)s@%(proxy)s/"
+                % {"user": username, "pwd": password, "proxy": tunnel},
+        "https": "http://%(user)s:%(pwd)s@%(proxy)s/"
+                 % {"user": username, "pwd": password, "proxy": tunnel},
+    }
+
+    return tunnel_proxies
+
+
+class XiaoNianGaoAuthor:
+    def __init__(self, platform, mode, rule_dict, env, user_list):
+        self.platform = platform
+        self.mode = mode
+        self.rule_dict = rule_dict
+        self.env = env
+        self.user_list = user_list
+        self.mq = MQ(topic_name="topic_crawler_etl_" + self.env)
+        self.download_count = 0
+
+    def get_author_list(self):
+        # 每轮只抓取定量的数据,到达数量后自己退出
+        max_count = int(self.rule_dict.get("videos_cnt", {}).get("min", 300))
+        for user_dict in self.user_list:
+            if self.download_count <= max_count:
+                self.get_video_list(user_dict)
+                time.sleep(random.randint(1, 15))
+            else:
+                message = "本轮已经抓取足够数量的视频,已经自动退出"
+                print(message)
+                return
+
+    def get_video_list(self, user_dict):
+        next_t = -1
+        # 只抓取更新的视频,如果刷到已经更新的立即退出
+        url = "https://kapi-xng-app.xiaoniangao.cn/v1/album/user_public"
+        headers = {
+            "Host": "kapi-xng-app.xiaoniangao.cn",
+            "content-type": "application/json; charset=utf-8",
+            "accept": "*/*",
+            "verb": "POST",
+            "accept-language": "zh-cn",
+            "date": "Wed, 01 Nov 2023 11:53:22 GMT",
+            "x-token-id": "",
+            "x-signaturemethod": "hmac-sha1",
+        }
+        while True:
+            payload = {
+                "token": "",
+                "limit": 20,
+                "start_t": next_t,
+                "visited_mid": int(user_dict["link"]),
+                "share_width": 300,
+                "share_height": 240,
+            }
+            response = requests.request(
+                "POST",
+                url,
+                headers=headers,
+                data=json.dumps(payload),
+                proxies=tunnel_proxies(),
+            )
+            if "data" not in response.text or response.status_code != 200:
+                message = f"get_videoList:{response.text}"
+                print(message)
+                return
+            elif "list" not in response.json()["data"]:
+                message = f"get_videoList:{response.json()}"
+                print(message)
+                return
+            elif len(response.json()["data"]["list"]) == 0:
+                message = f"没有更多数据啦~"
+                print(message)
+                return
+            else:
+                next_t = response.json()["data"]["next_t"]
+                feeds = response.json()["data"]["list"]
+                for video_obj in feeds:
+                    try:
+                        message = f"扫描到一条视频"
+                        print(message)
+                        self.process_video_obj(video_obj, user_dict)
+                    except Exception as e:
+                        message = "抓取单条视频异常, 报错原因是: {}".format(e)
+                        print(message)
+
+    def process_video_obj(self, video_obj, user_dict):
+        trace_id = self.platform + str(uuid.uuid1())
+        # 标题,表情随机加在片头、片尾,或替代句子中间的标点符号
+        xiaoniangao_title = clean_title(video_obj.get("title", ""))
+        # 随机取一个表情/符号
+        emoji = random.choice(
+            get_config_from_mysql(self.mode, self.platform, self.env, "emoji")
+        )
+        # 生成最终标题,标题list[表情+title, title+表情]随机取一个
+        video_title = random.choice(
+            [f"{emoji}{xiaoniangao_title}", f"{xiaoniangao_title}{emoji}"]
+        )
+        # 发布时间
+        publish_time_stamp = int(int(video_obj.get("t", 0)) / 1000)
+        publish_time_str = time.strftime(
+            "%Y-%m-%d %H:%M:%S", time.localtime(publish_time_stamp)
+        )
+        # 用户名 / 头像
+        user_name = (
+            video_obj.get("user", {})
+            .get("nick", "")
+            .strip()
+            .replace("\n", "")
+            .replace("/", "")
+            .replace(" ", "")
+            .replace(" ", "")
+            .replace("&NBSP", "")
+            .replace("\r", "")
+        )
+        video_dict = {
+            "video_title": video_title,
+            "video_id": video_obj.get("vid", ""),
+            "duration": int(video_obj.get("du", 0) / 1000),
+            "play_cnt": video_obj.get("play_pv", 0),
+            "like_cnt": video_obj.get("favor", {}).get("total", 0),
+            "comment_cnt": video_obj.get("comment_count", 0),
+            "share_cnt": video_obj.get("share", 0),
+            "user_name": user_name,
+            "publish_time_stamp": publish_time_stamp,
+            "publish_time_str": publish_time_str,
+            "update_time_stamp": int(time.time()),
+            "video_width": int(video_obj.get("w", 0)),
+            "video_height": int(video_obj.get("h", 0)),
+            "avatar_url": video_obj.get("user", {}).get("hurl", ""),
+            "profile_id": video_obj["id"],
+            "profile_mid": video_obj.get("user", {}).get("mid", ""),
+            "cover_url": video_obj.get("url", ""),
+            "video_url": video_obj.get("v_url", ""),
+            "session": f"xiaoniangao-author-{int(time.time())}",
+            "out_user_id": video_obj["id"],
+            "platform": self.platform,
+            "strategy": self.mode,
+            "out_video_id": video_obj.get("vid", ""),
+        }
+        print(video_dict)
+        pipeline = PiaoQuanPipelineTest(
+            platform=self.platform,
+            mode=self.mode,
+            rule_dict=self.rule_dict,
+            env=self.env,
+            item=video_dict,
+            trace_id=trace_id,
+        )
+        flag = pipeline.process_item()
+        if flag:
+            video_dict["width"] = video_dict["video_width"]
+            video_dict["height"] = video_dict["video_height"]
+            video_dict["crawler_rule"] = json.dumps(self.rule_dict)
+            video_dict["user_id"] = user_dict["uid"]
+            video_dict["publish_time"] = video_dict["publish_time_str"]
+            # print(video_dict)
+            self.mq.send_msg(video_dict)
+            self.download_count += 1
+            message = "成功发送 MQ 至 ETL"
+            print(message)
+
+
+if __name__ == "__main__":
+    XNGA = XiaoNianGaoAuthor(
+        platform="xiaoniangao",
+        mode="author",
+        rule_dict={},
+        env="prod",
+        user_list=[{"link": 295640510, "uid": "12334"}],
+    )
+    XNGA.get_author_list()