supeng 6 月之前
父节点
当前提交
486d8291b1

+ 34 - 19
content-understanding-core/src/main/java/com/tzld/piaoquan/content/understanding/rocketmq/consumer/OldVersionRemainVideoUnderstandingConsumer.java

@@ -1,37 +1,52 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
 package com.tzld.piaoquan.content.understanding.rocketmq.consumer;
 
-
+import com.alibaba.fastjson.JSON;
+import com.alibaba.fastjson.JSONObject;
+import com.alibaba.fastjson.TypeReference;
+import com.tzld.piaoquan.content.understanding.service.ContentService;
+import lombok.extern.slf4j.Slf4j;
 import org.apache.rocketmq.client.annotation.RocketMQMessageListener;
 import org.apache.rocketmq.client.apis.consumer.ConsumeResult;
 import org.apache.rocketmq.client.apis.message.MessageView;
 import org.apache.rocketmq.client.core.RocketMQListener;
+import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Service;
 
+import java.nio.ByteBuffer;
+import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
+import java.util.Map;
+import java.util.Objects;
+
+@Slf4j
 @Service
 @RocketMQMessageListener(endpoints = "${demo.trans.rocketmq.endpoints:}", topic = "${demo.trans.rocketmq.topic:}",
         consumerGroup = "${demo.trans.rocketmq.consumer-group:}", tag = "${demo.trans.rocketmq.tag:}")
 public class OldVersionRemainVideoUnderstandingConsumer implements RocketMQListener {
 
+    @Autowired
+    private ContentService contentService;
+
     @Override
     public ConsumeResult consume(MessageView messageView) {
-        System.out.println("handle my transaction message:" + messageView);
-
+        log.info("consume message = {}", messageView);
+        try {
+            ByteBuffer body = messageView.getBody();
+            Charset charset = StandardCharsets.UTF_8;
+            String message = charset.decode(body).toString();
+            Map<String, Object> messageMap = JSONObject.parseObject(message, new TypeReference<Map<String, Object>>() {});
+            Object videoIdObj = messageMap.get("videoId");
+            if (Objects.nonNull(videoIdObj)) {
+                Long videoId = Long.parseLong(videoIdObj.toString());
+                boolean flag = contentService.remainVideoUnderstandingHandler(videoId);
+                if (!flag) {
+                    return ConsumeResult.FAILURE;
+                }
+            }
+        } catch (Exception e) {
+            log.error("consume error {}", e, messageView);
+            return ConsumeResult.FAILURE;
+        }
         return ConsumeResult.SUCCESS;
     }
 }

+ 7 - 0
content-understanding-core/src/main/java/com/tzld/piaoquan/content/understanding/service/ContentService.java

@@ -57,5 +57,12 @@ public interface ContentService {
      */
     void remainVideoUnderstandingHandler();
 
+    /**
+     * 剩余视频处理
+     * @param videoId
+     * @return
+     */
+    boolean remainVideoUnderstandingHandler(Long videoId);
+
 
 }

+ 31 - 7
content-understanding-core/src/main/java/com/tzld/piaoquan/content/understanding/service/impl/ContentServiceImpl.java

