123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990 |
- import pika
- import asyncio
- import json
- from .universal_crawler import AsyncCrawler
- from .utils.log_config import setup_logger
- class RabbitMQConsumer:
- def __init__(self, config_path: str):
- self.config_path = config_path
- self.aliyun_log = setup_logger("rabbitmq_consumer", "system")
- self.consumer_tag = None
- def connect(self):
- """连接到RabbitMQ"""
- try:
- with open('config/rabbitmq_config.yaml', 'r', encoding='utf-8') as f:
- rabbit_config = json.load(f)
- self.connection = pika.BlockingConnection(
- pika.ConnectionParameters(
- host=rabbit_config.get('host', 'localhost'),
- port=rabbit_config.get('port', 5672),
- credentials=pika.PlainCredentials(
- rabbit_config.get('username', 'guest'),
- rabbit_config.get('password', 'guest')
- )
- )
- )
- self.channel = self.connection.channel()
- self.aliyun_log.info("成功连接到RabbitMQ")
- return True
- except Exception as e:
- self.aliyun_log.error(f"连接RabbitMQ失败: {str(e)}")
- return False
- async def process_message(self, ch, method, properties, body):
- """处理消息"""
- task = json.loads(body)
- self.aliyun_log.info(f"收到任务: {task.get('task_id', '未知ID')}")
- platform = task.get('platform', 'unknown_platform')
- mode = task.get('mode', 'recommend')
- crawler = AsyncCrawler(platform, mode, self.config_path)
- try:
- await crawler.run()
- ch.basic_ack(delivery_tag=method.delivery_tag)
- self.aliyun_log.info(f"任务完成: {task.get('task_id', '未知ID')}")
- except Exception as e:
- self.aliyun_log.error(f"处理任务异常: {str(e)}")
- # 重新排队
- ch.basic_nack(delivery_tag=method.delivery_tag, requeue=True)
- def start_consuming(self):
- """开始消费消息"""
- if not self.connect():
- return
- queue_name = self.setup_queue()
- if not queue_name:
- return
- try:
- self.channel.basic_consume(
- queue=queue_name,
- on_message_callback=self._sync_process_message,
- auto_ack=False
- )
- self.aliyun_log.info(f"开始消费队列: {queue_name}")
- self.channel.start_consuming()
- except KeyboardInterrupt:
- self.channel.stop_consuming()
- except Exception as e:
- self.aliyun_log.error(f"消费消息失败: {str(e)}")
- finally:
- self.connection.close()
- def _sync_process_message(self, ch, method, properties, body):
- """同步包装异步处理函数"""
- asyncio.run(self.process_message(ch, method, properties, body))
- def main():
- consumer = RabbitMQConsumer("config/platform_config.yaml")
- consumer.start_consuming()
- if __name__ == "__main__":
- main()
|