Browse Source

增加重试

supeng 2 tháng trước cách đây
mục cha
commit
1e77172c36

+ 36 - 12
content-understanding-core/src/main/java/com/tzld/piaoquan/content/understanding/service/impl/DeepSeekGenerateContentAction.java

@@ -10,6 +10,7 @@ import com.tzld.piaoquan.content.understanding.model.param.GeminiParam;
 import com.tzld.piaoquan.content.understanding.service.Action;
 import com.tzld.piaoquan.content.understanding.util.HttpClientUtil;
 import com.tzld.piaoquan.content.understanding.util.HttpPoolClient;
+import com.tzld.piaoquan.content.understanding.util.RetryUtil;
 import lombok.extern.slf4j.Slf4j;
 import org.springframework.beans.factory.annotation.Value;
 import org.springframework.stereotype.Service;
@@ -54,18 +55,41 @@ public class DeepSeekGenerateContentAction implements Action {
         deepSeekParam.setTemperature(TEMPERATURE);
         //reasonser模型不支持 json format
 //        deepSeekParam.setResponseFormat(ResponseFormatTypeEnum.JSON.getValue());
-        log.info("deepSeekGenerateContentAction deepSeekParam = {}", JSON.toJSONString(deepSeekParam));
-        Optional<String> optionalS = httpPoolClient.postJson(deepSeekApiUrl, JSON.toJSONString(deepSeekParam));
-        log.info("deepSeekGenerateContentAction optionalS = {}", optionalS);
-        if (optionalS.isPresent()) {
-            CommonResponse<Map<String, Object>> commonResponse = JSON.parseObject(optionalS.get(), CommonResponse.class);
-            if (commonResponse.isSuccess() && Objects.nonNull(commonResponse.getData()) && Objects.nonNull(commonResponse.getData().get("result"))) {
-                String content = commonResponse.getData().get("result").toString();
-                //过滤一些特殊格式
-                content = content.replace("```json", "").replace("```", "");
-                return content;
+        return RetryUtil.executeWithRetry(()-> {
+            try {
+                log.info("deepSeekGenerateContentAction deepSeekParam = {}", JSON.toJSONString(deepSeekParam));
+                Optional<String> optionalS = httpPoolClient.postJson(deepSeekApiUrl, JSON.toJSONString(deepSeekParam));
+                log.info("deepSeekGenerateContentAction optionalS = {}", optionalS);
+                if (optionalS.isPresent()) {
+                    CommonResponse<Map<String, Object>> commonResponse = JSON.parseObject(optionalS.get(), CommonResponse.class);
+                    if (commonResponse.isSuccess() && Objects.nonNull(commonResponse.getData()) && Objects.nonNull(commonResponse.getData().get("result"))) {
+                        String content = commonResponse.getData().get("result").toString();
+                        //过滤一些特殊格式
+                        content = content.replace("```json", "").replace("```", "");
+                        return content;
+                    } else {
+                        throw new RuntimeException("deepSeekGenerateContentAction request failure geminiParam = " + deepSeekParam);
+                    }
+                } else {
+                    throw new RuntimeException("deepSeekGenerateContentAction optionalS is null geminiParam = " + deepSeekParam);
+                }
+            } catch (Exception e) {
+                log.error("deepSeekGenerateContentAction error deepSeekParam = {}", deepSeekParam);
+                throw new RuntimeException("deepSeekGenerateContentAction request error", e);
             }
-        }
-        return null;
+        }, "deepSeekGenerateContentAction");
+//        log.info("deepSeekGenerateContentAction deepSeekParam = {}", JSON.toJSONString(deepSeekParam));
+//        Optional<String> optionalS = httpPoolClient.postJson(deepSeekApiUrl, JSON.toJSONString(deepSeekParam));
+//        log.info("deepSeekGenerateContentAction optionalS = {}", optionalS);
+//        if (optionalS.isPresent()) {
+//            CommonResponse<Map<String, Object>> commonResponse = JSON.parseObject(optionalS.get(), CommonResponse.class);
+//            if (commonResponse.isSuccess() && Objects.nonNull(commonResponse.getData()) && Objects.nonNull(commonResponse.getData().get("result"))) {
+//                String content = commonResponse.getData().get("result").toString();
+//                //过滤一些特殊格式
+//                content = content.replace("```json", "").replace("```", "");
+//                return content;
+//            }
+//        }
+//        return null;
     }
 }

