瀏覽代碼

xiaoniaogao play etl

ehlxr 2 年之前
父節點
當前提交
5f33e870ab

+ 51 - 30
xiaoniangao/xiaoniangao_hour/xiaoniangao_hour_scheduling.py

@@ -11,6 +11,7 @@ import time
 from hashlib import md5
 import requests
 import urllib3
+from common.mq import MQ
 sys.path.append(os.getcwd())
 from common.common import Common
 from common.feishu import Feishu
@@ -364,6 +365,21 @@ class XiaoniangaoHourScheduling:
                 Common.logger(log_type, crawler).error(f'更新{update_video_info["video_title"]}时异常:{e}\n')
                 Common.logging(log_type, crawler, env, f'更新{update_video_info["video_title"]}时异常:{e}\n')
 
+    @classmethod
+    def send_to_mq(cls, log_type, crawler, video_info_dict, rule_dict, env):
+        video_info_dict["out_user_id"] = video_info_dict["profile_id"]
+        video_info_dict["platform"] = crawler
+        video_info_dict["strategy"] = log_type
+        video_info_dict["out_video_id"] = video_info_dict["video_id"]
+        video_info_dict["width"] = video_info_dict["video_width"]
+        video_info_dict["height"] = video_info_dict["video_height"]
+        video_info_dict["crawler_rule"] = json.dumps(rule_dict)
+        video_info_dict["user_id"] = video_info_dict["uid"]
+        video_info_dict["publish_time"] = video_info_dict["publish_time_str"]
+
+        mq = MQ(topic_name="topic_crawler_etl_" + env)
+        mq.send_msg(video_info_dict)
+
     @classmethod
     def download(cls, log_type, crawler, video_info_dict, rule_dict, our_uid, env):
         # 下载视频
@@ -491,12 +507,13 @@ class XiaoniangaoHourScheduling:
         elif int(video_info_dict["play_cnt"]) >= 30000:
             Common.logger(log_type, crawler).info(f"播放量:{video_info_dict['play_cnt']} >= 30000,满足下载规则,开始下载视频")
             Common.logging(log_type, crawler, env, f"播放量:{video_info_dict['play_cnt']} >= 30000,满足下载规则,开始下载视频")
