rabbitmq_consumer.py 3.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990
  1. import pika
  2. import asyncio
  3. import json
  4. from .universal_crawler import AsyncCrawler
  5. from .utils.log_config import setup_logger
  6. class RabbitMQConsumer:
  7. def __init__(self, config_path: str):
  8. self.config_path = config_path
  9. self.aliyun_log = setup_logger("rabbitmq_consumer", "system")
  10. self.consumer_tag = None
  11. def connect(self):
  12. """连接到RabbitMQ"""
  13. try:
  14. with open('config/rabbitmq_config.yaml', 'r', encoding='utf-8') as f:
  15. rabbit_config = json.load(f)
  16. self.connection = pika.BlockingConnection(
  17. pika.ConnectionParameters(
  18. host=rabbit_config.get('host', 'localhost'),
  19. port=rabbit_config.get('port', 5672),
  20. credentials=pika.PlainCredentials(
  21. rabbit_config.get('username', 'guest'),
  22. rabbit_config.get('password', 'guest')
  23. )
  24. )
  25. )
  26. self.channel = self.connection.channel()
  27. self.aliyun_log.info("成功连接到RabbitMQ")
  28. return True
  29. except Exception as e:
  30. self.aliyun_log.error(f"连接RabbitMQ失败: {str(e)}")
  31. return False
  32. async def process_message(self, ch, method, properties, body):
  33. """处理消息"""
  34. task = json.loads(body)
  35. self.aliyun_log.info(f"收到任务: {task.get('task_id', '未知ID')}")
  36. platform = task.get('platform', 'unknown_platform')
  37. mode = task.get('mode', 'recommend')
  38. crawler = AsyncCrawler(platform, mode, self.config_path)
  39. try:
  40. await crawler.run()
  41. ch.basic_ack(delivery_tag=method.delivery_tag)
  42. self.aliyun_log.info(f"任务完成: {task.get('task_id', '未知ID')}")
  43. except Exception as e:
  44. self.aliyun_log.error(f"处理任务异常: {str(e)}")
  45. # 重新排队
  46. ch.basic_nack(delivery_tag=method.delivery_tag, requeue=True)
  47. def start_consuming(self):
  48. """开始消费消息"""
  49. if not self.connect():
  50. return
  51. queue_name = self.setup_queue()
  52. if not queue_name:
  53. return
  54. try:
  55. self.channel.basic_consume(
  56. queue=queue_name,
  57. on_message_callback=self._sync_process_message,
  58. auto_ack=False
  59. )
  60. self.aliyun_log.info(f"开始消费队列: {queue_name}")
  61. self.channel.start_consuming()
  62. except KeyboardInterrupt:
  63. self.channel.stop_consuming()
  64. except Exception as e:
  65. self.aliyun_log.error(f"消费消息失败: {str(e)}")
  66. finally:
  67. self.connection.close()
  68. def _sync_process_message(self, ch, method, properties, body):
  69. """同步包装异步处理函数"""
  70. asyncio.run(self.process_message(ch, method, properties, body))
  71. def main():
  72. consumer = RabbitMQConsumer("config/platform_config.yaml")
  73. consumer.start_consuming()
  74. if __name__ == "__main__":
  75. main()