redis.py 1.0 KB

12345678910111213141516171819202122232425262728293031323334353637
  1. import redis
  2. class RedisHelper(object):
  3. _pool: redis.ConnectionPool = None
  4. _instance = None
  5. def __init__(self):
  6. if not self._instance:
  7. self._pool = self._get_pool()
  8. self._instance = self
  9. def _get_pool(self) -> redis.ConnectionPool:
  10. if self._pool is None:
  11. self._pool = redis.ConnectionPool(
  12. host="r-bp1mb0v08fqi4hjffupd.redis.rds.aliyuncs.com", # 外网地址
  13. port=6379,
  14. db=0,
  15. password="Wqsd@2019",
  16. max_connections=100)
  17. return self._pool
  18. def get_client(self) -> redis.Redis:
  19. pool = self._get_pool()
  20. client = redis.Redis(connection_pool=pool)
  21. return client
  22. def close(self):
  23. if self._pool:
  24. self._pool.disconnect(inuse_connections=True)
  25. def content_video_data(ret):
  26. """分析失败视频重新写入redis"""
  27. task = f"task:video_insight"
  28. helper = RedisHelper()
  29. client = helper.get_client()
  30. client.rpush(task, ret)