|
@@ -5,6 +5,7 @@ import com.aliyun.odps.data.Record;
|
|
|
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
|
|
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
|
|
|
import com.baomidou.mybatisplus.core.toolkit.Wrappers;
|
|
import com.baomidou.mybatisplus.core.toolkit.Wrappers;
|
|
|
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
|
|
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
|
|
|
|
|
+import com.google.common.collect.Sets;
|
|
|
import com.tzld.piaoquan.sde.common.api.CommonRequest;
|
|
import com.tzld.piaoquan.sde.common.api.CommonRequest;
|
|
|
import com.tzld.piaoquan.sde.common.enums.*;
|
|
import com.tzld.piaoquan.sde.common.enums.*;
|
|
|
import com.tzld.piaoquan.sde.common.enums.deconstruction.DeconstructionTaskStatusEnum;
|
|
import com.tzld.piaoquan.sde.common.enums.deconstruction.DeconstructionTaskStatusEnum;
|
|
@@ -14,6 +15,7 @@ import com.tzld.piaoquan.sde.mapper.*;
|
|
|
import com.tzld.piaoquan.sde.model.dto.ContentInputParamsDTO;
|
|
import com.tzld.piaoquan.sde.model.dto.ContentInputParamsDTO;
|
|
|
import com.tzld.piaoquan.sde.model.dto.SdExecutionTaskPropertiesDTO;
|
|
import com.tzld.piaoquan.sde.model.dto.SdExecutionTaskPropertiesDTO;
|
|
|
import com.tzld.piaoquan.sde.model.dto.cluster.ManualClusterExecutionConfigDTO;
|
|
import com.tzld.piaoquan.sde.model.dto.cluster.ManualClusterExecutionConfigDTO;
|
|
|
|
|
+import com.tzld.piaoquan.sde.model.dto.deconstruction.DeconstructionODPSRecordInfo;
|
|
|
import com.tzld.piaoquan.sde.model.dto.deconstruction.QueryResponseDataDTO;
|
|
import com.tzld.piaoquan.sde.model.dto.deconstruction.QueryResponseDataDTO;
|
|
|
import com.tzld.piaoquan.sde.model.dto.task.XxlJobParamDto;
|
|
import com.tzld.piaoquan.sde.model.dto.task.XxlJobParamDto;
|
|
|
import com.tzld.piaoquan.sde.model.entity.ContentProfile;
|
|
import com.tzld.piaoquan.sde.model.entity.ContentProfile;
|
|
@@ -26,6 +28,7 @@ import com.tzld.piaoquan.sde.model.vo.SdExecutionTaskVO;
|
|
|
import com.tzld.piaoquan.sde.service.ContentProfileService;
|
|
import com.tzld.piaoquan.sde.service.ContentProfileService;
|
|
|
import com.tzld.piaoquan.sde.service.ExecutionTaskCreateService;
|
|
import com.tzld.piaoquan.sde.service.ExecutionTaskCreateService;
|
|
|
import com.tzld.piaoquan.sde.service.ExecutionTaskService;
|
|
import com.tzld.piaoquan.sde.service.ExecutionTaskService;
|
|
|
|
|
+import com.tzld.piaoquan.sde.util.CoverUtil;
|
|
|
import com.tzld.piaoquan.sde.util.DateUtil;
|
|
import com.tzld.piaoquan.sde.util.DateUtil;
|
|
|
import com.tzld.piaoquan.sde.util.IdGeneratorUtil;
|
|
import com.tzld.piaoquan.sde.util.IdGeneratorUtil;
|
|
|
import com.tzld.piaoquan.sde.util.OdpsManager;
|
|
import com.tzld.piaoquan.sde.util.OdpsManager;
|
|
@@ -196,6 +199,9 @@ public class ExecutionTaskServiceImpl implements ExecutionTaskService {
|
|
|
update.setTaskStatus(ExecutionTaskStatusEnum.FAILED.getValue());
|
|
update.setTaskStatus(ExecutionTaskStatusEnum.FAILED.getValue());
|
|
|
int rows = sdExecutionTaskMapper.updateById(update);
|
|
int rows = sdExecutionTaskMapper.updateById(update);
|
|
|
log.info("executionTask submit failure, id:{} rows = {}", sdExecutionTask.getId(), rows);
|
|
log.info("executionTask submit failure, id:{} rows = {}", sdExecutionTask.getId(), rows);
|
|
|
|
|
+
|
|
|
|
|
+ // 同步更新内容库状态
|
|
|
|
|
+ contentProfileService.updateStatusByExecutionTask(sdExecutionTask, ExecutionTaskStatusEnum.FAILED);
|
|
|
continue;
|
|
continue;
|
|
|
}
|
|
}
|
|
|
SdExecutionTask update = new SdExecutionTask();
|
|
SdExecutionTask update = new SdExecutionTask();
|
|
@@ -215,6 +221,9 @@ public class ExecutionTaskServiceImpl implements ExecutionTaskService {
|
|
|
update.setErrorMsg("任务提交失败:" + e.getMessage());
|
|
update.setErrorMsg("任务提交失败:" + e.getMessage());
|
|
|
update.setTaskStatus(ExecutionTaskStatusEnum.FAILED.getValue());
|
|
update.setTaskStatus(ExecutionTaskStatusEnum.FAILED.getValue());
|
|
|
int rows = sdExecutionTaskMapper.updateById(update);
|
|
int rows = sdExecutionTaskMapper.updateById(update);
|
|
|
|
|
+
|
|
|
|
|
+ // 同步更新内容库状态
|
|
|
|
|
+ contentProfileService.updateStatusByExecutionTask(sdExecutionTask, ExecutionTaskStatusEnum.FAILED);
|
|
|
log.info("executionTask submit failure, id:{} rows = {}", sdExecutionTask.getId(), rows);
|
|
log.info("executionTask submit failure, id:{} rows = {}", sdExecutionTask.getId(), rows);
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
@@ -292,6 +301,12 @@ public class ExecutionTaskServiceImpl implements ExecutionTaskService {
|
|
|
|
|
|
|
|
contentProfileService.deconstructSelectTopicResultSync(sdExecutionTask, queryResponseDataDTO);
|
|
contentProfileService.deconstructSelectTopicResultSync(sdExecutionTask, queryResponseDataDTO);
|
|
|
}
|
|
}
|
|
|
|
|
+
|
|
|
|
|
+ // 任务执行完成之后,更新内容库状态
|
|
|
|
|
+ if (Sets.newHashSet(ExecutionTaskStatusEnum.SUCCESS, ExecutionTaskStatusEnum.FAILED).contains(executionTaskStatusEnum)) {
|
|
|
|
|
+ contentProfileService.updateStatusByExecutionTask(sdExecutionTask, executionTaskStatusEnum);
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
} catch (Exception e) {
|
|
} catch (Exception e) {
|
|
|
log.error("executionTask sync error {}", sdExecutionTask, e);
|
|
log.error("executionTask sync error {}", sdExecutionTask, e);
|
|
|
if (e instanceof HttpServiceException) {
|
|
if (e instanceof HttpServiceException) {
|
|
@@ -302,6 +317,9 @@ public class ExecutionTaskServiceImpl implements ExecutionTaskService {
|
|
|
update.setTaskStatus(ExecutionTaskStatusEnum.FAILED.getValue());
|
|
update.setTaskStatus(ExecutionTaskStatusEnum.FAILED.getValue());
|
|
|
update.setErrorMsg(e.getMessage());
|
|
update.setErrorMsg(e.getMessage());
|
|
|
int rows = sdExecutionTaskMapper.updateById(update);
|
|
int rows = sdExecutionTaskMapper.updateById(update);
|
|
|
|
|
+
|
|
|
|
|
+ contentProfileService.updateStatusByExecutionTask(sdExecutionTask, ExecutionTaskStatusEnum.FAILED);
|
|
|
|
|
+
|
|
|
log.error("executionTask failure, id:{} rows = {}", sdExecutionTask.getId(), rows);
|
|
log.error("executionTask failure, id:{} rows = {}", sdExecutionTask.getId(), rows);
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
@@ -329,7 +347,6 @@ public class ExecutionTaskServiceImpl implements ExecutionTaskService {
|
|
|
if (Objects.isNull(record)) {
|
|
if (Objects.isNull(record)) {
|
|
|
continue;
|
|
continue;
|
|
|
}
|
|
}
|
|
|
- // TODO 待优化,只针对本策略去重
|
|
|
|
|
String videoId = record.getString("videoid");
|
|
String videoId = record.getString("videoid");
|
|
|
int count = sdExecutionTaskContentMapper.countByContentId(ContentTypeEnum.VIDEO.getValue(), videoId);
|
|
int count = sdExecutionTaskContentMapper.countByContentId(ContentTypeEnum.VIDEO.getValue(), videoId);
|
|
|
if (count > 0) {
|
|
if (count > 0) {
|
|
@@ -430,7 +447,7 @@ public class ExecutionTaskServiceImpl implements ExecutionTaskService {
|
|
|
continue;
|
|
continue;
|
|
|
}
|
|
}
|
|
|
String videoId = record.getString("videoid");
|
|
String videoId = record.getString("videoid");
|
|
|
- ContentProfile contentProfile = contentProfileService.findContentProfileByIdAndStage(videoId, ContentProfileStageEnum.DECONSTRUCTION_SELECT_TOPIC);
|
|
|
|
|
|
|
+ ContentProfile contentProfile = contentProfileService.findContentProfileByIdAndStage(videoId, TaskStageEnum.SELECT_TOPIC);
|
|
|
if (Objects.nonNull(contentProfile)) {
|
|
if (Objects.nonNull(contentProfile)) {
|
|
|
continue;
|
|
continue;
|
|
|
}
|
|
}
|
|
@@ -456,14 +473,14 @@ public class ExecutionTaskServiceImpl implements ExecutionTaskService {
|
|
|
ContentProfile insertContentProfile = new ContentProfile();
|
|
ContentProfile insertContentProfile = new ContentProfile();
|
|
|
insertContentProfile.setContentId(videoId);
|
|
insertContentProfile.setContentId(videoId);
|
|
|
insertContentProfile.setContentType(ContentTypeEnum.VIDEO.getValue());
|
|
insertContentProfile.setContentType(ContentTypeEnum.VIDEO.getValue());
|
|
|
- insertContentProfile.setStage(ContentProfileStageEnum.DECONSTRUCTION_SELECT_TOPIC.getValue());
|
|
|
|
|
|
|
+ insertContentProfile.setStage(TaskStageEnum.SELECT_TOPIC.getValue());
|
|
|
contentProfileService.insertOrUpdate(insertContentProfile);
|
|
contentProfileService.insertOrUpdate(insertContentProfile);
|
|
|
} catch (Exception e) {
|
|
} catch (Exception e) {
|
|
|
log.error("historyTopReturnVideoExecutionTaskCreateHandler error {}", record, e);
|
|
log.error("historyTopReturnVideoExecutionTaskCreateHandler error {}", record, e);
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
|
LambdaQueryWrapper<ContentProfile> wrapper = new LambdaQueryWrapper<>();
|
|
LambdaQueryWrapper<ContentProfile> wrapper = new LambdaQueryWrapper<>();
|
|
|
- wrapper.eq(ContentProfile::getStage, ContentProfileStageEnum.DECONSTRUCTION_SELECT_TOPIC.getValue());
|
|
|
|
|
|
|
+ wrapper.eq(ContentProfile::getStage, TaskStageEnum.SELECT_TOPIC.getValue());
|
|
|
wrapper.eq(ContentProfile::getContentType, ContentTypeEnum.VIDEO.getValue());
|
|
wrapper.eq(ContentProfile::getContentType, ContentTypeEnum.VIDEO.getValue());
|
|
|
wrapper.eq(ContentProfile::getIsDeleted, IsDeleteEnum.NORMAL.getValue());
|
|
wrapper.eq(ContentProfile::getIsDeleted, IsDeleteEnum.NORMAL.getValue());
|
|
|
long count = contentProfileMapper.selectCount(wrapper);
|
|
long count = contentProfileMapper.selectCount(wrapper);
|
|
@@ -480,62 +497,163 @@ public class ExecutionTaskServiceImpl implements ExecutionTaskService {
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
@Override
|
|
@Override
|
|
|
- public void videoExecutionTaskCreateHandler(String params) {
|
|
|
|
|
- XxlJobParamDto paramJson = JSONObject.parseObject(params, XxlJobParamDto.class);
|
|
|
|
|
|
|
+ public void videoDeconstructionSelectTopicExecutionTaskCreateHandler(String params) {
|
|
|
|
|
+ log.info("videoExecutionTaskCreateHandler start: {}", params);
|
|
|
|
|
+ long start = System.nanoTime();
|
|
|
|
|
|
|
|
|
|
|
|
|
- long start = System.nanoTime();
|
|
|
|
|
- log.info("videoExecutionTaskCreateHandler start: {}", JSONObject.toJSONString(paramJson));
|
|
|
|
|
|
|
+ XxlJobParamDto paramJson = JSONObject.parseObject(params, XxlJobParamDto.class);
|
|
|
|
|
|
|
|
- String table = paramJson.getTableOrDefault(yesterdayReturnVideoTable);
|
|
|
|
|
|
|
+ String table = paramJson.getTable();
|
|
|
String dt = paramJson.getDt();
|
|
String dt = paramJson.getDt();
|
|
|
|
|
+ String contentScope = paramJson.getContentScope();
|
|
|
|
|
+ if (StringUtils.isAnyBlank(table, contentScope)) {
|
|
|
|
|
+ log.error("table and contentScopre must not null");
|
|
|
|
|
+ XxlJobLogger.log("table and contentScopre must not null");
|
|
|
|
|
+ return;
|
|
|
|
|
+ }
|
|
|
if (StringUtils.isBlank(dt)) {
|
|
if (StringUtils.isBlank(dt)) {
|
|
|
- LocalDateTime yesterday = LocalDateTime.now().minusDays(1);
|
|
|
|
|
- dt = DateUtil.formatLocalDateTime(yesterday, "yyyyMMdd");
|
|
|
|
|
|
|
+ dt = DateUtil.formatLocalDateTime(LocalDateTime.now().minusDays(1), "yyyyMMdd");
|
|
|
}
|
|
}
|
|
|
- String sql = "select * from " + table + " where dt='" + dt + "' ORDER BY sort_field DESC;";
|
|
|
|
|
|
|
+
|
|
|
|
|
+ String sql = String.format("select * from %s where dt='%s' ORDER BY sort_field DESC;", table, dt);
|
|
|
List<Record> records = odpsManager.query(sql);
|
|
List<Record> records = odpsManager.query(sql);
|
|
|
- if (Objects.isNull(records) || records.isEmpty()) {
|
|
|
|
|
|
|
+ if (CollectionUtils.isEmpty(records)) {
|
|
|
log.info("videoExecutionTaskCreateHandler records is empty");
|
|
log.info("videoExecutionTaskCreateHandler records is empty");
|
|
|
|
|
+ XxlJobLogger.log("videoExecutionTaskCreateHandler records is empty");
|
|
|
return;
|
|
return;
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
- String contentScope = paramJson.getContentScopeOrDefault(YESTERDAY_RETURN_TOP10_VIDEO_SCOPE);
|
|
|
|
|
|
|
+ List<DeconstructionODPSRecordInfo> recordInfos = records.stream()
|
|
|
|
|
+ .filter(Objects::nonNull)
|
|
|
|
|
+ .map(CoverUtil::deconstructionODPSCoverRecordInfo)
|
|
|
|
|
+ .collect(Collectors.toList());
|
|
|
|
|
|
|
|
XxlJobLogger.log("videoExecutionTaskCreateHandler records size={}", records.size());
|
|
XxlJobLogger.log("videoExecutionTaskCreateHandler records size={}", records.size());
|
|
|
- List<Record> findRecords = new ArrayList<>();
|
|
|
|
|
- for (Record record : records) {
|
|
|
|
|
- if (Objects.isNull(record)) {
|
|
|
|
|
- continue;
|
|
|
|
|
- }
|
|
|
|
|
- String videoId = record.getString("videoid");
|
|
|
|
|
- // int count = sdExecutionTaskContentMapper.countByContentId(ContentTypeEnum.VIDEO.getValue(), videoId);
|
|
|
|
|
- // if (count > 0) {
|
|
|
|
|
- // continue;
|
|
|
|
|
- // }
|
|
|
|
|
|
|
|
|
|
- ContentProfile contentProfile = contentProfileService.findContentProfileByIdAndStage(videoId, ContentProfileStageEnum.DECONSTRUCTION_SELECT_TOPIC);
|
|
|
|
|
|
|
+ int count = this.deconstructionTaskCreateHandler(recordInfos, contentScope, TaskStageEnum.SELECT_TOPIC);
|
|
|
|
|
+
|
|
|
|
|
+ long costMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
|
|
|
|
|
+ log.info("videoExecutionTaskCreateHandler recordSize = {} count= {} finish cost = {}ms", records.size(), count, costMs);
|
|
|
|
|
+
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ @Override
|
|
|
|
|
+ public void videoDeconstructionContentCreationExecutionTaskCreateHandler(String params) {
|
|
|
|
|
+ log.info("videoDeconstructionContentCreationExecutionTaskCreateHandler start: {}", params);
|
|
|
|
|
+ long start = System.nanoTime();
|
|
|
|
|
+
|
|
|
|
|
+ XxlJobParamDto paramJson = JSONObject.parseObject(params, XxlJobParamDto.class);
|
|
|
|
|
+ String table = paramJson.getTable();
|
|
|
|
|
+ String dt = paramJson.getDt();
|
|
|
|
|
+ String contentScope = paramJson.getContentScope();
|
|
|
|
|
+ if (StringUtils.isAnyBlank(table, contentScope)) {
|
|
|
|
|
+ log.error("table and contentScopre must not null");
|
|
|
|
|
+ XxlJobLogger.log("table and contentScopre must not null");
|
|
|
|
|
+ return;
|
|
|
|
|
+ }
|
|
|
|
|
+ if (StringUtils.isBlank(dt)) {
|
|
|
|
|
+ dt = DateUtil.formatLocalDateTime(LocalDateTime.now().minusDays(1), "yyyyMMdd");
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ String sql = String.format("select * from %s where dt='%s' ORDER BY sort_field DESC;", table, dt);
|
|
|
|
|
+ List<Record> records = odpsManager.query(sql);
|
|
|
|
|
+ if (CollectionUtils.isEmpty(records)) {
|
|
|
|
|
+ log.info("videoDeconstructionContentCreationExecutionTaskCreateHandler records is empty");
|
|
|
|
|
+ XxlJobLogger.log("videoDeconstructionContentCreationExecutionTaskCreateHandler records is empty");
|
|
|
|
|
+ return;
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ List<DeconstructionODPSRecordInfo> recordInfos = records.stream()
|
|
|
|
|
+ .filter(Objects::nonNull)
|
|
|
|
|
+ .map(CoverUtil::deconstructionODPSCoverRecordInfo)
|
|
|
|
|
+ .collect(Collectors.toList());
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+ // 判断前置的解构任务是否都已完成
|
|
|
|
|
+ List<String> videoIds = recordInfos.stream()
|
|
|
|
|
+ .map(DeconstructionODPSRecordInfo::getChannelContentId)
|
|
|
|
|
+ .collect(Collectors.toList());
|
|
|
|
|
+ LambdaQueryWrapper<ContentProfile> queryWrapper = new LambdaQueryWrapper<>();
|
|
|
|
|
+ queryWrapper.in(ContentProfile::getContentId, videoIds)
|
|
|
|
|
+ .eq(ContentProfile::getIsDeleted, IsDeleteEnum.NORMAL.getValue());
|
|
|
|
|
+ List<ContentProfile> contentProfiles = contentProfileMapper.selectList(queryWrapper);
|
|
|
|
|
+
|
|
|
|
|
+ // 存在未解构的视频
|
|
|
|
|
+ if (CollectionUtils.isEmpty(contentProfiles) || recordInfos.size() != contentProfiles.size()) {
|
|
|
|
|
+ XxlJobLogger.log("videoDeconstructionContentCreationExecutionTaskCreateHandler exist not deconstruction video");
|
|
|
|
|
+ return;
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ // 存在未解构完成视频
|
|
|
|
|
+ Set<Integer> taskCompleteStatus = Sets.newHashSet(ExecutionTaskStatusEnum.SUCCESS.getValue(), ExecutionTaskStatusEnum.FAILED.getValue());
|
|
|
|
|
+ long taskNotCompleteCnt = contentProfiles.stream()
|
|
|
|
|
+ .filter(i -> !taskCompleteStatus.contains(i.getStatus()))
|
|
|
|
|
+ .count();
|
|
|
|
|
+ if (taskNotCompleteCnt > 0) {
|
|
|
|
|
+ log.error("videoDeconstructionContentCreationExecutionTaskCreateHandler depend task exist not complete");
|
|
|
|
|
+ XxlJobLogger.log("videoDeconstructionContentCreationExecutionTaskCreateHandler depend task exist not complete");
|
|
|
|
|
+ return;
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ // 获取解构成功,vov top10的视频
|
|
|
|
|
+ Set<String> deconstructionSuccessContentIds = contentProfiles.stream()
|
|
|
|
|
+ .filter(i -> ExecutionTaskStatusEnum.SUBMITTED.getValue().equals(i.getStatus()))
|
|
|
|
|
+ .map(ContentProfile::getContentId)
|
|
|
|
|
+ .collect(Collectors.toSet());
|
|
|
|
|
+
|
|
|
|
|
+ if (CollectionUtils.isEmpty(deconstructionSuccessContentIds)) {
|
|
|
|
|
+ XxlJobLogger.log("videoDeconstructionContentCreationExecutionTaskCreateHandler video all not deconstruction success");
|
|
|
|
|
+ return;
|
|
|
|
|
+ }
|
|
|
|
|
+ log.info("videoDeconstructionContentCreationExecutionTaskCreateHandler deconstruction success video size {}", deconstructionSuccessContentIds.size());
|
|
|
|
|
+
|
|
|
|
|
+ List<DeconstructionODPSRecordInfo> top10RecordInfos = recordInfos.stream()
|
|
|
|
|
+ .filter(i -> deconstructionSuccessContentIds.contains(i.getChannelContentId()))
|
|
|
|
|
+ .sorted(Comparator.comparingDouble(DeconstructionODPSRecordInfo::getVov).reversed())
|
|
|
|
|
+ .limit(10)
|
|
|
|
|
+ .collect(Collectors.toList());
|
|
|
|
|
+ XxlJobLogger.log("videoDeconstructionContentCreationExecutionTaskCreateHandler records size={}", records.size());
|
|
|
|
|
+ int count = this.deconstructionTaskCreateHandler(top10RecordInfos, contentScope, TaskStageEnum.CONTENT_CREATION);
|
|
|
|
|
+
|
|
|
|
|
+ long costMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
|
|
|
|
|
+ log.info("videoDeconstructionContentCreationExecutionTaskCreateHandler recordSize = {} count= {} finish cost = {}ms", records.size(), count, costMs);
|
|
|
|
|
+
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ private int deconstructionTaskCreateHandler(List<DeconstructionODPSRecordInfo> recordInfos, String contentScope, TaskStageEnum taskStage) {
|
|
|
|
|
+ if (CollectionUtils.isEmpty(recordInfos) || Objects.isNull(taskStage)) {
|
|
|
|
|
+ log.error("deconstructionTaskCreateHandler recordInfos and taskStage must not null");
|
|
|
|
|
+ XxlJobLogger.log("deconstructionTaskCreateHandler recordInfos and taskStage must not null");
|
|
|
|
|
+ return 0;
|
|
|
|
|
+ }
|
|
|
|
|
+ // 全局过滤,已经创建过同stage的视频不再创建
|
|
|
|
|
+ List<DeconstructionODPSRecordInfo> needCreateTaskRecords = new ArrayList<>();
|
|
|
|
|
+ for (DeconstructionODPSRecordInfo record : recordInfos) {
|
|
|
|
|
+ String videoId = record.getChannelContentId();
|
|
|
|
|
+ ContentProfile contentProfile = contentProfileService.findContentProfileByIdAndStage(videoId, taskStage);
|
|
|
if (Objects.nonNull(contentProfile)) {
|
|
if (Objects.nonNull(contentProfile)) {
|
|
|
continue;
|
|
continue;
|
|
|
}
|
|
}
|
|
|
-
|
|
|
|
|
- findRecords.add(record);
|
|
|
|
|
|
|
+ needCreateTaskRecords.add(record);
|
|
|
}
|
|
}
|
|
|
- if (findRecords.isEmpty()) {
|
|
|
|
|
- XxlJobLogger.log("videoExecutionTaskCreateHandler findRecords is empty");
|
|
|
|
|
- return;
|
|
|
|
|
|
|
+
|
|
|
|
|
+ if (CollectionUtils.isEmpty(needCreateTaskRecords)) {
|
|
|
|
|
+ log.info("deconstructionTaskCreateHandler needCreateTaskRecords is empty");
|
|
|
|
|
+ XxlJobLogger.log("deconstructionTaskCreateHandler needCreateTaskRecords is empty");
|
|
|
|
|
+ return 0;
|
|
|
}
|
|
}
|
|
|
- XxlJobLogger.log("videoExecutionTaskCreateHandler findRecords size={}", findRecords.size());
|
|
|
|
|
- long topCostMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
|
|
|
|
|
- log.info("videoExecutionTaskCreateHandler get top video cost={}ms", topCostMs);
|
|
|
|
|
- // 创建解构任务
|
|
|
|
|
|
|
+
|
|
|
|
|
+ XxlJobLogger.log("deconstructionTaskCreateHandler needCreateTaskRecords size={}", needCreateTaskRecords.size());
|
|
|
int count = 0;
|
|
int count = 0;
|
|
|
- for (Record record : findRecords) {
|
|
|
|
|
|
|
+
|
|
|
|
|
+ for (DeconstructionODPSRecordInfo record : recordInfos) {
|
|
|
try {
|
|
try {
|
|
|
- String videoId = record.getString("videoid");
|
|
|
|
|
|
|
+ String videoId = record.getChannelContentId();
|
|
|
SdExecutionTask sdExecutionTask = new SdExecutionTask();
|
|
SdExecutionTask sdExecutionTask = new SdExecutionTask();
|
|
|
sdExecutionTask.setTaskNo(IdGeneratorUtil.generateExecutionTaskNo());
|
|
sdExecutionTask.setTaskNo(IdGeneratorUtil.generateExecutionTaskNo());
|
|
|
sdExecutionTask.setTaskType(TaskTypeEnum.DECONSTRUCT.getValue());
|
|
sdExecutionTask.setTaskType(TaskTypeEnum.DECONSTRUCT.getValue());
|
|
|
|
|
+ sdExecutionTask.setTaskStage(taskStage.getValue());
|
|
|
sdExecutionTask.setTaskStatus(ExecutionTaskStatusEnum.INIT.getValue());
|
|
sdExecutionTask.setTaskStatus(ExecutionTaskStatusEnum.INIT.getValue());
|
|
|
sdExecutionTask.setContentType(ContentTypeEnum.VIDEO.getValue());
|
|
sdExecutionTask.setContentType(ContentTypeEnum.VIDEO.getValue());
|
|
|
// 属性设置
|
|
// 属性设置
|
|
@@ -550,23 +668,22 @@ public class ExecutionTaskServiceImpl implements ExecutionTaskService {
|
|
|
List<SdExecutionTaskContent> contentList = new ArrayList<>();
|
|
List<SdExecutionTaskContent> contentList = new ArrayList<>();
|
|
|
contentList.add(sdExecutionTaskContent);
|
|
contentList.add(sdExecutionTaskContent);
|
|
|
boolean createResult = executionTaskCreateService.create(sdExecutionTask, contentList);
|
|
boolean createResult = executionTaskCreateService.create(sdExecutionTask, contentList);
|
|
|
- log.info("videoExecutionTaskCreateHandler sdExecutionTask create videoId = {} result={}", videoId, createResult);
|
|
|
|
|
|
|
+ log.info("deconstructionTaskCreateHandler sdExecutionTask create videoId = {} result={}", videoId, createResult);
|
|
|
|
|
|
|
|
// 内容库添加数据
|
|
// 内容库添加数据
|
|
|
ContentProfile contentProfile = new ContentProfile();
|
|
ContentProfile contentProfile = new ContentProfile();
|
|
|
contentProfile.setContentId(videoId);
|
|
contentProfile.setContentId(videoId);
|
|
|
contentProfile.setContentType(ContentTypeEnum.VIDEO.getValue());
|
|
contentProfile.setContentType(ContentTypeEnum.VIDEO.getValue());
|
|
|
- contentProfile.setStage(ContentProfileStageEnum.DECONSTRUCTION_SELECT_TOPIC.getValue());
|
|
|
|
|
|
|
+ contentProfile.setStage(taskStage.getValue());
|
|
|
contentProfileService.insertOrUpdate(contentProfile);
|
|
contentProfileService.insertOrUpdate(contentProfile);
|
|
|
|
|
|
|
|
count++;
|
|
count++;
|
|
|
} catch (Exception e) {
|
|
} catch (Exception e) {
|
|
|
- log.error("videoExecutionTaskCreateHandler error {}", record, e);
|
|
|
|
|
|
|
+ log.error("deconstructionTaskCreateHandler error {}", record, e);
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
|
- long costMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
|
|
|
|
|
- log.info("videoExecutionTaskCreateHandler recordSize = {} count = {} finish cost = {}ms", records.size(), count, costMs);
|
|
|
|
|
|
|
|
|
|
|
|
+ return count;
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
@Override
|
|
@Override
|
|
@@ -676,7 +793,7 @@ public class ExecutionTaskServiceImpl implements ExecutionTaskService {
|
|
|
paramJson.put("table", "loghubods.supply_demand_task_video_collect_exp_top");
|
|
paramJson.put("table", "loghubods.supply_demand_task_video_collect_exp_top");
|
|
|
paramJson.put("contentScope", contentScope);
|
|
paramJson.put("contentScope", contentScope);
|
|
|
paramJson.put("dt", dt);
|
|
paramJson.put("dt", dt);
|
|
|
- this.videoExecutionTaskCreateHandler(paramJson.toJSONString());
|
|
|
|
|
|
|
+ this.videoDeconstructionSelectTopicExecutionTaskCreateHandler(paramJson.toJSONString());
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
|
|
|
|