async_redis_service.py 1.5 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849
  1. from core.base.async_redis_client import RedisManager
  2. class AsyncRedisService:
  3. """
  4. RocketMQ控制组件:
  5. - 拉取到消息后先检测是否执行过,未执行过则标记为处理中。
  6. - 执行完成后标记已完成。
  7. - 避免同一 message_id 重复执行长任务。
  8. """
  9. def __init__(self, prefix: str = "crawler:task", ttl: int = 1 * 24 * 3600):
  10. """
  11. :param prefix: Redis key 前缀
  12. :param ttl: Key 存活时间(秒),默认 7 天防堆积
  13. """
  14. self.prefix = prefix
  15. self.ttl = ttl
  16. def _build_key(self, message_id: str) -> str:
  17. return f"{self.prefix}:{message_id}"
  18. async def get_status(self, message_id: str) -> str:
  19. """
  20. 获取当前执行状态:
  21. - "0":执行中
  22. - "1":已完成
  23. - None:未执行
  24. """
  25. key = self._build_key(message_id)
  26. pool = RedisManager.get_pool()
  27. return await pool.get(key)
  28. async def mark_processing(self, message_id: str):
  29. """
  30. 标记当前 message_id 为执行中(值为 "0")
  31. """
  32. key = self._build_key(message_id)
  33. pool = RedisManager.get_pool()
  34. await pool.set(key, "0", ex=self.ttl)
  35. async def mark_done(self, message_id: str):
  36. """
  37. 标记当前 message_id 已执行完成(值为 "1")
  38. """
  39. key = self._build_key(message_id)
  40. pool = RedisManager.get_pool()
  41. await pool.set(key, "1", ex=self.ttl)