import redis from common.feishu_form import Material class SyncRedisHelper: _pool: redis.ConnectionPool = None _instance = None def __init__(self): if not self._instance: self._pool = self._get_pool() self._instance = self def _get_pool(self) -> redis.ConnectionPool: if self._pool is None: self._pool = redis.ConnectionPool( host="r-bp1mb0v08fqi4hjffupd.redis.rds.aliyuncs.com", # 外网地址 # host="r-bp1mb0v08fqi4hjffu.redis.rds.aliyuncs.com", # 内网地址 port=6379, db=0, password="Wqsd@2019", # password="Qingqu2019", ) return self._pool def get_client(self) -> redis.Redis: pool = self._get_pool() client = redis.Redis(connection_pool=pool) return client def close(self): if self._pool: self._pool.disconnect(inuse_connections=True) def insert_carry_data(dt, REDIS_NAME,FS_SHEET, NAME): data = Material.get_carry_data(dt, FS_SHEET,NAME) if not data: return 0 helper = SyncRedisHelper() client = helper.get_client() client.rpush(REDIS_NAME, *data) return len(data) def get_carry_data(REDIS_NAME): """获取一条需要打标签的视频""" helper = SyncRedisHelper() client = helper.get_client() ret = client.lpop(REDIS_NAME) return ret def in_carry_video_data(REDIS_NAME, ret): """获取/处理失败重新写入""" helper = SyncRedisHelper() client = helper.get_client() client.rpush(REDIS_NAME, str(ret))