Sfoglia il codice sorgente

suisuinianianyingfuqi recommed etl

ehlxr 1 anno fa
parent
commit
a2d8ab1150

+ 22 - 7
suisuiniannianyingfuqi/suisuiniannianyingfuqi_recommend/suisuiniannianyingfuqi_recommend_scheduling.py

@@ -10,6 +10,8 @@ from hashlib import md5
 import requests
 import urllib3
 from requests.adapters import HTTPAdapter
+
+from common.mq import MQ
 sys.path.append(os.getcwd())
 from common.common import Common
 from common.feishu import Feishu
@@ -23,12 +25,14 @@ class SuisuiniannianyingfuqiRecommendScheduling:
 
     @classmethod
     def repeat_video(cls, log_type, crawler, video_id, env):
-        sql = f""" select * from crawler_video where platform="岁岁年年迎福气" and out_video_id="{video_id}"; """
+        # sql = f""" select * from crawler_video where platform="岁岁年年迎福气" and out_video_id="{video_id}"; """
+        sql = f""" select * from crawler_video where platform in ("{crawler}","{cls.platform}") and out_video_id="{video_id}"; """
         repeat_video = MysqlHelper.get_values(log_type, crawler, sql, env)
         return len(repeat_video)
 
     @classmethod
     def get_videoList(cls, log_type, crawler, our_uid, rule_dict, env):
+        mq = MQ(topic_name="topic_crawler_etl_" + env)
         page = 1
         while True:
             try:
@@ -97,12 +101,23 @@ class SuisuiniannianyingfuqiRecommendScheduling:
                             elif cls.repeat_video(log_type, crawler, video_dict["video_id"], env) != 0:
                                 Common.logger(log_type, crawler).info('视频已下载\n')
                             else:
-                                cls.download_publish(log_type=log_type,
-                                                     crawler=crawler,
-                                                     our_uid=our_uid,
-                                                     video_dict=video_dict,
-                                                     rule_dict=rule_dict,
-                                                     env=env)
+                                # cls.download_publish(log_type=log_type,
+                                #                      crawler=crawler,
+                                #                      our_uid=our_uid,
+                                #                      video_dict=video_dict,
+                                #                      rule_dict=rule_dict,
+                                #                      env=env)
+                                video_dict["out_user_id"] = video_dict["user_id"]
+                                video_dict["platform"] = crawler
+                                video_dict["strategy"] = log_type
+                                video_dict["out_video_id"] = video_dict["video_id"]
+                                video_dict["width"] = 0
+                                video_dict["height"] = 0
+                                video_dict["crawler_rule"] = json.dumps(rule_dict)
+                                video_dict["user_id"] = our_uid
+                                video_dict["publish_time"] = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(publish_time_stamp))
+
+                                mq.send_msg(video_dict)
                         except Exception as e:
                             Common.logger(log_type, crawler).error(f"抓取单条视频异常:{e}\n")
             except Exception as e: