|
@@ -54,7 +54,8 @@ class AliyunRocketMQQueueBackend(MessageQueueBackend):
|
|
|
def __init__(self, endpoints: str, instance_id: str, topic: str,
|
|
|
has_consumer: bool = False, has_producer: bool = False,
|
|
|
group_id: Optional[str] = None,
|
|
|
- ak:Optional[str] = None, sk: Optional[str] = None):
|
|
|
+ ak:Optional[str] = None, sk: Optional[str] = None,
|
|
|
+ topic_type: Optional[str] = None):
|
|
|
if not has_consumer and not has_producer:
|
|
|
raise ValueError("At least one of has_consumer or has_producer must be True.")
|
|
|
self.has_consumer = has_consumer
|
|
@@ -71,6 +72,7 @@ class AliyunRocketMQQueueBackend(MessageQueueBackend):
|
|
|
if has_producer:
|
|
|
self.producer = rocketmq.Producer(mq_config, (topic,))
|
|
|
self.producer.startup()
|
|
|
+ self.topic_type = topic_type
|
|
|
|
|
|
def __del__(self):
|
|
|
self.shutdown()
|
|
@@ -84,7 +86,7 @@ class AliyunRocketMQQueueBackend(MessageQueueBackend):
|
|
|
return None
|
|
|
rmq_message = messages[0]
|
|
|
body = rmq_message.body.decode('utf-8')
|
|
|
- logger.debug("recv message body: {}".format(body))
|
|
|
+ logger.debug("[{}]recv message body: {}".format(self.topic, body))
|
|
|
try:
|
|
|
message = Message.from_json(body)
|
|
|
message._rmq_message = rmq_message
|
|
@@ -98,7 +100,7 @@ class AliyunRocketMQQueueBackend(MessageQueueBackend):
|
|
|
def ack(self, message: Message):
|
|
|
if not message._rmq_message:
|
|
|
raise ValueError("Message not set with _rmq_message.")
|
|
|
- logger.debug("ack message: {}".format(message))
|
|
|
+ logger.debug("[{}]ack message: {}".format(self.topic, message))
|
|
|
self.consumer.ack(message._rmq_message)
|
|
|
|
|
|
def produce(self, message: Message) -> None:
|
|
@@ -109,14 +111,18 @@ class AliyunRocketMQQueueBackend(MessageQueueBackend):
|
|
|
rmq_message = rocketmq.Message()
|
|
|
rmq_message.topic = self.topic
|
|
|
rmq_message.body = json_str.encode('utf-8')
|
|
|
- # 顺序消息队列必须指定消息组
|
|
|
- rmq_message.message_group = "agent_system"
|
|
|
+ if self.topic_type == 'FIFO':
|
|
|
+ # 顺序消息队列必须指定消息组
|
|
|
+ rmq_message.message_group = "agent_system"
|
|
|
+ elif self.topic_type == 'DELAY':
|
|
|
+ # 延时消息队列必须指定投递时间(秒)
|
|
|
+ rmq_message.delivery_timestamp = int(message.sendTime / 1000)
|
|
|
self.producer.send(rmq_message)
|
|
|
|
|
|
def shutdown(self):
|
|
|
- if self.has_consumer:
|
|
|
+ if self.has_consumer and self.consumer.is_running:
|
|
|
self.consumer.shutdown()
|
|
|
- if self.has_producer:
|
|
|
+ if self.has_producer and self.producer.is_running:
|
|
|
self.producer.shutdown()
|
|
|
|
|
|
if __name__ == '__main__':
|