Sfoglia il codice sorgente

小年糕增加新日志;
小年糕增加了每一轮抓取的最大值;
小年糕增加了每一轮中用户之间的等待时间;

罗俊辉 1 anno fa
parent
commit
089e8ac5a6

+ 53 - 54
resend_msg.py

@@ -3,59 +3,58 @@ from common.mq import MQ
 from tqdm import tqdm
 
 if __name__ == "__main__":
-    # path = 'message.txt'
-    # with open(path, "r", encoding="utf-8") as f:
-    #     datas = f.readlines()
-    # for line in tqdm(datas):
-    # # for line in datas:
-    #     video_dict = {}
-    #     msg = json.loads(line)['msg']
-    #     # print(msg)
-    #     platform = json.loads(line)['platform']
-    #     strategy = json.loads(line)['strategy']
-    #     strs = msg.replace("CrawlerEtlParam", "")[1: -1]
-    #     str_list = strs.split(", ")
-    #     str_list = [i.replace("'", "") for i in str_list]
-    #     key_dict = {
-    #         "crawlerRule": "crawler_rule",
-    #         "userId": "user_id",
-    #         "outUserId": "out_user_id",
-    #         "userName": "user_name",
-    #         "avatarUrl": "avatar_url",
-    #         "outVideoId": "out_video_id",
-    #         "videoTitle": "video_title",
-    #         "coverUrl": "cover_url",
-    #         "videoUrl": "video_url",
-    #         "publishTime": "publish_time",
-    #         "playCnt": "play_cnt",
-    #         "likeCnt": "like_cnt",
-    #         "shareCnt": "share_cnt",
-    #         "collectionCnt": "collection_cnt",
-    #         "commentCnt": "comment_cnt",
-    #         "strategyType": "strategy"
-    #     }
-    #     for index, i in enumerate(str_list[:-4]):
-    #         if "=" not in i:
-    #             continue
-    #         else:
-    #             key = i.split("=")[0]
-    #             value = i[len(key) + 1:]
-    #             new_key = key_dict.get(key, key)
-    #             video_dict[new_key] = value.replace("null", "")
-    #
-    #     video_dict['strategy'] = strategy
-    #     video_dict['platform'] = platform
-    #     video_dict['crawler_rule'] = json.dumps({})
-    #     print(json.dumps(video_dict, ensure_ascii=False, indent=4))
-    #     mq = MQ(topic_name="topic_crawler_etl_" + "prod")
-    #     mq.send_msg(video_dict)
+    path = 'resend.txt'
+    with open(path, "r", encoding="utf-8") as f:
+        datas = f.readlines()
+    for line in tqdm(datas):
+        video_dict = {}
+        msg = json.loads(line)['msg']
+        # print(msg)
+        platform = json.loads(line)['platform']
+        strategy = json.loads(line)['strategy']
+        strs = msg.replace("CrawlerEtlParam", "")[1: -1]
+        str_list = strs.split(", ")
+        str_list = [i.replace("'", "") for i in str_list]
+        key_dict = {
+            "crawlerRule": "crawler_rule",
+            "userId": "user_id",
+            "outUserId": "out_user_id",
+            "userName": "user_name",
+            "avatarUrl": "avatar_url",
+            "outVideoId": "out_video_id",
+            "videoTitle": "video_title",
+            "coverUrl": "cover_url",
+            "videoUrl": "video_url",
+            "publishTime": "publish_time",
+            "playCnt": "play_cnt",
+            "likeCnt": "like_cnt",
+            "shareCnt": "share_cnt",
+            "collectionCnt": "collection_cnt",
+            "commentCnt": "comment_cnt",
+            "strategyType": "strategy"
+        }
+        for index, i in enumerate(str_list[:-4]):
+            if "=" not in i:
+                continue
+            else:
+                key = i.split("=")[0]
+                value = i[len(key) + 1:]
+                new_key = key_dict.get(key, key)
+                video_dict[new_key] = value.replace("null", "")
 
