Browse Source

修复mq消息消费

zhangliang 6 hours ago
parent
commit
a1368591a6
7 changed files with 165 additions and 87 deletions
  1. 0 1
      config/base.py
  2. 1 1
      config/spiders_config.yaml
  3. 3 3
      core/utils/__init__.py
  4. 1 1
      main.py
  5. 46 35
      scheduler/async_consumer.py
  6. 84 22
      services/async_mq_consumer.py
  7. 30 24
      services/pipeline.py

+ 0 - 1
config/base.py

@@ -1,5 +1,4 @@
 import os
-from pathlib import Path
 
 from dotenv import load_dotenv
 from core.utils.path_utils import project_root,log_dir

+ 1 - 1
config/spiders_config.yaml

@@ -37,7 +37,7 @@ yuannifuqimanmanrecommend:
   method: post
   request_body:
     cursor: "{{next_cursor}}"
-  loop_times: 200
+  loop_times: 50
   loop_interval: 5
   feishu_sheetid: "golXy9"
   response_parse:

+ 3 - 3
core/utils/__init__.py

@@ -1,3 +1,3 @@
-from .log.log_codes import CODES
-
-__all__ = ['CODES']
+# from .log.log_codes import CODES
+#
+# __all__ = ['CODES']

+ 1 - 1
main.py

@@ -5,7 +5,7 @@ from typing import Dict
 from core.utils.log.logger_manager import LoggerManager
 from scheduler.process_manager import split_topics, start_worker_process
 from spiders.spider_registry import SPIDER_CLASS_MAP
-from core.utils import CODES
+
 
 
 

+ 46 - 35
scheduler/async_consumer.py

@@ -12,22 +12,21 @@ from spiders.spider_registry import get_spider_class
 logger = LoggerManager.get_logger()
 aliyun_logger = LoggerManager.get_aliyun_logger()
 
-async def async_handle_topic(topic: str,stop_event: asyncio.Event):
+
+async def async_handle_topic(topic: str, stop_event: asyncio.Event):
     """
     单个 topic 的消费逻辑,运行在协程中:
-    - 从 MQ 中消费消息;
+    - 从 MQ 中消费消息(单条处理,处理完再拉取下一条)
     - 根据消息内容执行对应爬虫;
     - 使用异步数据库服务查询配置;
     - 记录日志、确认消息。
     """
-
-
-    # 每个 topic 创建独立的 consumer 实例
+    # 每个 topic 创建独立的 consumer 实例(使用优化后的 AsyncRocketMQConsumer)
     from services.async_mq_consumer import AsyncRocketMQConsumer
-
     consumer = AsyncRocketMQConsumer(topic_name=topic, group_id=topic)
 
     async def handle_single_message(message):
+        """处理单条消息的业务逻辑(不含拉取和循环)"""
         trace_id = generate_trace_id()
         try:
             payload = json.loads(message.message_body)
@@ -41,19 +40,22 @@ async def async_handle_topic(topic: str,stop_event: asyncio.Event):
                 trace_id=trace_id,
                 account=topic
             )
+
+            # 从数据库查询配置
             async with AsyncMysqlService() as mysql:
                 user_list = await mysql.get_user_list(task_id)
                 rule_dict = await mysql.get_rule_dict(task_id)
 
+            # 执行爬虫任务
             CrawlerClass = get_spider_class(topic)
             crawler = CrawlerClass(
                 rule_dict=rule_dict,
                 user_list=user_list,
                 trace_id=trace_id
             )
-            await crawler.run()
+            await crawler.run()  # 爬虫成功执行后再确认消息
 
-            # ack 由 run 成功后执行
+            # 确认消息(单条消息处理成功后才 Ack)
             await consumer.ack_message(message.receipt_handle)
 
             logger.info(f"{trace_id} - 任务 {task_id} 执行成功并已 Ack")
@@ -61,19 +63,15 @@ async def async_handle_topic(topic: str,stop_event: asyncio.Event):
                 code="1010",
                 message="任务执行成功",
                 trace_id=trace_id,
-                data={
-                    "task_id": task_id,
-                    "topic": topic
-                },
+                data={"task_id": task_id, "topic": topic},
                 account=topic
-
             )
 
         except Exception as e:
