12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849 |
- from core.base.async_redis_client import RedisManager
- class AsyncRedisService:
- """
- RocketMQ控制组件:
- - 拉取到消息后先检测是否执行过,未执行过则标记为处理中。
- - 执行完成后标记已完成。
- - 避免同一 message_id 重复执行长任务。
- """
- def __init__(self, prefix: str = "crawler:task", ttl: int = 1 * 24 * 3600):
- """
- :param prefix: Redis key 前缀
- :param ttl: Key 存活时间(秒),默认 7 天防堆积
- """
- self.prefix = prefix
- self.ttl = ttl
- def _build_key(self, message_id: str) -> str:
- return f"{self.prefix}:{message_id}"
- async def get_status(self, message_id: str) -> str:
- """
- 获取当前执行状态:
- - "0":执行中
- - "1":已完成
- - None:未执行
- """
- key = self._build_key(message_id)
- pool = RedisManager.get_pool()
- return await pool.get(key)
- async def mark_processing(self, message_id: str):
- """
- 标记当前 message_id 为执行中(值为 "0")
- """
- key = self._build_key(message_id)
- pool = RedisManager.get_pool()
- await pool.set(key, "0", ex=self.ttl)
- async def mark_done(self, message_id: str):
- """
- 标记当前 message_id 已执行完成(值为 "1")
- """
- key = self._build_key(message_id)
- pool = RedisManager.get_pool()
- await pool.set(key, "1", ex=self.ttl)
|