supeng 1 month ago
parent
commit
921df3102d

+ 62 - 0
content-understanding-core/src/main/java/com/tzld/piaoquan/content/understanding/aop/RequestLogAspect.java

@@ -0,0 +1,62 @@
+package com.tzld.piaoquan.content.understanding.aop;
+
+import com.alibaba.fastjson.JSON;
+import com.google.common.base.Stopwatch;
+import com.tzld.piaoquan.content.understanding.service.LoghubService;
+import org.aspectj.lang.ProceedingJoinPoint;
+import org.aspectj.lang.annotation.Around;
+import org.aspectj.lang.annotation.Aspect;
+import org.aspectj.lang.annotation.Pointcut;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.stereotype.Component;
+import org.springframework.web.context.request.RequestContextHolder;
+import org.springframework.web.context.request.ServletRequestAttributes;
+
+import javax.servlet.http.HttpServletRequest;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.TimeUnit;
+
+@Aspect
+@Component
+public class RequestLogAspect {
+
+    @Autowired
+    private LoghubService loghubService;
+
+    @Value("aliyun.log.project")
+    private String aliyunLogProject;
+    @Value("aliyun.log.logstore.request")
+    private String aliyunLogLogstoreRequest;
+
+    @Pointcut("execution(* com.tzld.piaoquan.content.understanding..*.*(..)) && !within(com.tzld.piaoquan.content.understanding.controller.IndexController)")
+    public void allController() {
+    }
+
+    @Around("allController()")
+    public Object around(ProceedingJoinPoint joinPoint) throws Throwable {
+        // 获取请求信息
+        HttpServletRequest request = Objects.requireNonNull((ServletRequestAttributes) RequestContextHolder.getRequestAttributes()).getRequest();
+        String uri = request.getRequestURI();
+        String clientIp = request.getRemoteAddr();
+        String queryString = request.getQueryString();
+        String method = joinPoint.getSignature().getName();
+        Stopwatch stopwatch = Stopwatch.createStarted();
+        // 执行目标方法
+        Object result = joinPoint.proceed();
+        // 计算耗时
+        long executionTime = stopwatch.stop().elapsed(TimeUnit.MILLISECONDS);
+        Map<String, Object> data = new HashMap<>();
+        data.put("uri", uri);
+        data.put("clientIp", clientIp);
+        data.put("method", method);
+        data.put("executionTime", executionTime);
+        data.put("queryString", queryString);
+        data.put("requestBody", JSON.toJSONString(joinPoint.getArgs()));
+        data.put("responseBody", JSON.toJSONString(result));
+        loghubService.asyncSubmitLog(aliyunLogProject, aliyunLogLogstoreRequest, "", data);
+        return result;
+    }
+}

+ 33 - 0
content-understanding-core/src/main/java/com/tzld/piaoquan/content/understanding/handle/GlobalExceptionHandle.java

@@ -1,12 +1,17 @@
 package com.tzld.piaoquan.content.understanding.handle;
 
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 
 import javax.servlet.http.HttpServletRequest;
 
 import com.tzld.piaoquan.content.understanding.common.base.CommonResponse;
+import com.tzld.piaoquan.content.understanding.service.LoghubService;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Value;
 import org.springframework.util.CollectionUtils;
 import org.springframework.validation.BindException;
 import org.springframework.validation.ObjectError;
