#! /usr/bin/env python # -*- coding: utf-8 -*- # vim:fenc=utf-8 import abc from typing import Dict, Any, Optional from message import Message class MessageQueueBackend(abc.ABC): @abc.abstractmethod def consume(self) -> Optional[Message]: pass @abc.abstractmethod def produce(self, message: Message) -> None: pass class MemoryQueueBackend(MessageQueueBackend): """内存消息队列实现""" def __init__(self): self._queue = [] def consume(self) -> Optional[Message]: return self._queue.pop(0) if self._queue else None def produce(self, message: Message): self._queue.append(message)