#! /usr/bin/env python # -*- coding: utf-8 -*- # vim:fenc=utf-8 import abc from typing import Dict, Any from message import Message class MessageQueueBackend(abc.ABC): @abc.abstractmethod def consume(self) -> Any: pass @abc.abstractmethod def produce(self, message: Message) -> None: pass class MemoryQueueBackend(MessageQueueBackend): """内存消息队列实现""" def __init__(self): self._queue = [] def consume(self): return self._queue.pop(0) if self._queue else None def produce(self, message: Message): self._queue.append(message)