فهرست منبع

kuaishou recommend etl

ehlxr 1 سال پیش
والد
کامیت
95c0884724
2فایلهای تغییر یافته به همراه55 افزوده شده و 12 حذف شده
  1. 29 0
      common/mq.py
  2. 26 12
      kuaishou/kuaishou_recommend/kuaishou_recommend_cut_title.py

+ 29 - 0
common/mq.py

@@ -0,0 +1,29 @@
+import json
+from mq_http_sdk.mq_exception import MQExceptionBase
+from mq_http_sdk.mq_producer import TopicMessage
+from mq_http_sdk.mq_client import MQClient
+from common.common import Common
+
+
+class MQ:
+    instance_id = "MQ_INST_1894469520484605_BXhXuzkZ"
+
+    def __init__(self, topic_name) -> None:
+        self.mq_client = MQClient(
+            "http://1894469520484605.mqrest.cn-qingdao-public.aliyuncs.com",
+            "LTAI4G7puhXtLyHzHQpD6H7A", "nEbq3xWNQd1qLpdy2u71qFweHkZjSG")
+        self.producer = self.mq_client.get_producer(self.instance_id,
+                                                    topic_name)
+
+    def send_msg(self, video_dict):
+        try:
+            msg = TopicMessage(json.dumps(video_dict))
+            msg.set_message_key(video_dict["strategy"] + "-" +
+                                video_dict["out_video_id"])
+            re_msg = self.producer.publish_message(msg)
+            Common.logger(video_dict["strategy"], video_dict["platform"]).info(
+                "Publish Message Succeed. MessageID:%s, BodyMD5:%s" %
+                (re_msg.message_id, re_msg.message_body_md5))
+        except MQExceptionBase as e:
+            Common.logger(video_dict["strategy"], video_dict["platform"]).error(
+                "Publish Message Fail. Exception:%s" % e)

+ 26 - 12
kuaishou/kuaishou_recommend/kuaishou_recommend_cut_title.py

@@ -19,6 +19,7 @@ from common.scheduling_db import MysqlHelper
 from common.publish import Publish
 from common.public import random_title, get_config_from_mysql, download_rule, get_title_score
 from common.userAgent import get_random_user_agent
+from common.mq import MQ
 
 
 class KuaiShouRecommendScheduling:
@@ -59,6 +60,7 @@ class KuaiShouRecommendScheduling:
 
     @classmethod
     def get_videoList(cls, log_type, crawler, our_uid, rule_dict, env):
+        mq = MQ(topic_name="topic_crawler_etl_dev")
         for page in range(1, 101):
             try:
                 Common.logger(log_type, crawler).info(f"正在抓取第{page}页")
@@ -173,18 +175,30 @@ class KuaiShouRecommendScheduling:
                                 Common.logger(log_type, crawler).info('视频已下载\n')
                                 Common.logging(log_type, crawler, env, '视频已下载\n')
                             else:
-                                title_score = get_title_score(log_type, crawler, "16QspO", "0usaDk", video_title)
-                                if title_score <= 0.3:
-                                    Common.logger(log_type, crawler).info(f"权重分:{title_score}<=0.3\n")
-                                    Common.logging(log_type, crawler, env, f"权重分:{title_score}<=0.3\n")
-                                    continue
-                                cls.download_publish(log_type=log_type,
-                                                     crawler=crawler,
-                                                     our_uid=our_uid,
-                                                     video_dict=video_dict,
-                                                     rule_dict=rule_dict,
-                                                     title_score=title_score,
-                                                     env=env)
+                                video_dict["out_user_id"] = video_dict["user_id"]
+                                video_dict["platform"] = crawler
+                                video_dict["strategy"] = log_type
+                                video_dict["out_video_id"] = video_dict["video_id"]
+                                video_dict["width"] = video_dict["video_width"]
+                                video_dict["height"] = video_dict["video_height"]
+                                video_dict["crawler_rule"] = json.dumps(rule_dict)
+                                video_dict["user_id"] = our_uid
+                                video_dict["publish_time"] = video_dict["publish_time_str"]
+
+                                mq.send_msg(video_dict)
+
+                                # title_score = get_title_score(log_type, crawler, "16QspO", "0usaDk", video_title)
+                                # if title_score <= 0.3:
+                                #     Common.logger(log_type, crawler).info(f"权重分:{title_score}<=0.3\n")
+                                #     Common.logging(log_type, crawler, env, f"权重分:{title_score}<=0.3\n")
+                                #     continue
+                                # cls.download_publish(log_type=log_type,
+                                #                      crawler=crawler,
+                                #                      our_uid=our_uid,
+                                #                      video_dict=video_dict,
+                                #                      rule_dict=rule_dict,
+                                #                      title_score=title_score,
+                                #                      env=env)
                         except Exception as e:
                             Common.logger(log_type, crawler).error(f"抓取单条视频异常:{e}\n")
                             Common.logging(log_type, crawler, env, f"抓取单条视频异常:{e}\n")