xueyiming hai 6 meses
pai
achega
bf75fbea43

+ 73 - 0
api-module/src/main/java/com/tzld/piaoquan/api/config/MqConfig.java

@@ -0,0 +1,73 @@
+package com.tzld.piaoquan.api.config;
+
+import com.aliyun.openservices.ons.api.MessageListener;
+import com.aliyun.openservices.ons.api.PropertyKeyConst;
+import com.aliyun.openservices.ons.api.bean.ConsumerBean;
+import com.aliyun.openservices.ons.api.bean.ProducerBean;
+import com.aliyun.openservices.ons.api.bean.Subscription;
+import com.tzld.piaoquan.api.mq.MessageCallbackCustomer;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+
+@Configuration
+public class MqConfig {
+
+    @Value("${rocketmq.accessKey}")
+    private String accessKey;
+    @Value("${rocketmq.secretKey}")
+    private String secretKey;
+    @Value("${rocketmq.nameSrvAddr}")
+    private String nameSrvAddr;
+    @Value("${pushMessage.callback.topic}")
+    private String callbackTopic;
+    @Value("${pushMessage.callback.tag}")
+    private String callbackTag;
+    @Value("${pushMessage.callback.groupId}")
+    private String callbackGroupId;
+
+    @Autowired
+    private MessageCallbackCustomer messageCallbackCustomer;
+
+    public Properties getMqProperties() {
+        Properties properties = new Properties();
+        properties.setProperty(PropertyKeyConst.AccessKey, this.accessKey);
+        properties.setProperty(PropertyKeyConst.SecretKey, this.secretKey);
+        properties.setProperty(PropertyKeyConst.NAMESRV_ADDR, this.nameSrvAddr);
+        return properties;
+    }
+
+    @Bean(initMethod = "start", destroyMethod = "shutdown")
+    public ProducerBean buildProducer() {
+        ProducerBean producer = new ProducerBean();
+        producer.setProperties(getMqProperties());
+        return producer;
+    }
+
+
+    @Bean(initMethod = "start", destroyMethod = "shutdown")
+    public ConsumerBean buildConsumer() {
+        ConsumerBean consumerBean = new ConsumerBean();
+        //配置文件
+        Properties properties = getMqProperties();
+        properties.setProperty(PropertyKeyConst.ConsumeThreadNums, "20");
+        properties.setProperty(PropertyKeyConst.GROUP_ID, callbackGroupId);
+        consumerBean.setProperties(properties);
+        //订阅关系
+        Map<Subscription, MessageListener> subscriptionTable = new HashMap<>();
+        Subscription subscription = new Subscription();
+        subscription.setTopic(callbackTopic);
+        subscription.setExpression(callbackTag);
+        subscriptionTable.put(subscription, messageCallbackCustomer);
+        //订阅多个topic如上面设置
+        consumerBean.setSubscriptionTable(subscriptionTable);
+        return consumerBean;
+    }
+
+
+}

+ 124 - 0
api-module/src/main/java/com/tzld/piaoquan/api/controller/TencentWeComController.java

