import asyncio import json import traceback from typing import List import signal from core.utils.log.logger_manager import LoggerManager from core.utils.trace_utils import generate_trace_id from services.async_mysql_service import AsyncMysqlService from spiders.spider_registry import get_spider_class async def async_handle_topic(topic: str,stop_event: asyncio.Event): """ 单个 topic 的消费逻辑,运行在协程中: - 从 MQ 中消费消息; - 根据消息内容执行对应爬虫; - 使用异步数据库服务查询配置; - 记录日志、确认消息。 """ logger = LoggerManager.get_logger() aliyun_logger = LoggerManager.get_aliyun_logger() # 每个 topic 创建独立的 consumer 实例 from services.rocketmq_consumer import AsyncRocketMQConsumer consumer = AsyncRocketMQConsumer(topic_name=topic, group_id=topic) async def handle_single_message(message): trace_id = generate_trace_id() try: payload = json.loads(message.message_body) task_id = payload["id"] logger.info(f"{trace_id} - 接收到任务消息: {task_id}") async with AsyncMysqlService("system", "crawler") as mysql: user_list = await mysql.get_user_list(task_id) rule_dict = await mysql.get_rule_dict(task_id) CrawlerClass = get_spider_class(topic) crawler = CrawlerClass( rule_dict=rule_dict, user_list=user_list, trace_id=trace_id ) await crawler.run() # ack 由 run 成功后执行 await consumer.ack_message(message.receipt_handle) logger.info(f"{trace_id} - 任务 {task_id} 执行成功并已 Ack") aliyun_logger.logging( code="2000", message="任务执行成功", trace_id=trace_id, data={ "task_id": task_id, "topic": topic } ) except Exception as e: logger.error(f"{trace_id} - 任务处理失败: {e} /n traceback.format_exc()") aliyun_logger.logging( code="9001", message=f"处理消息失败: {str(e)}", trace_id=trace_id, data={ "error_type": type(e).__name__, "stack_trace": traceback.format_exc(), "message_body": message.message_body } ) # 自动重启消费循环 while not stop_event.is_set(): try: await consumer.run_forever(handle_single_message) except Exception as e: aliyun_logger.logging( code="9002", message=f"{topic} 消费循环异常即将重启: {str(e)}", data={ "error_type": type(e).__name__, "stack_trace": traceback.format_exc(), } ) logger.warning(f"[{topic}] 消费循环异常: {e},5秒后重启") await asyncio.sleep(5) async def run_all_topics(topics: List[str]): stop_event = asyncio.Event() loop = asyncio.get_running_loop() def shutdown(): print("[系统] 收到停止信号,准备优雅退出...") stop_event.set() for sig in [signal.SIGINT, signal.SIGTERM]: loop.add_signal_handler(sig, shutdown) tasks = [asyncio.create_task(async_handle_topic(topic, stop_event)) for topic in topics] await stop_event.wait() # 等待停止信号 print("[系统] 正在取消所有消费任务...") for task in tasks: task.cancel() results = await asyncio.gather(*tasks, return_exceptions=True) for idx, result in enumerate(results): if isinstance(result, Exception): print(f"[系统] 任务 {topics[idx]} 异常退出: {result}") print("[系统] 所有任务已退出,进程关闭") def handle_topic_group(topics: List[str]): """子进程入口函数:启动异步事件循环处理该组 topics。""" asyncio.run(run_all_topics(topics))