|
|
@@ -12,8 +12,9 @@ import com.tzld.piaoquan.sde.common.enums.deconstruction.DeconstructionTaskStatu
|
|
|
import com.tzld.piaoquan.sde.common.exception.HttpServiceException;
|
|
|
import com.tzld.piaoquan.sde.integration.ContentDeconstructionClusterClient;
|
|
|
import com.tzld.piaoquan.sde.mapper.*;
|
|
|
+import com.tzld.piaoquan.sde.model.dto.ContentInputParamsDTO;
|
|
|
import com.tzld.piaoquan.sde.model.dto.SdExecutionTaskPropertiesDTO;
|
|
|
-import com.tzld.piaoquan.sde.model.dto.cluster.ClusterExecutionConfigDTO;
|
|
|
+import com.tzld.piaoquan.sde.model.dto.cluster.ManualClusterExecutionConfigDTO;
|
|
|
import com.tzld.piaoquan.sde.model.dto.deconstruction.QueryResponseDataDTO;
|
|
|
import com.tzld.piaoquan.sde.model.entity.SdExecutionTask;
|
|
|
import com.tzld.piaoquan.sde.model.entity.SdExecutionTaskContent;
|
|
|
@@ -397,11 +398,11 @@ public class ExecutionTaskServiceImpl implements ExecutionTaskService {
|
|
|
if (Objects.isNull(record)) {
|
|
|
continue;
|
|
|
}
|
|
|
- String videoId = record.getString("videoid");
|
|
|
- int count = sdExecutionTaskContentMapper.countByContentId(ContentTypeEnum.VIDEO.getValue(), videoId);
|
|
|
- if (count > 0) {
|
|
|
- continue;
|
|
|
- }
|
|
|
+ String videoId = record.getString("videoid");
|
|
|
+ int count = sdExecutionTaskContentMapper.countByContentId(ContentTypeEnum.VIDEO.getValue(), videoId);
|
|
|
+ if (count > 0) {
|
|
|
+ continue;
|
|
|
+ }
|
|
|
findRecords.add(record);
|
|
|
}
|
|
|
if (findRecords.isEmpty()) {
|
|
|
@@ -453,87 +454,36 @@ public class ExecutionTaskServiceImpl implements ExecutionTaskService {
|
|
|
XxlJobLogger.log("manualClusterExecutionTaskCreateHandler params is empty");
|
|
|
return;
|
|
|
}
|
|
|
- ClusterExecutionConfigDTO clusterExecutionConfigDTO = JSONObject.parseObject(params, ClusterExecutionConfigDTO.class);
|
|
|
- if (Objects.isNull(clusterExecutionConfigDTO) || Objects.isNull(clusterExecutionConfigDTO.getCategories())
|
|
|
- || clusterExecutionConfigDTO.getCategories().isEmpty()) {
|
|
|
- XxlJobLogger.log("manualClusterExecutionTaskCreateHandler categories is empty");
|
|
|
+ List<ManualClusterExecutionConfigDTO> configDTOs = JSONObject.parseArray(params, ManualClusterExecutionConfigDTO.class);
|
|
|
+ if (Objects.isNull(configDTOs) || configDTOs.isEmpty()) {
|
|
|
+ XxlJobLogger.log("manualClusterExecutionTaskCreateHandler configDTOs is empty");
|
|
|
return;
|
|
|
}
|
|
|
- List<String> categories = clusterExecutionConfigDTO.getCategories();
|
|
|
- String contentScope = clusterExecutionConfigDTO.getContentScope();
|
|
|
- if (Objects.isNull(contentScope) || contentScope.isEmpty()) {
|
|
|
- // 默认值
|
|
|
- contentScope = MANUAL_SELECT_VIDEO_SCOPE;
|
|
|
- }
|
|
|
- Integer minThreshold = clusterExecutionConfigDTO.getMinThreshold();
|
|
|
- Integer maxLimit = clusterExecutionConfigDTO.getMaxLimit();
|
|
|
- // 统计二级标签类别
|
|
|
- String sql = "SELECT\n" +
|
|
|
- " t1.videoid,\n" +
|
|
|
- " t1.merge_leve2\n" +
|
|
|
- "FROM loghubods.video_merge_tag t1\n" +
|
|
|
- "INNER JOIN (\n" +
|
|
|
- " SELECT\n" +
|
|
|
- " t2.content_id AS videoid,\n" +
|
|
|
- " MAX(t3.id) AS max_task_id\n" +
|
|
|
- " FROM videoods.sd_execution_task_content t2\n" +
|
|
|
- " INNER JOIN videoods.sd_execution_task t3\n" +
|
|
|
- " ON t2.execution_task_id = t3.id\n" +
|
|
|
- " WHERE\n" +
|
|
|
- " t2.content_type = 3\n" +
|
|
|
- " AND t2.is_deleted = 0\n" +
|
|
|
- " AND t3.task_status = 3\n" +
|
|
|
- " AND t3.is_deleted = 0\n" +
|
|
|
- " GROUP BY t2.content_id\n" +
|
|
|
- ") latest\n" +
|
|
|
- " ON t1.videoid = latest.videoid\n" +
|
|
|
- "ORDER BY latest.max_task_id DESC;";
|
|
|
- List<Record> records = odpsManager.query(sql);
|
|
|
- if (Objects.isNull(records) || records.isEmpty()) {
|
|
|
- XxlJobLogger.log("records is empty");
|
|
|
- return;
|
|
|
- }
|
|
|
- XxlJobLogger.log("records size={}", records.size());
|
|
|
- long topCostMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
|
|
|
- log.info("manualClusterExecutionTaskCreateHandler get top video cost={}ms", topCostMs);
|
|
|
- Map<String, List<String>> categoryVideoMap = new HashMap<>();
|
|
|
- // 数据处理
|
|
|
- for (Record record : records) {
|
|
|
+ for (ManualClusterExecutionConfigDTO configDTO : configDTOs) {
|
|
|
try {
|
|
|
- String videoId = record.getString("videoid");
|
|
|
- String category = record.getString("merge_leve2");
|
|
|
- if (categories.contains(category) && videoId != null) {
|
|
|
- List<String> videoIds = categoryVideoMap.get(videoId);
|
|
|
- if (videoIds == null) {
|
|
|
- videoIds = new ArrayList<>();
|
|
|
- }
|
|
|
- videoIds.add(videoId);
|
|
|
- categoryVideoMap.put(category, videoIds);
|
|
|
+ if (Objects.isNull(configDTO) || configDTO.getPatternName() == null || configDTO.getCategory() == null
|
|
|
+ || configDTO.getCategory().isEmpty() || configDTO.getContents() == null || configDTO.getContents().isEmpty()) {
|
|
|
+ XxlJobLogger.log("manualClusterExecutionTaskCreateHandler config invalid = {}", configDTO);
|
|
|
+ continue;
|
|
|
}
|
|
|
- } catch (Exception e) {
|
|
|
- log.error("manualClusterExecutionTaskCreateHandler categoryVideoMap error {}", record, e);
|
|
|
- }
|
|
|
- }
|
|
|
- if (categoryVideoMap.isEmpty()) {
|
|
|
- XxlJobLogger.log("categoryVideoMap is empty");
|
|
|
- return;
|
|
|
- }
|
|
|
- XxlJobLogger.log("categoryVideoMap size={}", categoryVideoMap.size());
|
|
|
- // 创建解构任务
|
|
|
- int count = 0;
|
|
|
- for (Map.Entry<String, List<String>> entry : categoryVideoMap.entrySet()) {
|
|
|
- try {
|
|
|
- String category = entry.getKey();
|
|
|
- List<String> categoryVideoIds = entry.getValue();
|
|
|
+ String contentScope = configDTO.getContentScope();
|
|
|
+ if (Objects.isNull(contentScope) || contentScope.isEmpty()) {
|
|
|
+ // 默认值
|
|
|
+ contentScope = MANUAL_SELECT_VIDEO_SCOPE;
|
|
|
+ }
|
|
|
+ String patternName = configDTO.getPatternName();
|
|
|
+ String category = configDTO.getCategory();
|
|
|
+ List<ManualClusterExecutionConfigDTO.ContentDTO> contentDTOS = configDTO.getContents();
|
|
|
+ Integer minThreshold = configDTO.getMinThreshold();
|
|
|
+ Integer maxLimit = configDTO.getMaxLimit();
|
|
|
// 需要达到阈值
|
|
|
- if (Objects.isNull(categoryVideoIds) || categoryVideoIds.isEmpty()
|
|
|
- || categoryVideoIds.size() < minThreshold) {
|
|
|
- XxlJobLogger.log("categoryVideoIds is empty, category = {}", category);
|
|
|
+ if (contentDTOS.size() < minThreshold) {
|
|
|
+ XxlJobLogger.log("contentDTOS size < {}, category = {}", minThreshold, category);
|
|
|
continue;
|
|
|
}
|
|
|
// 最大数量限制
|
|
|
- if (categoryVideoIds.size() > maxLimit) {
|
|
|
- categoryVideoIds = categoryVideoIds.subList(0, maxLimit);
|
|
|
+ if (contentDTOS.size() > maxLimit) {
|
|
|
+ contentDTOS = contentDTOS.subList(0, maxLimit);
|
|
|
}
|
|
|
SdExecutionTask sdExecutionTask = new SdExecutionTask();
|
|
|
sdExecutionTask.setTaskNo(IdGeneratorUtil.generateExecutionTaskNo());
|
|
|
@@ -543,25 +493,33 @@ public class ExecutionTaskServiceImpl implements ExecutionTaskService {
|
|
|
// 属性设置
|
|
|
SdExecutionTaskPropertiesDTO propertiesDTO = new SdExecutionTaskPropertiesDTO();
|
|
|
propertiesDTO.setContentScope(contentScope);
|
|
|
+ propertiesDTO.setPatternName(patternName);
|
|
|
+ propertiesDTO.setCategory(category);
|
|
|
sdExecutionTask.setProperties(JSONObject.toJSONString(propertiesDTO));
|
|
|
// 关联内容:多条
|
|
|
List<SdExecutionTaskContent> contentList = new ArrayList<>();
|
|
|
- for (String videoId : categoryVideoIds) {
|
|
|
+ for (ManualClusterExecutionConfigDTO.ContentDTO contentDTO : contentDTOS) {
|
|
|
+ if (Objects.isNull(contentDTO) || Objects.isNull(contentDTO.getContentId())) {
|
|
|
+ continue;
|
|
|
+ }
|
|
|
+ String contentId = contentDTO.getContentId();
|
|
|
SdExecutionTaskContent sdExecutionTaskContent = new SdExecutionTaskContent();
|
|
|
sdExecutionTaskContent.setContentType(ContentTypeEnum.VIDEO.getValue());
|
|
|
- sdExecutionTaskContent.setContentId(videoId);
|
|
|
+ sdExecutionTaskContent.setContentId(contentId);
|
|
|
+ ContentInputParamsDTO contentInputParamsDTO = new ContentInputParamsDTO();
|
|
|
+ contentInputParamsDTO.setWeightScore(contentDTO.getWeightScore());
|
|
|
+ sdExecutionTaskContent.setInputParams(JSONObject.toJSONString(contentInputParamsDTO));
|
|
|
sdExecutionTaskContent.setContent(null);
|
|
|
contentList.add(sdExecutionTaskContent);
|
|
|
}
|
|
|
boolean createResult = executionTaskCreateService.create(sdExecutionTask, contentList);
|
|
|
- log.info("manualClusterExecutionTaskCreateHandler sdExecutionTask create result={}", createResult);
|
|
|
- count++;
|
|
|
+ log.info("manualClusterExecutionTaskCreateHandler category = {} sdExecutionTask create result={}", category, createResult);
|
|
|
} catch (Exception e) {
|
|
|
- log.error("manualClusterExecutionTaskCreateHandler error {}", entry, e);
|
|
|
+ log.error("manualClusterExecutionTaskCreateHandler error {}", configDTO, e);
|
|
|
}
|
|
|
}
|
|
|
long costMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
|
|
|
- log.info("manualClusterExecutionTaskCreateHandler recordSize = {} count = {} finish cost = {}ms", records.size(), count, costMs);
|
|
|
+ log.info("manualClusterExecutionTaskCreateHandler finish cost = {}ms", costMs);
|
|
|
}
|
|
|
|
|
|
}
|