clear_redis_concurrent.py 2.7 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273
  1. import datetime
  2. import sys
  3. import redis
  4. import concurrent.futures
  5. def process_key(r, key):
  6. """处理单个 key 的过期时间延长逻辑。"""
  7. try:
  8. ttl = r.ttl(key)
  9. if ttl is not None and 0 < ttl < threshold_second:
  10. r.expire(key, ttl + extend_second)
  11. #logging.info(f"Key: {key.decode()}, 原 TTL: {ttl} 秒, 延长 {extend_second} 秒")
  12. return 1 # 成功续期的 key 数量
  13. return 0
  14. except redis.exceptions.RedisError as e:
  15. print(f"处理 Key: {key.decode()} 时发生 Redis 错误: {e}")
  16. return 0
  17. except Exception as e:
  18. print(f"处理 Key: {key.decode()} 时发生其他错误: {e}")
  19. return 0
  20. def clear_redis_key(host='localhost', port=6379, db=0, password=None, num_workers=10):
  21. """
  22. 扫描 Redis key,如果过期时间小于 threshold_second,则过期时间延长 extend_second。
  23. 使用并行处理,尽量不改动原代码结构。
  24. """
  25. try:
  26. print(f'time = {datetime.datetime.now()}')
  27. sys.stdout.flush()
  28. sys.stderr.flush()
  29. r = redis.Redis(host=host, port=port, db=db, password=password)
  30. count = 0
  31. clear_count = 0
  32. with concurrent.futures.ThreadPoolExecutor(max_workers=num_workers) as executor:
  33. futures = []
  34. for key in r.scan_iter(match='com.weiqu.longvideo.video.message.cache.*', count=1000):
  35. futures.append(executor.submit(process_key, r, key))
  36. count += 1
  37. if count % 1000000 == 0:
  38. print(f"submit count: {count} time = {datetime.datetime.now()}")
  39. sys.stdout.flush()
  40. sys.stderr.flush()
  41. for future in concurrent.futures.as_completed(futures):
  42. try:
  43. clear_count += future.result()
  44. except Exception as e:
  45. print(f"future error: {e}")
  46. sys.stdout.flush()
  47. sys.stderr.flush()
  48. print(f"scan finish count: {count} clear_count: {clear_count} time = {datetime.datetime.now()}")
  49. sys.stdout.flush()
  50. sys.stderr.flush()
  51. except redis.exceptions.ConnectionError as e:
  52. print(f"connect Redis error: {e}")
  53. sys.stdout.flush()
  54. sys.stderr.flush()
  55. except Exception as e:
  56. print(f"error: {e}")
  57. sys.stdout.flush()
  58. sys.stderr.flush()
  59. if __name__ == "__main__":
  60. num_workers = 20 # 根据 CPU 核心数和网络情况调整
  61. # test
  62. clear_redis_key(host='r-bp1ps6my7lzg8rdhwx682.redis.rds.aliyuncs.com', port=6379, db=0, password='Wqsd@2019')
  63. # prod
  64. # clear_redis_key(host='r-bp1oyhyx4mxgs6klyt561.redis.rds.aliyuncs.com', port=6379, db=0, password='Wqsd@2019')