from core.base.async_redis_client import RedisManager class AsyncRedisService: """ RocketMQ控制组件: - 拉取到消息后先检测是否执行过,未执行过则标记为处理中。 - 执行完成后标记已完成。 - 避免同一 message_id 重复执行长任务。 """ def __init__(self, prefix: str = "crawler:task", ttl: int = 1 * 24 * 3600): """ :param prefix: Redis key 前缀 :param ttl: Key 存活时间(秒),默认 7 天防堆积 """ self.prefix = prefix self.ttl = ttl def _build_key(self, message_id: str) -> str: return f"{self.prefix}:{message_id}" async def get_status(self, message_id: str) -> str: """ 获取当前执行状态: - "0":执行中 - "1":已完成 - None:未执行 """ key = self._build_key(message_id) pool = RedisManager.get_pool() return await pool.get(key) async def mark_processing(self, message_id: str): """ 标记当前 message_id 为执行中(值为 "0") """ key = self._build_key(message_id) pool = RedisManager.get_pool() await pool.set(key, "0", ex=self.ttl) async def mark_done(self, message_id: str): """ 标记当前 message_id 已执行完成(值为 "1") """ key = self._build_key(message_id) pool = RedisManager.get_pool() await pool.set(key, "1", ex=self.ttl)