package com.tzld.piaoquan.api.config; import com.aliyun.openservices.ons.api.MessageListener; import com.aliyun.openservices.ons.api.PropertyKeyConst; import com.aliyun.openservices.ons.api.bean.ConsumerBean; import com.aliyun.openservices.ons.api.bean.ProducerBean; import com.aliyun.openservices.ons.api.bean.Subscription; import com.tzld.piaoquan.api.mq.MessageCallbackCustomer; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import java.util.HashMap; import java.util.Map; import java.util.Properties; @Configuration public class MqConfig { @Value("${rocketmq.accessKey}") private String accessKey; @Value("${rocketmq.secretKey}") private String secretKey; @Value("${rocketmq.nameSrvAddr}") private String nameSrvAddr; @Value("${pushMessage.callback.topic}") private String callbackTopic; @Value("${pushMessage.callback.tag}") private String callbackTag; @Value("${pushMessage.callback.groupId}") private String callbackGroupId; @Autowired private MessageCallbackCustomer messageCallbackCustomer; public Properties getMqProperties() { Properties properties = new Properties(); properties.setProperty(PropertyKeyConst.AccessKey, this.accessKey); properties.setProperty(PropertyKeyConst.SecretKey, this.secretKey); properties.setProperty(PropertyKeyConst.NAMESRV_ADDR, this.nameSrvAddr); return properties; } @Bean(initMethod = "start", destroyMethod = "shutdown") public ProducerBean buildProducer() { ProducerBean producer = new ProducerBean(); producer.setProperties(getMqProperties()); return producer; } @Bean(initMethod = "start", destroyMethod = "shutdown") public ConsumerBean buildConsumer() { ConsumerBean consumerBean = new ConsumerBean(); //配置文件 Properties properties = getMqProperties(); properties.setProperty(PropertyKeyConst.ConsumeThreadNums, "20"); properties.setProperty(PropertyKeyConst.GROUP_ID, callbackGroupId); consumerBean.setProperties(properties); //订阅关系 Map subscriptionTable = new HashMap<>(); Subscription subscription = new Subscription(); subscription.setTopic(callbackTopic); subscription.setExpression(callbackTag); subscriptionTable.put(subscription, messageCallbackCustomer); //订阅多个topic如上面设置 consumerBean.setSubscriptionTable(subscriptionTable); return consumerBean; } }