supeng 4 dienas atpakaļ
vecāks
revīzija
a2c6687b27

+ 14 - 0
content-understanding-core/src/main/java/com/tzld/piaoquan/content/understanding/job/VideoContentUnderstandingJob.java

@@ -48,6 +48,20 @@ public class VideoContentUnderstandingJob {
         return ReturnT.SUCCESS;
     }
 
+    @XxlJob("dayTopVideoUnderstandingHandler")
+    public ReturnT<String> historyVideoUnderstandingHandler(String params) {
+        XxlJobLogger.log("historyVideoUnderstandingHandler start");
+        try {
+            contentService.historyVideoUnderstandingHandler();
+        } catch (Exception e) {
+            XxlJobLogger.log("historyVideoUnderstandingHandler error", e);
+            return ReturnT.FAIL;
+        } finally {
+            XxlJobLogger.log("historyVideoUnderstandingHandler end");
+        }
+        return ReturnT.SUCCESS;
+    }
+
 
     @XxlJob("remainVideoUnderstandingHandler")
     public ReturnT<String> remainVideoUnderstandingHandler(String params) {

+ 9 - 0
content-understanding-core/src/main/java/com/tzld/piaoquan/content/understanding/model/dto/ContentAnalyseDTO.java

@@ -2,6 +2,11 @@ package com.tzld.piaoquan.content.understanding.model.dto;
 
 import lombok.Data;
 
+import java.util.Map;
+
+/**
+ * @author supeng
+ */
 @Data
 public class ContentAnalyseDTO {
     /**
@@ -17,4 +22,8 @@ public class ContentAnalyseDTO {
      * 任务ID
      */
     private String taskId;
+    /**
+     * 自定义扩展信息
+     */
+    private Map<String, Object> extMap;
 }

+ 4 - 0
content-understanding-core/src/main/java/com/tzld/piaoquan/content/understanding/model/dto/ContentDTO.java

@@ -2,6 +2,8 @@ package com.tzld.piaoquan.content.understanding.model.dto;
 
 import lombok.Data;
 
+import java.util.Map;
+
 /**
  * 内容信息
  *
@@ -29,4 +31,6 @@ public class ContentDTO {
      * 非必需
      */
     private Integer contentType;
+
+    private Map<String,Object> extMap;
 }

+ 3 - 0
content-understanding-core/src/main/java/com/tzld/piaoquan/content/understanding/rocketmq/consumer/OldVersionRemainVideoUnderstandingConsumer.java