-            logger.error(f"{trace_id} - 任务处理失败: {e} /n {traceback.format_exc()}")
+            logger.error(f"{trace_id} - 任务处理失败: {e} \n {traceback.format_exc()}")
             aliyun_logger.logging(
                 code="9001",
-                message=f"处理消息失败: {str(e)}  /n {traceback.format_exc()}",
+                message=f"处理消息失败: {str(e)} \n {traceback.format_exc()}",
                 trace_id=trace_id,
                 data={
                     "error_type": type(e).__name__,
@@ -82,22 +80,36 @@ async def async_handle_topic(topic: str,stop_event: asyncio.Event):
                 },
                 account=topic
             )
-        # 自动重启消费循环
-        while not stop_event.is_set():
+            # 处理失败不 Ack,消息会被 MQ 重新投递(依赖 MQ 的重试机制)
+
+    # 独立的消费循环:拉取消息并调用处理函数
+    async def consume_loop():
+        logger.info(f"[{topic}] 启动消费循环,开始拉取消息...")
+        while not stop_event.is_set():  # 监听停止信号,支持优雅退出
             try:
-                await consumer.run_forever(handle_single_message)
+                # 拉取单条消息(依赖优化后的 receive_message,无消息时返回 None 不报错)
+                message = await consumer.receive_message()
+                if message:
+                    # 有消息则处理,处理完成后再进入下一次循环
+                    await handle_single_message(message)
+                else:
+                    # 无消息时短暂休眠,避免频繁空轮询
+                    await asyncio.sleep(1)
             except Exception as e:
+                # 非消息处理的异常(如 MQ 连接失败),记录并重试
+                logger.error(f"[{topic}] 消费循环异常: {e}", exc_info=True)
                 aliyun_logger.logging(
                     code="9002",
-                    message=f"{topic} 消费循环异常即将重启: {str(e)}",
-                    data={
-                        "error_type": type(e).__name__,
-                        "stack_trace": traceback.format_exc(),
-                    },
+                    message=f"{topic} 消费循环异常,即将重试: {str(e)}",
+                    data={"error_type": type(e).__name__, "stack_trace": traceback.format_exc()},
                     account=topic
                 )
-                logger.warning(f"[{topic}] 消费循环异常: {e},5秒后重启")
-                await asyncio.sleep(5)
+                await asyncio.sleep(5)  # 异常后延迟重试,减轻服务压力
+
+        logger.info(f"[{topic}] 消费循环已停止(收到退出信号)")
+
+    # 启动消费循环(这是消费逻辑的入口)
+    await consume_loop()
 
 
 async def run_all_topics(topics: List[str]):
@@ -105,6 +117,7 @@ async def run_all_topics(topics: List[str]):
     loop = asyncio.get_running_loop()
 
     def shutdown():
+        """处理停止信号(如 Ctrl+C),触发优雅退出"""
         logger.warning("[系统] 收到停止信号,准备优雅退出...")
         aliyun_logger.logging(
             code="1600",
@@ -112,33 +125,31 @@ async def run_all_topics(topics: List[str]):
         )
         stop_event.set()
 
+    # 注册信号处理(支持 Ctrl+C 和 kill 命令)
     for sig in [signal.SIGINT, signal.SIGTERM]:
         loop.add_signal_handler(sig, shutdown)
 
+    # 为每个 topic 创建独立协程任务
     tasks = [asyncio.create_task(async_handle_topic(topic, stop_event)) for topic in topics]
 
-    await stop_event.wait()  # 等待停止信号
+    await stop_event.wait()  # 等待退出信号
 
-    logger.warning(f"[系统] 正在取消所有消费任务...{tasks}")
-    aliyun_logger.logging(
-        code="1601",
-        message="[系统] 收到停止信号,准备优雅退出...",
-        data=f"任务列表{tasks}"
-    )
+    # 取消所有任务并等待结束
+    logger.warning(f"[系统] 正在取消所有消费任务...")
     for task in tasks:
         task.cancel()
 
+    # 收集任务结果,忽略取消异常
     results = await asyncio.gather(*tasks, return_exceptions=True)
-
     for idx, result in enumerate(results):
-        if isinstance(result, Exception):
+        if isinstance(result, Exception) and not isinstance(result, asyncio.CancelledError):
             logger.error(f"[系统] 任务 {topics[idx]} 异常退出: {result}")
 
     logger.warning(f"[系统] 所有任务已退出,进程已关闭...")
     aliyun_logger.logging(
         code="1602",
         message="[系统] 所有任务已退出,进程已关闭...",
-        data=f"任务列表{tasks}"
+        data={"task_count": len(tasks)}
     )
 
 

+ 84 - 22
services/async_mq_consumer.py

@@ -1,11 +1,13 @@
 import asyncio
