Explorar o código

增加redis 二次去重

zhangyong hai 1 ano
pai
achega
1ed044c7a4
Modificáronse 2 ficheiros con 79 adicións e 0 borrados
  1. 62 0
      common/redis_db.py
  2. 17 0
      xiaoniangao/xiaoniangao_author/xiaoniangao_author_v2.py

+ 62 - 0
common/redis_db.py

@@ -0,0 +1,62 @@
+import redis
+from datetime import timedelta
+
+
+class SyncRedisHelper(object):
+    _pool: redis.ConnectionPool = None
+    _instance = None
+
+    def __new__(cls, *args, **kwargs):
+        if cls._instance is None:
+            cls._instance = super().__new__(cls, *args, **kwargs)
+        return cls._instance
+
+    def _get_pool(self) -> redis.ConnectionPool:
+        if self._pool is None:
+            self._pool = redis.ConnectionPool(
+                # host="r-bp154bpw97gptefiqkpd.redis.rds.aliyuncs.com",  # 外网地址
+                host="r-bp1mb0v08fqi4hjffu.redis.rds.aliyuncs.com",  # 内网地址
+                port=6379,
+                db=1,
+                password="Qingqu2019",
+            )
+        return self._pool
+
+    def get_client(self) -> redis.Redis:
+        pool = self._get_pool()
+        client = redis.Redis(connection_pool=pool)
+        return client
+
+    def close(self):
+        if self._pool:
+            self._pool.disconnect(inuse_connections=True)
+
+
+def store_data(platform, out_video_id):
+    key = f"crawler:duplicate:{platform}:{out_video_id}"
+    value = 1
+    timeout = timedelta(days=60)  # 60天超时时间
+    helper = SyncRedisHelper()
+    client = helper.get_client()
+
+    client.set(key, value)
+    client.expire(key, timeout)
+
+
+def get_data(platform, out_video_id):
+    key = f"crawler:duplicate:{platform}:{out_video_id}"
+    helper = SyncRedisHelper()
+    client = helper.get_client()
+    value = client.get(key)
+    return value
+
+
+# 示例:存储一个数据
+# store_data('xiaoniangao', '12345')
+
+# 示例:获取一个数据
+# value = get_data('xiaoniangao', '12345')
+# if value is None:
+#     print("Value does not exist")
+# else:
+#     print(f"Retrieved value: {value}")

+ 17 - 0
xiaoniangao/xiaoniangao_author/xiaoniangao_author_v2.py

@@ -7,6 +7,7 @@ import uuid
 import requests
 
 from common.mq import MQ
+from common.redis_db import SyncRedisHelper
 
 sys.path.append(os.getcwd())
 
@@ -194,6 +195,7 @@ class XiaoNianGaoAuthor:
                         )
 
     def process_video_obj(self, video_obj, user_dict):
+        sp = SyncRedisHelper()
         trace_id = self.platform + str(uuid.uuid1())
         # 标题,表情随机加在片头、片尾,或替代句子中间的标点符号
         xiaoniangao_title = clean_title(video_obj.get("title", ""))
@@ -247,6 +249,19 @@ class XiaoNianGaoAuthor:
             "strategy": self.mode,
             "out_video_id": video_obj.get("vid", ""),
         }
+        # 判断redis中是否有该条内容
+        value = sp.get_data(self.platform, video_obj.get("vid", ""))
+        if value:
+            AliyunLogger.logging(
+                code="2004",
+                trace_id=trace_id,
+                platform=self.platform,
+                mode=self.mode,
+                env=self.env,
+                data=video_dict,
+                message="redis重复视频:{}".format(video_dict),
+            )
+            return False
         pipeline = PiaoQuanPipeline(
             platform=self.platform,
             mode=self.mode,
@@ -300,6 +315,8 @@ class XiaoNianGaoAuthor:
             limit_flag = self.limiter.author_limitation(user_id=video_dict['user_id'])
             if limit_flag:
                 self.mq.send_msg(video_dict)
+                # 将渠道+视频id写入 redis 中
+                sp.store_data(self.platform, video_obj.get("vid", ""))
                 self.download_count += 1
                 AliyunLogger.logging(
                     code="1002",