123456789101112131415161718192021222324252627 |
- #! /usr/bin/env python
- # -*- coding: utf-8 -*-
- # vim:fenc=utf-8
- import abc
- from typing import Dict, Any
- class MessageQueueBackend(abc.ABC):
- @abc.abstractmethod
- def consume(self) -> Any:
- pass
- @abc.abstractmethod
- def produce(self, message: Dict) -> None:
- pass
- class MemoryQueueBackend(MessageQueueBackend):
- """内存消息队列实现"""
- def __init__(self):
- self._queue = []
- def consume(self):
- return self._queue.pop(0) if self._queue else None
- def produce(self, message: Dict):
- self._queue.append(message)
|