-    video_dict = {'video_title': '吴尊友因病去世!吴老师,您真的不容易,千言万语,汇成一句话您走好❗️', 'video_id': '5262651713', 'duration': 49, 'play_cnt': 71, 'like_cnt': 0, 'comment_cnt': 0, 'share_cnt': 1, 'user_name': '夏日❤️莲莲', 'publish_time_stamp': 1698398572, 'publish_time_str': '2023-10-27 17:22:52', 'video_width': 537, 'video_height': 954, 'avatar_url': 'https://cdn-xphoto2.xiaoniangao.cn/4987933869?Expires=1704038400&OSSAccessKeyId=LTAI4G2W1FsgwzAWYpPoB3v6&Signature=wopOmtlcp9tGyWHYW9uy7DIXO%2Bg%3D&x-oss-process=image%2Fresize%2Cw_200%2Ch_200%2Climit_0%2Finterlace%2C1%2Fquality%2Cq_50%2Fcrop%2Cw_200%2Ch_200%2Cg_center%2Fformat%2Cjpg%2Fauto-orient%2C0', 'profile_id': 55888345, 'profile_mid': 185546, 'cover_url': 'https://cdn-xphoto2.xiaoniangao.cn/5262652619?Expires=1704038400&OSSAccessKeyId=LTAI4G2W1FsgwzAWYpPoB3v6&Signature=qIIRzRICgyv40n3uMFeMwHCY8JY%3D&x-oss-process=image%2Fresize%2Cw_690%2Ch_385%2Climit_0%2Finterlace%2C1%2Fformat%2Cjpg%2Fauto-orient%2C0', 'video_url': 'https://cdn-xalbum2.xiaoniangao.cn/5262651713?Expires=1704038400&OSSAccessKeyId=LTAI5tB7cRkYiqHcTdkVprwb&Signature=hFGFAB49mmgUYwYcF4679bE%2BgLg%3D', 'session': 'xiaoniangao-author-1698402882'}
-    video_dict['strategy'] = "author"
-    video_dict['platform'] = "xiaoniangao"
-    video_dict['user_id'] = 58528269
-    video_dict['out_video_id'] = video_dict['video_id']
-    print(json.dumps(video_dict, ensure_ascii=False, indent=4))
-    mq = MQ(topic_name="topic_crawler_etl_" + "prod")
-    mq.send_msg(video_dict)
+        video_dict['strategy'] = strategy
+        video_dict['platform'] = platform
+        video_dict['crawler_rule'] = json.dumps({})
+        print(json.dumps(video_dict, ensure_ascii=False, indent=4))
+        mq = MQ(topic_name="topic_crawler_etl_" + "prod")
+        mq.send_msg(video_dict)
 
+    # video_dict = {'video_title': '吴尊友因病去世!吴老师,您真的不容易,千言万语,汇成一句话您走好❗️', 'video_id': '5262651713', 'duration': 49, 'play_cnt': 71, 'like_cnt': 0, 'comment_cnt': 0, 'share_cnt': 1, 'user_name': '夏日❤️莲莲', 'publish_time_stamp': 1698398572, 'publish_time_str': '2023-10-27 17:22:52', 'video_width': 537, 'video_height': 954, 'avatar_url': 'https://cdn-xphoto2.xiaoniangao.cn/4987933869?Expires=1704038400&OSSAccessKeyId=LTAI4G2W1FsgwzAWYpPoB3v6&Signature=wopOmtlcp9tGyWHYW9uy7DIXO%2Bg%3D&x-oss-process=image%2Fresize%2Cw_200%2Ch_200%2Climit_0%2Finterlace%2C1%2Fquality%2Cq_50%2Fcrop%2Cw_200%2Ch_200%2Cg_center%2Fformat%2Cjpg%2Fauto-orient%2C0', 'profile_id': 55888345, 'profile_mid': 185546, 'cover_url': 'https://cdn-xphoto2.xiaoniangao.cn/5262652619?Expires=1704038400&OSSAccessKeyId=LTAI4G2W1FsgwzAWYpPoB3v6&Signature=qIIRzRICgyv40n3uMFeMwHCY8JY%3D&x-oss-process=image%2Fresize%2Cw_690%2Ch_385%2Climit_0%2Finterlace%2C1%2Fformat%2Cjpg%2Fauto-orient%2C0', 'video_url': 'https://cdn-xalbum2.xiaoniangao.cn/5262651713?Expires=1704038400&OSSAccessKeyId=LTAI5tB7cRkYiqHcTdkVprwb&Signature=hFGFAB49mmgUYwYcF4679bE%2BgLg%3D', 'session': 'xiaoniangao-author-1698402882'}
+    # video_dict['strategy'] = "author"
+    # video_dict['platform'] = "xiaoniangao"
+    # video_dict['user_id'] = 58528269
+    # video_dict['out_video_id'] = video_dict['video_id']
+    # print(json.dumps(video_dict, ensure_ascii=False, indent=4))
+    # mq = MQ(topic_name="topic_crawler_etl_" + "prod")
+    # mq.send_msg(video_dict)
+    #