+ 40 - 12
content-understanding-core/src/main/java/com/tzld/piaoquan/content/understanding/service/impl/GeminiGenerateContentAction.java

@@ -8,6 +8,7 @@ import com.tzld.piaoquan.content.understanding.model.param.GeminiParam;
 import com.tzld.piaoquan.content.understanding.service.Action;
 import com.tzld.piaoquan.content.understanding.util.HttpClientUtil;
 import com.tzld.piaoquan.content.understanding.util.HttpPoolClient;
+import com.tzld.piaoquan.content.understanding.util.RetryUtil;
 import lombok.extern.slf4j.Slf4j;
 import org.springframework.beans.factory.annotation.Value;
 import org.springframework.stereotype.Service;
@@ -53,18 +54,45 @@ public class GeminiGenerateContentAction implements Action {
         geminiParam.setApiKey(apiKey);
         geminiParam.setModel(MODEL);
         geminiParam.setTemperature(TEMPERATURE);
-        log.info("geminiGenerateContentAction geminiParam = {}", JSON.toJSONString(geminiParam));
-        Optional<String> optionalS = httpPoolClient.postJson(geminiApiUrl, JSON.toJSONString(geminiParam));
-        log.info("geminiGenerateContentAction optionalS = {}", optionalS);
-        if (optionalS.isPresent()) {
-            CommonResponse<Map<String, Object>> commonResponse = JSON.parseObject(optionalS.get(), CommonResponse.class);
-            if (commonResponse.isSuccess() && Objects.nonNull(commonResponse.getData()) && Objects.nonNull(commonResponse.getData().get("result"))) {
-                String content = commonResponse.getData().get("result").toString();
-                //过滤一些特殊格式
-                content = content.replace("```json", "").replace("```", "");
-                return content;
+        return RetryUtil.executeWithRetry(() -> {
+            try {
+                log.info("geminiGenerateContentAction geminiParam = {}", JSON.toJSONString(geminiParam));
+                Optional<String> optionalS = httpPoolClient.postJson(geminiApiUrl, JSON.toJSONString(geminiParam));
+                log.info("geminiGenerateContentAction optionalS = {}", optionalS);
+                if (optionalS.isPresent()) {
+                    CommonResponse<Map<String, Object>> commonResponse = JSON.parseObject(optionalS.get(), CommonResponse.class);
+                    if (commonResponse.isSuccess() && Objects.nonNull(commonResponse.getData()) && Objects.nonNull(commonResponse.getData().get("result"))) {
+                        String content = commonResponse.getData().get("result").toString();
+                        //过滤一些特殊格式
+                        content = content.replace("```json", "").replace("```", "");
+                        return content;
+                    } else {
+                        throw new RuntimeException("geminiGenerateContentAction request failure geminiParam = " + JSON.toJSONString(geminiParam));
+                    }
+                } else {
+                    throw new RuntimeException("geminiGenerateContentAction optionalS is null geminiParam = " + JSON.toJSONString(geminiParam));
+                }
+            } catch (Exception e) {
+                log.error("geminiGenerateContentAction error param = {}", e, JSON.toJSONString(geminiParam));
+                throw new RuntimeException("geminiGenerateContentAction request error", e);
             }
-        }
-        return null;
+        }, "geminiGenerateContentAction");
+//        try {
+//            log.info("geminiGenerateContentAction geminiParam = {}", JSON.toJSONString(geminiParam));
+//            Optional<String> optionalS = httpPoolClient.postJson(geminiApiUrl, JSON.toJSONString(geminiParam));
+//            log.info("geminiGenerateContentAction optionalS = {}", optionalS);
+//            if (optionalS.isPresent()) {
+//                CommonResponse<Map<String, Object>> commonResponse = JSON.parseObject(optionalS.get(), CommonResponse.class);
+//                if (commonResponse.isSuccess() && Objects.nonNull(commonResponse.getData()) && Objects.nonNull(commonResponse.getData().get("result"))) {
+//                    String content = commonResponse.getData().get("result").toString();
+//                    //过滤一些特殊格式
+//                    content = content.replace("```json", "").replace("```", "");
+//                    return content;
+//                }
+//            }
+//        } catch (Exception e) {
+//            log.error("geminiGenerateContentAction error param = {}", e, JSON.toJSONString(geminiParam));
+//        }
+//        return null;
     }
 }

