|
@@ -1,6 +1,5 @@
|
|
|
package com.tzld.piaoquan.content.understanding.service.impl;
|
|
|
|
|
|
-import com.alibaba.fastjson.JSON;
|
|
|
import com.aliyun.odps.data.Record;
|
|
|
import com.aliyun.oss.model.ObjectMetadata;
|
|
|
import com.aliyun.oss.model.StorageClass;
|
|
@@ -25,16 +24,10 @@ import com.tzld.piaoquan.content.understanding.service.ContentService;
|
|
|
import com.tzld.piaoquan.content.understanding.service.LoghubService;
|
|
|
import com.tzld.piaoquan.content.understanding.service.ODPSManager;
|
|
|
import com.tzld.piaoquan.content.understanding.service.PipelineService;
|
|
|
-import com.tzld.piaoquan.content.understanding.util.HttpClientUtil;
|
|
|
-import com.tzld.piaoquan.content.understanding.util.HttpPoolClient;
|
|
|
-import com.tzld.piaoquan.content.understanding.util.RetryUtil;
|
|
|
import com.tzld.piaoquan.content.understanding.util.UrlUtil;
|
|
|
import com.xxl.job.core.log.XxlJobLogger;
|
|
|
import lombok.extern.slf4j.Slf4j;
|
|
|
-import org.apache.rocketmq.client.apis.message.Message;
|
|
|
import org.apache.rocketmq.client.apis.message.MessageId;
|
|
|
-import org.apache.rocketmq.client.apis.producer.SendReceipt;
|
|
|
-import org.apache.rocketmq.client.core.RocketMQClientTemplate;
|
|
|
import org.springframework.beans.BeanUtils;
|
|
|
import org.springframework.beans.factory.annotation.Autowired;
|
|
|
import org.springframework.beans.factory.annotation.Value;
|
|
@@ -307,6 +300,44 @@ public class ContentServiceImpl implements ContentService {
|
|
|
XxlJobLogger.log("table = {} execute time = {}", topReturnTable, stopwatch.stop().elapsed(TimeUnit.MILLISECONDS));
|
|
|
}
|
|
|
|
|
|
+ @Override
|
|
|
+ public void historyVideoUnderstandingHandler() {
|
|
|
+ Stopwatch stopwatch = Stopwatch.createStarted();
|
|
|
+ List<Record> newRecommendRecords = odpsManager.tableTunnelQuery("loghubods", topHistoryTable);
|
|
|
+ if (Objects.nonNull(newRecommendRecords) && !newRecommendRecords.isEmpty()) {
|
|
|
+ XxlJobLogger.log("table = {} records size = {}", topHistoryTable, newRecommendRecords.size());
|
|
|
+ for (Record record : newRecommendRecords) {
|
|
|
+ try {
|
|
|
+ Long videoId = record.getBigint("videoid");
|
|
|
+ if (Objects.isNull(videoId)) {
|
|
|
+ XxlJobLogger.log("table = {} videoId 为 null", topHistoryTable);
|
|
|
+ continue;
|
|
|
+ }
|
|
|
+ String videoUrl = getVideoUrl(videoId);
|
|
|
+ if (Objects.isNull(videoUrl)) {
|
|
|
+ XxlJobLogger.log("table = {} videoId = {} videoUrl is null", topHistoryTable, videoId);
|
|
|
+ continue;
|
|
|
+ }
|
|
|
+ SubmitTasksParam submitTasksParam = new SubmitTasksParam();
|
|
|
+ List<ContentDTO> contentDTOList = new ArrayList<>();
|
|
|
+ ContentDTO contentDTO = new ContentDTO();
|
|
|
+ contentDTO.setVideoId(videoId);
|
|
|
+ contentDTO.setContentType(ContentTypeEnum.VIDEO.getValue());
|
|
|
+ contentDTO.setContent(videoUrl);
|
|
|
+ contentDTO.setPipelineId(oldVersionPipelineId);
|
|
|
+ contentDTOList.add(contentDTO);
|
|
|
+ submitTasksParam.setContents(contentDTOList);
|
|
|
+ TaskResultVO taskResultVO = submitTasks(submitTasksParam);
|
|
|
+ } catch (Exception e) {
|
|
|
+ XxlJobLogger.log("record = {} error", record);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ } else {
|
|
|
+ XxlJobLogger.log("table = {} records is empty", topHistoryTable);
|
|
|
+ }
|
|
|
+ XxlJobLogger.log("table = {} execute time = {}", topHistoryTable, stopwatch.stop().elapsed(TimeUnit.MILLISECONDS));
|
|
|
+ }
|
|
|
+
|
|
|
/**
|
|
|
* 获取视频URL
|
|
|
* <p>
|