12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273 |
- package com.tzld.piaoquan.api.config;
- import com.aliyun.openservices.ons.api.MessageListener;
- import com.aliyun.openservices.ons.api.PropertyKeyConst;
- import com.aliyun.openservices.ons.api.bean.ConsumerBean;
- import com.aliyun.openservices.ons.api.bean.ProducerBean;
- import com.aliyun.openservices.ons.api.bean.Subscription;
- import com.tzld.piaoquan.api.mq.MessageCallbackCustomer;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.beans.factory.annotation.Value;
- import org.springframework.context.annotation.Bean;
- import org.springframework.context.annotation.Configuration;
- import java.util.HashMap;
- import java.util.Map;
- import java.util.Properties;
- @Configuration
- public class MqConfig {
- @Value("${rocketmq.accessKey}")
- private String accessKey;
- @Value("${rocketmq.secretKey}")
- private String secretKey;
- @Value("${rocketmq.nameSrvAddr}")
- private String nameSrvAddr;
- @Value("${pushMessage.callback.topic}")
- private String callbackTopic;
- @Value("${pushMessage.callback.tag}")
- private String callbackTag;
- @Value("${pushMessage.callback.groupId}")
- private String callbackGroupId;
- @Autowired
- private MessageCallbackCustomer messageCallbackCustomer;
- public Properties getMqProperties() {
- Properties properties = new Properties();
- properties.setProperty(PropertyKeyConst.AccessKey, this.accessKey);
- properties.setProperty(PropertyKeyConst.SecretKey, this.secretKey);
- properties.setProperty(PropertyKeyConst.NAMESRV_ADDR, this.nameSrvAddr);
- return properties;
- }
- @Bean(initMethod = "start", destroyMethod = "shutdown")
- public ProducerBean buildProducer() {
- ProducerBean producer = new ProducerBean();
- producer.setProperties(getMqProperties());
- return producer;
- }
- @Bean(initMethod = "start", destroyMethod = "shutdown")
- public ConsumerBean buildConsumer() {
- ConsumerBean consumerBean = new ConsumerBean();
- //配置文件
- Properties properties = getMqProperties();
- properties.setProperty(PropertyKeyConst.ConsumeThreadNums, "20");
- properties.setProperty(PropertyKeyConst.GROUP_ID, callbackGroupId);
- consumerBean.setProperties(properties);
- //订阅关系
- Map<Subscription, MessageListener> subscriptionTable = new HashMap<>();
- Subscription subscription = new Subscription();
- subscription.setTopic(callbackTopic);
- subscription.setExpression(callbackTag);
- subscriptionTable.put(subscription, messageCallbackCustomer);
- //订阅多个topic如上面设置
- consumerBean.setSubscriptionTable(subscriptionTable);
- return consumerBean;
- }
- }
|