@@ -29,6 +34,14 @@ public class GlobalExceptionHandle {
 
     private static Logger LOGGER = LoggerFactory.getLogger(GlobalExceptionHandle.class);
 
+    @Autowired
+    private LoghubService loghubService;
+
+    @Value("aliyun.log.project")
+    private String aliyunLogProject;
+    @Value("aliyun.log.logstore.error")
+    private String aliyunLogLogstoreError;
+
     @ExceptionHandler
     public Object handleException(HttpServletRequest req, Exception exception) throws Exception {
         String uri = req.getRequestURI();
@@ -39,6 +52,11 @@ public class GlobalExceptionHandle {
             response.setCode(e.getCode());
             response.setMsg(e.getMsg());
             LOGGER.warn("uri:" + uri + "\n" + "CustomException log.", exception);
+            Map<String, Object> logMap = new HashMap<>();
+            logMap.put("uri", "uri");
+            logMap.put("code", e.getCode());
+            logMap.put("msg", e.getMsg());
+            loghubService.asyncSubmitLog(aliyunLogProject, aliyunLogLogstoreError, "" , logMap);
         } else if (exception instanceof MethodArgumentNotValidException) {
             // 参数校验异常
             MethodArgumentNotValidException e = (MethodArgumentNotValidException) exception;
@@ -53,6 +71,11 @@ public class GlobalExceptionHandle {
             response.setCode(ExceptionEnum.PARAMS_INVALID.getCode());
             response.setMsg(errorMsg.toString());
             LOGGER.warn("uri:" + uri + "\n" + "MethodArgumentNotValidException log.", exception);
+            Map<String, Object> logMap = new HashMap<>();
+            logMap.put("uri", "uri");
+            logMap.put("code", ExceptionEnum.PARAMS_INVALID.getCode());
+            logMap.put("msg", errorMsg.toString());
+            loghubService.asyncSubmitLog(aliyunLogProject, aliyunLogLogstoreError, "" , logMap);
         } else if (exception instanceof BindException) {
             // 参数绑定异常
             BindException e = (BindException) exception;
@@ -67,10 +90,20 @@ public class GlobalExceptionHandle {
             response.setCode(ExceptionEnum.PARAMS_INVALID.getCode());
             response.setMsg(errorMsg.toString());
             LOGGER.warn("uri:" + uri + "\n" + "BindException log.", exception);
+            Map<String, Object> logMap = new HashMap<>();
+            logMap.put("uri", "uri");
+            logMap.put("code", ExceptionEnum.PARAMS_INVALID.getCode());
+            logMap.put("msg", errorMsg.toString());
+            loghubService.asyncSubmitLog(aliyunLogProject, aliyunLogLogstoreError, "" , logMap);
         } else {
             response.setCode(ExceptionEnum.SYSTEM_ERROR.getCode());
             response.setMsg(ExceptionEnum.SYSTEM_ERROR.getMsg());
             LOGGER.error("uri:" + uri + "\n" + "unknow exception log.", exception);
+            Map<String, Object> logMap = new HashMap<>();
+            logMap.put("uri", "uri");
+            logMap.put("code", ExceptionEnum.SYSTEM_ERROR.getCode());
+            logMap.put("msg", ExceptionEnum.SYSTEM_ERROR.getMsg());
+            loghubService.asyncSubmitLog(aliyunLogProject, aliyunLogLogstoreError, "" , logMap);
         }
         return response;
     }

+ 30 - 33
content-understanding-core/src/main/java/com/tzld/piaoquan/content/understanding/rocketmq/consumer/OldVersionRemainVideoUnderstandingConsumer.java

@@ -1,6 +1,6 @@
 package com.tzld.piaoquan.content.understanding.rocketmq.consumer;
 
-import com.alibaba.fastjson.JSONObject;
+import com.alibaba.fastjson.JSON;
 import com.alibaba.fastjson.TypeReference;
 import com.tzld.piaoquan.content.understanding.service.ContentService;
 import lombok.extern.slf4j.Slf4j;
@@ -8,58 +8,55 @@ import org.apache.rocketmq.client.annotation.RocketMQMessageListener;
 import org.apache.rocketmq.client.apis.consumer.ConsumeResult;
 import org.apache.rocketmq.client.apis.message.MessageId;
 import org.apache.rocketmq.client.apis.message.MessageView;
-import org.apache.rocketmq.client.core.RocketMQClientTemplate;
 import org.apache.rocketmq.client.core.RocketMQListener;
 import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.boot.CommandLineRunner;
+import org.springframework.stereotype.Component;
 import org.springframework.stereotype.Service;
 
 import java.nio.ByteBuffer;
 import java.nio.charset.Charset;
 import java.nio.charset.StandardCharsets;
-import java.time.Duration;
-import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 
 /**
  * 旧版视频理解 需要延迟处理的视频 消费者
+ *
+ * @author supeng
  */
 @Slf4j
-@Service
-public class OldVersionRemainVideoUnderstandingConsumer implements CommandLineRunner {
-    @Autowired
-    private RocketMQClientTemplate rocketMQClientTemplate;
+@Component
+@RocketMQMessageListener(endpoints = "${rocketmq.consumer.endpoints:rmq-cn-vym47zv2i03-vpc.cn-hangzhou.rmq.aliyuncs.com:8080}", topic = "${rocketmq.consumer.oldversion.topic:topic_old_version_remain_video_understanding_test}",
+        consumerGroup = "${rocketmq.consumer.oldversion.group:group_old_version_remain_video_understanding_test}", tag = "*")
+public class OldVersionRemainVideoUnderstandingConsumer implements RocketMQListener {
 
     @Autowired
     private ContentService contentService;
 
     @Override
-    public void run(String... args) throws Exception {
-        while (true) {
-            final List<MessageView> messages = rocketMQClientTemplate.receive(10, Duration.ofSeconds(15));
-            log.info("Received {} message(s)", messages.size());
-            for (MessageView message : messages) {
-                log.info("receive message, topic:" + message.getTopic() + " messageId:" + message.getMessageId());
-                final MessageId messageId = message.getMessageId();
-                try {
-                    ByteBuffer body = message.getBody();
-                    Charset charset = StandardCharsets.UTF_8;
-                    String messageStr = charset.decode(body).toString();
-                    Map<String, Object> messageMap = JSONObject.parseObject(messageStr, new TypeReference<Map<String, Object>>() {});
-                    Object videoIdObj = messageMap.get("videoId");
-                    if (Objects.nonNull(videoIdObj)) {
-                        Long videoId = Long.parseLong(videoIdObj.toString());
-                        boolean flag = contentService.remainVideoUnderstandingHandler(videoId);
-                        if (flag) {
-                            rocketMQClientTemplate.ack(message);
-                            log.info("Message is acknowledged successfully, messageId={}", messageId);
-                        }
-                    }
-                } catch (Throwable t) {
-                    log.error("Message is failed to be acknowledged, messageId={}", messageId, t);
-                }
+    public ConsumeResult consume(MessageView messageView) {
+        log.info("Receive message {}", messageView);
+        MessageId messageId = messageView.getMessageId();
+        try {
+            ByteBuffer body = messageView.getBody();
+            Charset charset = StandardCharsets.UTF_8;
+            String messageStr = charset.decode(body).toString();
+            Map<String, Object> messageMap = JSON.parseObject(messageStr, new TypeReference<Map<String, Object>>() {
+            });
+            Object videoIdObj = messageMap.get("videoId");
+            if (Objects.isNull(videoIdObj)) {
+                //错误消息 清理掉
+                return ConsumeResult.SUCCESS;
+            }
+            Long videoId = Long.parseLong(videoIdObj.toString());
+            boolean flag = contentService.remainVideoUnderstandingHandler(videoId);
+            if (flag) {
+                log.info("Message is acknowledged successfully, messageId={}", messageId);
+                return ConsumeResult.SUCCESS;
             }
+        } catch (Exception e) {
+            log.error("Message is failed to be acknowledged, messageId={}", messageId, e);
         }
+        return ConsumeResult.FAILURE;
     }
 }

+ 2 - 2
content-understanding-core/src/main/java/com/tzld/piaoquan/content/understanding/rocketmq/producer/OldVersionRemainVideoUnderstandingProducer.java

@@ -23,14 +23,14 @@ public class OldVersionRemainVideoUnderstandingProducer {
     @Autowired
     private RocketMQClientTemplate rocketMQClientTemplate;
 
-    @Value("${old.version.remain.video.understanding.topic:}")
+    @Value("${rocketmq.producer.oldversion.topic:topic_old_version_remain_video_understanding}")
     private String oldVersionRemainTopic;
 
     public MessageId syncSendDelayMessage(Map<String, Object> messageMap, Duration duration) {
         return RetryUtil.executeWithRetry(() -> {
             SendReceipt sendReceipt = rocketMQClientTemplate.syncSendDelayMessage(oldVersionRemainTopic, JSON.toJSONString(messageMap), duration);
             if (Objects.isNull(sendReceipt) || Objects.isNull(sendReceipt.getMessageId())) {
-                throw new RuntimeException("transcode syncSendDelayMessage failure messageMap = " + messageMap);
+                throw new RuntimeException("OldVersionRemainVideoUnderstandingProducer syncSendDelayMessage failure messageMap = " + messageMap);
             }
             return sendReceipt.getMessageId();
         }, "oldVersionRemainVideoSendDelayMessage");

+ 6 - 2
content-understanding-core/src/main/java/com/tzld/piaoquan/content/understanding/service/impl/PipelineServiceImpl.java

@@ -280,7 +280,9 @@ public class PipelineServiceImpl implements PipelineService {
                         logMap.put("data", result);
                         logMap.put("status", "success");
                         logMap.put("taskId", taskId);
-                        logMap.putAll(dto.getExtMap());
+                        if (Objects.nonNull(dto.getExtMap())) {
+                            logMap.putAll(dto.getExtMap());
+                        }
                         loghubService.asyncSubmitLog(aliyunLogProject, aliyunLogStoreResult, "", logMap);
 
                     } catch (Exception e) {
@@ -301,7 +303,9 @@ public class PipelineServiceImpl implements PipelineService {
                     logMap.put("error_msg", reason);
                     logMap.put("status", "failure");
                     logMap.put("taskId", taskId);
-                    logMap.putAll(dto.getExtMap());
+                    if (Objects.nonNull(dto.getExtMap())) {
+                        logMap.putAll(dto.getExtMap());
+                    }
                     loghubService.asyncSubmitLog(aliyunLogProject, aliyunLogStoreResult, "", logMap);
                 }
                 continue;

+ 12 - 3
content-understanding-server/src/main/resources/application-dev.yml

@@ -1,5 +1,5 @@
 server:
-  port: 8081
+  port: 8080
 
 eureka:
   instance:
@@ -46,6 +46,7 @@ aliyun:
       action: action-log-test
       request: request-log-test
       result: result-log-test
+      error: error-log-test
     topic:
 
 xxl:
@@ -61,8 +62,16 @@ xxl:
       logpath: /datalog/weblog/${project.name}/xxl-job/
       logretentiondays: 30
 
-
 longvideoapi:
   getVideoBaseInfoUrl: http://videotest-internal.yishihui.com/longvideoapi/openapi/video/getBaseInfo
 
-
+rocketmq:
+  consumer:
+    endpoints: rmq-cn-vym47zv2i03-vpc.cn-hangzhou.rmq.aliyuncs.com:8080
+    oldversion:
+      topic: topic_old_version_remain_video_understanding_test
+      group: group_old_version_remain_video_understanding_test
+  producer:
+    endpoints: rmq-cn-vym47zv2i03-vpc.cn-hangzhou.rmq.aliyuncs.com:8080
+    oldversion:
+      topic: topic_old_version_remain_video_understanding_test

+ 13 - 2
content-understanding-server/src/main/resources/application-pre.yml

@@ -44,6 +44,7 @@ aliyun:
       action: action-log-pre
       request: request-log-pre
       result: result-log-pre
+      error: error-log-pre
     topic:
 
 
@@ -60,6 +61,16 @@ xxl:
       logpath: /datalog/weblog/${project.name}/xxl-job/
       logretentiondays: 30
 
-
 longvideoapi:
-  getVideoBaseInfoUrl: http://videopre-internal.piaoquantv.com/longvideoapi/openapi/video/getBaseInfo
+  getVideoBaseInfoUrl: http://videopre-internal.piaoquantv.com/longvideoapi/openapi/video/getBaseInfo
+
+rocketmq:
+  consumer:
+    endpoints: rmq-cn-vym47zv2i03-vpc.cn-hangzhou.rmq.aliyuncs.com:8080
+    oldversion:
+      topic: topic_old_version_remain_video_understanding_pre
+      group: group_old_version_remain_video_understanding_pre
+  producer:
+    endpoints: rmq-cn-vym47zv2i03-vpc.cn-hangzhou.rmq.aliyuncs.com:8080
+    oldversion:
+      topic: topic_old_version_remain_video_understanding_pre

+ 13 - 1
content-understanding-server/src/main/resources/application-prod.yml

@@ -45,6 +45,7 @@ aliyun:
       action: action-log
       request: request-log
       result: result-log
+      error: error-log
     topic:
 
 xxl:
@@ -61,4 +62,15 @@ xxl:
       logretentiondays: 30
 
 longvideoapi:
-  getVideoBaseInfoUrl: http://longvideoapi-internal.piaoquantv.com/longvideoapi/openapi/video/getBaseInfo
+  getVideoBaseInfoUrl: http://longvideoapi-internal.piaoquantv.com/longvideoapi/openapi/video/getBaseInfo
+
+rocketmq:
+  consumer:
+    endpoints: rmq-cn-vym47zv2i03-vpc.cn-hangzhou.rmq.aliyuncs.com:8080
+    oldversion:
+      topic: topic_old_version_remain_video_understanding
+      group: group_old_version_remain_video_understanding
+  producer:
+    endpoints: rmq-cn-vym47zv2i03-vpc.cn-hangzhou.rmq.aliyuncs.com:8080
+    oldversion:
+      topic: topic_old_version_remain_video_understanding

+ 13 - 1
content-understanding-server/src/main/resources/application-stress.yml

@@ -46,6 +46,7 @@ aliyun:
       action: action-log-test
       request: request-log-test
       result: result-log-test
+      error: error-log-test
     topic:
 
 xxl:
@@ -62,4 +63,15 @@ xxl:
       logretentiondays: 30
 
 longvideoapi:
-  getVideoBaseInfoUrl: http://videotest-internal.yishihui.com/longvideoapi/openapi/video/getBaseInfo
+  getVideoBaseInfoUrl: http://videotest-internal.yishihui.com/longvideoapi/openapi/video/getBaseInfo
+
+rocketmq:
+  consumer:
+    endpoints: rmq-cn-vym47zv2i03-vpc.cn-hangzhou.rmq.aliyuncs.com:8080
+    oldversion:
+      topic: topic_old_version_remain_video_understanding_test
+      group: group_old_version_remain_video_understanding_test
+  producer:
+    endpoints: rmq-cn-vym47zv2i03-vpc.cn-hangzhou.rmq.aliyuncs.com:8080
+    oldversion:
+      topic: topic_old_version_remain_video_understanding_test

+ 13 - 2
content-understanding-server/src/main/resources/application-test.yml

@@ -46,6 +46,7 @@ aliyun:
       action: action-log-test
       request: request-log-test
       result: result-log-test
+      error: error-log-test
     topic:
 
 
@@ -62,6 +63,16 @@ xxl:
       logpath: /datalog/weblog/${project.name}/xxl-job/
       logretentiondays: 30
 
-
 longvideoapi:
-  getVideoBaseInfoUrl: http://videotest-internal.yishihui.com/longvideoapi/openapi/video/getBaseInfo
+  getVideoBaseInfoUrl: http://videotest-internal.yishihui.com/longvideoapi/openapi/video/getBaseInfo
+
+rocketmq:
+  consumer:
+    endpoints: rmq-cn-vym47zv2i03-vpc.cn-hangzhou.rmq.aliyuncs.com:8080
+    oldversion:
+      topic: topic_old_version_remain_video_understanding_test
+      group: group_old_version_remain_video_understanding_test
+  producer:
+    endpoints: rmq-cn-vym47zv2i03-vpc.cn-hangzhou.rmq.aliyuncs.com:8080
+    oldversion:
+      topic: topic_old_version_remain_video_understanding_test

+ 30 - 0
content-understanding-server/src/test/java/com/tzld/piaoquan/content/understanding/service/RocketmqTest.java

@@ -0,0 +1,30 @@
+package com.tzld.piaoquan.content.understanding.service;
+
+import com.alibaba.fastjson.JSON;
+import com.tzld.piaoquan.content.understanding.BaseTest;
+import com.tzld.piaoquan.content.understanding.rocketmq.producer.OldVersionRemainVideoUnderstandingProducer;
+import org.apache.rocketmq.client.apis.message.MessageId;
+import org.apache.rocketmq.client.apis.producer.SendReceipt;
+import org.apache.rocketmq.client.core.RocketMQClientTemplate;
+import org.junit.jupiter.api.Test;
+import org.springframework.beans.factory.annotation.Autowired;
+
+import java.time.Duration;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+
+public class RocketmqTest extends BaseTest {
+
+    @Autowired
+    OldVersionRemainVideoUnderstandingProducer oldVersionRemainVideoUnderstandingProducer;
+
+    @Test
+    public void testSend() {
+        Map<String, Object> messageMap = new HashMap<>();
+        messageMap.put("videoId", 9043967L);
+        Duration duration = Duration.ofMinutes(1);
+        MessageId messageId = oldVersionRemainVideoUnderstandingProducer.syncSendDelayMessage(messageMap, duration);
+        System.out.println(messageId);
+    }
+}