Kaynağa Gözat

benshanzhufu recommend hour etl

ehlxr 1 yıl önce
ebeveyn
işleme
d29149d784

+ 25 - 7
benshanzhufu/benshanzhufu_recommend/benshanzhufu_recommend_scheduling.py

@@ -13,6 +13,8 @@ from hashlib import md5
 from urllib import parse
 import requests
 import urllib3
+
+from common.mq import MQ
 sys.path.append(os.getcwd())
 from common.common import Common
 from common.scheduling_db import MysqlHelper
@@ -27,13 +29,15 @@ class BenshanzhufuRecommend:
 
     @classmethod
     def repeat_video(cls, log_type, crawler, video_id, env):
-        sql = f""" select * from crawler_video where platform="{cls.platform}" and out_video_id="{video_id}"; """
+        # sql = f""" select * from crawler_video where platform="{cls.platform}" and out_video_id="{video_id}"; """
+        sql = f""" select * from crawler_video where platform in ("{crawler}","{cls.platform}") and out_video_id="{video_id}"; """
         repeat_video = MysqlHelper.get_values(log_type, crawler, sql, env)
         return len(repeat_video)
 
     # 推荐列表获取视频
     @classmethod
     def get_videoList(cls, log_type, crawler, our_uid, rule_dict, env):
+        mq = MQ(topic_name="topic_crawler_etl_" + env)
         # 翻页参数
         visitor_key = ""
         page = 1
@@ -117,12 +121,26 @@ class BenshanzhufuRecommend:
                     elif cls.repeat_video(log_type, crawler, video_dict["video_id"], env) != 0:
                         Common.logger(log_type, crawler).info('视频已下载\n')
                     else:
-                        cls.download_publish(log_type=log_type,
-                                             crawler=crawler,
-                                             our_uid=our_uid,
-                                             video_dict=video_dict,
-                                             rule_dict=rule_dict,
-                                             env=env)
+                        # cls.download_publish(log_type=log_type,
+                        #                      crawler=crawler,
+                        #                      our_uid=our_uid,
+                        #                      video_dict=video_dict,
+                        #                      rule_dict=rule_dict,
+                        #                      env=env)
+                        video_dict["out_user_id"] = video_dict["user_id"]
+                        video_dict["platform"] = crawler
+                        video_dict["strategy"] = log_type
+                        video_dict["out_video_id"] = video_dict["video_id"]
+                        video_dict["width"] = 0
+                        video_dict["height"] = 0
+                        video_dict["crawler_rule"] = json.dumps(rule_dict)
+                        video_dict["user_id"] = our_uid
+                        video_dict["publish_time"] = video_dict["publish_time_str"]
+                        video_dict["fans_cnt"] = 0
+                        video_dict["videos_cnt"] = 0
+
+                        mq.send_msg(video_dict)
+
                         # except Exception as e:
                         #     Common.logger(log_type, crawler).info(f"抓取单条视频异常:{e}\n")
             # except Exception as e:

+ 1 - 1
common/mq.py

@@ -18,7 +18,7 @@ class MQ:
         platform = video_dict["platform"]
         try:
             msg = TopicMessage(json.dumps(video_dict))
-            msg.set_message_key(strategy + "-" + video_dict["out_video_id"])
+            msg.set_message_key(platform + "-" + strategy + "-" + video_dict["out_video_id"])
             re_msg = self.producer.publish_message(msg)
             Common.logger(strategy, platform).info("Publish Message Succeed. MessageID:%s, BodyMD5:%s" %
                                                    (re_msg.message_id, re_msg.message_body_md5))

+ 2 - 1
douyin/douyin_author/douyin_author_scheduling.py

@@ -172,7 +172,8 @@ class DouyinauthorScheduling:
 
     @classmethod
     def repeat_video(cls, log_type, crawler, video_id, env):
-        sql = f""" select * from crawler_video where platform="{cls.platform}" and out_video_id="{video_id}"; """
+        # sql = f""" select * from crawler_video where platform="{cls.platform}" and out_video_id="{video_id}"; """
+        sql = f""" select * from crawler_video where platform in ("{crawler}","{cls.platform}") and out_video_id="{video_id}"; """
         repeat_video = MysqlHelper.get_values(log_type, crawler, sql, env)
         return len(repeat_video)
 

+ 2 - 1
douyin/douyin_recommend/douyin_recommend_scheduling.py

@@ -163,7 +163,8 @@ class DouyinrecommendScheduling:
 
     @classmethod
     def repeat_video(cls, log_type, crawler, video_id, env):
-        sql = f""" select * from crawler_video where platform="{cls.platform}" and out_video_id="{video_id}"; """
+        # sql = f""" select * from crawler_video where platform="{cls.platform}" and out_video_id="{video_id}"; """
+        sql = f""" select * from crawler_video where platform in ("{crawler}","{cls.platform}") and out_video_id="{video_id}"; """
         repeat_video = MysqlHelper.get_values(log_type, crawler, sql, env)
         return len(repeat_video)
 

+ 2 - 1
kuaishou/kuaishou_author/kuaishou_author_scheduling.py

@@ -222,7 +222,8 @@ class KuaishouauthorScheduling:
 
     @classmethod
     def repeat_video(cls, log_type, crawler, video_id, env):
