|
@@ -11,21 +11,21 @@ from rocketmq import ClientConfiguration, Credentials, SimpleConsumer
|
|
|
from pqai_agent import configs
|
|
|
from pqai_agent import logging_service
|
|
|
from pqai_agent.logging_service import logger
|
|
|
-from pqai_agent.mq_message import Message, MessageType, MessageChannel
|
|
|
+from pqai_agent.mq_message import MqMessage, MessageType, MessageChannel
|
|
|
|
|
|
|
|
|
|
|
|
class MessageQueueBackend(abc.ABC):
|
|
|
@abc.abstractmethod
|
|
|
- def consume(self) -> Optional[Message]:
|
|
|
+ def consume(self) -> Optional[MqMessage]:
|
|
|
pass
|
|
|
|
|
|
@abc.abstractmethod
|
|
|
- def ack(self, message: Message) -> None:
|
|
|
+ def ack(self, message: MqMessage) -> None:
|
|
|
pass
|
|
|
|
|
|
@abc.abstractmethod
|
|
|
- def produce(self, message: Message, msg_group: Optional[str] = None) -> None:
|
|
|
+ def produce(self, message: MqMessage, msg_group: Optional[str] = None) -> None:
|
|
|
pass
|
|
|
|
|
|
@abc.abstractmethod
|
|
@@ -37,13 +37,13 @@ class MemoryQueueBackend(MessageQueueBackend):
|
|
|
def __init__(self):
|
|
|
self._queue = []
|
|
|
|
|
|
- def consume(self) -> Optional[Message]:
|
|
|
+ def consume(self) -> Optional[MqMessage]:
|
|
|
return self._queue.pop(0) if self._queue else None
|
|
|
|
|
|
- def ack(self, message: Message):
|
|
|
+ def ack(self, message: MqMessage):
|
|
|
return
|
|
|
|
|
|
- def produce(self, message: Message, msg_group: Optional[str] = None):
|
|
|
+ def produce(self, message: MqMessage, msg_group: Optional[str] = None):
|
|
|
self._queue.append(message)
|
|
|
|
|
|
def shutdown(self):
|
|
@@ -78,7 +78,7 @@ class AliyunRocketMQQueueBackend(MessageQueueBackend):
|
|
|
def __del__(self):
|
|
|
self.shutdown()
|
|
|
|
|
|
- def consume(self, invisible_duration=60) -> Optional[Message]:
|
|
|
+ def consume(self, invisible_duration=60) -> Optional[MqMessage]:
|
|
|
if not self.has_consumer:
|
|
|
raise Exception("Consumer not initialized.")
|
|
|
# TODO(zhoutian): invisible_duration实际是不同消息类型不同的,有些消息预期的处理时间会更长
|
|
@@ -89,7 +89,7 @@ class AliyunRocketMQQueueBackend(MessageQueueBackend):
|
|
|
body = rmq_message.body.decode('utf-8')
|
|
|
logger.debug("[{}]recv message body: {}".format(self.topic, body))
|
|
|
try:
|
|
|
- message = Message.from_json(body)
|
|
|
+ message = MqMessage.from_json(body)
|
|
|
message._rmq_message = rmq_message
|
|
|
except Exception as e:
|
|
|
logger.error("Invalid message: {}. Parsing error: {}".format(body, e))
|
|
@@ -98,13 +98,13 @@ class AliyunRocketMQQueueBackend(MessageQueueBackend):
|
|
|
return None
|
|
|
return message
|
|
|
|
|
|
- def ack(self, message: Message):
|
|
|
+ def ack(self, message: MqMessage):
|
|
|
if not message._rmq_message:
|
|
|
- raise ValueError("Message not set with _rmq_message.")
|
|
|
+ raise ValueError("MqMessage not set with _rmq_message.")
|
|
|
logger.debug("[{}]ack message: {}".format(self.topic, message))
|
|
|
self.consumer.ack(message._rmq_message)
|
|
|
|
|
|
- def produce(self, message: Message, msg_group: Optional[str] = None) -> None:
|
|
|
+ def produce(self, message: MqMessage, msg_group: Optional[str] = None) -> None:
|
|
|
if not self.has_producer:
|
|
|
raise Exception("Producer not initialized.")
|
|
|
message.model_config['use_enum_values'] = False
|
|
@@ -148,16 +148,16 @@ if __name__ == '__main__':
|
|
|
else:
|
|
|
break
|
|
|
|
|
|
- send_message = Message.build(MessageType.AGGREGATION_TRIGGER, MessageChannel.CORP_WECHAT,
|
|
|
+ send_message = MqMessage.build(MessageType.AGGREGATION_TRIGGER, MessageChannel.CORP_WECHAT,
|
|
|
"user_id_1", "staff_id_0",
|
|
|
- None, int(time.time() * 1000))
|
|
|
+ None, int(time.time() * 1000))
|
|
|
queue.produce(send_message)
|
|
|
recv_message = queue.consume()
|
|
|
print(recv_message)
|
|
|
if recv_message:
|
|
|
queue.ack(recv_message)
|
|
|
|
|
|
- send_message = Message.build(MessageType.TEXT, MessageChannel.CORP_WECHAT,
|
|
|
+ send_message = MqMessage.build(MessageType.TEXT, MessageChannel.CORP_WECHAT,
|
|
|
"user_id_1", "staff_id_0",
|
|
|
"message_queue_backend test", int(time.time() * 1000))
|
|
|
queue.produce(send_message)
|