|
@@ -1,9 +1,10 @@
|
|
|
import asyncio
|
|
|
import logging
|
|
|
+import traceback
|
|
|
from typing import List, Optional, Callable, Any
|
|
|
from mq_http_sdk.mq_client import MQClient
|
|
|
from mq_http_sdk.mq_consumer import Message
|
|
|
-from mq_http_sdk.mq_exception import MQExceptionBase
|
|
|
+from mq_http_sdk.mq_exception import MQExceptionBase, MQClientNetworkException
|
|
|
|
|
|
from config import settings
|
|
|
from core.utils.log.logger_manager import LoggerManager
|
|
@@ -34,12 +35,21 @@ class AsyncRocketMQConsumer:
|
|
|
self.topic_name = topic_name
|
|
|
self.group_id = group_id
|
|
|
|
|
|
- # 初始化客户端
|
|
|
- self.client = MQClient(self.endpoint, self.access_key_id, self.access_key_secret)
|
|
|
- self.consumer = self.client.get_consumer(self.instance_id, self.topic_name, self.group_id)
|
|
|
-
|
|
|
self.logger = LoggerManager.get_logger()
|
|
|
self.aliyun_logger = LoggerManager.get_aliyun_logger()
|
|
|
+ self._create_consumer()
|
|
|
+ def _create_consumer(self):
|
|
|
+ self.client = MQClient(
|
|
|
+ self.endpoint,
|
|
|
+ self.access_key_id,
|
|
|
+ self.access_key_secret
|
|
|
+ )
|
|
|
+ self.consumer = self.client.get_consumer(
|
|
|
+ self.instance_id,
|
|
|
+ self.topic_name,
|
|
|
+ self.group_id
|
|
|
+ )
|
|
|
+ self.logger.info(f"[{self.topic_name}] MQ Consumer 初始化完成")
|
|
|
|
|
|
async def receive_message(self) -> Optional[Message]:
|
|
|
"""异步拉取单条消息"""
|
|
@@ -54,15 +64,36 @@ class AsyncRocketMQConsumer:
|
|
|
self.logger.info(f"[{self.topic_name}]成功拉取到1条消息")
|
|
|
return messages[0]
|
|
|
return None
|
|
|
+ except MQClientNetworkException as e:
|
|
|
+ self.logger.error(f"[{self.topic_name}] MQClientNetworkException 发生,将自动重连: {e}")
|
|
|
+ await self._handle_reconnect()
|
|
|
+ return None
|
|
|
+
|
|
|
except MQExceptionBase as e:
|
|
|
- if getattr(e, "type", "") == "MessageNotExist":
|
|
|
- # 更友好的日志输出,使用INFO级别而非ERROR
|
|
|
- self.logger.debug(f"[{self.topic_name}]当前没有可消费的消息,继续等待...")
|
|
|
+ if e.type == "MessageNotExist":
|
|
|
+ # 没有新消息
|
|
|
+ self.logger.warning(f"[{self.topic_name}] 消息为空")
|
|
|
+ await asyncio.sleep(5)
|
|
|
+ return None
|
|
|
+ else:
|
|
|
+ self.logger.error(f"[{self.topic_name}] MQExceptionBase 错误: {e} \n {traceback.extract_tb()}")
|
|
|
return None
|
|
|
- # 其他类型的异常仍按错误处理
|
|
|
- self.logger.error(f"[{self.topic_name}]拉取消息失败: {e}")
|
|
|
- raise e
|
|
|
|
|
|
+ except Exception as e:
|
|
|
+ self.logger.error(f"[{self.topic_name}] 未知异常: {e} \n {traceback.extract_tb()}")
|
|
|
+ return None
|
|
|
+
|
|
|
+ async def _handle_reconnect(self):
|
|
|
+ """
|
|
|
+ 在网络断开时自动重建 MQConsumer 实例
|
|
|
+ """
|
|
|
+ try:
|
|
|
+ self.logger.warning(f"[{self.topic_name}] 正在重新初始化 MQ Consumer 实例以重连...")
|
|
|
+ self._create_consumer()
|
|
|
+ self.logger.warning(f"[{self.topic_name}] MQ Consumer 重连成功")
|
|
|
+ except Exception as e:
|
|
|
+ self.logger.error(f"[{self.topic_name}] MQ Consumer 重连失败: {e}", exc_info=True)
|
|
|
+ await asyncio.sleep(5)
|
|
|
async def ack_message(self, receipt_handle: str) -> None:
|
|
|
"""确认消费成功"""
|
|
|
try:
|