-        sql = f""" select * from crawler_video where platform="{cls.platform}" and out_video_id="{video_id}" """
+        # sql = f""" select * from crawler_video where platform="{cls.platform}" and out_video_id="{video_id}" """
+        sql = f""" select * from crawler_video where platform in ("{crawler}","{cls.platform}") and out_video_id="{video_id}"; """
         repeat_video = MysqlHelper.get_values(log_type, crawler, sql, env)
         return len(repeat_video)
 

+ 2 - 1
kuaishou/kuaishou_recommend/kuaishou_recommend_cut_title.py

@@ -208,7 +208,8 @@ class KuaiShouRecommendScheduling:
 
     @classmethod
     def repeat_video(cls, log_type, crawler, video_id, env):
-        sql = f""" select * from crawler_video where platform="{cls.platform}" and out_video_id="{video_id}" """
+        # sql = f""" select * from crawler_video where platform="{cls.platform}" and out_video_id="{video_id}" """
+        sql = f""" select * from crawler_video where platform in ("{crawler}","{cls.platform}") and out_video_id="{video_id}"; """
         repeat_video = MysqlHelper.get_values(log_type, crawler, sql, env)
         return len(repeat_video)
 

+ 2 - 1
xiaoniangao/xiaoniangao_author/xiaoniangao_author_scheduling.py

@@ -26,7 +26,8 @@ class XiaoniangaoAuthorScheduling:
 
     @classmethod
     def repeat_video(cls, log_type, crawler, video_id, env):
-        sql = f""" select * from crawler_video where platform="{cls.platform}" and out_video_id="{video_id}"; """
+        # sql = f""" select * from crawler_video where platform="{cls.platform}" and out_video_id="{video_id}"; """
+        sql = f""" select * from crawler_video where platform in ("{crawler}","{cls.platform}") and out_video_id="{video_id}"; """
         repeat_video = MysqlHelper.get_values(log_type, crawler, sql, env)
         return len(repeat_video)
 

+ 2 - 1
xiaoniangao/xiaoniangao_hour/xiaoniangao_hour_scheduling.py

@@ -31,7 +31,8 @@ class XiaoniangaoHourScheduling:
 
     @classmethod
     def repeat_video(cls, log_type, crawler, video_id, env):
-        sql = f""" select * from crawler_video where platform="小年糕" and out_video_id="{video_id}"; """
+        # sql = f""" select * from crawler_video where platform="小年糕" and out_video_id="{video_id}"; """
+        sql = f""" select * from crawler_video where platform in ("{crawler}","{cls.platform}") and out_video_id="{video_id}"; """
         repeat_video = MysqlHelper.get_values(log_type, crawler, sql, env)
         return len(repeat_video)
 

+ 2 - 1
xiaoniangao/xiaoniangao_play/xiaoniangao_play_scheduling.py

@@ -168,7 +168,8 @@ class XiaoniangaoplayScheduling:
 
     @classmethod
     def repeat_video(cls, log_type, crawler, video_id, env):
-        sql = f""" select * from crawler_video where platform="{cls.platform}" and out_video_id="{video_id}"; """
+        # sql = f""" select * from crawler_video where platform="{cls.platform}" and out_video_id="{video_id}"; """
+        sql = f""" select * from crawler_video where platform in ("{crawler}","{cls.platform}") and out_video_id="{video_id}"; """
         repeat_video = MysqlHelper.get_values(log_type, crawler, sql, env)
         return len(repeat_video)
 

+ 2 - 1
xigua/xigua_author/xigua_author_scheduling.py

@@ -644,7 +644,8 @@ class XiguaauthorScheduling:
                     Common.logging(log_type, crawler, env, f"抓取单条视频异常:{e}\n")
     @classmethod
     def repeat_video(cls, log_type, crawler, video_id, env):
-        sql = f""" select * from crawler_video where platform="{cls.platform}" and out_video_id="{video_id}"; """
+        # sql = f""" select * from crawler_video where platform="{cls.platform}" and out_video_id="{video_id}"; """
+        sql = f""" select * from crawler_video where platform in ("{crawler}","{cls.platform}") and out_video_id="{video_id}"; """
         repeat_video = MysqlHelper.get_values(log_type, crawler, sql, env)
         return len(repeat_video)
 

+ 2 - 1
xigua/xigua_recommend/xigua_recommend_scheduling.py

@@ -597,7 +597,8 @@ class XiguarecommendScheduling:
 
     @classmethod
     def repeat_video(cls, log_type, crawler, video_id, env):
-        sql = f""" select * from crawler_video where platform="西瓜视频" and out_video_id="{video_id}"; """
+        # sql = f""" select * from crawler_video where platform="西瓜视频" and out_video_id="{video_id}"; """
+        sql = f""" select * from crawler_video where platform in ("{crawler}","{cls.platform}") and out_video_id="{video_id}"; """
         repeat_video = MysqlHelper.get_values(log_type, crawler, sql, env)
         return len(repeat_video)
 

+ 2 - 1
xigua/xigua_search/xigua_search_scheduling.py

@@ -683,7 +683,8 @@ class XiguasearchScheduling:
 
     @classmethod
     def repeat_video(cls, log_type, crawler, video_id, env):
-        sql = f""" select * from crawler_video where platform="{cls.platform}" and out_video_id="{video_id}"; """
+        # sql = f""" select * from crawler_video where platform="{cls.platform}" and out_video_id="{video_id}"; """
+        sql = f""" select * from crawler_video where platform in ("{crawler}","{cls.platform}") and out_video_id="{video_id}"; """
         repeat_video = MysqlHelper.get_values(log_type, crawler, sql, env, action="")
         return len(repeat_video)