supeng 1 hete
szülő
commit
abe2bdf2d4

+ 2 - 0
supply-demand-engine-core/src/main/java/com/tzld/piaoquan/sde/mapper/SdExecutionTaskContentMapper.java

@@ -10,4 +10,6 @@ import java.util.List;
 public interface SdExecutionTaskContentMapper extends BaseMapper<SdExecutionTaskContent> {
 
     int insertBatchCustom(List<SdExecutionTaskContent> list);
+
+    int countByContentId(Integer contentType, String contentId);
 }

+ 20 - 2
supply-demand-engine-core/src/main/java/com/tzld/piaoquan/sde/service/impl/ExecutionTaskServiceImpl.java

@@ -34,6 +34,7 @@ import java.time.LocalDateTime;
 import java.time.ZoneId;
 import java.util.*;
 import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
 
 /**
  * @author supeng
@@ -291,17 +292,34 @@ public class ExecutionTaskServiceImpl implements ExecutionTaskService {
         //获取昨日Top10视频
         LocalDateTime yesterday = LocalDateTime.now().minusDays(1);
         String dt = DateUtil.formatLocalDateTime(yesterday, "yyyyMMdd");
-        String sql = "select * from " + yesterdayReturnVideoTable + " where dt='" + dt + "' ORDER BY 回流人数 DESC LIMIT " + TOP_N + ";";
+        int topN = TOP_N * 5;
+        String sql = "select * from " + yesterdayReturnVideoTable + " where dt='" + dt + "' ORDER BY 回流人数 DESC LIMIT " + topN + ";";
         List<Record> records = odpsManager.query(sql);
         if (Objects.isNull(records) || records.isEmpty()) {
             log.info("yesterdayTopReturnVideoExecutionTaskCreateHandler records is empty");
             return;
         }
+        List<Record> findRecords = new ArrayList<>();
+        for (Record record : records) {
+            if (Objects.isNull(record)) {
+                continue;
+            }
+            //TODO 待优化,只针对本策略去重
+            String videoId = record.getString("videoid");
+            int count = sdExecutionTaskContentMapper.countByContentId(ContentTypeEnum.VIDEO.getValue(), videoId);
+            if (count > 0) {
+                continue;
+            }
+            findRecords.add(record);
+            if (findRecords.size() >= topN) {
+                break;
+            }
+        }
         long topCostMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
         log.info("yesterdayTopReturnVideoExecutionTaskCreateHandler get top video cost={}ms", topCostMs);
         //创建解构任务
         int count = 0;
-        for (Record record : records) {
+        for (Record record : findRecords) {
             try {
                 String videoId = record.getString("videoid");
                 SdExecutionTask sdExecutionTask = new SdExecutionTask();

+ 8 - 0
supply-demand-engine-core/src/main/resources/mapper/SdExecutionTaskContentMapper.xml

@@ -19,4 +19,12 @@
             )
         </foreach>
     </insert>
+    <select id="countByContentId" resultType="java.lang.Integer">
+        select count(*)
+        from sd_execution_task_content t1
+                 inner join sd_execution_task t2
+                            on t1.execution_task_id = t2.id and t1.is_deleted = 0 and t2.is_deleted = 0 and
+                               t1.content_type = #{contentType}
+                                and t2.content_id = #{contentId};
+    </select>
 </mapper>