-            cls.download(log_type=log_type,
-                         crawler=crawler,
-                         video_info_dict=video_info_dict,
-                         rule_dict=rule_dict,
-                         our_uid=our_uid,
-                         env=env)
+            # cls.download(log_type=log_type,
+            #              crawler=crawler,
+            #              video_info_dict=video_info_dict,
+            #              rule_dict=rule_dict,
+            #              our_uid=our_uid,
+            #              env=env)
+            cls.send_to_mq(log_type=log_type, crawler=crawler, video_info_dict=video_info_dict, rule_dict=rule_dict, env=env)
 
         # 上升榜判断逻辑,任意时间段上升量>=5000,连续两个时间段上升量>=2000
         elif int(update_video_info['ten_play_cnt']) >= 3000 or int(
@@ -505,24 +522,26 @@ class XiaoniangaoHourScheduling:
             Common.logging(log_type, crawler, env, f"10:00 or 15:00 or 20:00 数据上升量:{int(update_video_info['ten_play_cnt'])} or {int(update_video_info['fifteen_play_cnt'])} or {int(update_video_info['twenty_play_cnt'])} >= 3000")
             Common.logger(log_type, crawler).info("满足下载规则,开始下载视频")
             Common.logging(log_type, crawler, env, "满足下载规则,开始下载视频")
-            cls.download(log_type=log_type,
-                         crawler=crawler,
-                         video_info_dict=video_info_dict,
-                         rule_dict=rule_dict,
-                         our_uid=our_uid,
-                         env=env)
+            # cls.download(log_type=log_type,
+            #              crawler=crawler,
+            #              video_info_dict=video_info_dict,
+            #              rule_dict=rule_dict,
+            #              our_uid=our_uid,
+            #              env=env)
+            cls.send_to_mq(log_type=log_type, crawler=crawler, video_info_dict=video_info_dict, rule_dict=rule_dict, env=env)
 
         elif int(update_video_info['ten_play_cnt']) >= 1000 and int(update_video_info['fifteen_play_cnt']) >= 1000:
             Common.logger(log_type, crawler).info(f"10:00 and 15:00 数据上升量:{int(update_video_info['ten_play_cnt'])} and {int(update_video_info['fifteen_play_cnt'])} >= 1000")
             Common.logging(log_type, crawler, env, f"10:00 and 15:00 数据上升量:{int(update_video_info['ten_play_cnt'])} and {int(update_video_info['fifteen_play_cnt'])} >= 1000")
             Common.logger(log_type, crawler).info("满足下载规则,开始下载视频")
             Common.logging(log_type, crawler, env, "满足下载规则,开始下载视频")
-            cls.download(log_type=log_type,
-                         crawler=crawler,
-                         video_info_dict=video_info_dict,
-                         rule_dict=rule_dict,
-                         our_uid=our_uid,
-                         env=env)
+            # cls.download(log_type=log_type,
+            #              crawler=crawler,
+            #              video_info_dict=video_info_dict,
+            #              rule_dict=rule_dict,
+            #              our_uid=our_uid,
+            #              env=env)
+            cls.send_to_mq(log_type=log_type, crawler=crawler, video_info_dict=video_info_dict, rule_dict=rule_dict, env=env)
 
         elif int(update_video_info['fifteen_play_cnt']) >= 1000 and int(update_video_info['twenty_play_cnt']) >= 1000:
             Common.logger(log_type, crawler).info(
@@ -530,12 +549,13 @@ class XiaoniangaoHourScheduling:
             Common.logging(log_type, crawler, env, f"15:00 and 20:00 数据上升量:{int(update_video_info['fifteen_play_cnt'])} and {int(update_video_info['twenty_play_cnt'])} >= 1000")
             Common.logger(log_type, crawler).info("满足下载规则,开始下载视频")
             Common.logging(log_type, crawler, env, "满足下载规则,开始下载视频")
-            cls.download(log_type=log_type,
-                         crawler=crawler,
-                         video_info_dict=video_info_dict,
-                         rule_dict=rule_dict,
-                         our_uid=our_uid,
-                         env=env)
+            # cls.download(log_type=log_type,
+            #              crawler=crawler,
+            #              video_info_dict=video_info_dict,
+            #              rule_dict=rule_dict,
+            #              our_uid=our_uid,
+            #              env=env)
+            cls.send_to_mq(log_type=log_type, crawler=crawler, video_info_dict=video_info_dict, rule_dict=rule_dict, env=env)
 
         elif int(update_video_info['ten_play_cnt']) >= 1000 and int(update_video_info['twenty_play_cnt']) >= 1000:
             Common.logger(log_type, crawler).info(
@@ -543,12 +563,13 @@ class XiaoniangaoHourScheduling:
             Common.logging(log_type, crawler, env, f"今日10:00 / 20:00数据上升量:{int(update_video_info['ten_play_cnt'])} and {int(update_video_info['twenty_play_cnt'])} >= 1000")
             Common.logger(log_type, crawler).info("满足下载规则,开始下载视频")
             Common.logging(log_type, crawler, env, "满足下载规则,开始下载视频")
-            cls.download(log_type=log_type,
-                         crawler=crawler,
-                         video_info_dict=video_info_dict,
-                         rule_dict=rule_dict,
-                         our_uid=our_uid,
-                         env=env)
+            # cls.download(log_type=log_type,
+            #              crawler=crawler,
+            #              video_info_dict=video_info_dict,
+            #              rule_dict=rule_dict,
+            #              our_uid=our_uid,
+            #              env=env)
+            cls.send_to_mq(log_type=log_type, crawler=crawler, video_info_dict=video_info_dict, rule_dict=rule_dict, env=env)
 
         else:
             Common.logger(log_type, crawler).info("上升量不满足下载规则")

+ 20 - 6
xiaoniangao/xiaoniangao_play/xiaoniangao_play_scheduling.py

@@ -10,6 +10,8 @@ import time
 from hashlib import md5
 import requests
 import urllib3
+
+from common.mq import MQ
 sys.path.append(os.getcwd())
 from common.common import Common
 from common.feishu import Feishu
@@ -29,6 +31,7 @@ class XiaoniangaoplayScheduling:
     # 获取列表
     @classmethod
     def get_videoList(cls, log_type, crawler, rule_dict, our_uid, env):
+        mq = MQ(topic_name="topic_crawler_etl_" + env)
         # uid_token_dict = cls.uid_token_dict
         for page in range(1, 101):
             try:
@@ -138,12 +141,23 @@ class XiaoniangaoplayScheduling:
                                 Common.logger(log_type, crawler).info('视频已下载\n')
                                 Common.logging(log_type, crawler, env, '视频已下载\n')
                             else:
-                                cls.download_publish(log_type=log_type,
-                                                     crawler=crawler,
-                                                     video_dict=video_dict,
-                                                     rule_dict=rule_dict,
-                                                     our_uid=our_uid,
-                                                     env=env)
+                                # cls.download_publish(log_type=log_type,
+                                #                      crawler=crawler,
+                                #                      video_dict=video_dict,
+                                #                      rule_dict=rule_dict,
+                                #                      our_uid=our_uid,
+                                #                      env=env)
+                                video_dict["out_user_id"] = video_dict["profile_id"]
+                                video_dict["platform"] = crawler
+                                video_dict["strategy"] = log_type
+                                video_dict["out_video_id"] = video_dict["video_id"]
+                                video_dict["width"] = video_dict["video_width"]
+                                video_dict["height"] = video_dict["video_height"]
+                                video_dict["crawler_rule"] = json.dumps(rule_dict)
+                                video_dict["user_id"] = our_uid
+                                video_dict["publish_time"] = video_dict["publish_time_str"]
+
+                                mq.send_msg(video_dict)
                         except Exception as e:
                             Common.logger(log_type, crawler).error(f"抓取单条视频异常:{e}\n")
                             Common.logging(log_type, crawler, env, f"抓取单条视频异常:{e}\n")