mq_consumer.py 875 B

123456789101112131415161718192021222324252627282930
  1. import time
  2. import rocketmq
  3. from pqai_agent import configs
  4. if __name__ == '__main__':
  5. credentials = rocketmq.Credentials()
  6. mq_conf = configs.get()['mq']
  7. rmq_client_conf = rocketmq.ClientConfiguration(mq_conf['endpoints'], credentials, mq_conf['instance_id'])
  8. print(rmq_client_conf)
  9. rmq_topic = 'agent_push_tasks'
  10. rmq_group = 'agent_push_generate_task'
  11. consumer = rocketmq.SimpleConsumer(rmq_client_conf, rmq_group, await_duration=5)
  12. consumer.startup()
  13. time.sleep(1)
  14. consumer.subscribe(rmq_topic)
  15. time.sleep(1)
  16. while True:
  17. t1 = time.time()
  18. msgs = consumer.receive(1, 10)
  19. if not msgs:
  20. break
  21. msg = msgs[0]
  22. for msg in msgs:
  23. msg_body = msg.body.decode('utf-8')
  24. print(f"received message: {msg_body}")
  25. consumer.ack(msg)
  26. time.sleep(1)