import pika import asyncio import json from .universal_crawler import AsyncCrawler from .utils.log_config import setup_logger class RabbitMQConsumer: def __init__(self, config_path: str): self.config_path = config_path self.aliyun_log = setup_logger("rabbitmq_consumer", "system") self.consumer_tag = None def connect(self): """连接到RabbitMQ""" try: with open('config/rabbitmq_config.yaml', 'r', encoding='utf-8') as f: rabbit_config = json.load(f) self.connection = pika.BlockingConnection( pika.ConnectionParameters( host=rabbit_config.get('host', 'localhost'), port=rabbit_config.get('port', 5672), credentials=pika.PlainCredentials( rabbit_config.get('username', 'guest'), rabbit_config.get('password', 'guest') ) ) ) self.channel = self.connection.channel() self.aliyun_log.info("成功连接到RabbitMQ") return True except Exception as e: self.aliyun_log.error(f"连接RabbitMQ失败: {str(e)}") return False async def process_message(self, ch, method, properties, body): """处理消息""" task = json.loads(body) self.aliyun_log.info(f"收到任务: {task.get('task_id', '未知ID')}") platform = task.get('platform', 'unknown_platform') mode = task.get('mode', 'recommend') crawler = AsyncCrawler(platform, mode, self.config_path) try: await crawler.run() ch.basic_ack(delivery_tag=method.delivery_tag) self.aliyun_log.info(f"任务完成: {task.get('task_id', '未知ID')}") except Exception as e: self.aliyun_log.error(f"处理任务异常: {str(e)}") # 重新排队 ch.basic_nack(delivery_tag=method.delivery_tag, requeue=True) def start_consuming(self): """开始消费消息""" if not self.connect(): return queue_name = self.setup_queue() if not queue_name: return try: self.channel.basic_consume( queue=queue_name, on_message_callback=self._sync_process_message, auto_ack=False ) self.aliyun_log.info(f"开始消费队列: {queue_name}") self.channel.start_consuming() except KeyboardInterrupt: self.channel.stop_consuming() except Exception as e: self.aliyun_log.error(f"消费消息失败: {str(e)}") finally: self.connection.close() def _sync_process_message(self, ch, method, properties, body): """同步包装异步处理函数""" asyncio.run(self.process_message(ch, method, properties, body)) def main(): consumer = RabbitMQConsumer("config/platform_config.yaml") consumer.start_consuming() if __name__ == "__main__": main()