+ 1 - 3
xiaoniangao/xiaoniangao_author/__init__.py

@@ -1,3 +1 @@
-# -*- coding: utf-8 -*-
-# @Author: wangkun
-# @Time: 2023/4/20
+from .xiaoniangao_author_v2 import XiaoNianGaoAuthor

+ 240 - 0
xiaoniangao/xiaoniangao_author/xiaoniangao_author_v2.py

@@ -0,0 +1,240 @@
+import json
+import os
+import random
+import sys
+import time
+import uuid
+import requests
+
+from common.mq import MQ
+
+sys.path.append(os.getcwd())
+from common.common import Common
+from common import AliyunLogger, PiaoQuanPipeline
+from common.public import get_config_from_mysql, clean_title
+
+
+def tunnel_proxies():
+    # 隧道域名:端口号
+    tunnel = "q796.kdltps.com:15818"
+
+    # 用户名密码方式
+    username = "t17772369458618"
+    password = "5zqcjkmy"
+    tunnel_proxies = {
+        "http": "http://%(user)s:%(pwd)s@%(proxy)s/"
+        % {"user": username, "pwd": password, "proxy": tunnel},
+        "https": "http://%(user)s:%(pwd)s@%(proxy)s/"
+        % {"user": username, "pwd": password, "proxy": tunnel},
+    }
+
+    return tunnel_proxies
+
+
+class XiaoNianGaoAuthor:
+    def __init__(self, platform, mode, rule_dict, env, user_list):
+        self.platform = platform
+        self.mode = mode
+        self.rule_dict = rule_dict
+        self.env = env
+        self.user_list = user_list
+        self.mq = MQ(topic_name="topic_crawler_etl_" + self.env)
+        self.download_count = 0
+
+    def get_author_list(self):
+        # 每轮只抓取定量的数据,到达数量后自己退出
+        max_count = int(self.rule_dict.get("videos_cnt", {}).get("min", 200))
+        for user_dict in self.user_list:
+            if self.download_count <= max_count:
+                self.get_video_list(user_dict)
+                time.sleep(random.randint(1, 15))
+            else:
+                AliyunLogger.logging(
+                    code="2000",
+                    platform=self.platform,
+                    mode=self.mode,
+                    env=self.env,
+                    message="本轮已经抓取足够数量的视频,已经自动退出",
+                )
+                return
+
+    def get_video_list(self, user_dict):
+        next_t = -1
+        # 只抓取更新的视频,如果刷到已经更新的立即退出
+        url = "https://kapi-xng-app.xiaoniangao.cn/v1/album/user_public"
+        headers = {
+            "Host": "kapi-xng-app.xiaoniangao.cn",
+            "content-type": "application/json; charset=utf-8",
+            "accept": "*/*",
+            "verb": "POST",
+            "accept-language": "zh-cn",
+            "date": "Wed, 01 Nov 2023 11:53:22 GMT",
+            "x-token-id": "",
+            "x-signaturemethod": "hmac-sha1",
+        }
+        while True:
+            payload = {
+                "token": "",
+                "limit": 20,
+                "start_t": next_t,
+                "visited_mid": int(user_dict["link"]),
+                "share_width": 300,
+                "share_height": 240,
+            }
+            response = requests.request(
+                "POST",
+                url,
+                headers=headers,
+                data=json.dumps(payload),
+                proxies=tunnel_proxies(),
+            )
+            if "data" not in response.text or response.status_code != 200:
+                Common.logger(self.mode, self.platform).info(
+                    f"get_videoList:{response.text}\n"
+                )
+                AliyunLogger.logging(
+                    code="2000",
+                    platform=self.platform,
+                    mode=self.mode,
+                    env=self.env,
+                    message=f"get_videoList:{response.text}\n",
+                )
+                return
+            elif "list" not in response.json()["data"]:
+                Common.logger(self.mode, self.platform).info(
+                    f"get_videoList:{response.json()}\n"
+                )
+                AliyunLogger.logging(
+                    code="2000",
+                    platform=self.platform,
+                    mode=self.mode,
+                    env=self.env,
+                    message=f"get_videoList:{response.text}\n",
+                )
+                return
+            elif len(response.json()["data"]["list"]) == 0:
+                Common.logger(self.mode, self.platform).info(f"没有更多数据啦~\n")
+                AliyunLogger.logging(
+                    code="2000",
+                    platform=self.platform,
+                    mode=self.mode,
+                    env=self.env,
+                    message=f"没有更多数据啦~\n",
+                )
+                return
+            else:
+                next_t = response.json()["data"]["next_t"]
+                feeds = response.json()["data"]["list"]
+                for video_obj in feeds:
+                    try:
+                        AliyunLogger.logging(
+                            code="1001",
+                            platform=self.platform,
+                            mode=self.mode,
+                            env=self.env,
+                            message="扫描到一条视频",
+                        )
+                        self.process_video_obj(video_obj, user_dict)
+                    except Exception as e:
+                        AliyunLogger.logging(
+                            code="3000",
+                            platform=self.platform,
+                            mode=self.mode,
+                            env=self.env,
+                            data=video_obj,
+                            message="抓取单条视频异常, 报错原因是: {}".format(e),
+                        )
+
+    def process_video_obj(self, video_obj, user_dict):
+        trace_id = self.platform + str(uuid.uuid1())
+        # 标题,表情随机加在片头、片尾,或替代句子中间的标点符号
+        xiaoniangao_title = clean_title(video_obj.get("title", ""))
+        # 随机取一个表情/符号
+        emoji = random.choice(
+            get_config_from_mysql(self.mode, self.platform, self.env, "emoji")
+        )
+        # 生成最终标题,标题list[表情+title, title+表情]随机取一个
+        video_title = random.choice(
+            [f"{emoji}{xiaoniangao_title}", f"{xiaoniangao_title}{emoji}"]
+        )
+        # 发布时间
+        publish_time_stamp = int(int(video_obj.get("t", 0)) / 1000)
+        publish_time_str = time.strftime(
+            "%Y-%m-%d %H:%M:%S", time.localtime(publish_time_stamp)
+        )
+        # 用户名 / 头像
+        user_name = (
+            video_obj.get("user", {})
+            .get("nick", "")
+            .strip()
+            .replace("\n", "")
+            .replace("/", "")
+            .replace(" ", "")
+            .replace(" ", "")
+            .replace("&NBSP", "")
+            .replace("\r", "")
+        )
+        video_dict = {
+            "video_title": video_title,
+            "video_id": video_obj.get("vid", ""),
+            "duration": int(video_obj.get("du", 0) / 1000),
+            "play_cnt": video_obj.get("play_pv", 0),
+            "like_cnt": video_obj.get("favor", {}).get("total", 0),
+            "comment_cnt": video_obj.get("comment_count", 0),
+            "share_cnt": video_obj.get("share", 0),
+            "user_name": user_name,
+            "publish_time_stamp": publish_time_stamp,
+            "publish_time_str": publish_time_str,
+            "update_time_stamp": int(time.time()),
+            "video_width": int(video_obj.get("w", 0)),
+            "video_height": int(video_obj.get("h", 0)),
+            "avatar_url": video_obj.get("user", {}).get("hurl", ""),
+            "profile_id": video_obj["id"],
+            "profile_mid": video_obj.get("user", {}).get("mid", ""),
+            "cover_url": video_obj.get("url", ""),
+            "video_url": video_obj.get("v_url", ""),
+            "session": f"xiaoniangao-author-{int(time.time())}",
+            "out_user_id": video_obj["id"],
+            "platform": self.platform,
+            "strategy": self.mode,
+            "out_video_id": video_obj.get("vid", ""),
+        }
+        for k, v in video_dict.items():
+            Common.logger(self.mode, self.platform).info(f"{k}:{v}")
+        pipeline = PiaoQuanPipeline(
+            platform=self.platform,
+            mode=self.mode,
+            rule_dict=self.rule_dict,
+            env=self.env,
+            item=video_dict,
+            trace_id=trace_id,
+        )
+        flag = pipeline.process_item()
+        if flag:
+            video_dict["width"] = video_dict["video_width"]
+            video_dict["height"] = video_dict["video_height"]
+            video_dict["crawler_rule"] = json.dumps(self.rule_dict)
+            video_dict["user_id"] = user_dict["uid"]
+            video_dict["publish_time"] = video_dict["publish_time_str"]
+            # print(video_dict)
+            self.mq.send_msg(video_dict)
+            AliyunLogger.logging(
+                code="1002",
+                platform=self.platform,
+                mode=self.mode,
+                env=self.env,
+                data=video_dict,
+                trace_id=trace_id,
+                message="成功发送 MQ 至 ETL",
+            )
+
+
+if __name__ == "__main__":
+    XNGA = XiaoNianGaoAuthor(
+        platform="xiaoniangao",
+        mode="author",
+        rule_dict={},
+        env="prod",
+        user_list=[{"link": 295640510, "uid": "12334"}],
+    )
+    XNGA.get_author_list()