@@ -0,0 +1,124 @@
+package com.tzld.piaoquan.api.controller;
+
+import com.alibaba.fastjson.JSONObject;
+import com.tzld.piaoquan.common.common.constant.WeComServerConstant;
+import com.tzld.piaoquan.api.service.UserService;
+import com.tzld.piaoquan.common.utils.wecom.WXBizMsgCrypt;
+import com.tzld.piaoquan.common.utils.wecom.WxUtil;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.StringUtils;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.web.bind.annotation.GetMapping;
+import org.springframework.web.bind.annotation.PostMapping;
+import org.springframework.web.bind.annotation.RequestMapping;
+import org.springframework.web.bind.annotation.RestController;
+
+import javax.servlet.ServletInputStream;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+import java.io.BufferedReader;
+import java.io.InputStreamReader;
+import java.io.PrintWriter;
+import java.util.Map;
+
+import static com.tzld.piaoquan.common.common.enums.CorpEnum.HNWQ;
+
+@Slf4j
+@RestController
+@RequestMapping("/wecom/server")
+public class TencentWeComController {
+
+
+    @Autowired
+    private UserService userService;
+
+    @GetMapping("/verify")
+    public void verifyGet(HttpServletRequest request, HttpServletResponse response) {
+        try {
+            // 微信加密签名
+            String msgSignature = request.getParameter("msg_signature");
+            // 时间戳
+            String timestamp = request.getParameter("timestamp");
+            // 随机数
+            String nonce = request.getParameter("nonce");
+            // 随机字符串
+            // 如果是刷新,需返回原echostr
+            String echoStr = request.getParameter("echostr");
+            // 微信加密签名
+            WXBizMsgCrypt wxcpt = new WXBizMsgCrypt(WeComServerConstant.TOKEN,
+                    WeComServerConstant.ENCODING_AES_KEY,
+                    WeComServerConstant.CORP_ID);
+
+            String sEchoStr = ""; //需要返回的明文
+            PrintWriter out;
+
+            sEchoStr = wxcpt.VerifyURL(msgSignature, timestamp,
+                    nonce, echoStr);
+            log.info("verifyurl echostr: " + sEchoStr);
+
+            // 验证URL成功,将sEchoStr返回
+            out = response.getWriter();
+            out.print(sEchoStr);
+        } catch (Exception e) {
+            //验证URL失败,错误原因请查看异常
+            log.error("verifyGet error", e);
+        }
+
+    }
+
+    /**
+     * 刷新 ticket
+     */
+    @PostMapping(value = "/verify")
+    public String verifyPost(HttpServletRequest request) {
+
+        try {
+            // 微信加密签名
+            String msg_signature = request.getParameter("msg_signature");
+            // 时间戳
+            String timestamp = request.getParameter("timestamp");
+            // 随机数
+            String nonce = request.getParameter("nonce");
+
+            String id = WeComServerConstant.CORP_ID;
+
+            WXBizMsgCrypt wxcpt = new WXBizMsgCrypt(WeComServerConstant.TOKEN, WeComServerConstant.ENCODING_AES_KEY, id);
+
+            StringBuilder postData = new StringBuilder();   // 密文,对应POST请求的数据
+            //1.获取加密的请求消息:使用输入流获得加密请求消息postData
+            ServletInputStream in = request.getInputStream();
+            BufferedReader reader = new BufferedReader(new InputStreamReader(in));
+
+            String tempStr = "";   //作为输出字符串的临时串,用于判断是否读取完毕
+            while (null != (tempStr = reader.readLine())) {
+                postData.append(tempStr);
+            }
+
+            String suiteXml = wxcpt.DecryptMsg(msg_signature, timestamp, nonce, postData.toString());
+            log.info("suiteXml: " + suiteXml);
+
+            Map suiteMap = WxUtil.transferXmlToMap(suiteXml);
+            log.info("suiteMap = {}", JSONObject.toJSONString(suiteMap));
+            if (suiteMap != null) {
+                String changeType = (String) suiteMap.get("ChangeType");
+                if (StringUtils.isNotEmpty(changeType) && changeType.equals("add_external_contact")) {
+                    String userId = (String) suiteMap.get("UserID");
+                    String externalUserId = (String) suiteMap.get("ExternalUserID");
+                    log.info("addStaffWithUser userId={} externalUserId={}", userId, externalUserId);
+                    userService.addStaffWithUser(externalUserId, userId, HNWQ.getId());
+                }
+
+                if (StringUtils.isNotEmpty(changeType) && changeType.equals("del_follow_user")) {
+                    String userId = (String) suiteMap.get("UserID");
+                    String externalUserId = (String) suiteMap.get("ExternalUserID");
+                    log.info("delStaffWithUser userId={} externalUserId={}", userId, externalUserId);
+                    userService.delStaffWithUser(externalUserId, userId, System.currentTimeMillis());
+                }
+            }
+        } catch (Exception e) {
+            log.error("verifyPost error", e);
+        }
+        String success = "success";
+        return success;
+    }
+}