-from typing import List, Optional, Callable
-
+import logging
+from typing import List, Optional, Callable, Any
 from mq_http_sdk.mq_client import MQClient
 from mq_http_sdk.mq_consumer import Message
 from mq_http_sdk.mq_exception import MQExceptionBase
 
 from config import settings
+from core.utils.log.logger_manager import LoggerManager
+
 
 
 class AsyncRocketMQConsumer:
@@ -14,6 +16,7 @@ class AsyncRocketMQConsumer:
     - 支持自动读取环境变量
     - 基于 asyncio 实现原生异步消费模型
     - 手动确认消费
+    - 提供单条消息处理模式
     """
 
     def __init__(
@@ -21,57 +24,116 @@ class AsyncRocketMQConsumer:
             topic_name: Optional[str],
             group_id: Optional[str],
             wait_seconds: Optional[int] = None,
-            batch: Optional[int] = None,
     ):
         # 从环境变量读取配置
         self.endpoint = settings.ROCKETMQ_ENDPOINT
         self.access_key_id = settings.ROCKETMQ_ACCESS_KEY_ID
         self.access_key_secret = settings.ROCKETMQ_ACCESS_KEY_SECRET
         self.instance_id = settings.ROCKETMQ_INSTANCE_ID
-        self.wait_seconds = settings.ROCKETMQ_WAIT_SECONDS
-        self.batch = batch or settings.ROCKETMQ_BATCH
+        self.wait_seconds = wait_seconds or settings.ROCKETMQ_WAIT_SECONDS
         self.topic_name = topic_name
         self.group_id = group_id
+
         # 初始化客户端
         self.client = MQClient(self.endpoint, self.access_key_id, self.access_key_secret)
         self.consumer = self.client.get_consumer(self.instance_id, self.topic_name, self.group_id)
 
-    async def receive_messages(self) -> List[Message]:
-        """异步封装消息拉取"""
+        self.logger = LoggerManager.get_logger()
+        self.aliyun_logger = LoggerManager.get_aliyun_logger()
+
+    async def receive_message(self) -> Optional[Message]:
+        """异步拉取单条消息"""
         try:
-            return await asyncio.to_thread(
+            self.logger.debug(f"开始拉取单条消息,等待时间: {self.wait_seconds}秒")
+            messages = await asyncio.to_thread(
                 self.consumer.consume_message,
-                self.batch,
+                settings.ROCKETMQ_BATCH,
                 self.wait_seconds,
             )
+            if messages:
+                self.logger.debug(f"成功拉取到1条消息")
+                return messages[0]
+            return None
         except MQExceptionBase as e:
             if getattr(e, "type", "") == "MessageNotExist":
-                return []
+                # 更友好的日志输出,使用INFO级别而非ERROR
+                self.logger.info("当前没有可消费的消息,继续等待...")
+                return None
+            # 其他类型的异常仍按错误处理
+            self.logger.error(f"拉取消息失败: {e}")
             raise e
 
     async def ack_message(self, receipt_handle: str) -> None:
         """确认消费成功"""
         try:
             await asyncio.to_thread(self.consumer.ack_message, [receipt_handle])
+            self.logger.debug(f"消息确认成功")
         except Exception as e:
+            self.logger.error(f"确认消息失败: {e}")
             raise RuntimeError(f"确认消息失败: {e}")
 
-    async def run_forever(self, handler: Callable[[Message], asyncio.Future]):
+    async def process_single_message(self, handler: Callable[[Message], Any]) -> bool:
+        """
+        处理单条消息的完整流程:拉取、处理、确认
+
+        返回值:是否成功处理并确认消息
         """
-        无限循环拉取消息并处理,适合开发调试或小批量任务
+        message = await self.receive_message()
+        if not message:
+            return False
+
+        try:
+            self.logger.info(f"收到消息 ID: {message.message_id}")
+            # 执行消息处理任务
+            await handler(message)
+            # 任务成功后确认消息
+            await self.ack_message(message.receipt_handle)
+            self.logger.info(f"消息 ID: {message.message_id} 处理并确认成功")
+            return True
+        except Exception as e:
+            self.logger.error(f"处理消息失败: {e}", exc_info=True)
+            # 消息处理失败,不会确认消息,RocketMQ会在可见时间后重新投递
+            return False
+
+
+
+    async def run_single_threaded(self, handler: Callable[[Message], Any], max_retries: int = 3):
+        """
+        单线程模式处理消息:获取一条消息,处理完成后再获取下一条
 
         :param handler: 异步消息处理函数 async def handler(msg: Message)
