#1 添加临时任务

Обединени
zhaohaipeng обедини 1 ревизии от Server/feature_20260203_zhaohaipeng_temp_task във Server/master преди 1 ден

+ 2 - 0
supply-demand-engine-core/src/main/java/com/tzld/piaoquan/sde/service/ExecutionTaskService.java

@@ -48,6 +48,8 @@ public interface ExecutionTaskService {
      */
     void yesterdayTopReturnVideoExecutionTaskCreateHandler();
 
+    void topReturnVideoExecutionTaskCreateHandler(String params);
+
     /**
      * 聚类
      */

+ 103 - 15
supply-demand-engine-core/src/main/java/com/tzld/piaoquan/sde/service/impl/ExecutionTaskServiceImpl.java

@@ -1,6 +1,7 @@
 package com.tzld.piaoquan.sde.service.impl;
 
 import com.alibaba.fastjson.JSONObject;
+import com.alibaba.fastjson.TypeReference;
 import com.aliyun.odps.data.Record;
 import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
 import com.baomidou.mybatisplus.core.toolkit.Wrappers;
@@ -27,6 +28,7 @@ import com.tzld.piaoquan.sde.util.IdGeneratorUtil;
 import com.tzld.piaoquan.sde.util.OdpsManager;
 import com.xxl.job.core.log.XxlJobLogger;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.StringUtils;
 import org.springframework.beans.BeanUtils;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.beans.factory.annotation.Value;
@@ -154,7 +156,7 @@ public class ExecutionTaskServiceImpl implements ExecutionTaskService {
             log.info("there is no executionTask need submit.");
             return;
         }
-        //提交子任务
+        // 提交子任务
         for (SdExecutionTask sdExecutionTask : list) {
             try {
                 ExecutionTaskTypeEnum executionTaskTypeEnum = ExecutionTaskTypeEnum.getInstance(sdExecutionTask.getTaskType());
@@ -190,7 +192,7 @@ public class ExecutionTaskServiceImpl implements ExecutionTaskService {
             } catch (Exception e) {
                 log.error("executionTask submit error {}", sdExecutionTask, e);
                 if (e instanceof HttpServiceException) {
-                    //http请求异常 不直接失败;下次重试
+                    // http请求异常 不直接失败;下次重试
                     continue;
                 }
                 SdExecutionTask update = new SdExecutionTask();
@@ -291,7 +293,7 @@ public class ExecutionTaskServiceImpl implements ExecutionTaskService {
     public void yesterdayTopReturnVideoExecutionTaskCreateHandler() {
         long start = System.nanoTime();
         log.info("yesterdayTopReturnVideoExecutionTaskCreateHandler start");
-        //获取昨日Top10视频
+        // 获取昨日Top10视频
         LocalDateTime yesterday = LocalDateTime.now().minusDays(1);
         String dt = DateUtil.formatLocalDateTime(yesterday, "yyyyMMdd");
         int topN = TOP_N * 5;
@@ -307,7 +309,7 @@ public class ExecutionTaskServiceImpl implements ExecutionTaskService {
             if (Objects.isNull(record)) {
                 continue;
             }
-            //TODO 待优化,只针对本策略去重
+            // TODO 待优化,只针对本策略去重
             String videoId = record.getString("videoid");
             int count = sdExecutionTaskContentMapper.countByContentId(ContentTypeEnum.VIDEO.getValue(), videoId);
             if (count > 0) {
@@ -325,7 +327,7 @@ public class ExecutionTaskServiceImpl implements ExecutionTaskService {
         XxlJobLogger.log("yesterdayTopReturnVideoExecutionTaskCreateHandler findRecords size={}", findRecords.size());
         long topCostMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
         log.info("yesterdayTopReturnVideoExecutionTaskCreateHandler get top video cost={}ms", topCostMs);
-        //创建解构任务
+        // 创建解构任务
         int count = 0;
         for (Record record : findRecords) {
             try {
@@ -335,11 +337,11 @@ public class ExecutionTaskServiceImpl implements ExecutionTaskService {
                 sdExecutionTask.setTaskType(TaskTypeEnum.DECONSTRUCT.getValue());
                 sdExecutionTask.setTaskStatus(ExecutionTaskStatusEnum.INIT.getValue());
                 sdExecutionTask.setContentType(ContentTypeEnum.VIDEO.getValue());
-                //属性设置
+                // 属性设置
                 SdExecutionTaskPropertiesDTO propertiesDTO = new SdExecutionTaskPropertiesDTO();
                 propertiesDTO.setContentScope(YESTERDAY_RETURN_TOP10_VIDEO_SCOPE);
                 sdExecutionTask.setProperties(JSONObject.toJSONString(propertiesDTO));
-                //关联内容
+                // 关联内容
                 SdExecutionTaskContent sdExecutionTaskContent = new SdExecutionTaskContent();
                 sdExecutionTaskContent.setContentType(ContentTypeEnum.VIDEO.getValue());
                 sdExecutionTaskContent.setContentId(videoId);
@@ -357,6 +359,92 @@ public class ExecutionTaskServiceImpl implements ExecutionTaskService {
         log.info("yesterdayTopReturnVideoExecutionTaskCreateHandler recordSize = {}  count = {} finish cost = {}ms", records.size(), count, costMs);
     }
 
+    @Override
+    public void topReturnVideoExecutionTaskCreateHandler(String params) {
+        Map<String, String> paramJson = new HashMap<>();
+        try {
+            paramJson = JSONObject.parseObject(params, new TypeReference<Map<String, String>>() {
+            });
+        } catch (Exception e) {
+            log.error("parse xxl job param error params: {}", params, e);
+        }
+
+
+        long start = System.nanoTime();
+        log.info("topReturnVideoExecutionTaskCreateHandler start");
+
+        String table = paramJson.getOrDefault("table", yesterdayReturnVideoTable);
+        String dt = paramJson.get("dt");
+        if (StringUtils.isBlank(dt)) {
+            LocalDateTime yesterday = LocalDateTime.now().minusDays(1);
+            dt = DateUtil.formatLocalDateTime(yesterday, "yyyyMMdd");
+        }
+        int topN = TOP_N * 5;
+        String sql = "select * from " + table + " where dt='" + dt + "' ORDER BY 回流人数 DESC LIMIT " + topN + ";";
+        List<Record> records = odpsManager.query(sql);
+        if (Objects.isNull(records) || records.isEmpty()) {
+            log.info("topReturnVideoExecutionTaskCreateHandler records is empty");
+            return;
+        }
+
+        String contentScope = paramJson.getOrDefault("contentScope", YESTERDAY_RETURN_TOP10_VIDEO_SCOPE);
+
+        XxlJobLogger.log("topReturnVideoExecutionTaskCreateHandler records size={}", records.size());
+        List<Record> findRecords = new ArrayList<>();
+        for (Record record : records) {
+            if (Objects.isNull(record)) {
+                continue;
+            }
+            String videoId = record.getString("videoid");
+            // int count = sdExecutionTaskContentMapper.countByContentId(ContentTypeEnum.VIDEO.getValue(), videoId);
+            // if (count > 0) {
+            //     continue;
+            // }
+            findRecords.add(record);
+            if (findRecords.size() >= TOP_N) {
+                break;
+            }
+        }
+        if (findRecords.isEmpty()) {
+            XxlJobLogger.log("topReturnVideoExecutionTaskCreateHandler findRecords is empty");
+            return;
+        }
+        XxlJobLogger.log("topReturnVideoExecutionTaskCreateHandler findRecords size={}", findRecords.size());
+        long topCostMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
+        log.info("topReturnVideoExecutionTaskCreateHandler get top video cost={}ms", topCostMs);
+        // 创建解构任务
+        int count = 0;
+        for (Record record : findRecords) {
+            try {
+                String videoId = record.getString("videoid");
+                SdExecutionTask sdExecutionTask = new SdExecutionTask();
+                sdExecutionTask.setTaskNo(IdGeneratorUtil.generateExecutionTaskNo());
+                sdExecutionTask.setTaskType(TaskTypeEnum.DECONSTRUCT.getValue());
+                sdExecutionTask.setTaskStatus(ExecutionTaskStatusEnum.INIT.getValue());
+                sdExecutionTask.setContentType(ContentTypeEnum.VIDEO.getValue());
+                // 属性设置
+                SdExecutionTaskPropertiesDTO propertiesDTO = new SdExecutionTaskPropertiesDTO();
+                propertiesDTO.setContentScope(contentScope);
+                sdExecutionTask.setProperties(JSONObject.toJSONString(propertiesDTO));
+                // 关联内容
+                SdExecutionTaskContent sdExecutionTaskContent = new SdExecutionTaskContent();
+                sdExecutionTaskContent.setContentType(ContentTypeEnum.VIDEO.getValue());
+                sdExecutionTaskContent.setContentId(videoId);
+                sdExecutionTaskContent.setContent(null);
+                List<SdExecutionTaskContent> contentList = new ArrayList<>();
+                contentList.add(sdExecutionTaskContent);
+                boolean createResult = executionTaskCreateService.create(sdExecutionTask, contentList);
+                log.info("topReturnVideoExecutionTaskCreateHandler sdExecutionTask create videoId = {} result={}", videoId, createResult);
+                count++;
+            } catch (Exception e) {
+                log.error("topReturnVideoExecutionTaskCreateHandler error {}", record, e);
+            }
+        }
+        long costMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
+        log.info("topReturnVideoExecutionTaskCreateHandler recordSize = {}  count = {} finish cost = {}ms", records.size(), count, costMs);
+
+    }
+
     @Override
     public void manualClusterExecutionTaskCreateHandler(String params) {
         long start = System.nanoTime();
@@ -375,12 +463,12 @@ public class ExecutionTaskServiceImpl implements ExecutionTaskService {
         List<String> categories = clusterExecutionConfigDTO.getCategories();
         String contentScope = clusterExecutionConfigDTO.getContentScope();
         if (Objects.isNull(contentScope) || contentScope.isEmpty()) {
-            //默认值
+            // 默认值
             contentScope = MANUAL_SELECT_VIDEO_SCOPE;
         }
         Integer minThreshold = clusterExecutionConfigDTO.getMinThreshold();
         Integer maxLimit = clusterExecutionConfigDTO.getMaxLimit();
-        //统计二级标签类别
+        // 统计二级标签类别
         String sql = "SELECT\n" +
                 "    t1.videoid,\n" +
                 "    t1.merge_leve2\n" +
@@ -410,7 +498,7 @@ public class ExecutionTaskServiceImpl implements ExecutionTaskService {
         long topCostMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
         log.info("manualClusterExecutionTaskCreateHandler get top video cost={}ms", topCostMs);
         Map<String, List<String>> categoryVideoMap = new HashMap<>();
-        //数据处理
+        // 数据处理
         for (Record record : records) {
             try {
                 String videoId = record.getString("videoid");
@@ -432,19 +520,19 @@ public class ExecutionTaskServiceImpl implements ExecutionTaskService {
             return;
         }
         XxlJobLogger.log("categoryVideoMap size={}", categoryVideoMap.size());
-        //创建解构任务
+        // 创建解构任务
         int count = 0;
         for (Map.Entry<String, List<String>> entry : categoryVideoMap.entrySet()) {
             try {
                 String category = entry.getKey();
                 List<String> categoryVideoIds = entry.getValue();
-                //需要达到阈值
+                // 需要达到阈值
                 if (Objects.isNull(categoryVideoIds) || categoryVideoIds.isEmpty()
                         || categoryVideoIds.size() < minThreshold) {
                     XxlJobLogger.log("categoryVideoIds is empty, category = {}", category);
                     continue;
                 }
-                //最大数量限制
+                // 最大数量限制
                 if (categoryVideoIds.size() > maxLimit) {
                     categoryVideoIds = categoryVideoIds.subList(0, maxLimit);
                 }
@@ -453,11 +541,11 @@ public class ExecutionTaskServiceImpl implements ExecutionTaskService {
                 sdExecutionTask.setTaskType(TaskTypeEnum.CLUSTER.getValue());
                 sdExecutionTask.setTaskStatus(ExecutionTaskStatusEnum.INIT.getValue());
                 sdExecutionTask.setContentType(ContentTypeEnum.VIDEO.getValue());
-                //属性设置
+                // 属性设置
                 SdExecutionTaskPropertiesDTO propertiesDTO = new SdExecutionTaskPropertiesDTO();
                 propertiesDTO.setContentScope(contentScope);
                 sdExecutionTask.setProperties(JSONObject.toJSONString(propertiesDTO));
-                //关联内容:多条
+                // 关联内容:多条
                 List<SdExecutionTaskContent> contentList = new ArrayList<>();
                 for (String videoId : categoryVideoIds) {
                     SdExecutionTaskContent sdExecutionTaskContent = new SdExecutionTaskContent();

+ 21 - 0
supply-demand-engine-job/src/main/java/com/tzld/piaoquan/sde/job/ExecutionTaskJob.java

@@ -79,6 +79,27 @@ public class ExecutionTaskJob {
         return ReturnT.SUCCESS;
     }
 
+    /**
+     * 昨日回流Top内容定时解构
+     *
+     * @param params
+     * @return
+     */
+    @XxlJob("topReturnVideoExecutionTaskCreateHandler")
+    public ReturnT<String> topReturnVideoExecutionTaskCreateHandler(String params) {
+        XxlJobLogger.log("returnVideoExecutionTaskCreateHandler start");
+        try {
+            executionTaskService.topReturnVideoExecutionTaskCreateHandler(params);
+        } catch (Exception e) {
+            log.error("topReturnVideoExecutionTaskCreateHandler error", e);
+            XxlJobLogger.log("topReturnVideoExecutionTaskCreateHandler error", e);
+            return ReturnT.FAIL;
+        } finally {
+            XxlJobLogger.log("topReturnVideoExecutionTaskCreateHandler end");
+        }
+        return ReturnT.SUCCESS;
+    }
+
     /**
      * 手动创建聚类任务
      *