+ 46 - 0
api-module/src/main/java/com/tzld/piaoquan/api/controller/ThirdPartyController.java

@@ -0,0 +1,46 @@
+package com.tzld.piaoquan.api.controller;
+
+
+import com.tzld.piaoquan.api.model.vo.*;
+import com.tzld.piaoquan.api.mq.MessageCallbackProducer;
+import com.tzld.piaoquan.api.service.AccessTokenService;
+import com.tzld.piaoquan.api.service.ThirdPartyService;
+import com.tzld.piaoquan.common.common.base.CommonResponse;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.web.bind.annotation.*;
+
+import java.util.List;
+
+@RestController
+@RequestMapping("/3rdParty")
+public class ThirdPartyController {
+
+    @Autowired
+    private AccessTokenService accessTokenService;
+
+    @Autowired
+    private ThirdPartyService thirdPartyService;
+
+    @Autowired
+    private MessageCallbackProducer messageCallbackProducer;
+
+    @PostMapping("/accessToken/get")
+    public CommonResponse<AccessTokenVo> getAccessToken(@RequestBody AccessTokenParam param) {
+        return accessTokenService.getAccessToken(param);
+    }
+
+    @PostMapping("/pushMessage/get")
+    public CommonResponse<List<PushMessageVo>> getPushMessage(@RequestBody PushMessageParam param) {
+        return thirdPartyService.getPushMessage(param);
+    }
+
+    @PostMapping("/pushMessage/callback")
+    public CommonResponse<Void> pushMessageCallback(@RequestBody CallbackParam param) {
+        return messageCallbackProducer.sendMessage(param);
+    }
+
+    @GetMapping("/report/uv")
+    public CommonResponse<List<ReportUvVo>> getReportUv(@RequestParam String date, @RequestParam String accessToken) {
+        return thirdPartyService.getReportUv(date, accessToken);
+    }
+}

+ 12 - 105
api-module/src/main/java/com/tzld/piaoquan/api/controller/WeComController.java

@@ -1,124 +1,31 @@
 package com.tzld.piaoquan.api.controller;
 
-import com.alibaba.fastjson.JSONObject;
-import com.tzld.piaoquan.common.common.constant.WeComServerConstant;
-import com.tzld.piaoquan.api.service.UserService;
-import com.tzld.piaoquan.common.utils.wecom.WXBizMsgCrypt;
-import com.tzld.piaoquan.common.utils.wecom.WxUtil;
+import com.tzld.piaoquan.api.model.vo.WeComPushMessageParam;
+import com.tzld.piaoquan.api.model.vo.WeComPushMessageVo;
+import com.tzld.piaoquan.api.service.WeComService;
+import com.tzld.piaoquan.common.common.base.CommonResponse;
 import lombok.extern.slf4j.Slf4j;
-import org.apache.commons.lang3.StringUtils;
 import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.web.bind.annotation.GetMapping;
 import org.springframework.web.bind.annotation.PostMapping;
+import org.springframework.web.bind.annotation.RequestBody;
 import org.springframework.web.bind.annotation.RequestMapping;
 import org.springframework.web.bind.annotation.RestController;
 
-import javax.servlet.ServletInputStream;
-import javax.servlet.http.HttpServletRequest;
-import javax.servlet.http.HttpServletResponse;
-import java.io.BufferedReader;
-import java.io.InputStreamReader;
-import java.io.PrintWriter;
-import java.util.Map;
-
-import static com.tzld.piaoquan.common.common.enums.CorpEnum.HNWQ;
+import java.util.List;
 
 @Slf4j
 @RestController
