Explorar o código

implementing retry strategies for core business logic

ehlxr hai 1 ano
pai
achega
8e6020d2f3

+ 1 - 2
etl-core/src/main/java/com/tzld/crawler/etl/mq/EtlMQConsumer.java

@@ -37,7 +37,6 @@ import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Value;
 import org.springframework.stereotype.Component;
 
-import javax.annotation.PostConstruct;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
@@ -76,7 +75,7 @@ public class EtlMQConsumer {
         this.etlService = etlService;
     }
 
-    @PostConstruct
+    // @PostConstruct
     public void init() {
         mqClient = new MQClient(httpEndpoint, accessKey, secretKey);
         priorityPool = new ThreadPoolExecutor(Runtime.getRuntime().availableProcessors(), Runtime.getRuntime().availableProcessors() * 2, 60L,

+ 77 - 72
etl-core/src/main/java/com/tzld/crawler/etl/service/impl/EtlServiceImpl.java

@@ -52,6 +52,7 @@ import com.tzld.crawler.etl.util.FeishuUtils;
 import com.tzld.crawler.etl.util.FileUtils;
 import com.tzld.crawler.etl.util.MD5Util;
 import com.tzld.crawler.etl.util.VideoUtils;
+import org.apache.commons.lang3.tuple.Pair;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.BeanUtils;
@@ -71,6 +72,8 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.Executor;
 import java.util.concurrent.Executors;
+import java.util.function.Function;
+import java.util.function.Supplier;
 
 /**
  * @author ehlxr
@@ -89,7 +92,7 @@ public class EtlServiceImpl implements EtlService {
 
     @Value("${download.file.path:/data/crawler/videos}")
     private String downloadPath;
-    @Value("${aliyun.oss.video.bucket:art-pubbucket}")
+    @Value("${aliyun.oss.video.bucket:art-}")
     private String ossBucket;
     @Value("${env:}")
     private String env;
@@ -142,6 +145,9 @@ public class EtlServiceImpl implements EtlService {
 
             // 保存数据库(去重校验)
             id = save2db(crawlerVideo);
+            if (id <= 0) {
+                return;
+            }
 
             // 策略应用
             // TODO: 获取用户选择策略
@@ -161,24 +167,11 @@ public class EtlServiceImpl implements EtlService {
             long videoId = videoSend(data);
 
             // 更新数据库
-            int retry = 0;
-            int saveVideo2dbRetry = failRetryTimes.getOrDefault("video2db", 3);
-
-            while (retry < saveVideo2dbRetry) {
-                retry++;
-                try {
-                    crawlerVideo.setVideoId(videoId);
-                    crawlerVideo.setTitleScore(data.getTitleScore());
-                    if (crawlerVideoMapper.updateByPrimaryKeySelective(crawlerVideo) > 0) {
-                        break;
-                    }
-                } catch (Exception e) {
-                    log.error("update video info to db failed retry {} times. {}", retry, crawlerVideo, e);
-                    if (retry >= saveVideo2dbRetry) {
-                        throw new CommonException(ExceptionEnum.SYSTEM_ERROR, "update video info to db failed." + e.getMessage());
-                    }
-                }
-            }
+            retrySupplier(() -> {
+                crawlerVideo.setVideoId(videoId);
+                crawlerVideo.setTitleScore(data.getTitleScore());
+                return crawlerVideoMapper.updateByPrimaryKeySelective(crawlerVideo) > 0;
+            }, "synthesis", String.format("update video info %s to db error", crawlerVideo));
 
             // 视频写入飞书
             async2Feishu(data, videoId);
@@ -193,29 +186,18 @@ public class EtlServiceImpl implements EtlService {
     }
 
     private long save2db(CrawlerVideo crawlerVideo) {
-        long id = 0;
-        int retry = 0;
-        int saveVideo2dbRetry = failRetryTimes.getOrDefault("video2db", 3);
-        while (retry < saveVideo2dbRetry) {
-            retry++;
+        return retrySupplierR(() -> {
             try {
                 crawlerVideoExtMapper.insertSelectiveReturnId(crawlerVideo);
                 slsService.log("message", "视频信息写入数据库成功", "crawler", crawlerVideo.getPlatform(), "mode", crawlerVideo.getStrategy());
-                id = crawlerVideo.getId();
-                break;
+                return Pair.of(true, crawlerVideo.getId());
             } catch (DuplicateKeyException e) {
                 // 根据站外视频 ID 唯一约束 key 做去重校验
                 log.info("out video id {} of platform {} strategy {} has exist!", crawlerVideo.getOutVideoId(),
                         crawlerVideo.getPlatform(), crawlerVideo.getStrategy());
-                throw new CommonException(ExceptionEnum.DATA_ERROR);
-            } catch (Exception e) {
-                log.error("save video info to db failed retry {} times. {}", retry, crawlerVideo, e);
-                if (retry >= saveVideo2dbRetry) {
-                    throw new CommonException(ExceptionEnum.SYSTEM_ERROR, "save video info to db failed." + e.getMessage());
-                }
+                return Pair.of(true, -1L);
             }
-        }
-        return id;
+        }, "video2db", String.format("save video info %s to db failed", crawlerVideo));
     }
 
     private long videoSend(StrategyDataDto data) {
@@ -231,9 +213,15 @@ public class EtlServiceImpl implements EtlService {
         request.setViewStatus(1);
         request.setCrawlerSrcId(data.getOutVideoId());
         request.setCrawlerSrcCode(platform.toUpperCase());
-        LocalDateTime localDateTime =
-                LocalDateTime.parse(data.getPublishTime(), DateTimeFormatter.ofPattern(Constant.STANDARD_FORMAT));
-        request.setCrawlerSrcPublishTimestamp(localDateTime.toInstant(ZoneOffset.of("+8")).toEpochMilli());
+
+        String publishTime = data.getPublishTime();
+        if (Strings.isNullOrEmpty(publishTime)) {
+            request.setCrawlerSrcPublishTimestamp(System.currentTimeMillis());
+        } else {
+            LocalDateTime localDateTime =
+                    LocalDateTime.parse(publishTime, DateTimeFormatter.ofPattern(Constant.STANDARD_FORMAT));
+            request.setCrawlerSrcPublishTimestamp(localDateTime.toInstant(ZoneOffset.of("+8")).toEpochMilli());
+        }
         request.setCrawlerTaskTimestamp(System.currentTimeMillis());
         CommonResponse<WxVideoVO> response = longVideoFeign.crawlerVideoSend(request);
         log.info("crawler send video request: {}, response: {}", request, response);
@@ -318,21 +306,15 @@ public class EtlServiceImpl implements EtlService {
         // 音、视频合成
         if (!Strings.isNullOrEmpty(audioUrl)) {
             String audioPath = urlDownload(data.getAudioUrl(), "longvideo/crawler_local/audio", title);
-            int retry = 0;
-            int synthesisRetry = failRetryTimes.getOrDefault("synthesis", 3);
-            while (retry < synthesisRetry) {
-                retry++;
+            retryFunc((t) -> {
                 try {
-                    VideoUtils.videoSynthesis(ffmpegPath, downloadPath + File.separator + videoPath,
-                            downloadPath + File.separator + audioPath, downloadPath + File.separator + videoPath + "_comp.mp4");
-                    break;
+                    VideoUtils.videoSynthesis(ffmpegPath, downloadPath + File.separator + t,
+                            downloadPath + File.separator + audioPath, downloadPath + File.separator + t + "_comp.mp4");
+                    return true;
                 } catch (Exception e) {
-                    log.error("video audio synthesis error retry {} times. {},{}", retry, videoPath, audioPath);
-                    if (retry >= synthesisRetry) {
-                        throw new CommonException(ExceptionEnum.DATA_ERROR, "video synthesis error");
-                    }
+                    throw new RuntimeException(e);
                 }
-            }
+            }, videoPath, "synthesis", String.format("video %s audio %s synthesis error", videoPath, audioPath));
 
             // 清理合成音频之前的文件
             Files.deleteIfExists(Paths.get(new File(downloadPath + File.separator + videoPath).getPath()));
@@ -379,31 +361,26 @@ public class EtlServiceImpl implements EtlService {
         File localFile = new File(localFilePath);
         if (!localFile.exists()) {
             if (!localFile.mkdirs()) {
-                // throw new CommonException(ExceptionEnum.SYSTEM_ERROR, "mkdir " + localFilePath + " error!");
                 log.warn("mkdir {} failed", localFilePath);
             }
         }
 
-        // 下载文件
-        int retry = 0;
-        int downloadRetry = failRetryTimes.getOrDefault("download", 3);
-        while (retry < downloadRetry) {
-            retry++;
+        retrySupplier(() -> {
             try {
+                // 下载文件
                 FileUtils.download(fileUrl, localFilePath + File.separator + titleMmd5);
-                break;
-            } catch (CommonException e) {
-                if (e.getCode() == ExceptionEnum.URL_FORBIDDEN.getCode()) {
-                    log.error("access to the url {} of remote server is prohibited.", fileUrl);
-                    break;
-                }
+                return true;
             } catch (Exception e) {
-                log.error("download file failed retry {} times. from url {} to file {}. ", fileUrl, filePath, retry);
-                if (retry >= downloadRetry) {
-                    throw new CommonException(ExceptionEnum.SYSTEM_ERROR, "download file " + fileUrl + " failed!");
+                if (e instanceof CommonException) {
+                    CommonException ce = (CommonException) e;
+                    if (ce.getCode() == ExceptionEnum.URL_FORBIDDEN.getCode()) {
+                        log.error("access to the url {} of remote server is prohibited.", fileUrl);
+                        return true;
+                    }
                 }
+                throw new RuntimeException(e);
             }
-        }
+        }, "download", String.format("download file from %s to %s", fileUrl, filePath));
 
         return videoFilePath + File.separator + titleMmd5;
     }
@@ -413,20 +390,48 @@ public class EtlServiceImpl implements EtlService {
         slsService.log("message", "开始上传视频... ", "crawler", crawler, "mode", mode);
         log.info("begin upload {} to oss key {}", localFile, ossBucketKey);
 
+        retryFunc((t) -> {
+            try {
+                aliyunOssManager.putObject(ossBucket, ossBucketKey, Files.newInputStream(Paths.get(localFile)));
+                return true;
+            } catch (IOException e) {
+                throw new RuntimeException(e);
+            }
+        }, "", "2oss", String.format("upload file %s to oss buckekey %s", localFile, ossBucketKey));
+    }
+
+    private void retrySupplier(Supplier<Boolean> supplier, String type, String errorMsg) {
+        retryFunc((c) -> supplier.get(), null, type, errorMsg);
+    }
+
+    private <R> R retrySupplierR(Supplier<Pair<Boolean, R>> supplier, String type, String errorMsg) {
+        return retryFuncR((c) -> supplier.get(), null, type, errorMsg);
+    }
+
+    private <T, R> R retryFuncR(Function<T, Pair<Boolean, R>> func, T t, String type, String errorMsg) {
         int retry = 0;
-        int file2ossRetry = failRetryTimes.getOrDefault("2oss", 3);
-        while (retry < file2ossRetry) {
+        int retryTimes = failRetryTimes.getOrDefault(type, 3);
+        R r = null;
+        while (retry < retryTimes) {
             retry++;
             try {
-                aliyunOssManager.putObject(ossBucket, ossBucketKey, Files.newInputStream(Paths.get(localFile)));
-                break;
+                Pair<Boolean, R> apply = func.apply(t);
+                r = apply.getRight();
+                if (Boolean.TRUE.equals(apply.getLeft())) {
+                    break;
+                }
             } catch (Exception e) {
-                log.error("upload file to oss failed retry {} times. from url {} to {}. ", retry, localFile, ossBucketKey);
-                if (retry >= file2ossRetry) {
-                    throw new CommonException(ExceptionEnum.SYSTEM_ERROR, "upload file " + localFile + " to oss failed!");
+                log.error("the operation [{}] has failed on the {}th retry.", errorMsg, retry, e);
+                if (retry >= retryTimes) {
+                    throw new CommonException(ExceptionEnum.SYSTEM_ERROR, "the operation [{}] has failed after " + retry + " times retry.");
                 }
             }
         }
+        return r;
+    }
+
+    private <T> void retryFunc(Function<T, Boolean> func, T t, String type, String errorMsg) {
+        retryFuncR((t1) -> Pair.of(func.apply(t1), null), t, type, errorMsg);
     }
 
     @PostConstruct

+ 3 - 4
etl-core/src/main/java/com/tzld/crawler/etl/util/FileUtils.java

@@ -67,7 +67,7 @@ public class FileUtils {
 
     public static void download(String fileUrl, String filePath) throws Exception {
         // try {
-        log.info("begin download file url {} local path {}", fileUrl, filePath);
+        log.info("begin download {} to {}", fileUrl, filePath);
         URL url = new URL(fileUrl);
         HttpURLConnection conn = (HttpURLConnection) url.openConnection();
         if (conn.getResponseCode() == HttpURLConnection.HTTP_FORBIDDEN) {
@@ -75,8 +75,7 @@ public class FileUtils {
         }
         conn.setConnectTimeout(5000);
         conn.setReadTimeout(5000);
-        log.info("file size {} url {}, local path {} ",
-                formatFileSize(conn.getContentLength()), fileUrl, filePath);
+        log.info("size is {} of url {}", formatFileSize(conn.getContentLength()), fileUrl);
 
         InputStream inputStream = conn.getInputStream();
         FileOutputStream outputStream = new FileOutputStream(filePath);
@@ -87,7 +86,7 @@ public class FileUtils {
         }
         inputStream.close();
         outputStream.close();
-        log.info("downloaded successfully file url {}, local path {} ", fileUrl, filePath);
+        log.info("downloaded successfully {} to {}", fileUrl, filePath);
         // return true;
         // } catch (Exception e) {
         //     log.error("downloaded error file url: {} local path: {} ", fileUrl, filePath, e);