message_queue_backend.py 583 B

123456789101112131415161718192021222324252627
  1. #! /usr/bin/env python
  2. # -*- coding: utf-8 -*-
  3. # vim:fenc=utf-8
  4. import abc
  5. from typing import Dict, Any
  6. class MessageQueueBackend(abc.ABC):
  7. @abc.abstractmethod
  8. def consume(self) -> Any:
  9. pass
  10. @abc.abstractmethod
  11. def produce(self, message: Dict) -> None:
  12. pass
  13. class MemoryQueueBackend(MessageQueueBackend):
  14. """内存消息队列实现"""
  15. def __init__(self):
  16. self._queue = []
  17. def consume(self):
  18. return self._queue.pop(0) if self._queue else None
  19. def produce(self, message: Dict):
  20. self._queue.append(message)