-@RequestMapping("/wecom/server")
+@RequestMapping("/wecom")
 public class WeComController {
 
 
     @Autowired
-    private UserService userService;
-
-    @GetMapping("/verify")
-    public void verifyGet(HttpServletRequest request, HttpServletResponse response) {
-        try {
-            // 微信加密签名
-            String msgSignature = request.getParameter("msg_signature");
-            // 时间戳
-            String timestamp = request.getParameter("timestamp");
-            // 随机数
-            String nonce = request.getParameter("nonce");
-            // 随机字符串
-            // 如果是刷新,需返回原echostr
-            String echoStr = request.getParameter("echostr");
-            // 微信加密签名
-            WXBizMsgCrypt wxcpt = new WXBizMsgCrypt(WeComServerConstant.TOKEN,
-                    WeComServerConstant.ENCODING_AES_KEY,
-                    WeComServerConstant.CORP_ID);
-
-            String sEchoStr = ""; //需要返回的明文
-            PrintWriter out;
-
-            sEchoStr = wxcpt.VerifyURL(msgSignature, timestamp,
-                    nonce, echoStr);
-            log.info("verifyurl echostr: " + sEchoStr);
-
-            // 验证URL成功,将sEchoStr返回
-            out = response.getWriter();
-            out.print(sEchoStr);
-        } catch (Exception e) {
-            //验证URL失败,错误原因请查看异常
-            log.error("verifyGet error", e);
-        }
-
-    }
-
-    /**
-     * 刷新 ticket
-     */
-    @PostMapping(value = "/verify")
-    public String verifyPost(HttpServletRequest request) {
-
-        try {
-            // 微信加密签名
-            String msg_signature = request.getParameter("msg_signature");
-            // 时间戳
-            String timestamp = request.getParameter("timestamp");
-            // 随机数
-            String nonce = request.getParameter("nonce");
-
-            String id = WeComServerConstant.CORP_ID;
-
-            WXBizMsgCrypt wxcpt = new WXBizMsgCrypt(WeComServerConstant.TOKEN, WeComServerConstant.ENCODING_AES_KEY, id);
-
-            StringBuilder postData = new StringBuilder();   // 密文,对应POST请求的数据
-            //1.获取加密的请求消息:使用输入流获得加密请求消息postData
-            ServletInputStream in = request.getInputStream();
-            BufferedReader reader = new BufferedReader(new InputStreamReader(in));
-
-            String tempStr = "";   //作为输出字符串的临时串,用于判断是否读取完毕
-            while (null != (tempStr = reader.readLine())) {
-                postData.append(tempStr);
-            }
-
-            String suiteXml = wxcpt.DecryptMsg(msg_signature, timestamp, nonce, postData.toString());
-            log.info("suiteXml: " + suiteXml);
+    private WeComService weComService;
 
-            Map suiteMap = WxUtil.transferXmlToMap(suiteXml);
-            log.info("suiteMap = {}", JSONObject.toJSONString(suiteMap));
-            if (suiteMap != null) {
-                String changeType = (String) suiteMap.get("ChangeType");
-                if (StringUtils.isNotEmpty(changeType) && changeType.equals("add_external_contact")) {
-                    String userId = (String) suiteMap.get("UserID");
-                    String externalUserId = (String) suiteMap.get("ExternalUserID");
-                    log.info("addStaffWithUser userId={} externalUserId={}", userId, externalUserId);
-                    userService.addStaffWithUser(externalUserId, userId, HNWQ.getId());
-                }
 
-                if (StringUtils.isNotEmpty(changeType) && changeType.equals("del_follow_user")) {
-                    String userId = (String) suiteMap.get("UserID");
-                    String externalUserId = (String) suiteMap.get("ExternalUserID");
-                    log.info("delStaffWithUser userId={} externalUserId={}", userId, externalUserId);
-                    userService.delStaffWithUser(externalUserId, userId, System.currentTimeMillis());
-                }
-            }
-        } catch (Exception e) {
-            log.error("verifyPost error", e);
-        }
-        String success = "success";
-        return success;
+    @PostMapping("/pushMessage/get")
+    public CommonResponse<List<WeComPushMessageVo>> getPushMessage(@RequestBody WeComPushMessageParam param) {
+        log.info("param={}", param);
+        return weComService.getPushMessage(param);
     }
 }

+ 21 - 0
api-module/src/main/java/com/tzld/piaoquan/api/model/vo/CallbackParam.java

@@ -0,0 +1,21 @@
+package com.tzld.piaoquan.api.model.vo;
+
+import com.tzld.piaoquan.api.model.bo.ReplyInfo;
+import lombok.Data;
+
+import java.util.List;
+
+@Data
+public class CallbackParam {
+
+    private String ghId;
+
+    private String accessToken;
+
+    private String openId;
+
+    private Long timestamp;
+
+    private List<ReplyInfo> replyInfo;
+
+}

+ 58 - 0
api-module/src/main/java/com/tzld/piaoquan/api/mq/MessageCallbackCustomer.java

@@ -0,0 +1,58 @@
+package com.tzld.piaoquan.api.mq;
+
+
+import com.alibaba.fastjson.JSONObject;
+
+import com.aliyun.openservices.ons.api.Action;
+import com.aliyun.openservices.ons.api.ConsumeContext;
+import com.aliyun.openservices.ons.api.Message;
+import com.aliyun.openservices.ons.api.MessageListener;
+import com.tzld.piaoquan.api.dao.mapper.NewPushMessageCallbackMapper;
+import com.tzld.piaoquan.api.model.bo.ReplyInfo;
+import com.tzld.piaoquan.api.model.po.PushMessageCallback;
+import com.tzld.piaoquan.api.model.vo.CallbackParam;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.BeanUtils;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+import org.springframework.util.CollectionUtils;
+
+import java.util.ArrayList;
+import java.util.List;
+
+@Slf4j
+@Component
+public class MessageCallbackCustomer implements MessageListener {
+
+
+    @Autowired
+    private NewPushMessageCallbackMapper newPushMessageCallbackMapper;
+
+    @Override
+    public Action consume(Message message, ConsumeContext consumeContext) {
+        CallbackParam param = JSONObject.parseObject(new String(message.getBody()), CallbackParam.class);
+        log.info("param = {}", param);
+        if (CollectionUtils.isEmpty(param.getReplyInfo())) {
+            log.error("CallbackParam replyInfo is empty {}", param);
+        }
+        PushMessageCallback pushMessageCallback = new PushMessageCallback();
+        pushMessageCallback.setGhId(param.getGhId());
+        pushMessageCallback.setTimestamp(param.getTimestamp());
+        pushMessageCallback.setOpenId(param.getOpenId());
+        List<PushMessageCallback> insertList = new ArrayList<>();
+        for (ReplyInfo replyInfo : param.getReplyInfo()) {
+            PushMessageCallback insertPushMessageCallback = new PushMessageCallback();
+            BeanUtils.copyProperties(pushMessageCallback, insertPushMessageCallback);
+            insertPushMessageCallback.setMsgType(replyInfo.getMsgType());
+            insertPushMessageCallback.setVideoId(replyInfo.getMiniVideoId());
+            insertList.add(insertPushMessageCallback);
+        }
+        try {
+            newPushMessageCallbackMapper.insertList(insertList);
+        } catch (Exception e) {
+            log.error("PushMessageCallback newPushMessageCallbackMapper insert pushMessageCallback={}, error={}", pushMessageCallback, e.getMessage());
+            return Action.ReconsumeLater;
+        }
+        return Action.CommitMessage;
+    }
+}

+ 69 - 0
api-module/src/main/java/com/tzld/piaoquan/api/mq/MessageCallbackProducer.java

@@ -0,0 +1,69 @@
+package com.tzld.piaoquan.api.mq;
+
+import com.alibaba.fastjson.JSON;
+import com.aliyun.openservices.ons.api.Message;
+import com.aliyun.openservices.ons.api.SendResult;
+import com.aliyun.openservices.ons.api.bean.ProducerBean;
+
+import com.tzld.piaoquan.api.model.vo.CallbackParam;
+import com.tzld.piaoquan.api.service.AccessTokenService;
+import com.tzld.piaoquan.common.common.base.CommonResponse;
+import com.tzld.piaoquan.common.common.enums.ExceptionCodeEnum;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.stereotype.Component;
+
+import java.nio.charset.StandardCharsets;
+
+@Slf4j
+@Component
+public class MessageCallbackProducer {
+
+    @Value("${pushMessage.callback.topic}")
+    private String TOPIC;
+
+    @Value("${pushMessage.callback.tag}")
+    private String TAG;
+
+    @Autowired
+    private ProducerBean producer;
+
+    @Autowired
+    private AccessTokenService accessTokenService;
+
+    public CommonResponse<Void> sendMessage(CallbackParam param) {
+        if (param == null || param.getAccessToken() == null) {
+            return CommonResponse.create(ExceptionCodeEnum.PARAMS_ERROR, "参数错误");
+        }
+        if (!accessTokenService.validateAccessToken(param.getAccessToken())) {
+            return CommonResponse.create(ExceptionCodeEnum.PARAMS_ERROR, "accessToken错误或者已失效");
+        }
+        Message message = new Message();
+        message.setTopic(TOPIC);
+        message.setTag(TAG);
+        message.setBody(JSON.toJSONString(param).getBytes(StandardCharsets.UTF_8));
+        try {
+            log.info("sendMessage = {}", message);
+            producer.send(message);
+        } catch (Exception e) {
+            log.error("error send param = {}", param);
+            log.error("error", e);
+            //重试
+            retry(message);
+        }
+        return CommonResponse.success();
+    }
+
+    private void retry(Message message) {
+        for (int i = 0; i < 3; i++) {
+            try {
+                SendResult sendResult = producer.send(message);
+                log.info("sendResult = {}", sendResult);
+                return;
+            } catch (Exception e) {
+                log.error("retry send error {}", i, e);
+            }
+        }
+    }
+}

+ 4 - 0
api-module/src/main/resources/application-prod.properties

@@ -10,5 +10,9 @@ xxl.job.admin.addresses=http://xxl-job-internal.piaoquantv.com/xxl-job-admin
 
 small_page_url=https://api.piaoquantv.com
 
+pushMessage.callback.topic=3rd_party_push_message_callback_prod
+pushMessage.callback.groupId=GID_3RD_PARTY_PUSH_MESSAGE_CALLBACK_PROD
+pushMessage.callback.tag=mini
+
 
 

+ 5 - 1
api-module/src/main/resources/application-test.properties

@@ -10,4 +10,8 @@ spring.redis.password=Qingqu2019
 
 xxl.job.admin.addresses=http://xxl-job-test-internal.piaoquantv.com/xxl-job-admin
 
-small_page_url=https://testapi.piaoquantv.com
+small_page_url=https://testapi.piaoquantv.com
+
+pushMessage.callback.topic=3rd_party_push_message_callback_prod
+pushMessage.callback.groupId=GID_3RD_PARTY_PUSH_MESSAGE_CALLBACK_PROD
+pushMessage.callback.tag=mini

+ 5 - 0
api-module/src/main/resources/application.properties

@@ -32,3 +32,8 @@ xxl.job.executor.logretentiondays=30
 
 
 
+rocketmq.accessKey=LTAI4G7puhXtLyHzHQpD6H7A
+rocketmq.secretKey=nEbq3xWNQd1qLpdy2u71qFweHkZjSG
+rocketmq.nameSrvAddr=http://MQ_INST_1894469520484605_BXhXuzkZ.mq-internet-access.mq-internet.aliyuncs.com:80
+
+

+ 6 - 0
pom.xml

@@ -168,6 +168,12 @@
             <version>2.0.6</version>
         </dependency>
 
+        <dependency>
+            <groupId>com.aliyun.openservices</groupId>
+            <artifactId>ons-client</artifactId>
+            <version>1.8.4.Final</version>
+        </dependency>
+
 
 
     </dependencies>