@@ -93,14 +93,9 @@ public class ContentServiceImpl implements ContentService {
     @Value("${video.analyse:overwrite:0}")
     private String overwriteAnalyse;
 
-    @Value("${longvideoapi.getVideoBaseInfoUrl}")
-    private String videoBaseInfoUrl;
-
-    @Value("${video.understanding.old.version.pipelineId}")
+    @Value("${old.version.video.understanding.pipelineId:0}")
     private Long oldVersionPipelineId;
 
-    private static HttpPoolClient httpPoolClient = HttpClientUtil.create(1000, 3000, 50, 200, 1, 1000);
-
     @Autowired
     private AliyunOssManager aliyunOssManager;
 
@@ -300,7 +295,7 @@ public class ContentServiceImpl implements ContentService {
                     contentDTO.setPipelineId(oldVersionPipelineId);
                     contentDTOList.add(contentDTO);
                     submitTasksParam.setContents(contentDTOList);
-                    submitTasks(submitTasksParam);
+                    TaskResultVO taskResultVO = submitTasks(submitTasksParam);
                 } catch (Exception e) {
                     XxlJobLogger.log("record = {} error", record);
                 }
@@ -435,4 +430,33 @@ public class ContentServiceImpl implements ContentService {
     public void remainVideoUnderstandingHandler() {
 
     }
+
+    @Override
+    public boolean remainVideoUnderstandingHandler(Long videoId) {
+        try {
+            String videoUrl = getVideoUrl(videoId);
+            if (Objects.isNull(videoUrl)) {
+                XxlJobLogger.log("table = {} videoId = {} videoUrl is null", newRecommendTable, videoId);
+                return false;
+            }
+            SubmitTasksParam submitTasksParam = new SubmitTasksParam();
+            List<ContentDTO> contentDTOList = new ArrayList<>();
+            ContentDTO contentDTO = new ContentDTO();
+            contentDTO.setVideoId(videoId);
+            contentDTO.setContentType(ContentTypeEnum.VIDEO.getValue());
+            contentDTO.setContent(videoUrl);
+            contentDTO.setPipelineId(oldVersionPipelineId);
+            contentDTOList.add(contentDTO);
+            submitTasksParam.setContents(contentDTOList);
+            TaskResultVO taskResultVO = submitTasks(submitTasksParam);
+            if (Objects.nonNull(taskResultVO) && Objects.nonNull(taskResultVO.getTaskResults())
+                    && taskResultVO.getTaskResults().size() > 0 && Objects.nonNull(taskResultVO.getTaskResults().get(0))
+                    && Objects.nonNull(taskResultVO.getTaskResults().get(0).getTaskId())) {
+                return true;
+            }
+        } catch (Exception e) {
+            log.error("remainVideoUnderstandingHandler error", e);
+        }
+        return false;
+    }
 }

+ 0 - 13
content-understanding-core/src/main/java/com/tzld/piaoquan/content/understanding/service/impl/DeepSeekGenerateContentAction.java

@@ -78,18 +78,5 @@ public class DeepSeekGenerateContentAction implements Action {
                 throw new RuntimeException("deepSeekGenerateContentAction request error", e);
             }
         }, "deepSeekGenerateContentAction");
-//        log.info("deepSeekGenerateContentAction deepSeekParam = {}", JSON.toJSONString(deepSeekParam));
-//        Optional<String> optionalS = httpPoolClient.postJson(deepSeekApiUrl, JSON.toJSONString(deepSeekParam));
-//        log.info("deepSeekGenerateContentAction optionalS = {}", optionalS);
-//        if (optionalS.isPresent()) {
-//            CommonResponse<Map<String, Object>> commonResponse = JSON.parseObject(optionalS.get(), CommonResponse.class);
-//            if (commonResponse.isSuccess() && Objects.nonNull(commonResponse.getData()) && Objects.nonNull(commonResponse.getData().get("result"))) {
-//                String content = commonResponse.getData().get("result").toString();
-//                //过滤一些特殊格式
-//                content = content.replace("```json", "").replace("```", "");
-//                return content;
-//            }
-//        }
-//        return null;
     }
 }

+ 3 - 18
content-understanding-core/src/main/java/com/tzld/piaoquan/content/understanding/service/impl/GeminiGenerateContentAction.java

@@ -37,6 +37,8 @@ public class GeminiGenerateContentAction implements Action {
     private static final String MODEL = "gemini-2.0-flash";
     private static final Double TEMPERATURE = 1.0;
 
+    private static final Integer RETRY_TIMES = 5;
+
     @Override
     public String execute(ActionParam param) {
         Integer type = param.getType();
@@ -76,23 +78,6 @@ public class GeminiGenerateContentAction implements Action {
                 log.error("geminiGenerateContentAction error param = {}", e, JSON.toJSONString(geminiParam));
                 throw new RuntimeException("geminiGenerateContentAction request error", e);
             }
-        }, "geminiGenerateContentAction");
-//        try {
-//            log.info("geminiGenerateContentAction geminiParam = {}", JSON.toJSONString(geminiParam));
-//            Optional<String> optionalS = httpPoolClient.postJson(geminiApiUrl, JSON.toJSONString(geminiParam));
-//            log.info("geminiGenerateContentAction optionalS = {}", optionalS);
-//            if (optionalS.isPresent()) {
-//                CommonResponse<Map<String, Object>> commonResponse = JSON.parseObject(optionalS.get(), CommonResponse.class);
-//                if (commonResponse.isSuccess() && Objects.nonNull(commonResponse.getData()) && Objects.nonNull(commonResponse.getData().get("result"))) {
-//                    String content = commonResponse.getData().get("result").toString();
-//                    //过滤一些特殊格式
-//                    content = content.replace("```json", "").replace("```", "");
-//                    return content;
-//                }
-//            }
-//        } catch (Exception e) {
-//            log.error("geminiGenerateContentAction error param = {}", e, JSON.toJSONString(geminiParam));
-//        }
-//        return null;
+        }, RETRY_TIMES, "geminiGenerateContentAction");
     }
 }