|
@@ -44,20 +44,20 @@ class AsyncRocketMQConsumer:
|
|
|
async def receive_message(self) -> Optional[Message]:
|
|
|
"""异步拉取单条消息"""
|
|
|
try:
|
|
|
- self.logger.debug(f"[{self.topic_name}]开始拉取单条消息,等待时间: {self.wait_seconds}秒")
|
|
|
+ self.logger.info(f"[{self.topic_name}]开始拉取单条消息,等待时间: {self.wait_seconds}秒")
|
|
|
messages = await asyncio.to_thread(
|
|
|
self.consumer.consume_message,
|
|
|
settings.ROCKETMQ_BATCH,
|
|
|
self.wait_seconds,
|
|
|
)
|
|
|
if messages:
|
|
|
- self.logger.debug(f"[{self.topic_name}]成功拉取到1条消息")
|
|
|
+ self.logger.info(f"[{self.topic_name}]成功拉取到1条消息")
|
|
|
return messages[0]
|
|
|
return None
|
|
|
except MQExceptionBase as e:
|
|
|
if getattr(e, "type", "") == "MessageNotExist":
|
|
|
# 更友好的日志输出,使用INFO级别而非ERROR
|
|
|
- self.logger.info(f"[{self.topic_name}]当前没有可消费的消息,继续等待...")
|
|
|
+ self.logger.debug(f"[{self.topic_name}]当前没有可消费的消息,继续等待...")
|
|
|
return None
|
|
|
# 其他类型的异常仍按错误处理
|
|
|
self.logger.error(f"[{self.topic_name}]拉取消息失败: {e}")
|