+        :param max_retries: 消息处理失败后的最大重试次数
         """
-        print(f"[AsyncRocketMQConsumer] 启动消费: Topic={self.topic_name}, Group={self.group_id}")
+        self.logger.info(f"[AsyncRocketMQConsumer] 启动单线程消费模式: Topic={self.topic_name}, Group={self.group_id}")
+
         while True:
             try:
-                messages = await self.receive_messages()
-                for msg in messages:
-                    try:
-                        await handler(msg)
-                        await self.ack_message(msg.receipt_handle)
-                    except Exception as e:
-                        print(f"[处理失败] {e}\n消息: {msg.message_body}")
+                success = await self.process_single_message(handler)
+                if not success:
+                    # 没有消息或处理失败,适当等待避免频繁请求
+                    await asyncio.sleep(1)
             except Exception as e:
-                print(f"[拉取失败] {e}")
-                await asyncio.sleep(2)
+                self.logger.error(f"消费循环异常: {e}", exc_info=True)
+                await asyncio.sleep(5)  # 发生异常时等待较长时间
+
+
+async def process_message(message: Message) -> None:
+    """示例消息处理函数"""
+    self.logger.info(f"开始处理消息: {message.message_body}")
+    # 模拟处理耗时操作
+    await asyncio.sleep(2)
+    self.logger.info(f"消息处理完成")
+
+
+if __name__ == '__main__':
+
+    async def run_consumer():
+        consumer = AsyncRocketMQConsumer(
+            topic_name="ynfqmm_recommend_prod",
+            group_id="ynfqmm_recommend_prod",
+            wait_seconds=10,  # 长轮询等待时间
+        )
+        await consumer.run_single_threaded(process_message)
+
+
+    asyncio.run(run_consumer())

+ 30 - 24
services/pipeline.py

@@ -142,30 +142,36 @@ class PiaoQuanPipeline:
         return True
 
     def download_rule_flag(self) -> bool:
-        for key, rule in self.rule_dict.items():
-            if key == "period":
-                continue
-            item_value = int(self.item.get(key, 0))
-            min_v = int(rule.get("min", 0))
-            max_v = int(rule.get("max", 999999999))
-            if not (min_v <= item_value <= max_v):
-                msg = f"[{key} 校验失败] value={item_value}, expected=[{min_v}, {max_v}]"
-                self.logger.warning(msg)
-                self.aliyun_log.logging(
-                    code="2004",
-                    trace_id=self.trace_id,
-                    data={
-                        "item": self.item,
-                        "field": key,
-                        "item_value": item_value,
-                        "min": min_v,
-                        "max": max_v
-                    },
-                    message=msg,
-                    account=self.account
+        """
+          视频基础下载规则
+          :return:
+        """
+        for key in self.item:
+            if self.rule_dict.get(key):
+                max_value = (
+                    int(self.rule_dict[key]["max"])
+                    if int(self.rule_dict[key]["max"]) > 0
+                    else 999999999999999
                 )
-                return False
-        return True
+                if key == "peroid":  # peroid是抓取周期天数
+                    continue
+                else:
+                    flag = int(self.rule_dict[key]["min"]) <= int(self.item[key]) <= max_value
+                    if not flag:
+                        self.aliyun_log.logging(
+                            code="2004",
+                            trace_id=self.trace_id,
+                            data=self.item,
+                            message="{}: {} <= {} <= {}, {}".format(
+                                key,
+                                self.rule_dict[key]["min"],
+                                self.item[key],
+                                max_value,
+                                flag,
+                            ),
+                            account=self.account
+                        )
+                        return flag
 
     async def repeat_video(self) -> bool:
         out_id = self.item.get("out_video_id")
@@ -173,7 +179,7 @@ class PiaoQuanPipeline:
 
         bypass_platforms = {
             "zhufuniannianshunxinjixiang", "weiquanshipin", "piaoquangushi", "lepaoledong", "zhufukuaizhuan",
-            "linglingkuailezhufu", "lepaoledongdijie", "jierizhufuhuakaifugui", "haoyunzhufuduo",
+            "linglingkuailezhufu", "lepaoledongdijie", "jierizhufuhuakaifugui","yuannifuqimanman", "haoyunzhufuduo",
             "quzhuan", "zhufudewenhou", "jierizhufuxingfujixiang", "haoyoushipin", "xinshiquan",
             "laonianshenghuokuaile", "laonianquan"
         }