MqConfig.java 2.7 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273
  1. package com.tzld.piaoquan.api.config;
  2. import com.aliyun.openservices.ons.api.MessageListener;
  3. import com.aliyun.openservices.ons.api.PropertyKeyConst;
  4. import com.aliyun.openservices.ons.api.bean.ConsumerBean;
  5. import com.aliyun.openservices.ons.api.bean.ProducerBean;
  6. import com.aliyun.openservices.ons.api.bean.Subscription;
  7. import com.tzld.piaoquan.api.mq.MessageCallbackCustomer;
  8. import org.springframework.beans.factory.annotation.Autowired;
  9. import org.springframework.beans.factory.annotation.Value;
  10. import org.springframework.context.annotation.Bean;
  11. import org.springframework.context.annotation.Configuration;
  12. import java.util.HashMap;
  13. import java.util.Map;
  14. import java.util.Properties;
  15. @Configuration
  16. public class MqConfig {
  17. @Value("${rocketmq.accessKey}")
  18. private String accessKey;
  19. @Value("${rocketmq.secretKey}")
  20. private String secretKey;
  21. @Value("${rocketmq.nameSrvAddr}")
  22. private String nameSrvAddr;
  23. @Value("${pushMessage.callback.topic}")
  24. private String callbackTopic;
  25. @Value("${pushMessage.callback.tag}")
  26. private String callbackTag;
  27. @Value("${pushMessage.callback.groupId}")
  28. private String callbackGroupId;
  29. @Autowired
  30. private MessageCallbackCustomer messageCallbackCustomer;
  31. public Properties getMqProperties() {
  32. Properties properties = new Properties();
  33. properties.setProperty(PropertyKeyConst.AccessKey, this.accessKey);
  34. properties.setProperty(PropertyKeyConst.SecretKey, this.secretKey);
  35. properties.setProperty(PropertyKeyConst.NAMESRV_ADDR, this.nameSrvAddr);
  36. return properties;
  37. }
  38. @Bean(initMethod = "start", destroyMethod = "shutdown")
  39. public ProducerBean buildProducer() {
  40. ProducerBean producer = new ProducerBean();
  41. producer.setProperties(getMqProperties());
  42. return producer;
  43. }
  44. @Bean(initMethod = "start", destroyMethod = "shutdown")
  45. public ConsumerBean buildConsumer() {
  46. ConsumerBean consumerBean = new ConsumerBean();
  47. //配置文件
  48. Properties properties = getMqProperties();
  49. properties.setProperty(PropertyKeyConst.ConsumeThreadNums, "20");
  50. properties.setProperty(PropertyKeyConst.GROUP_ID, callbackGroupId);
  51. consumerBean.setProperties(properties);
  52. //订阅关系
  53. Map<Subscription, MessageListener> subscriptionTable = new HashMap<>();
  54. Subscription subscription = new Subscription();
  55. subscription.setTopic(callbackTopic);
  56. subscription.setExpression(callbackTag);
  57. subscriptionTable.put(subscription, messageCallbackCustomer);
  58. //订阅多个topic如上面设置
  59. consumerBean.setSubscriptionTable(subscriptionTable);
  60. return consumerBean;
  61. }
  62. }