import json import traceback from typing import List import asyncio from core.utils.log.logger_manager import LoggerManager from core.utils.trace_utils import generate_trace_id from services.async_mysql_service import AsyncMysqlService from spiders.spider_registry import get_spider_class async def async_handle_topic(topic: str): """ 单个 topic 的消费逻辑,运行在协程中: - 从 MQ 中消费消息; - 根据消息内容执行对应爬虫; - 使用异步数据库服务查询配置; - 记录日志、确认消息。 """ logger = LoggerManager.get_logger() aliyun_logger = LoggerManager.get_aliyun_logger() # 每个 topic 创建独立的 consumer 实例 from services.rocketmq_consumer import AsyncRocketMQConsumer consumer = AsyncRocketMQConsumer(topic_name=topic, group_id=topic) async def handle_single_message(message): trace_id = generate_trace_id() try: payload = json.loads(message.message_body) task_id = payload["id"] async with AsyncMysqlService("system", "crawler") as mysql: user_list = await mysql.get_user_list(task_id) rule_dict = await mysql.get_rule_dict(task_id) CrawlerClass = get_spider_class(topic) crawler = CrawlerClass( rule_dict=rule_dict, user_list=user_list, trace_id=trace_id ) await crawler.run() # ack 由 run 成功后执行 await consumer.ack_message(message.receipt_handle) except Exception as e: aliyun_logger.logging( code="9001", message=f"处理消息失败: {str(e)}", trace_id=trace_id, data={ "error_type": type(e).__name__, "stack_trace": traceback.format_exc(), "message_body": message.message_body } ) # 消费循环启动 await consumer.run_forever(handle_single_message) async def run_all_topics(topics: List[str]): """ 启动当前进程中所有 topic 的协程监听任务。 初始化全局 AsyncMysqlService 实例。 """ tasks = [asyncio.create_task(async_handle_topic(topic)) for topic in topics] await asyncio.gather(*tasks) def handle_topic_group(topics: List[str]): """子进程入口函数:启动异步事件循环处理该组 topics。""" asyncio.run(run_all_topics(topics))