import redis from datetime import timedelta class SyncRedisHelper: _pool: redis.ConnectionPool = None _instance = None def __init__(self): if not self._instance: self._pool = self._get_pool() self._instance = self def _get_pool(self) -> redis.ConnectionPool: if self._pool is None: self._pool = redis.ConnectionPool( host="r-bp1mb0v08fqi4hjffu.redis.rds.aliyuncs.com", # 内网地址 # host="r-bp154bpw97gptefiqkpd.redis.rds.aliyuncs.com", # 外网地址 port=6379, db=2, password="Wqsd@2019", ) return self._pool def get_client(self) -> redis.Redis: pool = self._get_pool() client = redis.Redis(connection_pool=pool) return client def close(self): if self._pool: self._pool.disconnect(inuse_connections=True) def store_data(platform, out_video_id): key = f"crawler:duplicate:{platform}:{out_video_id}" value = 1 timeout = timedelta(days=60) # 60天超时时间 helper = SyncRedisHelper() client = helper.get_client() client.set(key, value) client.expire(key, timeout) def get_data(platform, out_video_id): key = f"crawler:duplicate:{platform}:{out_video_id}" helper = SyncRedisHelper() client = helper.get_client() value = client.get(key) return value # 示例:存储一个数据 # store_data('xiaoniangao', '12345') # 示例:获取一个数据 value = get_data('xiaoniangao', '12345') if value is None: print("Value does not exist") else: print(f"Retrieved value: {value}")