123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869 |
- package com.tzld.piaoquan.api.mq;
- import com.alibaba.fastjson.JSON;
- import com.aliyun.openservices.ons.api.Message;
- import com.aliyun.openservices.ons.api.SendResult;
- import com.aliyun.openservices.ons.api.bean.ProducerBean;
- import com.tzld.piaoquan.api.model.vo.CallbackParam;
- import com.tzld.piaoquan.api.service.GhAccessTokenService;
- import com.tzld.piaoquan.growth.common.common.base.CommonResponse;
- import com.tzld.piaoquan.growth.common.common.enums.ExceptionCodeEnum;
- import lombok.extern.slf4j.Slf4j;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.beans.factory.annotation.Value;
- import org.springframework.stereotype.Component;
- import java.nio.charset.StandardCharsets;
- @Slf4j
- @Component
- public class MessageCallbackProducer {
- @Value("${pushMessage.callback.topic}")
- private String TOPIC;
- @Value("${pushMessage.callback.tag}")
- private String TAG;
- @Autowired
- private ProducerBean producer;
- @Autowired
- private GhAccessTokenService ghAccessTokenService;
- public CommonResponse<Void> sendMessage(CallbackParam param) {
- if (param == null || param.getAccessToken() == null) {
- return CommonResponse.create(ExceptionCodeEnum.PARAMS_ERROR, "参数错误");
- }
- if (!ghAccessTokenService.validateAccessToken(param.getAccessToken())) {
- return CommonResponse.create(ExceptionCodeEnum.PARAMS_ERROR, "accessToken错误或者已失效");
- }
- Message message = new Message();
- message.setTopic(TOPIC);
- message.setTag(TAG);
- message.setBody(JSON.toJSONString(param).getBytes(StandardCharsets.UTF_8));
- try {
- log.info("sendMessage = {}", message);
- producer.send(message);
- } catch (Exception e) {
- log.error("error send param = {}", param);
- log.error("error", e);
- //重试
- retry(message);
- }
- return CommonResponse.success();
- }
- private void retry(Message message) {
- for (int i = 0; i < 3; i++) {
- try {
- SendResult sendResult = producer.send(message);
- log.info("sendResult = {}", sendResult);
- return;
- } catch (Exception e) {
- log.error("retry send error {}", i, e);
- }
- }
- }
- }
|