瀏覽代碼

update cluster

supeng 1 周之前
父節點
當前提交
53bb603d11

+ 5 - 0
supply-demand-engine-core/src/main/java/com/tzld/piaoquan/sde/service/ExecutionTaskService.java

@@ -47,4 +47,9 @@ public interface ExecutionTaskService {
      * 昨日回流Top内容定时解构
      */
     void yesterdayTopReturnVideoExecutionTaskCreateHandler();
+
+    /**
+     * 聚类
+     */
+    void clusterExecutionTaskCreateHandler();
 }

+ 65 - 0
supply-demand-engine-core/src/main/java/com/tzld/piaoquan/sde/service/impl/ExecutionTaskServiceImpl.java

@@ -56,6 +56,9 @@ public class ExecutionTaskServiceImpl implements ExecutionTaskService {
     @Value("${yesterday.return.video.table:loghubods.lastday_return}")
     private String yesterdayReturnVideoTable;
 
+    @Value("${yesterday.return.video.table:loghubods.lastday_return}")
+    private String clusterVideoTable;
+
     private static final Integer TOP_N = 10;
 
     @Autowired
@@ -320,4 +323,66 @@ public class ExecutionTaskServiceImpl implements ExecutionTaskService {
         log.info("yesterdayTopReturnVideoExecutionTaskCreateHandler recordSize = {}  count = {} finish cost = {}ms", records.size(), count, costMs);
     }
 
+    @Override
+    public void clusterExecutionTaskCreateHandler() {
+        long start = System.nanoTime();
+        log.info("clusterExecutionTaskCreateHandler start");
+        //获取昨日Top10视频
+        LocalDateTime yesterday = LocalDateTime.now().minusDays(1);
+        String dt = DateUtil.formatLocalDateTime(yesterday, "yyyyMMdd");
+        String sql = "select * from " + clusterVideoTable + " where dt='" + dt + "' ORDER BY 回流人数 DESC LIMIT " + TOP_N + ";";
+        List<Record> records = odpsManager.query(sql);
+        if (Objects.isNull(records) || records.isEmpty()) {
+            log.info("clusterExecutionTaskCreateHandler records is empty");
+            return;
+        }
+        long topCostMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
+        log.info("clusterExecutionTaskCreateHandler get top video cost={}ms", topCostMs);
+        //数据处理
+        Map<String, List<String>> categoryVideoMap = new HashMap<>();
+        for (Record record : records) {
+            try {
+                String videoId = record.getString("videoid");
+                String category = record.getString("category");
+                List<String> categoryVideoIds = categoryVideoMap.get(category);
+                if (categoryVideoIds == null) {
+                    categoryVideoIds = new ArrayList<>();
+                }
+                categoryVideoIds.add(videoId);
+                categoryVideoMap.put(videoId, categoryVideoIds);
+            } catch (Exception e) {
+                log.error("clusterExecutionTaskCreateHandler categoryVideoMap error {}", record, e);
+            }
+        }
+        //创建解构任务
+        int count = 0;
+        for (Map.Entry<String, List<String>> entry : categoryVideoMap.entrySet()) {
+            try {
+                String category = entry.getKey();
+                List<String> categoryVideoIds = entry.getValue();
+                SdExecutionTask sdExecutionTask = new SdExecutionTask();
+                sdExecutionTask.setTaskNo(IdGeneratorUtil.generateExecutionTaskNo());
+                sdExecutionTask.setTaskType(TaskTypeEnum.CLUSTER.getValue());
+                sdExecutionTask.setTaskStatus(ExecutionTaskStatusEnum.INIT.getValue());
+                sdExecutionTask.setContentType(ContentTypeEnum.VIDEO.getValue());
+//                sdExecutionTask.setContentId(videoId);
+                List<SdExecutionTaskContent> contentList = new ArrayList<>();
+                for (String videoId : categoryVideoIds) {
+                    SdExecutionTaskContent sdExecutionTaskContent = new SdExecutionTaskContent();
+                    sdExecutionTaskContent.setContentType(ContentTypeEnum.VIDEO.getValue());
+                    sdExecutionTaskContent.setContentId(videoId);
+                    sdExecutionTaskContent.setContent(null);
+                    contentList.add(sdExecutionTaskContent);
+                }
+                boolean createResult = executionTaskCreateService.create(sdExecutionTask, contentList);
+                log.info("clusterExecutionTaskCreateHandler sdExecutionTask create result={}", createResult);
+                count++;
+            } catch (Exception e) {
+
+            }
+        }
+        long costMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
+        log.info("clusterExecutionTaskCreateHandler recordSize = {}  count = {} finish cost = {}ms", records.size(), count, costMs);
+    }
+
 }

+ 15 - 0
supply-demand-engine-job/src/main/java/com/tzld/piaoquan/sde/job/ExecutionTaskJob.java

@@ -77,4 +77,19 @@ public class ExecutionTaskJob {
         }
         return ReturnT.SUCCESS;
     }
+
+
+    @XxlJob("clusterExecutionTaskCreateHandler")
+    public ReturnT<String> clusterExecutionTaskCreateHandler(String params) {
+        XxlJobLogger.log("clusterExecutionTaskCreateHandler start");
+        try {
+            executionTaskService.clusterExecutionTaskCreateHandler();
+        } catch (Exception e) {
+            XxlJobLogger.log("clusterExecutionTaskCreateHandler error", e);
+            return ReturnT.FAIL;
+        } finally {
+            XxlJobLogger.log("clusterExecutionTaskCreateHandler end");
+        }
+        return ReturnT.SUCCESS;
+    }
 }

+ 1 - 1
supply-demand-engine-job/src/main/java/com/tzld/piaoquan/sde/job/WorkflowTaskJob.java

@@ -19,7 +19,7 @@ public class WorkflowTaskJob {
     private WorkflowTaskService workflowTaskService;
 
     /**
-     * 任务状态检测更新
+     * 流程任务状态检测更新
      *
      * @param params
      * @return