+ 3 - 3
content-understanding-core/src/main/java/com/tzld/piaoquan/content/understanding/service/impl/PipelineServiceImpl.java

@@ -256,10 +256,10 @@ public class PipelineServiceImpl implements PipelineService {
                         CuTaskExample example = new CuTaskExample();
                         example.createCriteria().andTaskIdEqualTo(taskId);
                         CuTask cuTask = new CuTask();
-                        cuTask.setOutput(JSON.toJSONString(resultMap));
+                        cuTask.setOutput(result);
                         cuTask.setTaskStatus(TaskStatusEnum.SUCCESS.getValue());
-                        int insert = cuTaskMapper.updateByExampleSelective(cuTask, example);
-                        if (insert <= 0) {
+                        int update = cuTaskMapper.updateByExampleSelective(cuTask, example);
+                        if (update <= 0) {
                             log.error("step execute error step = {}, result = {} insert = {}", JSON.toJSONString(step), result, insert);
                         }
                     } catch (Exception e) {

+ 33 - 33
content-understanding-core/src/main/java/com/tzld/piaoquan/content/understanding/util/RetryUtil.java

@@ -1,52 +1,52 @@
 package com.tzld.piaoquan.content.understanding.util;
 
+import lombok.extern.slf4j.Slf4j;
+
+import java.util.concurrent.TimeUnit;
 import java.util.function.LongSupplier;
 import java.util.function.Predicate;
 import java.util.function.Supplier;
 
+@Slf4j
 public class RetryUtil {
 
+    private static final Integer MAX_RETRIES = 3;
+
     /**
-     * 重试执行操作,直到成功或达到最大重试次数。
+     * 重试
      *
-     * @param operation      需要执行的操作,返回操作结果。
-     * @param successPredicate 判断操作结果是否成功的断言。
-     * @param maxRetries     最大重试次数。
-     * @param operationName  操作名称,用于日志记录。
-     * @param <T>            操作结果的类型。
-     * @return 操作成功的结果,如果所有重试都失败则返回 null。
+     * @param action
+     * @param actionName
+     * @param <T>
+     * @return
      */
-    public static <T> T retry(Supplier<T> operation, Predicate<T> successPredicate, int maxRetries, String operationName) {
-        T result = null;
-        int attempts = 0;
+    public static <T> T executeWithRetry(Supplier<T> action, String actionName) {
+        return executeWithRetry(action, MAX_RETRIES, actionName);
+    }
 
-        while (attempts <= maxRetries) {
-            attempts++;
+    /**
+     * 重试
+     *
+     * @param action
+     * @param maxRetries
+     * @param actionName
+     * @param <T>
+     * @return
+     */
+    public static <T> T executeWithRetry(Supplier<T> action, Integer maxRetries, String actionName) {
+        for (int i = 1; i <= maxRetries; i++) {
             try {
-                result = operation.get();
-                if (successPredicate.test(result)) {
-                    return result;
-                } else {
-                    if (attempts <= maxRetries) {
-                        // 可以添加适当的重试间隔,例如 Thread.sleep(1000);
-                    }
-                }
+                return action.get();
             } catch (Exception e) {
-                if (attempts <= maxRetries) {
-                    // 可以添加适当的重试间隔,例如 Thread.sleep(1000);
+                log.error("{} failed on attempt {}: {}", actionName, i, e.getMessage(), e);
+                if (i < MAX_RETRIES) {
+                    log.info("{} will be retried (attempt {}/{})", actionName, i + 1, MAX_RETRIES);
+                } else {
+                    log.error("{} failed after {} retries due to non-IO error.", actionName, MAX_RETRIES);
+                    throw new RuntimeException(actionName + " failed after multiple retries due to error.", e);
                 }
             }
         }
-
-        return null; // 返回 null 表示所有重试都失败
-    }
-    public static void main(String[] args) {
-        // 封装带重试的插入操作
-        RetryUtil.retry(
-                () -> 1,
-                insertCount -> insertCount > 0, // 判断插入影响行数是否大于 0
-                3,
-                "插入 CuTask"
-        );
+        return null;
     }
 }