|
@@ -12,6 +12,7 @@ import com.tzld.piaoquan.sde.common.exception.HttpServiceException;
|
|
|
import com.tzld.piaoquan.sde.integration.ContentDeconstructionClusterClient;
|
|
import com.tzld.piaoquan.sde.integration.ContentDeconstructionClusterClient;
|
|
|
import com.tzld.piaoquan.sde.mapper.*;
|
|
import com.tzld.piaoquan.sde.mapper.*;
|
|
|
import com.tzld.piaoquan.sde.model.dto.SdExecutionTaskPropertiesDTO;
|
|
import com.tzld.piaoquan.sde.model.dto.SdExecutionTaskPropertiesDTO;
|
|
|
|
|
+import com.tzld.piaoquan.sde.model.dto.cluster.ClusterExecutionConfigDTO;
|
|
|
import com.tzld.piaoquan.sde.model.dto.deconstruction.QueryResponseDataDTO;
|
|
import com.tzld.piaoquan.sde.model.dto.deconstruction.QueryResponseDataDTO;
|
|
|
import com.tzld.piaoquan.sde.model.entity.SdExecutionTask;
|
|
import com.tzld.piaoquan.sde.model.entity.SdExecutionTask;
|
|
|
import com.tzld.piaoquan.sde.model.entity.SdExecutionTaskContent;
|
|
import com.tzld.piaoquan.sde.model.entity.SdExecutionTaskContent;
|
|
@@ -35,7 +36,6 @@ import java.time.LocalDateTime;
|
|
|
import java.time.ZoneId;
|
|
import java.time.ZoneId;
|
|
|
import java.util.*;
|
|
import java.util.*;
|
|
|
import java.util.concurrent.TimeUnit;
|
|
import java.util.concurrent.TimeUnit;
|
|
|
-import java.util.stream.Collectors;
|
|
|
|
|
|
|
|
|
|
/**
|
|
/**
|
|
|
* @author supeng
|
|
* @author supeng
|
|
@@ -58,12 +58,13 @@ public class ExecutionTaskServiceImpl implements ExecutionTaskService {
|
|
|
private SdExecutionTaskContentMapper sdExecutionTaskContentMapper;
|
|
private SdExecutionTaskContentMapper sdExecutionTaskContentMapper;
|
|
|
|
|
|
|
|
private static final String YESTERDAY_RETURN_TOP10_VIDEO_SCOPE = "yesterday_return_top10_video_scope";
|
|
private static final String YESTERDAY_RETURN_TOP10_VIDEO_SCOPE = "yesterday_return_top10_video_scope";
|
|
|
|
|
+ private static final String MANUAL_SELECT_VIDEO_SCOPE = "manual_select_video_scope";
|
|
|
|
|
|
|
|
@Value("${yesterday.return.video.table:loghubods.lastday_return}")
|
|
@Value("${yesterday.return.video.table:loghubods.lastday_return}")
|
|
|
private String yesterdayReturnVideoTable;
|
|
private String yesterdayReturnVideoTable;
|
|
|
|
|
|
|
|
- @Value("${yesterday.return.video.table:loghubods.lastday_return}")
|
|
|
|
|
- private String clusterVideoTable;
|
|
|
|
|
|
|
+// @Value("${video.tag.table:loghubods.video_merge_tag}")
|
|
|
|
|
+// private String videoTagTable;
|
|
|
|
|
|
|
|
private static final Integer TOP_N = 10;
|
|
private static final Integer TOP_N = 10;
|
|
|
|
|
|
|
@@ -138,7 +139,7 @@ public class ExecutionTaskServiceImpl implements ExecutionTaskService {
|
|
|
public void executionTaskSubmitHandler() {
|
|
public void executionTaskSubmitHandler() {
|
|
|
long start = System.nanoTime();
|
|
long start = System.nanoTime();
|
|
|
log.info("executionTaskSubmitHandler start");
|
|
log.info("executionTaskSubmitHandler start");
|
|
|
- // 6 小时前
|
|
|
|
|
|
|
+ // N 小时前
|
|
|
Date hourAgo = Date.from(
|
|
Date hourAgo = Date.from(
|
|
|
LocalDateTime.now()
|
|
LocalDateTime.now()
|
|
|
.minusHours(6)
|
|
.minusHours(6)
|
|
@@ -166,7 +167,7 @@ public class ExecutionTaskServiceImpl implements ExecutionTaskService {
|
|
|
jobId = contentDeconstructionClusterClient.submitDeconstructionTask(sdExecutionTask);
|
|
jobId = contentDeconstructionClusterClient.submitDeconstructionTask(sdExecutionTask);
|
|
|
break;
|
|
break;
|
|
|
case CLUSTER:
|
|
case CLUSTER:
|
|
|
-// jobId = contentDeconstructionClusterClient.submitClusterTask(sdExecutionTask);
|
|
|
|
|
|
|
+ jobId = contentDeconstructionClusterClient.submitClusterTask(sdExecutionTask);
|
|
|
break;
|
|
break;
|
|
|
default:
|
|
default:
|
|
|
log.error("executionTaskTypeEnum is illegal.");
|
|
log.error("executionTaskTypeEnum is illegal.");
|
|
@@ -177,7 +178,7 @@ public class ExecutionTaskServiceImpl implements ExecutionTaskService {
|
|
|
update.setErrorMsg("任务提交失败:未获取到jobId");
|
|
update.setErrorMsg("任务提交失败:未获取到jobId");
|
|
|
update.setTaskStatus(ExecutionTaskStatusEnum.FAILED.getValue());
|
|
update.setTaskStatus(ExecutionTaskStatusEnum.FAILED.getValue());
|
|
|
int rows = sdExecutionTaskMapper.updateById(update);
|
|
int rows = sdExecutionTaskMapper.updateById(update);
|
|
|
- log.info("subTask submit failure, id:{} rows = {}", sdExecutionTask.getId(), rows);
|
|
|
|
|
|
|
+ log.info("executionTask submit failure, id:{} rows = {}", sdExecutionTask.getId(), rows);
|
|
|
continue;
|
|
continue;
|
|
|
}
|
|
}
|
|
|
SdExecutionTask update = new SdExecutionTask();
|
|
SdExecutionTask update = new SdExecutionTask();
|
|
@@ -185,9 +186,9 @@ public class ExecutionTaskServiceImpl implements ExecutionTaskService {
|
|
|
update.setExternalJobId(jobId);
|
|
update.setExternalJobId(jobId);
|
|
|
update.setTaskStatus(ExecutionTaskStatusEnum.SUBMITTED.getValue());
|
|
update.setTaskStatus(ExecutionTaskStatusEnum.SUBMITTED.getValue());
|
|
|
int rows = sdExecutionTaskMapper.updateById(update);
|
|
int rows = sdExecutionTaskMapper.updateById(update);
|
|
|
- log.info("subTask submit success, id:{} rows = {}", sdExecutionTask.getId(), rows);
|
|
|
|
|
|
|
+ log.info("executionTask submit success, id:{} rows = {}", sdExecutionTask.getId(), rows);
|
|
|
} catch (Exception e) {
|
|
} catch (Exception e) {
|
|
|
- log.error("subTask submit error {}", sdExecutionTask, e);
|
|
|
|
|
|
|
+ log.error("executionTask submit error {}", sdExecutionTask, e);
|
|
|
if (e instanceof HttpServiceException) {
|
|
if (e instanceof HttpServiceException) {
|
|
|
//http请求异常 不直接失败;下次重试
|
|
//http请求异常 不直接失败;下次重试
|
|
|
continue;
|
|
continue;
|
|
@@ -197,21 +198,21 @@ public class ExecutionTaskServiceImpl implements ExecutionTaskService {
|
|
|
update.setErrorMsg("任务提交失败:" + e.getMessage());
|
|
update.setErrorMsg("任务提交失败:" + e.getMessage());
|
|
|
update.setTaskStatus(ExecutionTaskStatusEnum.FAILED.getValue());
|
|
update.setTaskStatus(ExecutionTaskStatusEnum.FAILED.getValue());
|
|
|
int rows = sdExecutionTaskMapper.updateById(update);
|
|
int rows = sdExecutionTaskMapper.updateById(update);
|
|
|
- log.info("subTask submit failure, id:{} rows = {}", sdExecutionTask.getId(), rows);
|
|
|
|
|
|
|
+ log.info("executionTask submit failure, id:{} rows = {}", sdExecutionTask.getId(), rows);
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
|
long costMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
|
|
long costMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
|
|
|
- log.info("subTaskExecutionmitHandler finish cost={}ms", costMs);
|
|
|
|
|
|
|
+ log.info("executionTaskSubmitHandler finish cost={}ms", costMs);
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
@Override
|
|
@Override
|
|
|
public void executionTaskSyncHandler() {
|
|
public void executionTaskSyncHandler() {
|
|
|
long start = System.nanoTime();
|
|
long start = System.nanoTime();
|
|
|
log.info("executionTaskSyncHandler start");
|
|
log.info("executionTaskSyncHandler start");
|
|
|
- // 6 小时前
|
|
|
|
|
|
|
+ // N 小时前
|
|
|
Date hourAgo = Date.from(
|
|
Date hourAgo = Date.from(
|
|
|
LocalDateTime.now()
|
|
LocalDateTime.now()
|
|
|
- .minusHours(6)
|
|
|
|
|
|
|
+ .minusHours(24)
|
|
|
.atZone(ZoneId.systemDefault())
|
|
.atZone(ZoneId.systemDefault())
|
|
|
.toInstant());
|
|
.toInstant());
|
|
|
List<Integer> taskStatusList = Arrays.asList(ExecutionTaskStatusEnum.SUBMITTED.getValue(), ExecutionTaskStatusEnum.RUNNING.getValue());
|
|
List<Integer> taskStatusList = Arrays.asList(ExecutionTaskStatusEnum.SUBMITTED.getValue(), ExecutionTaskStatusEnum.RUNNING.getValue());
|
|
@@ -334,9 +335,11 @@ public class ExecutionTaskServiceImpl implements ExecutionTaskService {
|
|
|
sdExecutionTask.setTaskType(TaskTypeEnum.DECONSTRUCT.getValue());
|
|
sdExecutionTask.setTaskType(TaskTypeEnum.DECONSTRUCT.getValue());
|
|
|
sdExecutionTask.setTaskStatus(ExecutionTaskStatusEnum.INIT.getValue());
|
|
sdExecutionTask.setTaskStatus(ExecutionTaskStatusEnum.INIT.getValue());
|
|
|
sdExecutionTask.setContentType(ContentTypeEnum.VIDEO.getValue());
|
|
sdExecutionTask.setContentType(ContentTypeEnum.VIDEO.getValue());
|
|
|
|
|
+ //属性设置
|
|
|
SdExecutionTaskPropertiesDTO propertiesDTO = new SdExecutionTaskPropertiesDTO();
|
|
SdExecutionTaskPropertiesDTO propertiesDTO = new SdExecutionTaskPropertiesDTO();
|
|
|
propertiesDTO.setContentScope(YESTERDAY_RETURN_TOP10_VIDEO_SCOPE);
|
|
propertiesDTO.setContentScope(YESTERDAY_RETURN_TOP10_VIDEO_SCOPE);
|
|
|
sdExecutionTask.setProperties(JSONObject.toJSONString(propertiesDTO));
|
|
sdExecutionTask.setProperties(JSONObject.toJSONString(propertiesDTO));
|
|
|
|
|
+ //关联内容
|
|
|
SdExecutionTaskContent sdExecutionTaskContent = new SdExecutionTaskContent();
|
|
SdExecutionTaskContent sdExecutionTaskContent = new SdExecutionTaskContent();
|
|
|
sdExecutionTaskContent.setContentType(ContentTypeEnum.VIDEO.getValue());
|
|
sdExecutionTaskContent.setContentType(ContentTypeEnum.VIDEO.getValue());
|
|
|
sdExecutionTaskContent.setContentId(videoId);
|
|
sdExecutionTaskContent.setContentId(videoId);
|
|
@@ -355,48 +358,106 @@ public class ExecutionTaskServiceImpl implements ExecutionTaskService {
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
@Override
|
|
@Override
|
|
|
- public void clusterExecutionTaskCreateHandler() {
|
|
|
|
|
|
|
+ public void manualClusterExecutionTaskCreateHandler(String params) {
|
|
|
long start = System.nanoTime();
|
|
long start = System.nanoTime();
|
|
|
- log.info("clusterExecutionTaskCreateHandler start");
|
|
|
|
|
- //获取昨日Top10视频
|
|
|
|
|
- LocalDateTime yesterday = LocalDateTime.now().minusDays(1);
|
|
|
|
|
- String dt = DateUtil.formatLocalDateTime(yesterday, "yyyyMMdd");
|
|
|
|
|
- String sql = "select * from " + clusterVideoTable + " where dt='" + dt + "' ORDER BY 回流人数 DESC LIMIT " + TOP_N + ";";
|
|
|
|
|
|
|
+ log.info("manualClusterExecutionTaskCreateHandler start");
|
|
|
|
|
+ log.info("manualClusterExecutionTaskCreateHandler params={}", params);
|
|
|
|
|
+ if (Objects.isNull(params) || params.isEmpty()) {
|
|
|
|
|
+ XxlJobLogger.log("manualClusterExecutionTaskCreateHandler params is empty");
|
|
|
|
|
+ return;
|
|
|
|
|
+ }
|
|
|
|
|
+ ClusterExecutionConfigDTO clusterExecutionConfigDTO = JSONObject.parseObject(params, ClusterExecutionConfigDTO.class);
|
|
|
|
|
+ if (Objects.isNull(clusterExecutionConfigDTO) || Objects.isNull(clusterExecutionConfigDTO.getCategories())
|
|
|
|
|
+ || clusterExecutionConfigDTO.getCategories().isEmpty()) {
|
|
|
|
|
+ XxlJobLogger.log("manualClusterExecutionTaskCreateHandler categories is empty");
|
|
|
|
|
+ return;
|
|
|
|
|
+ }
|
|
|
|
|
+ List<String> categories = clusterExecutionConfigDTO.getCategories();
|
|
|
|
|
+ String contentScope = clusterExecutionConfigDTO.getContentScope();
|
|
|
|
|
+ if (Objects.isNull(contentScope) || contentScope.isEmpty()) {
|
|
|
|
|
+ //默认值
|
|
|
|
|
+ contentScope = MANUAL_SELECT_VIDEO_SCOPE;
|
|
|
|
|
+ }
|
|
|
|
|
+ Integer minThreshold = clusterExecutionConfigDTO.getMinThreshold();
|
|
|
|
|
+ Integer maxLimit = clusterExecutionConfigDTO.getMaxLimit();
|
|
|
|
|
+ //统计二级标签类别
|
|
|
|
|
+ String sql = "SELECT\n" +
|
|
|
|
|
+ " t1.videoid,\n" +
|
|
|
|
|
+ " t1.merge_leve2\n" +
|
|
|
|
|
+ "FROM loghubods.video_merge_tag t1\n" +
|
|
|
|
|
+ "INNER JOIN (\n" +
|
|
|
|
|
+ " SELECT\n" +
|
|
|
|
|
+ " t2.content_id AS videoid,\n" +
|
|
|
|
|
+ " MAX(t3.id) AS max_task_id\n" +
|
|
|
|
|
+ " FROM videoods.sd_execution_task_content t2\n" +
|
|
|
|
|
+ " INNER JOIN videoods.sd_execution_task t3\n" +
|
|
|
|
|
+ " ON t2.execution_task_id = t3.id\n" +
|
|
|
|
|
+ " WHERE\n" +
|
|
|
|
|
+ " t2.content_type = 3\n" +
|
|
|
|
|
+ " AND t2.is_deleted = 0\n" +
|
|
|
|
|
+ " AND t3.task_status = 3\n" +
|
|
|
|
|
+ " AND t3.is_deleted = 0\n" +
|
|
|
|
|
+ " GROUP BY t2.content_id\n" +
|
|
|
|
|
+ ") latest\n" +
|
|
|
|
|
+ " ON t1.videoid = latest.videoid\n" +
|
|
|
|
|
+ "ORDER BY latest.max_task_id DESC;";
|
|
|
List<Record> records = odpsManager.query(sql);
|
|
List<Record> records = odpsManager.query(sql);
|
|
|
if (Objects.isNull(records) || records.isEmpty()) {
|
|
if (Objects.isNull(records) || records.isEmpty()) {
|
|
|
- log.info("clusterExecutionTaskCreateHandler records is empty");
|
|
|
|
|
|
|
+ XxlJobLogger.log("records is empty");
|
|
|
return;
|
|
return;
|
|
|
}
|
|
}
|
|
|
|
|
+ XxlJobLogger.log("records size={}", records.size());
|
|
|
long topCostMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
|
|
long topCostMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
|
|
|
- log.info("clusterExecutionTaskCreateHandler get top video cost={}ms", topCostMs);
|
|
|
|
|
- //数据处理
|
|
|
|
|
|
|
+ log.info("manualClusterExecutionTaskCreateHandler get top video cost={}ms", topCostMs);
|
|
|
Map<String, List<String>> categoryVideoMap = new HashMap<>();
|
|
Map<String, List<String>> categoryVideoMap = new HashMap<>();
|
|
|
|
|
+ //数据处理
|
|
|
for (Record record : records) {
|
|
for (Record record : records) {
|
|
|
try {
|
|
try {
|
|
|
String videoId = record.getString("videoid");
|
|
String videoId = record.getString("videoid");
|
|
|
- String category = record.getString("category");
|
|
|
|
|
- List<String> categoryVideoIds = categoryVideoMap.get(category);
|
|
|
|
|
- if (categoryVideoIds == null) {
|
|
|
|
|
- categoryVideoIds = new ArrayList<>();
|
|
|
|
|
|
|
+ String category = record.getString("merge_leve2");
|
|
|
|
|
+ if (categories.contains(category) && videoId != null) {
|
|
|
|
|
+ List<String> videoIds = categoryVideoMap.get(videoId);
|
|
|
|
|
+ if (videoIds == null) {
|
|
|
|
|
+ videoIds = new ArrayList<>();
|
|
|
|
|
+ }
|
|
|
|
|
+ videoIds.add(videoId);
|
|
|
|
|
+ categoryVideoMap.put(category, videoIds);
|
|
|
}
|
|
}
|
|
|
- categoryVideoIds.add(videoId);
|
|
|
|
|
- categoryVideoMap.put(videoId, categoryVideoIds);
|
|
|
|
|
} catch (Exception e) {
|
|
} catch (Exception e) {
|
|
|
- log.error("clusterExecutionTaskCreateHandler categoryVideoMap error {}", record, e);
|
|
|
|
|
|
|
+ log.error("manualClusterExecutionTaskCreateHandler categoryVideoMap error {}", record, e);
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
|
|
|
+ if (categoryVideoMap.isEmpty()) {
|
|
|
|
|
+ XxlJobLogger.log("categoryVideoMap is empty");
|
|
|
|
|
+ return;
|
|
|
|
|
+ }
|
|
|
|
|
+ XxlJobLogger.log("categoryVideoMap size={}", categoryVideoMap.size());
|
|
|
//创建解构任务
|
|
//创建解构任务
|
|
|
int count = 0;
|
|
int count = 0;
|
|
|
for (Map.Entry<String, List<String>> entry : categoryVideoMap.entrySet()) {
|
|
for (Map.Entry<String, List<String>> entry : categoryVideoMap.entrySet()) {
|
|
|
try {
|
|
try {
|
|
|
String category = entry.getKey();
|
|
String category = entry.getKey();
|
|
|
List<String> categoryVideoIds = entry.getValue();
|
|
List<String> categoryVideoIds = entry.getValue();
|
|
|
|
|
+ //需要达到阈值
|
|
|
|
|
+ if (Objects.isNull(categoryVideoIds) || categoryVideoIds.isEmpty()
|
|
|
|
|
+ || categoryVideoIds.size() < minThreshold) {
|
|
|
|
|
+ XxlJobLogger.log("categoryVideoIds is empty, category = {}", category);
|
|
|
|
|
+ continue;
|
|
|
|
|
+ }
|
|
|
|
|
+ //最大数量限制
|
|
|
|
|
+ if (categoryVideoIds.size() > maxLimit) {
|
|
|
|
|
+ categoryVideoIds = categoryVideoIds.subList(0, maxLimit);
|
|
|
|
|
+ }
|
|
|
SdExecutionTask sdExecutionTask = new SdExecutionTask();
|
|
SdExecutionTask sdExecutionTask = new SdExecutionTask();
|
|
|
sdExecutionTask.setTaskNo(IdGeneratorUtil.generateExecutionTaskNo());
|
|
sdExecutionTask.setTaskNo(IdGeneratorUtil.generateExecutionTaskNo());
|
|
|
sdExecutionTask.setTaskType(TaskTypeEnum.CLUSTER.getValue());
|
|
sdExecutionTask.setTaskType(TaskTypeEnum.CLUSTER.getValue());
|
|
|
sdExecutionTask.setTaskStatus(ExecutionTaskStatusEnum.INIT.getValue());
|
|
sdExecutionTask.setTaskStatus(ExecutionTaskStatusEnum.INIT.getValue());
|
|
|
sdExecutionTask.setContentType(ContentTypeEnum.VIDEO.getValue());
|
|
sdExecutionTask.setContentType(ContentTypeEnum.VIDEO.getValue());
|
|
|
-// sdExecutionTask.setContentId(videoId);
|
|
|
|
|
|
|
+ //属性设置
|
|
|
|
|
+ SdExecutionTaskPropertiesDTO propertiesDTO = new SdExecutionTaskPropertiesDTO();
|
|
|
|
|
+ propertiesDTO.setContentScope(YESTERDAY_RETURN_TOP10_VIDEO_SCOPE);
|
|
|
|
|
+ sdExecutionTask.setProperties(JSONObject.toJSONString(propertiesDTO));
|
|
|
|
|
+ //关联内容:多条
|
|
|
List<SdExecutionTaskContent> contentList = new ArrayList<>();
|
|
List<SdExecutionTaskContent> contentList = new ArrayList<>();
|
|
|
for (String videoId : categoryVideoIds) {
|
|
for (String videoId : categoryVideoIds) {
|
|
|
SdExecutionTaskContent sdExecutionTaskContent = new SdExecutionTaskContent();
|
|
SdExecutionTaskContent sdExecutionTaskContent = new SdExecutionTaskContent();
|
|
@@ -406,14 +467,14 @@ public class ExecutionTaskServiceImpl implements ExecutionTaskService {
|
|
|
contentList.add(sdExecutionTaskContent);
|
|
contentList.add(sdExecutionTaskContent);
|
|
|
}
|
|
}
|
|
|
boolean createResult = executionTaskCreateService.create(sdExecutionTask, contentList);
|
|
boolean createResult = executionTaskCreateService.create(sdExecutionTask, contentList);
|
|
|
- log.info("clusterExecutionTaskCreateHandler sdExecutionTask create result={}", createResult);
|
|
|
|
|
|
|
+ log.info("manualClusterExecutionTaskCreateHandler sdExecutionTask create result={}", createResult);
|
|
|
count++;
|
|
count++;
|
|
|
} catch (Exception e) {
|
|
} catch (Exception e) {
|
|
|
-
|
|
|
|
|
|
|
+ log.error("manualClusterExecutionTaskCreateHandler error {}", entry, e);
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
|
long costMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
|
|
long costMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
|
|
|
- log.info("clusterExecutionTaskCreateHandler recordSize = {} count = {} finish cost = {}ms", records.size(), count, costMs);
|
|
|
|
|
|
|
+ log.info("manualClusterExecutionTaskCreateHandler recordSize = {} count = {} finish cost = {}ms", records.size(), count, costMs);
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
}
|
|
}
|