package com.tzld.piaoquan.api.mq; import com.alibaba.fastjson.JSON; import com.aliyun.openservices.ons.api.Message; import com.aliyun.openservices.ons.api.SendResult; import com.aliyun.openservices.ons.api.bean.ProducerBean; import com.tzld.piaoquan.api.model.vo.CallbackParam; import com.tzld.piaoquan.api.service.GhAccessTokenService; import com.tzld.piaoquan.growth.common.common.base.CommonResponse; import com.tzld.piaoquan.growth.common.common.enums.ExceptionCodeEnum; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Component; import java.nio.charset.StandardCharsets; @Slf4j @Component public class MessageCallbackProducer { @Value("${pushMessage.callback.topic}") private String TOPIC; @Value("${pushMessage.callback.tag}") private String TAG; @Autowired private ProducerBean producer; @Autowired private GhAccessTokenService ghAccessTokenService; public CommonResponse sendMessage(CallbackParam param) { if (param == null || param.getAccessToken() == null) { return CommonResponse.create(ExceptionCodeEnum.PARAMS_ERROR, "参数错误"); } if (!ghAccessTokenService.validateAccessToken(param.getAccessToken())) { return CommonResponse.create(ExceptionCodeEnum.PARAMS_ERROR, "accessToken错误或者已失效"); } Message message = new Message(); message.setTopic(TOPIC); message.setTag(TAG); message.setBody(JSON.toJSONString(param).getBytes(StandardCharsets.UTF_8)); try { log.info("sendMessage = {}", message); producer.send(message); } catch (Exception e) { log.error("error send param = {}", param); log.error("error", e); //重试 retry(message); } return CommonResponse.success(); } private void retry(Message message) { for (int i = 0; i < 3; i++) { try { SendResult sendResult = producer.send(message); log.info("sendResult = {}", sendResult); return; } catch (Exception e) { log.error("retry send error {}", i, e); } } } }