@@ -22,6 +22,9 @@ import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 
+/**
+ * 旧版视频理解 需要延迟处理的视频 消费者
+ */
 @Slf4j
 @Service
 public class OldVersionRemainVideoUnderstandingConsumer implements CommandLineRunner {

+ 5 - 0
content-understanding-core/src/main/java/com/tzld/piaoquan/content/understanding/rocketmq/producer/OldVersionRemainVideoUnderstandingProducer.java

@@ -13,6 +13,11 @@ import java.time.Duration;
 import java.util.Map;
 import java.util.Objects;
 
+/**
+ * 旧版视频理解 需要延迟处理的视频 生产者
+ *
+ * @author supeng
+ */
 @Component
 public class OldVersionRemainVideoUnderstandingProducer {
     @Autowired

+ 10 - 0
content-understanding-core/src/main/java/com/tzld/piaoquan/content/understanding/service/LoghubService.java

@@ -1,7 +1,17 @@
 package com.tzld.piaoquan.content.understanding.service;
 
+import java.util.Map;
+
 /**
  * @author supeng
  */
 public interface LoghubService {
+    /**
+     *
+     * @param project
+     * @param logStore
+     * @param topic
+     * @param data
+     */
+    void asyncSubmitLog(String project, String logStore, String topic, Map<String, Object> data);
 }

+ 82 - 9
content-understanding-core/src/main/java/com/tzld/piaoquan/content/understanding/service/impl/ContentServiceImpl.java

@@ -24,6 +24,7 @@ import com.tzld.piaoquan.content.understanding.service.ContentService;
 import com.tzld.piaoquan.content.understanding.service.LoghubService;
 import com.tzld.piaoquan.content.understanding.service.ODPSManager;
 import com.tzld.piaoquan.content.understanding.service.PipelineService;
+import com.tzld.piaoquan.content.understanding.util.DateUtil;
 import com.tzld.piaoquan.content.understanding.util.UrlUtil;
 import com.xxl.job.core.log.XxlJobLogger;
 import lombok.extern.slf4j.Slf4j;
@@ -34,6 +35,8 @@ import org.springframework.beans.factory.annotation.Value;
 import org.springframework.stereotype.Service;
 
 import java.time.Duration;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
 import java.util.*;
 import java.util.concurrent.TimeUnit;
 
@@ -227,12 +230,16 @@ public class ContentServiceImpl implements ContentService {
     @Override
     public void newRecommendVideoUnderstandingHandler() {
         Stopwatch stopwatch = Stopwatch.createStarted();
-        List<Record> newRecommendRecords = odpsManager.tableTunnelQuery("loghubods", newRecommendTable);
+        String dt = DateUtil.formatLocalDateTime(LocalDateTime.now(), "yyyyMMddHH");
+        List<Record> newRecommendRecords = odpsManager.tableTunnelQuery("loghubods", newRecommendTable, "dt=" + dt);
         if (Objects.nonNull(newRecommendRecords) && !newRecommendRecords.isEmpty()) {
             XxlJobLogger.log("table = {} records size = {}", newRecommendTable, newRecommendRecords.size());
             for (Record record : newRecommendRecords) {
                 try {
                     Long videoId = record.getBigint("videoid");
+                    String type = record.getString("type");
+                    String title = record.getString("title");
+                    String videoPath = record.getString("video_path");
                     if (Objects.isNull(videoId)) {
                         XxlJobLogger.log("table = {} videoId 为 null", newRecommendTable);
                         continue;
@@ -249,6 +256,25 @@ public class ContentServiceImpl implements ContentService {
                     contentDTO.setContentType(ContentTypeEnum.VIDEO.getValue());
                     contentDTO.setContent(videoUrl);
                     contentDTO.setPipelineId(oldVersionPipelineId);
+                    //上报日志
+//                    1. type 类型
+//                    2. video_id 视频ID
+//                    3. partition 分区
+//                    4. data 数据结果
+//                    5. error_msg 错误信息
+//                    6. status 状态
+//                       success failure
+//                    7. pipelineId
+//                    8. taskId
+//                    9. video_title
+//                    10. video_url
+                    Map<String, Object> extMap = new HashMap<>();
+                    extMap.put("partition", dt);
+                    extMap.put("pipelineId", oldVersionPipelineId);
+                    extMap.put("type", type);
+                    extMap.put("video_title", title);
+                    extMap.put("video_url", videoPath);
+                    contentDTO.setExtMap(extMap);
                     contentDTOList.add(contentDTO);
                     submitTasksParam.setContents(contentDTOList);
                     submitTasks(submitTasksParam);
@@ -265,12 +291,17 @@ public class ContentServiceImpl implements ContentService {
     @Override
     public void dayTopVideoUnderstandingHandler() {
         Stopwatch stopwatch = Stopwatch.createStarted();
-        List<Record> newRecommendRecords = odpsManager.tableTunnelQuery("loghubods", topReturnTable);
-        if (Objects.nonNull(newRecommendRecords) && !newRecommendRecords.isEmpty()) {
-            XxlJobLogger.log("table = {} records size = {}", topReturnTable, newRecommendRecords.size());
-            for (Record record : newRecommendRecords) {
+        String dt = DateUtil.formatLocalDateTime(LocalDateTime.now(), "yyyyMMdd");
+        List<Record> topReturnRecords = odpsManager.tableTunnelQuery("loghubods", topReturnTable, "dt=" + dt);
+        if (Objects.nonNull(topReturnRecords) && !topReturnRecords.isEmpty()) {
+            XxlJobLogger.log("table = {} records size = {}", topReturnTable, topReturnRecords.size());
+            for (Record record : topReturnRecords) {
                 try {
+                    //
                     Long videoId = record.getBigint("videoid");
+                    String type = record.getString("type");
+                    String title = record.getString("title");
+                    String videoPath = record.getString("video_path");
                     if (Objects.isNull(videoId)) {
                         XxlJobLogger.log("table = {} videoId 为 null", topReturnTable);
                         continue;
@@ -287,6 +318,25 @@ public class ContentServiceImpl implements ContentService {
                     contentDTO.setContentType(ContentTypeEnum.VIDEO.getValue());
                     contentDTO.setContent(videoUrl);
                     contentDTO.setPipelineId(oldVersionPipelineId);
+                    //上报日志
+//                    1. type 类型
+//                    2. video_id 视频ID
+//                    3. partition 分区
+//                    4. data 数据结果
+//                    5. error_msg 错误信息
+//                    6. status 状态
+//                       success failure
+//                    7. pipelineId
+//                    8. taskId
+//                    9. video_title
+//                    10. video_url
+                    Map<String, Object> extMap = new HashMap<>();
+                    extMap.put("partition", dt);
+                    extMap.put("pipelineId", oldVersionPipelineId);
+                    extMap.put("type", type);
+                    extMap.put("video_title", title);
+                    extMap.put("video_url", videoPath);
+                    contentDTO.setExtMap(extMap);
                     contentDTOList.add(contentDTO);
                     submitTasksParam.setContents(contentDTOList);
                     TaskResultVO taskResultVO = submitTasks(submitTasksParam);
@@ -303,12 +353,16 @@ public class ContentServiceImpl implements ContentService {
     @Override
     public void historyVideoUnderstandingHandler() {
         Stopwatch stopwatch = Stopwatch.createStarted();
-        List<Record> newRecommendRecords = odpsManager.tableTunnelQuery("loghubods", topHistoryTable);
-        if (Objects.nonNull(newRecommendRecords) && !newRecommendRecords.isEmpty()) {
-            XxlJobLogger.log("table = {} records size = {}", topHistoryTable, newRecommendRecords.size());
-            for (Record record : newRecommendRecords) {
+        String dt = DateUtil.formatLocalDateTime(LocalDateTime.now(), "yyyyMMdd");
+        List<Record> topHistoryRecords = odpsManager.tableTunnelQuery("loghubods", topHistoryTable, "dt=" + dt);
+        if (Objects.nonNull(topHistoryRecords) && !topHistoryRecords.isEmpty()) {
+            XxlJobLogger.log("table = {} records size = {}", topHistoryTable, topHistoryRecords.size());
+            for (Record record : topHistoryRecords) {
                 try {
                     Long videoId = record.getBigint("videoid");
+                    String type = record.getString("type");
+                    String title = record.getString("title");
+                    String videoPath = record.getString("video_path");
                     if (Objects.isNull(videoId)) {
                         XxlJobLogger.log("table = {} videoId 为 null", topHistoryTable);
                         continue;
@@ -325,6 +379,25 @@ public class ContentServiceImpl implements ContentService {
                     contentDTO.setContentType(ContentTypeEnum.VIDEO.getValue());
                     contentDTO.setContent(videoUrl);
                     contentDTO.setPipelineId(oldVersionPipelineId);
+                    //上报日志
+//                    1. type 类型
+//                    2. video_id 视频ID
+//                    3. partition 分区
+//                    4. data 数据结果
+//                    5. error_msg 错误信息
+//                    6. status 状态
+//                       success failure
+//                    7. pipelineId
+//                    8. taskId
+//                    9. video_title
+//                    10. video_url
+                    Map<String, Object> extMap = new HashMap<>();
+                    extMap.put("partition", dt);
+                    extMap.put("pipelineId", oldVersionPipelineId);
+                    extMap.put("type", type);
+                    extMap.put("video_title", title);
+                    extMap.put("video_url", videoPath);
+                    contentDTO.setExtMap(extMap);
                     contentDTOList.add(contentDTO);
                     submitTasksParam.setContents(contentDTOList);
                     TaskResultVO taskResultVO = submitTasks(submitTasksParam);

+ 36 - 0
content-understanding-core/src/main/java/com/tzld/piaoquan/content/understanding/service/impl/LoghubServiceImpl.java

@@ -1,5 +1,6 @@
 package com.tzld.piaoquan.content.understanding.service.impl;
 
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import com.tzld.commons.aliyun.log.AliyunLogManager;
 import com.tzld.piaoquan.content.understanding.service.LoghubService;
 import lombok.extern.slf4j.Slf4j;
@@ -7,6 +8,11 @@ import org.checkerframework.checker.units.qual.A;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Service;
 
+import javax.annotation.PostConstruct;
+import javax.annotation.PreDestroy;
+import java.util.Map;
+import java.util.concurrent.*;
+
 /**
  * @author supeng
  */
@@ -17,4 +23,34 @@ public class LoghubServiceImpl implements LoghubService {
     @Autowired
     private AliyunLogManager aliyunLogManager;
 
+    /**
+     * 线程池队列大小
+     */
+    private static final int QUEUE_MAX_SIZE = 100000;
+    /**
+     * 线程命名
+     */
+    private static final ThreadFactory NAMED_THREAD_FACTORY = new ThreadFactoryBuilder().setNameFormat("loghub-service-pool-%d").build();
+    /**
+     * 线程池
+     */
+    private static ExecutorService pool;
+
+    @PostConstruct
+    public void init() {
+        //init thread pool
+        pool = new ThreadPoolExecutor(32, 32,
+                0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(QUEUE_MAX_SIZE), NAMED_THREAD_FACTORY, new ThreadPoolExecutor.AbortPolicy());
+    }
+
+    @PreDestroy
+    public void destroy() {
+        //gracefully shutdown
+        pool.shutdown();
+    }
+
+    @Override
+    public void asyncSubmitLog(String project, String logStore, String topic, Map<String, Object> data) {
+        pool.execute(() -> aliyunLogManager.sendLog(project, logStore, topic, data));
+    }
 }

+ 40 - 13
content-understanding-core/src/main/java/com/tzld/piaoquan/content/understanding/service/impl/PipelineServiceImpl.java

@@ -17,10 +17,12 @@ import com.tzld.piaoquan.content.understanding.model.param.ActionParam;
 import com.tzld.piaoquan.content.understanding.model.param.ContentAnalyseParam;
 import com.tzld.piaoquan.content.understanding.model.po.*;
 import com.tzld.piaoquan.content.understanding.service.Action;
+import com.tzld.piaoquan.content.understanding.service.LoghubService;
 import com.tzld.piaoquan.content.understanding.service.PipelineService;
 import lombok.extern.slf4j.Slf4j;
 import org.springframework.beans.BeanUtils;
 import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Value;
 import org.springframework.data.mongodb.core.MongoTemplate;
 import org.springframework.stereotype.Service;
 
@@ -56,6 +58,14 @@ public class PipelineServiceImpl implements PipelineService {
     @Autowired
     private CuTaskMapper cuTaskMapper;
 
+    @Autowired
+    private LoghubService loghubService;
+
+    @Value("aliyun.log.project")
+    private String aliyunLogProject;
+    @Value("aliyun.log.logstore.result")
+    private String aliyunLogStoreResult;
+
     /**
      * 线程池队列大小
      */
@@ -116,7 +126,7 @@ public class PipelineServiceImpl implements PipelineService {
         cuTask.setPipelineId(pipelineId);
         cuTask.setInput(content);
         int insertCount = cuTaskMapper.insertSelective(cuTask);
-        String reason = "";
+//        String reason = "";
         if (insertCount <= 0) {
             throw new CommonException(ExceptionEnum.SYSTEM_ERROR, "新增任务失败");
         }
@@ -141,6 +151,7 @@ public class PipelineServiceImpl implements PipelineService {
             dto1.setTaskId(taskId);
             dto1.setVideoId(contentDTO.getVideoId());
             dto1.setContent(content);
+            dto1.setExtMap(contentDTO.getExtMap());
             // 4.按照步骤执行pipeline每一步动作
             executeTaskTreeNodeBFS(root, dto1);
         });
@@ -241,18 +252,19 @@ public class PipelineServiceImpl implements PipelineService {
             CuPipelineStep step = currentNode.getStep();
             Integer type = currentNode.getType();
             String input = currentNode.getInput() == null ? "" : currentNode.getInput();
-            // 执行action
-            String result = executeTaskStep(step, input, type);
+            String reason = "处理失败";
+            String result = null;
+            try {
+                // 执行action
+                result = executeTaskStep(step, input, type);
+            } catch (Exception e) {
+                reason = e.getMessage();
+                log.error("step execute error step = {}, reason = {}", e, JSON.toJSONString(step), reason);
+            }
             if (Objects.isNull(currentNode.getChildren()) || currentNode.getChildren().isEmpty()) {
                 if (Objects.nonNull(result)) {
                     //异常不阻塞其他分支执行
                     try {
-                        Map<String, Object> resultMap = JSON.parseObject(result, new TypeReference<Map<String, Object>>() {
-                        });
-                        resultMap.put("video_id", videoId);
-                        //保存数据到mongo
-                        mongoTemplate.save(resultMap, COLLECTION_NAME);
-                        mongoTemplate.insert(resultMap, COLLECTION_NAME);
                         //保存数据到mysql
                         CuTaskExample example = new CuTaskExample();
                         example.createCriteria().andTaskIdEqualTo(taskId);
@@ -261,21 +273,36 @@ public class PipelineServiceImpl implements PipelineService {
                         cuTask.setTaskStatus(TaskStatusEnum.SUCCESS.getValue());
                         int update = cuTaskMapper.updateByExampleSelective(cuTask, example);
                         if (update <= 0) {
-                            log.error("step execute error step = {}, result = {} insert = {}", JSON.toJSONString(step), result, update);
+                            log.error("step execute update error step = {}, result = {} update = {}", JSON.toJSONString(step), result, update);
                         }
+                        Map<String, Object> logMap = new HashMap<>();
+                        logMap.put("video_id", videoId);
+                        logMap.put("data", result);
+                        logMap.put("status", "success");
+                        logMap.put("taskId", taskId);
+                        logMap.putAll(dto.getExtMap());
+                        loghubService.asyncSubmitLog(aliyunLogProject, aliyunLogStoreResult, "", logMap);
+
                     } catch (Exception e) {
-                        log.error("step execute error step = {}, result = {}", e, JSON.toJSONString(step), result);
+                        log.error("step execute update error step = {}, result = {}", e, JSON.toJSONString(step), result);
                     }
                 } else {
                     CuTaskExample example = new CuTaskExample();
                     example.createCriteria().andTaskIdEqualTo(taskId);
                     CuTask cuTask = new CuTask();
-                    cuTask.setOutput(result);
+                    cuTask.setReason(reason);
                     cuTask.setTaskStatus(TaskStatusEnum.FAILURE.getValue());
                     int update = cuTaskMapper.updateByExampleSelective(cuTask, example);
                     if (update <= 0) {
-                        log.error("step execute error step = {}, result = {} insert = {}", JSON.toJSONString(step), result, update);
+                        log.error("step execute update error step = {}, result = {} update = {}", JSON.toJSONString(step), result, update);
                     }
+                    Map<String, Object> logMap = new HashMap<>();
+                    logMap.put("video_id", videoId);
+                    logMap.put("error_msg", reason);
+                    logMap.put("status", "failure");
+                    logMap.put("taskId", taskId);
+                    logMap.putAll(dto.getExtMap());
+                    loghubService.asyncSubmitLog(aliyunLogProject, aliyunLogStoreResult, "", logMap);
                 }
                 continue;
             }