message_queue_backend.py 663 B

1234567891011121314151617181920212223242526272829
  1. #! /usr/bin/env python
  2. # -*- coding: utf-8 -*-
  3. # vim:fenc=utf-8
  4. import abc
  5. from typing import Dict, Any, Optional
  6. from message import Message
  7. class MessageQueueBackend(abc.ABC):
  8. @abc.abstractmethod
  9. def consume(self) -> Optional[Message]:
  10. pass
  11. @abc.abstractmethod
  12. def produce(self, message: Message) -> None:
  13. pass
  14. class MemoryQueueBackend(MessageQueueBackend):
  15. """内存消息队列实现"""
  16. def __init__(self):
  17. self._queue = []
  18. def consume(self) -> Optional[Message]:
  19. return self._queue.pop(0) if self._queue else None
  20. def produce(self, message: Message):
  21. self._queue.append(message)