|
|
@@ -1,6 +1,7 @@
|
|
|
package com.tzld.piaoquan.sde.service.impl;
|
|
|
|
|
|
import com.alibaba.fastjson.JSONObject;
|
|
|
+import com.alibaba.fastjson.TypeReference;
|
|
|
import com.aliyun.odps.data.Record;
|
|
|
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
|
|
|
import com.baomidou.mybatisplus.core.toolkit.Wrappers;
|
|
|
@@ -27,6 +28,7 @@ import com.tzld.piaoquan.sde.util.IdGeneratorUtil;
|
|
|
import com.tzld.piaoquan.sde.util.OdpsManager;
|
|
|
import com.xxl.job.core.log.XxlJobLogger;
|
|
|
import lombok.extern.slf4j.Slf4j;
|
|
|
+import org.apache.commons.lang3.StringUtils;
|
|
|
import org.springframework.beans.BeanUtils;
|
|
|
import org.springframework.beans.factory.annotation.Autowired;
|
|
|
import org.springframework.beans.factory.annotation.Value;
|
|
|
@@ -154,7 +156,7 @@ public class ExecutionTaskServiceImpl implements ExecutionTaskService {
|
|
|
log.info("there is no executionTask need submit.");
|
|
|
return;
|
|
|
}
|
|
|
- //提交子任务
|
|
|
+ // 提交子任务
|
|
|
for (SdExecutionTask sdExecutionTask : list) {
|
|
|
try {
|
|
|
ExecutionTaskTypeEnum executionTaskTypeEnum = ExecutionTaskTypeEnum.getInstance(sdExecutionTask.getTaskType());
|
|
|
@@ -190,7 +192,7 @@ public class ExecutionTaskServiceImpl implements ExecutionTaskService {
|
|
|
} catch (Exception e) {
|
|
|
log.error("executionTask submit error {}", sdExecutionTask, e);
|
|
|
if (e instanceof HttpServiceException) {
|
|
|
- //http请求异常 不直接失败;下次重试
|
|
|
+ // http请求异常 不直接失败;下次重试
|
|
|
continue;
|
|
|
}
|
|
|
SdExecutionTask update = new SdExecutionTask();
|
|
|
@@ -291,7 +293,7 @@ public class ExecutionTaskServiceImpl implements ExecutionTaskService {
|
|
|
public void yesterdayTopReturnVideoExecutionTaskCreateHandler() {
|
|
|
long start = System.nanoTime();
|
|
|
log.info("yesterdayTopReturnVideoExecutionTaskCreateHandler start");
|
|
|
- //获取昨日Top10视频
|
|
|
+ // 获取昨日Top10视频
|
|
|
LocalDateTime yesterday = LocalDateTime.now().minusDays(1);
|
|
|
String dt = DateUtil.formatLocalDateTime(yesterday, "yyyyMMdd");
|
|
|
int topN = TOP_N * 5;
|
|
|
@@ -307,7 +309,7 @@ public class ExecutionTaskServiceImpl implements ExecutionTaskService {
|
|
|
if (Objects.isNull(record)) {
|
|
|
continue;
|
|
|
}
|
|
|
- //TODO 待优化,只针对本策略去重
|
|
|
+ // TODO 待优化,只针对本策略去重
|
|
|
String videoId = record.getString("videoid");
|
|
|
int count = sdExecutionTaskContentMapper.countByContentId(ContentTypeEnum.VIDEO.getValue(), videoId);
|
|
|
if (count > 0) {
|
|
|
@@ -325,7 +327,7 @@ public class ExecutionTaskServiceImpl implements ExecutionTaskService {
|
|
|
XxlJobLogger.log("yesterdayTopReturnVideoExecutionTaskCreateHandler findRecords size={}", findRecords.size());
|
|
|
long topCostMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
|
|
|
log.info("yesterdayTopReturnVideoExecutionTaskCreateHandler get top video cost={}ms", topCostMs);
|
|
|
- //创建解构任务
|
|
|
+ // 创建解构任务
|
|
|
int count = 0;
|
|
|
for (Record record : findRecords) {
|
|
|
try {
|
|
|
@@ -335,11 +337,11 @@ public class ExecutionTaskServiceImpl implements ExecutionTaskService {
|
|
|
sdExecutionTask.setTaskType(TaskTypeEnum.DECONSTRUCT.getValue());
|
|
|
sdExecutionTask.setTaskStatus(ExecutionTaskStatusEnum.INIT.getValue());
|
|
|
sdExecutionTask.setContentType(ContentTypeEnum.VIDEO.getValue());
|
|
|
- //属性设置
|
|
|
+ // 属性设置
|
|
|
SdExecutionTaskPropertiesDTO propertiesDTO = new SdExecutionTaskPropertiesDTO();
|
|
|
propertiesDTO.setContentScope(YESTERDAY_RETURN_TOP10_VIDEO_SCOPE);
|
|
|
sdExecutionTask.setProperties(JSONObject.toJSONString(propertiesDTO));
|
|
|
- //关联内容
|
|
|
+ // 关联内容
|
|
|
SdExecutionTaskContent sdExecutionTaskContent = new SdExecutionTaskContent();
|
|
|
sdExecutionTaskContent.setContentType(ContentTypeEnum.VIDEO.getValue());
|
|
|
sdExecutionTaskContent.setContentId(videoId);
|
|
|
@@ -357,6 +359,92 @@ public class ExecutionTaskServiceImpl implements ExecutionTaskService {
|
|
|
log.info("yesterdayTopReturnVideoExecutionTaskCreateHandler recordSize = {} count = {} finish cost = {}ms", records.size(), count, costMs);
|
|
|
}
|
|
|
|
|
|
+ @Override
|
|
|
+ public void topReturnVideoExecutionTaskCreateHandler(String params) {
|
|
|
+ Map<String, String> paramJson = new HashMap<>();
|
|
|
+ try {
|
|
|
+ paramJson = JSONObject.parseObject(params, new TypeReference<Map<String, String>>() {
|
|
|
+ });
|
|
|
+ } catch (Exception e) {
|
|
|
+ log.error("parse xxl job param error params: {}", params, e);
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+ long start = System.nanoTime();
|
|
|
+ log.info("topReturnVideoExecutionTaskCreateHandler start");
|
|
|
+
|
|
|
+ String table = paramJson.getOrDefault("table", yesterdayReturnVideoTable);
|
|
|
+ String dt = paramJson.get("dt");
|
|
|
+ if (StringUtils.isBlank(dt)) {
|
|
|
+ LocalDateTime yesterday = LocalDateTime.now().minusDays(1);
|
|
|
+ dt = DateUtil.formatLocalDateTime(yesterday, "yyyyMMdd");
|
|
|
+ }
|
|
|
+ int topN = TOP_N * 5;
|
|
|
+ String sql = "select * from " + table + " where dt='" + dt + "' ORDER BY 回流人数 DESC LIMIT " + topN + ";";
|
|
|
+ List<Record> records = odpsManager.query(sql);
|
|
|
+ if (Objects.isNull(records) || records.isEmpty()) {
|
|
|
+ log.info("topReturnVideoExecutionTaskCreateHandler records is empty");
|
|
|
+ return;
|
|
|
+ }
|
|
|
+
|
|
|
+ String contentScope = paramJson.getOrDefault("contentScope", YESTERDAY_RETURN_TOP10_VIDEO_SCOPE);
|
|
|
+
|
|
|
+ XxlJobLogger.log("topReturnVideoExecutionTaskCreateHandler records size={}", records.size());
|
|
|
+ List<Record> findRecords = new ArrayList<>();
|
|
|
+ for (Record record : records) {
|
|
|
+ if (Objects.isNull(record)) {
|
|
|
+ continue;
|
|
|
+ }
|
|
|
+ String videoId = record.getString("videoid");
|
|
|
+ // int count = sdExecutionTaskContentMapper.countByContentId(ContentTypeEnum.VIDEO.getValue(), videoId);
|
|
|
+ // if (count > 0) {
|
|
|
+ // continue;
|
|
|
+ // }
|
|
|
+ findRecords.add(record);
|
|
|
+ if (findRecords.size() >= TOP_N) {
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ if (findRecords.isEmpty()) {
|
|
|
+ XxlJobLogger.log("topReturnVideoExecutionTaskCreateHandler findRecords is empty");
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ XxlJobLogger.log("topReturnVideoExecutionTaskCreateHandler findRecords size={}", findRecords.size());
|
|
|
+ long topCostMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
|
|
|
+ log.info("topReturnVideoExecutionTaskCreateHandler get top video cost={}ms", topCostMs);
|
|
|
+ // 创建解构任务
|
|
|
+ int count = 0;
|
|
|
+ for (Record record : findRecords) {
|
|
|
+ try {
|
|
|
+ String videoId = record.getString("videoid");
|
|
|
+ SdExecutionTask sdExecutionTask = new SdExecutionTask();
|
|
|
+ sdExecutionTask.setTaskNo(IdGeneratorUtil.generateExecutionTaskNo());
|
|
|
+ sdExecutionTask.setTaskType(TaskTypeEnum.DECONSTRUCT.getValue());
|
|
|
+ sdExecutionTask.setTaskStatus(ExecutionTaskStatusEnum.INIT.getValue());
|
|
|
+ sdExecutionTask.setContentType(ContentTypeEnum.VIDEO.getValue());
|
|
|
+ // 属性设置
|
|
|
+ SdExecutionTaskPropertiesDTO propertiesDTO = new SdExecutionTaskPropertiesDTO();
|
|
|
+ propertiesDTO.setContentScope(contentScope);
|
|
|
+ sdExecutionTask.setProperties(JSONObject.toJSONString(propertiesDTO));
|
|
|
+ // 关联内容
|
|
|
+ SdExecutionTaskContent sdExecutionTaskContent = new SdExecutionTaskContent();
|
|
|
+ sdExecutionTaskContent.setContentType(ContentTypeEnum.VIDEO.getValue());
|
|
|
+ sdExecutionTaskContent.setContentId(videoId);
|
|
|
+ sdExecutionTaskContent.setContent(null);
|
|
|
+ List<SdExecutionTaskContent> contentList = new ArrayList<>();
|
|
|
+ contentList.add(sdExecutionTaskContent);
|
|
|
+ boolean createResult = executionTaskCreateService.create(sdExecutionTask, contentList);
|
|
|
+ log.info("topReturnVideoExecutionTaskCreateHandler sdExecutionTask create videoId = {} result={}", videoId, createResult);
|
|
|
+ count++;
|
|
|
+ } catch (Exception e) {
|
|
|
+ log.error("topReturnVideoExecutionTaskCreateHandler error {}", record, e);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ long costMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
|
|
|
+ log.info("topReturnVideoExecutionTaskCreateHandler recordSize = {} count = {} finish cost = {}ms", records.size(), count, costMs);
|
|
|
+
|
|
|
+ }
|
|
|
+
|
|
|
@Override
|
|
|
public void manualClusterExecutionTaskCreateHandler(String params) {
|
|
|
long start = System.nanoTime();
|
|
|
@@ -375,12 +463,12 @@ public class ExecutionTaskServiceImpl implements ExecutionTaskService {
|
|
|
List<String> categories = clusterExecutionConfigDTO.getCategories();
|
|
|
String contentScope = clusterExecutionConfigDTO.getContentScope();
|
|
|
if (Objects.isNull(contentScope) || contentScope.isEmpty()) {
|
|
|
- //默认值
|
|
|
+ // 默认值
|
|
|
contentScope = MANUAL_SELECT_VIDEO_SCOPE;
|
|
|
}
|
|
|
Integer minThreshold = clusterExecutionConfigDTO.getMinThreshold();
|
|
|
Integer maxLimit = clusterExecutionConfigDTO.getMaxLimit();
|
|
|
- //统计二级标签类别
|
|
|
+ // 统计二级标签类别
|
|
|
String sql = "SELECT\n" +
|
|
|
" t1.videoid,\n" +
|
|
|
" t1.merge_leve2\n" +
|
|
|
@@ -410,7 +498,7 @@ public class ExecutionTaskServiceImpl implements ExecutionTaskService {
|
|
|
long topCostMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
|
|
|
log.info("manualClusterExecutionTaskCreateHandler get top video cost={}ms", topCostMs);
|
|
|
Map<String, List<String>> categoryVideoMap = new HashMap<>();
|
|
|
- //数据处理
|
|
|
+ // 数据处理
|
|
|
for (Record record : records) {
|
|
|
try {
|
|
|
String videoId = record.getString("videoid");
|
|
|
@@ -432,19 +520,19 @@ public class ExecutionTaskServiceImpl implements ExecutionTaskService {
|
|
|
return;
|
|
|
}
|
|
|
XxlJobLogger.log("categoryVideoMap size={}", categoryVideoMap.size());
|
|
|
- //创建解构任务
|
|
|
+ // 创建解构任务
|
|
|
int count = 0;
|
|
|
for (Map.Entry<String, List<String>> entry : categoryVideoMap.entrySet()) {
|
|
|
try {
|
|
|
String category = entry.getKey();
|
|
|
List<String> categoryVideoIds = entry.getValue();
|
|
|
- //需要达到阈值
|
|
|
+ // 需要达到阈值
|
|
|
if (Objects.isNull(categoryVideoIds) || categoryVideoIds.isEmpty()
|
|
|
|| categoryVideoIds.size() < minThreshold) {
|
|
|
XxlJobLogger.log("categoryVideoIds is empty, category = {}", category);
|
|
|
continue;
|
|
|
}
|
|
|
- //最大数量限制
|
|
|
+ // 最大数量限制
|
|
|
if (categoryVideoIds.size() > maxLimit) {
|
|
|
categoryVideoIds = categoryVideoIds.subList(0, maxLimit);
|
|
|
}
|
|
|
@@ -453,11 +541,11 @@ public class ExecutionTaskServiceImpl implements ExecutionTaskService {
|
|
|
sdExecutionTask.setTaskType(TaskTypeEnum.CLUSTER.getValue());
|
|
|
sdExecutionTask.setTaskStatus(ExecutionTaskStatusEnum.INIT.getValue());
|
|
|
sdExecutionTask.setContentType(ContentTypeEnum.VIDEO.getValue());
|
|
|
- //属性设置
|
|
|
+ // 属性设置
|
|
|
SdExecutionTaskPropertiesDTO propertiesDTO = new SdExecutionTaskPropertiesDTO();
|
|
|
propertiesDTO.setContentScope(contentScope);
|
|
|
sdExecutionTask.setProperties(JSONObject.toJSONString(propertiesDTO));
|
|
|
- //关联内容:多条
|
|
|
+ // 关联内容:多条
|
|
|
List<SdExecutionTaskContent> contentList = new ArrayList<>();
|
|
|
for (String videoId : categoryVideoIds) {
|
|
|
SdExecutionTaskContent sdExecutionTaskContent = new SdExecutionTaskContent();
|