123456789101112131415161718192021222324252627282930313233 |
- import pytest
- import asyncio
- from config import settings
- from services.async_redis_service import AsyncRedisService
- from core.base.async_redis_client import RedisManager
- @pytest.mark.asyncio
- async def test_async_redis_service_mark_and_get_status():
- await RedisManager.init(redis_url=settings.redis_url)
- service = AsyncRedisService(prefix="crawler:task", ttl=20)
- test_message_id = "test_123456"
- # 确保干净
- pool = RedisManager.get_pool()
- await pool.delete(service._build_key(test_message_id))
- # 初始应获取 None
- status = await service.get_status(test_message_id)
- assert status is None
- # 标记为执行中
- await service.mark_processing(test_message_id)
- status = await service.get_status(test_message_id)
- assert status == "0"
- # 标记为完成
- await service.mark_done(test_message_id)
- status = await service.get_status(test_message_id)
- assert status == "1"
- # 清理
- # await pool.delete(service._build_key(test_message_id))
|