import asyncio import json import traceback from typing import List import signal from core.utils.log.logger_manager import LoggerManager from core.utils.trace_utils import generate_trace_id, TraceContext from services.async_mysql_service import AsyncMysqlService from spiders.spider_registry import get_spider_class logger = LoggerManager.get_logger() aliyun_logger = LoggerManager.get_aliyun_logger() async def async_handle_topic(topic: str, stop_event: asyncio.Event): """ 单个 topic 的消费逻辑,运行在协程中: - 从 MQ 中消费消息(单条处理,处理完再拉取下一条); - 根据消息内容执行对应爬虫; - 使用异步数据库服务查询配置; - 记录日志、确认消息。 """ # 每个 topic 创建独立的 consumer 实例(使用优化后的 AsyncRocketMQConsumer) from services.async_mq_consumer import AsyncRocketMQConsumer consumer = AsyncRocketMQConsumer(topic_name=topic, group_id=topic) async def handle_single_message(message): """处理单条消息的业务逻辑(不含拉取和循环)""" with TraceContext() as trace_id: # 生成 trace_id 并绑定到上下文 try: payload = json.loads(message.message_body) task_id = payload["id"] logger.info(f"[{topic}]接收到任务消息: {task_id}") aliyun_logger.logging( code="1000", message="任务接收成功", data=payload, account=topic ) # 从数据库查询配置 async with AsyncMysqlService() as mysql: user_list = await mysql.get_user_list(task_id) rule_dict = await mysql.get_rule_dict(task_id) # 执行爬虫任务 CrawlerClass = get_spider_class(topic) crawler = CrawlerClass( rule_dict=rule_dict, user_list=user_list, ) await crawler.run() # 爬虫成功执行后再确认消息 # 确认消息(单条消息处理成功后才 Ack) await consumer.ack_message(message.receipt_handle) logger.info(f"[{topic}]任务 {task_id} 执行成功并已 Ack") aliyun_logger.logging( code="1010", message="任务执行成功", data={"task_id": task_id, "topic": topic}, account=topic ) except Exception as e: logger.error(f"[{topic}]任务处理失败: {e} \n {traceback.format_exc()}") aliyun_logger.logging( code="9001", message=f"处理消息失败: {str(e)} \n {traceback.format_exc()}", data={ "error_type": type(e).__name__, "stack_trace": traceback.format_exc(), "message_body": message.message_body }, account=topic ) # 处理失败不 Ack,消息会被 MQ 重新投递(依赖 MQ 的重试机制) # 独立的消费循环:拉取消息并调用处理函数 async def consume_loop(): logger.info(f"[{topic}] 启动消费循环,开始拉取消息...") while not stop_event.is_set(): # 监听停止信号,支持优雅退出 try: # 拉取单条消息(依赖优化后的 receive_message,无消息时返回 None 不报错) message = await consumer.receive_message() if message: # 有消息则处理,处理完成后再进入下一次循环 await handle_single_message(message) else: # 无消息时短暂休眠,避免频繁空轮询 await asyncio.sleep(1) except Exception as e: # 非消息处理的异常(如 MQ 连接失败),记录并重试 logger.error(f"[{topic}] 消费循环异常: {e}", exc_info=True) aliyun_logger.logging( code="9002", message=f"{topic} 消费循环异常,即将重试: {str(e)}", data={"error_type": type(e).__name__, "stack_trace": traceback.format_exc()}, account=topic ) await asyncio.sleep(5) # 异常后延迟重试,减轻服务压力 logger.info(f"[{topic}] 消费循环已停止(收到退出信号)") # 启动消费循环(这是消费逻辑的入口) await consume_loop() async def run_all_topics(topics: List[str]): stop_event = asyncio.Event() loop = asyncio.get_running_loop() def shutdown(): """处理停止信号(如 Ctrl+C),触发优雅退出""" logger.warning("[系统] 收到停止信号,准备优雅退出...") aliyun_logger.logging( code="1600", message="[系统] 收到停止信号,准备优雅退出...", ) stop_event.set() # 注册信号处理(支持 Ctrl+C 和 kill 命令) for sig in [signal.SIGINT, signal.SIGTERM]: loop.add_signal_handler(sig, shutdown) # 为每个 topic 创建独立协程任务 tasks = [asyncio.create_task(async_handle_topic(topic, stop_event)) for topic in topics] await stop_event.wait() # 等待退出信号 # 取消所有任务并等待结束 logger.warning(f"[系统] 正在取消所有消费任务...") for task in tasks: task.cancel() # 收集任务结果,忽略取消异常 results = await asyncio.gather(*tasks, return_exceptions=True) for idx, result in enumerate(results): if isinstance(result, Exception) and not isinstance(result, asyncio.CancelledError): logger.error(f"[系统] 任务 {topics[idx]} 异常退出: {result}") logger.warning(f"[系统] 所有任务已退出,进程已关闭...") aliyun_logger.logging( code="1602", message="[系统] 所有任务已退出,进程已关闭...", data={"task_count": len(tasks)} ) def handle_topic_group(topics: List[str]): """子进程入口函数:启动异步事件循环处理该组 topics。""" asyncio.run(run_all_topics(topics))