MessageCallbackProducer.java 2.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869
  1. package com.tzld.piaoquan.api.mq;
  2. import com.alibaba.fastjson.JSON;
  3. import com.aliyun.openservices.ons.api.Message;
  4. import com.aliyun.openservices.ons.api.SendResult;
  5. import com.aliyun.openservices.ons.api.bean.ProducerBean;
  6. import com.tzld.piaoquan.api.model.vo.CallbackParam;
  7. import com.tzld.piaoquan.api.service.GhAccessTokenService;
  8. import com.tzld.piaoquan.growth.common.common.base.CommonResponse;
  9. import com.tzld.piaoquan.growth.common.common.enums.ExceptionCodeEnum;
  10. import lombok.extern.slf4j.Slf4j;
  11. import org.springframework.beans.factory.annotation.Autowired;
  12. import org.springframework.beans.factory.annotation.Value;
  13. import org.springframework.stereotype.Component;
  14. import java.nio.charset.StandardCharsets;
  15. @Slf4j
  16. @Component
  17. public class MessageCallbackProducer {
  18. @Value("${pushMessage.callback.topic}")
  19. private String TOPIC;
  20. @Value("${pushMessage.callback.tag}")
  21. private String TAG;
  22. @Autowired
  23. private ProducerBean producer;
  24. @Autowired
  25. private GhAccessTokenService ghAccessTokenService;
  26. public CommonResponse<Void> sendMessage(CallbackParam param) {
  27. if (param == null || param.getAccessToken() == null) {
  28. return CommonResponse.create(ExceptionCodeEnum.PARAMS_ERROR, "参数错误");
  29. }
  30. if (!ghAccessTokenService.validateAccessToken(param.getAccessToken())) {
  31. return CommonResponse.create(ExceptionCodeEnum.PARAMS_ERROR, "accessToken错误或者已失效");
  32. }
  33. Message message = new Message();
  34. message.setTopic(TOPIC);
  35. message.setTag(TAG);
  36. message.setBody(JSON.toJSONString(param).getBytes(StandardCharsets.UTF_8));
  37. try {
  38. log.info("sendMessage = {}", message);
  39. producer.send(message);
  40. } catch (Exception e) {
  41. log.error("error send param = {}", param);
  42. log.error("error", e);
  43. //重试
  44. retry(message);
  45. }
  46. return CommonResponse.success();
  47. }
  48. private void retry(Message message) {
  49. for (int i = 0; i < 3; i++) {
  50. try {
  51. SendResult sendResult = producer.send(message);
  52. log.info("sendResult = {}", sendResult);
  53. return;
  54. } catch (Exception e) {
  55. log.error("retry send error {}", i, e);
  56. }
  57. }
  58. }
  59. }