import datetime import redis def extend_redis_key_expiry(host='localhost', port=6379, db=0, password=None, threshold_second=60, extend_second=300): """ 扫描 Redis key,如果过期时间小于 threshold_days 天,则过期时间延长 extend_days 天。 Args: host: Redis 主机地址。 port: Redis 端口。 db: Redis 数据库编号。 password: Redis 密码(如果没有则为 None)。 threshold_second: 过期时间阈值(秒)。 extend_second: 延长时间(秒)。 """ try: print(f'time = {datetime.datetime.now()}') r = redis.Redis(host=host, port=port, db=db, password=password) count = 0 renew_count = 0 # 获取所有 key (生产环境谨慎使用 keys '*') # 建议使用 scan_iter 迭代 key,避免阻塞 Redis for key in r.scan_iter(match='mid:generate:timestamp*', count=1000): # for key in r.scan_iter(match='renew_1*', count=1000): ttl = r.ttl(key) print(f'key: {key}, ttl: {ttl}') if ttl is not None and ttl > 0: # 检查 key 是否设置了过期时间 if ttl < threshold_second: r.expire(key, ttl + extend_second) # 在原过期时间基础上延长 print(f"Key: {key.decode()}, 原 TTL: {ttl} 秒, 延长 {extend_second} 秒") renew_count += 1 count += 1 if count % 1000000 == 0: print(f"count: {count}") print(f"扫描完成 count: {count} renew_count: {renew_count} time = {datetime.datetime.now()}") except redis.exceptions.ConnectionError as e: print(f"连接 Redis 失败: {e}") except Exception as e: print(f"发生错误: {e}") if __name__ == "__main__": # 或者指定 Redis 连接参数 threshold_second = 7 * 24 * 60 * 60 extend_second = 20 * 24 * 60 * 60 # test # extend_redis_key_expiry(host='r-bp1ps6my7lzg8rdhwx682.redis.rds.aliyuncs.com', port=6379, db=0, password='Wqsd@2019', threshold_second=threshold_second, extend_second=extend_second) # prod extend_redis_key_expiry(host='r-bp1j1vsznx8h813ddk.redis.rds.aliyuncs.com', port=6379, db=0, password='Wqsd@2019', threshold_second=threshold_second, extend_second=extend_second)