+ 141 - 0
xiaoniangao/xiaoniangao_main/run_xng_author_v2.py

@@ -0,0 +1,141 @@
+# -*- coding: utf-8 -*-
+# @Author: luojunhui
+# @Time: 2023/10/23
+import argparse
+import random
+from mq_http_sdk.mq_client import *
+from mq_http_sdk.mq_consumer import *
+from mq_http_sdk.mq_exception import MQExceptionBase
+
+sys.path.append(os.getcwd())
+from common.public import task_fun_mq, get_consumer, ack_message
+from common.scheduling_db import MysqlHelper
+from common.aliyun_log import AliyunLogger
+from xiaoniangao.xiaoniangao_author import XiaoNianGaoAuthor
+
+
+def main(my_platform, mode, topic_name, group_id, env):
+    consumer = get_consumer(topic_name, group_id)
+    # 长轮询表示如果Topic没有消息,则客户端请求会在服务端挂起3秒,3秒内如果有消息可以消费则立即返回响应。
+    # 长轮询时间3秒(最多可设置为30秒)。
+    wait_seconds = 30
+    # 一次最多消费3条(最多可设置为16条)。
+    batch = 1
+    AliyunLogger.logging(
+        code="1000",
+        platform=my_platform,
+        mode=mode,
+        env=env,
+        message=f'{10 * "="}Consume And Ack Message From Topic{10 * "="}\n'
+        f"WaitSeconds:{wait_seconds}\n"
+        f"TopicName:{topic_name}\n"
+        f"MQConsumer:{group_id}",
+    )
+    while True:
+        try:
+            # 长轮询消费消息。
+            recv_msgs = consumer.consume_message(batch, wait_seconds)
+            for msg in recv_msgs:
+                AliyunLogger.logging(
+                    code="1000",
+                    platform=my_platform,
+                    mode=mode,
+                    env=env,
+                    message=f"Receive\n"
+                    f"MessageId:{msg.message_id}\n"
+                    f"MessageBodyMD5:{msg.message_body_md5}\n"
+                    f"MessageTag:{msg.message_tag}\n"
+                    f"ConsumedTimes:{msg.consumed_times}\n"
+                    f"PublishTime:{msg.publish_time}\n"
+                    f"Body:{msg.message_body}\n"
+                    f"NextConsumeTime:{msg.next_consume_time}\n"
+                    f"ReceiptHandle:{msg.receipt_handle}\n"
+                    f"Properties:{msg.properties}",
+                )
+                # ack_mq_message
+                ack_message(
+                    log_type=mode,
+                    crawler=my_platform,
+                    recv_msgs=recv_msgs,
+                    consumer=consumer,
+                )
+                # 解析 task_dict
+                task_dict = task_fun_mq(msg.message_body)["task_dict"]
+                AliyunLogger.logging(
+                    "1000", my_platform, mode, env, f"调度任务:{task_dict}"
+                )
+                # 解析 rule_dict
+                rule_dict = task_fun_mq(msg.message_body)["rule_dict"]
+                AliyunLogger.logging(
+                    "1000", my_platform, mode, env, f"抓取规则:{rule_dict}\n"
+                )
+                # 解析 user_list
+                task_id = task_dict["id"]
+                select_user_sql = (
+                    f"""select * from crawler_user_v3 where task_id={task_id}"""
+                )
+                user_list = MysqlHelper.get_values(
+                    mode, my_platform, select_user_sql, env, action=""
+                )
+                our_uid_list = []
+                our_uid = random.choice(our_uid_list)
+                AliyunLogger.logging(
+                    code="1003",
+                    platform=my_platform,
+                    mode=mode,
+                    env=env,
+                    message="成功获取信息,启动爬虫,开始一轮抓取",
+                )
+                XNGAuthor = XiaoNianGaoAuthor(
+                    platform=my_platform,
+                    mode=mode,
+                    env=env,
+                    rule_dict=rule_dict,
+                    user_list=user_list
+                )
+                XNGAuthor.get_author_list()
+                AliyunLogger.logging(
+                    code="1004",
+                    platform=my_platform,
+                    mode=mode,
+                    env=env,
+                    message="成功抓取完一轮",
+                )
+
+        except MQExceptionBase as err:
+            # Topic中没有消息可消费。
+            if err.type == "MessageNotExist":
+                AliyunLogger.logging(
+                    code="1000",
+                    platform=my_platform,
+                    mode=mode,
+                    env=env,
+                    message=f"No new message! RequestId:{err.req_id}\n",
+                )
+                continue
+            AliyunLogger.logging(
+                code="1000",
+                platform=my_platform,
+                mode=mode,
+                env=env,
+                message=f"Consume Message Fail! Exception:{err}\n",
+            )
+            time.sleep(2)
+            continue
+
+
+if __name__ == "__main__":
+    parser = argparse.ArgumentParser()  ## 新建参数解释器对象
+    parser.add_argument("--log_type", type=str)  ## 添加参数,注明参数类型
+    parser.add_argument("--crawler")  ## 添加参数
+    parser.add_argument("--topic_name")  ## 添加参数
+    parser.add_argument("--group_id")  ## 添加参数
+    parser.add_argument("--env")  ## 添加参数
+    args = parser.parse_args()  ### 参数赋值,也可以通过终端赋值
+    main(
+        my_platform=args.crawler,
+        mode=args.log_type,
+        topic_name=args.topic_name,
+        group_id=args.group_id,
+        env=args.env,
+    )