Browse Source

bug修复

zhangliang 12 hours ago
parent
commit
e1e92d4208
3 changed files with 120 additions and 3 deletions
  1. 14 0
      requirements.txt
  2. 5 3
      scheduler/async_consumer.py
  3. 101 0
      services/rocket_mq_pullconsumer.py

+ 14 - 0
requirements.txt

@@ -16,12 +16,24 @@ dotenv==0.9.9
 elastic-transport==8.17.1
 elasticsearch==9.0.2
 frozenlist==1.7.0
+googleapis-common-protos==1.70.0
+grpcio==1.73.1
+grpcio-tools==1.71.2
 idna==3.10
+importlib_metadata==8.7.0
 jmespath==0.10.0
 jsonpath-ng==1.7.0
 loguru==0.7.3
 mq-http-sdk==1.0.3
 multidict==6.4.4
+opentelemetry-api==1.34.1
+opentelemetry-exporter-otlp==1.34.1
+opentelemetry-exporter-otlp-proto-common==1.34.1
+opentelemetry-exporter-otlp-proto-grpc==1.34.1
+opentelemetry-exporter-otlp-proto-http==1.34.1
+opentelemetry-proto==1.34.1
+opentelemetry-sdk==1.34.1
+opentelemetry-semantic-conventions==0.55b1
 ply==3.11
 propcache==0.3.2
 protobuf==3.20.3
@@ -38,6 +50,7 @@ redis==6.2.0
 regex==2024.11.6
 requests==2.32.4
 rocketmq-client-python==2.0.0
+rocketmq-python-client==5.0.6
 six==1.17.0
 tenacity==9.1.2
 typing-inspection==0.4.1
@@ -45,3 +58,4 @@ typing_extensions==4.14.0
 tzlocal==5.3.1
 urllib3==2.4.0
 yarl==1.20.1
+zipp==3.23.0

+ 5 - 3
scheduler/async_consumer.py

@@ -40,6 +40,8 @@ async def async_handle_topic(topic: str, stop_event: asyncio.Event):
                     data=payload,
                     account=topic
                 )
+                # 确认消息(单条消息处理成功后才 Ack) 由于获取到消息到确认消息有超时问题,暂时先把ask提前,后面改为使用sdk
+                await consumer.ack_message(message.receipt_handle)
 
                 # 从数据库查询配置
                 async with AsyncMysqlService() as mysql:
@@ -52,10 +54,10 @@ async def async_handle_topic(topic: str, stop_event: asyncio.Event):
                     rule_dict=rule_dict,
                     user_list=user_list,
                 )
-                await crawler.run()  # 爬虫成功执行后再确认消息
 
-                # 确认消息(单条消息处理成功后才 Ack)
-                await consumer.ack_message(message.receipt_handle)
+
+
+                await crawler.run()
 
                 logger.info(f"[{topic}]任务 {task_id} 执行成功并已 Ack")
                 aliyun_logger.logging(

+ 101 - 0
services/rocket_mq_pullconsumer.py

@@ -0,0 +1,101 @@
+# import asyncio
+# import logging
+# from typing import Callable, Awaitable
+#
+# from rocketmq.client import PushConsumer, Message
+# from core.utils.log.logger_manager import LoggerManager
+#
+# logger = LoggerManager.get_logger()
+# aliyun_logger = LoggerManager.get_aliyun_logger()
+#
+#
+# class RocketMQAsyncPullConsumer:
+#     def __init__(self, topic: str, group_id: str, max_workers: int = 5):
+#         """
+#         RocketMQ TCP SDK 异步拉取消费者封装
+#         - 拉取消息后执行任务,执行完成再确认 ack
+#         - 使用 asyncio + Semaphore 控制并发执行
+#         """
+#         self.topic = topic
+#         self.group_id = group_id
+#         self.max_workers = max_workers
+#         self.semaphore = asyncio.Semaphore(max_workers)
+#         self.shutdown_event = asyncio.Event()
+#
+#         # 初始化 RocketMQ Pull Consumer
+#         self.consumer = PushConsumer(group_id)
+#         self.consumer.set_name_server_address('')
+#         self.consumer.subscribe(topic)
+#
+#     async def process_message(self, msg: Message, handler: Callable[[Message], Awaitable[None]]):
+#         async with self.semaphore:
+#             try:
+#                 logger.info(f"[{self.topic}] 收到消息: {msg.id}")
+#                 await handler(msg)
+#                 msg.ack()
+#                 logger.info(f"[{self.topic}] 消息 {msg.id} 执行完毕并已确认 ack")
+#             except Exception as e:
+#                 logger.error(f"[{self.topic}] 消息处理失败: {e}", exc_info=True)
+#                 aliyun_logger.logging(
+#                     code="9001",
+#                     message=f"[{self.topic}] 消息处理失败: {e}",
+#                     data={"msg_id": msg.id, "body": msg.body.decode()},
+#                 )
+#
+#     async def _consume_loop(self, handler: Callable[[Message], Awaitable[None]]):
+#         loop = asyncio.get_running_loop()
+#
+#         def callback(msg: Message):
+#             asyncio.run_coroutine_threadsafe(self.process_message(msg, handler), loop)
+#             return None
+#
+#         self.consumer.register_message_listener(callback)
+#         self.consumer.start()
+#
+#         logger.info(f"[{self.topic}] RocketMQ PullConsumer 已启动,开始消费消息...")
+#         aliyun_logger.logging(
+#             code="1500",
+#             message=f"[{self.topic}] RocketMQ PullConsumer 已启动",
+#         )
+#
+#         await self.shutdown_event.wait()
+#
+#         self.consumer.shutdown()
+#         logger.info(f"[{self.topic}] PullConsumer 已优雅退出")
+#         aliyun_logger.logging(
+#             code="1602",
+#             message=f"[{self.topic}] PullConsumer 已优雅退出",
+#         )
+#
+#     def stop(self):
+#         """外部调用以优雅退出"""
+#         self.shutdown_event.set()
+#
+#     async def run(self, handler: Callable[[Message], Awaitable[None]]):
+#         await self._consume_loop(handler)
+#
+#
+# # === 使用示例 ===
+# if __name__ == '__main__':
+#     import signal
+#
+#     async def handle_message(msg: Message):
+#         logger.info(f"处理消息内容: {msg.body.decode()}")
+#         await asyncio.sleep(2)  # 模拟执行耗时任务
+#
+#     consumer = RocketMQAsyncPullConsumer(
+#         topic="ynfqmm_recommend_prod",
+#         group_id="ynfqmm_recommend_prod",
+#         max_workers=5
+#     )
+#
+#     loop = asyncio.get_event_loop()
+#
+#     def shutdown_handler():
+#         logger.warning("收到退出信号,准备优雅关闭消费者...")
+#         consumer.stop()
+#
+#     for sig in [signal.SIGINT, signal.SIGTERM]:
+#         loop.add_signal_handler(sig, shutdown_handler)
+#
+#     loop.run_until_complete(consumer.run(handle_message))