Bläddra i källkod

implementing retry strategies for core business logic

ehlxr 1 år sedan
förälder
incheckning
3c9959bb73

+ 78 - 0
etl-core/src/main/java/com/tzld/crawler/etl/common/CustomValidator.java

@@ -0,0 +1,78 @@
+/*
+ * The MIT License (MIT)
+ *
+ * Copyright © 2023 xrv <xrv@live.com>
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in
+ * all copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+ * THE SOFTWARE.
+ */
+
+package com.tzld.crawler.etl.common;
+
+
+import org.springframework.validation.BeanPropertyBindingResult;
+import org.springframework.validation.Errors;
+import org.springframework.validation.Validator;
+
+import javax.validation.ConstraintViolation;
+import javax.validation.Validation;
+import javax.validation.ValidatorFactory;
+import java.util.Set;
+import java.util.StringJoiner;
+
+/**
+ * @author ehlxr
+ * @since 2023-07-14 15:18.
+ */
+public class CustomValidator implements Validator {
+    private final javax.validation.Validator validator;
+
+    public CustomValidator() {
+        ValidatorFactory factory = Validation.buildDefaultValidatorFactory();
+        validator = factory.getValidator();
+    }
+
+    public static String validate(Object param) {
+        Validator validator = new CustomValidator();
+        BeanPropertyBindingResult errors = new BeanPropertyBindingResult(param, "param");
+        validator.validate(param, errors);
+        if (errors.hasErrors()) {
+            StringJoiner errorMessage = new StringJoiner("; ", "[", "]");
+            errors.getAllErrors().forEach(e -> errorMessage.add(e.getDefaultMessage()));
+            return errorMessage.toString();
+
+        }
+
+        return "";
+    }
+
+    @Override
+    public boolean supports(Class<?> clazz) {
+        return (this.validator != null);
+    }
+
+    @Override
+    public void validate(Object target, Errors errors) {
+        Set<ConstraintViolation<Object>> constraintViolations = validator.validate(target);
+        for (ConstraintViolation<Object> violation : constraintViolations) {
+            String field = violation.getPropertyPath().toString();
+            String message = violation.getMessage();
+            errors.rejectValue(field, "", message);
+        }
+    }
+}

+ 1 - 0
etl-core/src/main/java/com/tzld/crawler/etl/common/enums/ExceptionEnum.java

@@ -12,6 +12,7 @@ public enum ExceptionEnum {
     DATA_ERROR(1001, "数据异常,请联系管理员"),
     PARAM_ERROR(1002, "参数不对"),
     INVOKE_VIDEOAPI_ERROR(1003, "调用 longvideo api 接口服务失败"),
+    URL_FORBIDDEN(1004, "don't have permission to access the url on remote server."),
 
     ;
 

+ 10 - 1
etl-core/src/main/java/com/tzld/crawler/etl/model/vo/CrawlerVideoVO.java

@@ -1,5 +1,7 @@
 package com.tzld.crawler.etl.model.vo;
 
+import javax.validation.constraints.NotNull;
+
 /**
  * @author ehlxr
  */
@@ -7,6 +9,7 @@ public class CrawlerVideoVO {
     /**
      * 站内用户ID
      */
+    @NotNull(message = "userId not allow null")
     private Long userId;
 
     /**
@@ -25,31 +28,37 @@ public class CrawlerVideoVO {
     /**
      * 平台:youtube,wechat,小年糕,好看视频
      */
+    @NotNull(message = "platform not allow null")
     private String platform;
 
     /**
      * 策略:定向爬虫策略,热榜爬虫策略,小时榜爬虫策略,推荐榜爬虫策略
      */
+    @NotNull(message = "strategy not allow null")
     private String strategy;
 
     /**
      * 站外视频ID
      */
+    @NotNull(message = "outVideoId not allow null")
     private String outVideoId;
 
     /**
      * 视频标题
      */
+    @NotNull(message = "videoTitle not allow null")
     private String videoTitle;
 
     /**
      * 视频封面
      */
+    @NotNull(message = "coverUrl not allow null")
     private String coverUrl;
 
     /**
      * 视频播放地址
      */
+    @NotNull(message = "videoUrl not allow null")
     private String videoUrl;
 
     /**
@@ -90,7 +99,7 @@ public class CrawlerVideoVO {
     /**
      * 抓取时条件:{'play_cnt': 0, 'comment_cnt': 0, 'like_cnt': 0, 'duration': 60, 'publish_time': 10, 'video_width': 720, 'video_height': 720}
      */
-    private String crawlerRule;
+    private String crawlerRule = "{}";
 
     /**
      * 宽

+ 202 - 125
etl-core/src/main/java/com/tzld/crawler/etl/service/impl/EtlServiceImpl.java

@@ -28,6 +28,7 @@ import com.ctrip.framework.apollo.spring.annotation.ApolloJsonValue;
 import com.google.common.base.Strings;
 import com.google.common.collect.Lists;
 import com.tzld.commons.aliyun.oss.AliyunOssManager;
+import com.tzld.crawler.etl.common.CustomValidator;
 import com.tzld.crawler.etl.common.base.CommonResponse;
 import com.tzld.crawler.etl.common.base.Constant;
 import com.tzld.crawler.etl.common.enums.ExceptionEnum;
@@ -109,6 +110,9 @@ public class EtlServiceImpl implements EtlService {
     @Value("${ffprobe.path:ffmpeg}")
     private String ffmpegPath;
 
+    @ApolloJsonValue("${fail.retry.times:{}}")
+    private Map<String, Integer> failRetryTimes;
+
     private Executor pool;
 
     public EtlServiceImpl(StrategyHandlerService strategyHandlerService, AliyunOssManager aliyunOssManager,
@@ -125,29 +129,19 @@ public class EtlServiceImpl implements EtlService {
 
     @Override
     public void deal(CrawlerVideoVO param) {
-        String title = param.getVideoTitle();
-        String platform = param.getPlatform();
-        String strategy = param.getStrategy();
+        CrawlerVideo crawlerVideo = new CrawlerVideo();
+        BeanUtils.copyProperties(param, crawlerVideo);
         long id = 0L;
         try {
-            // 保存数据库
-            // alter table crawler_video drop key unq_video_id;
-            CrawlerVideo crawlerVideo = new CrawlerVideo();
-            BeanUtils.copyProperties(param, crawlerVideo);
-            String insertSql =
-                    "insert into crawler_video(user_id, out_user_id, platform, strategy, out_video_id, video_title,"
-                            + " cover_url, video_url, duration, publish_time, play_cnt, crawler_rule, width, height) values("
-                            + crawlerVideo.getUserId() + "," + crawlerVideo.getOutUserId() + ","
-                            + crawlerVideo.getPlatform() + "," + crawlerVideo.getStrategy() + ","
-                            + crawlerVideo.getOutVideoId() + "," + crawlerVideo.getVideoTitle() + ","
-                            + crawlerVideo.getCoverUrl() + "," + crawlerVideo.getVideoUrl() + ","
-                            + crawlerVideo.getDuration() + "," + crawlerVideo.getPublishTime() + ","
-                            + crawlerVideo.getPlayCnt() + "," + crawlerVideo.getCrawlerRule() + ","
-                            + crawlerVideo.getWidth() + "," + crawlerVideo.getHeight() + ")";
-            slsService.log("message", "insert_sql: " + insertSql, "crawler", platform, "mode", strategy);
-            crawlerVideoExtMapper.insertSelectiveReturnId(crawlerVideo);
-            id = crawlerVideo.getId();
-            slsService.log("message", "视频信息写入数据库成功", "crawler", platform, "mode", strategy);
+            // 参数校验
+            String errorMessage = CustomValidator.validate(param);
+            if (!Strings.isNullOrEmpty(errorMessage)) {
+                log.error("param validate failed. {}", errorMessage);
+                return;
+            }
+
+            // 保存数据库(去重校验)
+            id = save2db(crawlerVideo);
 
             // 策略应用
             // TODO: 获取用户选择策略
@@ -164,107 +158,156 @@ public class EtlServiceImpl implements EtlService {
             processVideo(data);
 
             // 视频发布
-            CrawlerVideoSendParam request = new CrawlerVideoSendParam();
-            request.setLoginUid(data.getUserId());
-            request.setAppType(888888);
-            request.setTitle(title);
-            request.setVideoPath(data.getVideoOssPath());
-            request.setCoverImgPath(data.getCoverOssPath());
-            request.setTotalTime(data.getDuration());
-            request.setVersionCode(1);
-            request.setViewStatus(1);
-            request.setCrawlerSrcId(data.getOutVideoId());
-            request.setCrawlerSrcCode(platform.toUpperCase());
-            LocalDateTime localDateTime =
-                    LocalDateTime.parse(data.getPublishTime(), DateTimeFormatter.ofPattern(Constant.STANDARD_FORMAT));
-            request.setCrawlerSrcPublishTimestamp(localDateTime.toInstant(ZoneOffset.of("+8")).toEpochMilli());
-            request.setCrawlerTaskTimestamp(System.currentTimeMillis());
-            CommonResponse<WxVideoVO> response = longVideoFeign.crawlerVideoSend(request);
-            log.info("crawler data send request: {}, response: {}", request, response);
-            if (!response.isSuccess()) {
-                throw new CommonException(ExceptionEnum.INVOKE_VIDEOAPI_ERROR,
-                        "invoke crawler data send failed!" + response);
-            }
+            long videoId = videoSend(data);
 
             // 更新数据库
-            crawlerVideo.setVideoId(response.getData().getId());
-            crawlerVideo.setTitleScore(data.getTitleScore());
-            crawlerVideoMapper.updateByPrimaryKeySelective(crawlerVideo);
+            int retry = 0;
+            int saveVideo2dbRetry = failRetryTimes.getOrDefault("video2db", 3);
 
-            // 视频写入飞书
-            pool.execute(() -> {
+            while (retry < saveVideo2dbRetry) {
+                retry++;
                 try {
-                    String sheetToken = feishuSheetTokenMap.get(platform);
-                    if (Strings.isNullOrEmpty(sheetToken)) {
-                        log.error("feishu.sheet.token.map config of {} is null", platform);
-                        return;
-                    }
-                    String strategyType = Strings.nullToEmpty(param.getStrategyType());
-                    String sheetId = feishuSheetIdMap.get(platform + strategyType);
-                    if (Strings.isNullOrEmpty(sheetToken)) {
-                        log.error("feishu.sheet.id.map config of {} is null", platform);
-                        return;
+                    crawlerVideo.setVideoId(videoId);
+                    crawlerVideo.setTitleScore(data.getTitleScore());
+                    if (crawlerVideoMapper.updateByPrimaryKeySelective(crawlerVideo) > 0) {
+                        break;
                     }
-                    String range = feishuRangeMap.get(platform + strategyType);
-
-                    log.info("{} {} sheetToken {} sheetId {} range {}", platform, strategy, sheetToken, sheetId, range);
-                    String fsResp = FeishuUtils.insertRows(feishuAppid, feishuAppsecret, sheetToken, sheetId, 1, 2);
-                    log.debug("insert columns to feishu sheet response is {}", fsResp);
-
-                    List<List<Object>> values = new ArrayList<>();
-                    List<Object> value = new ArrayList<>();
-                    DateTimeFormatter formatter = DateTimeFormatter.ofPattern(Constant.STANDARD_FORMAT);
-                    if ("search".equals(strategy)) {
-                        CrawlerUserV3Example example = new CrawlerUserV3Example();
-                        example.createCriteria().andUidEqualTo(crawlerVideo.getUserId());
-                        List<CrawlerUserV3> crawlerUserV3s = crawlerUserV3Mapper.selectByExample(example);
-                        value.add(crawlerUserV3s.size() > 0 ? crawlerUserV3s.get(0).getLink() : "");
-                    }
-
-                    value.add(data.getTitleScore());
-                    value.add(crawlerVideo.getVideoId());
-                    value.add(formatter.format(LocalDateTime.ofInstant(Instant.ofEpochMilli(System.currentTimeMillis()),
-                            ZoneId.systemDefault())));
-                    value.add(strategy);
-                    value.add(data.getOutVideoId());
-                    value.add(data.getVideoTitle());
-                    value.add(String.format(adminCmsUrl, crawlerVideo.getVideoId()));
-                    value.add(data.getPlayCnt());
-                    value.add(data.getCommentCnt());
-                    value.add(data.getLikeCnt());
-                    value.add(data.getShareCnt());
-                    value.add(data.getDuration());
-                    value.add(data.getWidth() + "*" + data.getHeight());
-                    value.add(data.getPublishTime());
-                    value.add(data.getUserName());
-                    value.add(data.getOutUserId());
-                    value.add(data.getAvatarUrl());
-                    value.add(data.getCoverUrl());
-                    value.add(data.getVideoUrl());
-                    values.add(value);
-
-                    fsResp = FeishuUtils.updateValues(feishuAppid, feishuAppsecret, sheetToken, range, values);
-                    log.debug("update feishu sheet value response is {}", fsResp);
-
-                    slsService.log("message", "视频已保存至云文档", "crawler", platform, "mode", strategy);
                 } catch (Exception e) {
-                    // 保存飞书失败不回滚数据
-                    log.error("save data to feishu sheet error. platform {}, strategy {}", platform, strategy, e);
+                    log.error("update video info to db failed retry {} times. {}", retry, crawlerVideo, e);
+                    if (retry >= saveVideo2dbRetry) {
+                        throw new CommonException(ExceptionEnum.SYSTEM_ERROR, "update video info to db failed." + e.getMessage());
+                    }
                 }
-            });
-        } catch (DuplicateKeyException e) {
-            slsService.log("message", "out video id " + param.getOutVideoId() + " has exist", "crawler", platform,
-                    "mode", strategy);
-            log.info("out video id {} of platform {} strategy {} has exist!", param.getOutVideoId(),
-                    param.getPlatform(), param.getStrategy());
+            }
+
+            // 视频写入飞书
+            async2Feishu(data, videoId);
         } catch (Exception e) {
             log.error("etl server deal {} failed.", param, e);
             // 回滚数据
-            crawlerVideoMapper.deleteByPrimaryKey(id);
+            if (id > 0) {
+                crawlerVideoMapper.deleteByPrimaryKey(id);
+            }
             throw new CommonException(ExceptionEnum.SYSTEM_ERROR, "etl server deal error: " + e.getMessage());
         }
     }
 
+    private long save2db(CrawlerVideo crawlerVideo) {
+        long id = 0;
+        int retry = 0;
+        int saveVideo2dbRetry = failRetryTimes.getOrDefault("video2db", 3);
+        while (retry < saveVideo2dbRetry) {
+            retry++;
+            try {
+                crawlerVideoExtMapper.insertSelectiveReturnId(crawlerVideo);
+                slsService.log("message", "视频信息写入数据库成功", "crawler", crawlerVideo.getPlatform(), "mode", crawlerVideo.getStrategy());
+                id = crawlerVideo.getId();
+                break;
+            } catch (DuplicateKeyException e) {
+                // 根据站外视频 ID 唯一约束 key 做去重校验
+                log.info("out video id {} of platform {} strategy {} has exist!", crawlerVideo.getOutVideoId(),
+                        crawlerVideo.getPlatform(), crawlerVideo.getStrategy());
+                throw new CommonException(ExceptionEnum.DATA_ERROR);
+            } catch (Exception e) {
+                log.error("save video info to db failed retry {} times. {}", retry, crawlerVideo, e);
+                if (retry >= saveVideo2dbRetry) {
+                    throw new CommonException(ExceptionEnum.SYSTEM_ERROR, "save video info to db failed." + e.getMessage());
+                }
+            }
+        }
+        return id;
+    }
+
+    private long videoSend(StrategyDataDto data) {
+        String platform = data.getPlatform();
+        CrawlerVideoSendParam request = new CrawlerVideoSendParam();
+        request.setLoginUid(data.getUserId());
+        request.setAppType(888888);
+        request.setTitle(data.getVideoTitle());
+        request.setVideoPath(data.getVideoOssPath());
+        request.setCoverImgPath(data.getCoverOssPath());
+        request.setTotalTime(data.getDuration());
+        request.setVersionCode(1);
+        request.setViewStatus(1);
+        request.setCrawlerSrcId(data.getOutVideoId());
+        request.setCrawlerSrcCode(platform.toUpperCase());
+        LocalDateTime localDateTime =
+                LocalDateTime.parse(data.getPublishTime(), DateTimeFormatter.ofPattern(Constant.STANDARD_FORMAT));
+        request.setCrawlerSrcPublishTimestamp(localDateTime.toInstant(ZoneOffset.of("+8")).toEpochMilli());
+        request.setCrawlerTaskTimestamp(System.currentTimeMillis());
+        CommonResponse<WxVideoVO> response = longVideoFeign.crawlerVideoSend(request);
+        log.info("crawler send video request: {}, response: {}", request, response);
+        if (!response.isSuccess() || response.getData() == null || response.getData().getId() == null) {
+            throw new CommonException(ExceptionEnum.INVOKE_VIDEOAPI_ERROR,
+                    "invoke crawler send video failed!" + response);
+        }
+        return response.getData().getId();
+    }
+
+    private void async2Feishu(StrategyDataDto data, long videoId) {
+        String platform = data.getPlatform();
+        String strategy = data.getStrategy();
+        pool.execute(() -> {
+            try {
+                String sheetToken = feishuSheetTokenMap.get(platform);
+                if (Strings.isNullOrEmpty(sheetToken)) {
+                    log.error("feishu.sheet.token.map config of {} is null", platform);
+                    return;
+                }
+                String strategyType = Strings.nullToEmpty(data.getStrategyType());
+                String sheetId = feishuSheetIdMap.get(platform + strategyType);
+                if (Strings.isNullOrEmpty(sheetToken)) {
+                    log.error("feishu.sheet.id.map config of {} is null", platform);
+                    return;
+                }
+                String range = feishuRangeMap.get(platform + strategyType);
+
+                log.info("{} {} sheetToken {} sheetId {} range {}", platform, strategy, sheetToken, sheetId, range);
+                String fsResp = FeishuUtils.insertRows(feishuAppid, feishuAppsecret, sheetToken, sheetId, 1, 2);
+                log.debug("insert columns to feishu sheet response is {}", fsResp);
+
+                List<List<Object>> values = new ArrayList<>();
+                List<Object> value = new ArrayList<>();
+                DateTimeFormatter formatter = DateTimeFormatter.ofPattern(Constant.STANDARD_FORMAT);
+                if ("search".equals(strategy)) {
+                    CrawlerUserV3Example example = new CrawlerUserV3Example();
+                    example.createCriteria().andUidEqualTo(data.getUserId());
+                    List<CrawlerUserV3> crawlerUserV3s = crawlerUserV3Mapper.selectByExample(example);
+                    value.add(crawlerUserV3s.size() > 0 ? crawlerUserV3s.get(0).getLink() : "");
+                }
+
+                value.add(data.getTitleScore());
+                value.add(videoId);
+                value.add(formatter.format(LocalDateTime.ofInstant(Instant.ofEpochMilli(System.currentTimeMillis()),
+                        ZoneId.systemDefault())));
+                value.add(strategy);
+                value.add(data.getOutVideoId());
+                value.add(data.getVideoTitle());
+                value.add(String.format(adminCmsUrl, videoId));
+                value.add(data.getPlayCnt());
+                value.add(data.getCommentCnt());
+                value.add(data.getLikeCnt());
+                value.add(data.getShareCnt());
+                value.add(data.getDuration());
+                value.add(data.getWidth() + "*" + data.getHeight());
+                value.add(data.getPublishTime());
+                value.add(data.getUserName());
+                value.add(data.getOutUserId());
+                value.add(data.getAvatarUrl());
+                value.add(data.getCoverUrl());
+                value.add(data.getVideoUrl());
+                values.add(value);
+
+                fsResp = FeishuUtils.updateValues(feishuAppid, feishuAppsecret, sheetToken, range, values);
+                log.debug("update feishu sheet value response is {}", fsResp);
+
+                slsService.log("message", "视频已保存至云文档", "crawler", platform, "mode", strategy);
+            } catch (Exception e) {
+                log.error("save data to feishu sheet error. platform {}, strategy {}", platform, strategy, e);
+            }
+        });
+    }
+
     private void processVideo(StrategyDataDto data) throws IOException {
         String title = data.getVideoTitle();
         String platform = data.getPlatform();
@@ -274,12 +317,26 @@ public class EtlServiceImpl implements EtlService {
         String videoPath = urlDownload(data.getVideoUrl(), "longvideo/crawler_local/video", title);
         // 音、视频合成
         if (!Strings.isNullOrEmpty(audioUrl)) {
-            String auditPath = urlDownload(data.getAudioUrl(), "longvideo/crawler_local/audit", title);
-            VideoUtils.videoCompose(ffmpegPath, downloadPath + File.separator + videoPath,
-                    downloadPath + File.separator + auditPath, downloadPath + File.separator + videoPath + "_comp.mp4");
+            String audioPath = urlDownload(data.getAudioUrl(), "longvideo/crawler_local/audio", title);
+            int retry = 0;
+            int synthesisRetry = failRetryTimes.getOrDefault("synthesis", 3);
+            while (retry < synthesisRetry) {
+                retry++;
+                try {
+                    VideoUtils.videoSynthesis(ffmpegPath, downloadPath + File.separator + videoPath,
+                            downloadPath + File.separator + audioPath, downloadPath + File.separator + videoPath + "_comp.mp4");
+                    break;
+                } catch (Exception e) {
+                    log.error("video audio synthesis error retry {} times. {},{}", retry, videoPath, audioPath);
+                    if (retry >= synthesisRetry) {
+                        throw new CommonException(ExceptionEnum.DATA_ERROR, "video synthesis error");
+                    }
+                }
+            }
+
             // 清理合成音频之前的文件
             Files.deleteIfExists(Paths.get(new File(downloadPath + File.separator + videoPath).getPath()));
-            Files.deleteIfExists(Paths.get(new File(downloadPath + File.separator + auditPath).getPath()));
+            Files.deleteIfExists(Paths.get(new File(downloadPath + File.separator + audioPath).getPath()));
             videoPath += "_comp.mp4";
         }
 
@@ -299,7 +356,7 @@ public class EtlServiceImpl implements EtlService {
         }
 
         // 文件清理
-        log.info("begin delete: {}", tempFilePath);
+        log.info("begin delete {}", tempFilePath);
         Files.deleteIfExists(Paths.get(new File(tempFilePath).getPath()));
 
         // 视频封面下载、上传 OSS
@@ -309,7 +366,7 @@ public class EtlServiceImpl implements EtlService {
         tempFilePath = downloadPath + File.separator + videoPath;
 
         // 文件清理
-        log.info("begin delete: {}", tempFilePath);
+        log.info("begin delete {}", tempFilePath);
         Files.deleteIfExists(Paths.get(new File(tempFilePath).getPath()));
     }
 
@@ -329,27 +386,47 @@ public class EtlServiceImpl implements EtlService {
 
         // 下载文件
         int retry = 0;
-        boolean downloadFlag = false;
-        while (retry < 1) {
+        int downloadRetry = failRetryTimes.getOrDefault("download", 3);
+        while (retry < downloadRetry) {
             retry++;
-            downloadFlag = FileUtils.download(fileUrl, localFilePath + File.separator + titleMmd5);
-            if (downloadFlag) {
+            try {
+                FileUtils.download(fileUrl, localFilePath + File.separator + titleMmd5);
                 break;
+            } catch (CommonException e) {
+                if (e.getCode() == ExceptionEnum.URL_FORBIDDEN.getCode()) {
+                    log.error("access to the url {} of remote server is prohibited.", fileUrl);
+                    break;
+                }
+            } catch (Exception e) {
+                log.error("download file failed retry {} times. from url {} to file {}. ", fileUrl, filePath, retry);
+                if (retry >= downloadRetry) {
+                    throw new CommonException(ExceptionEnum.SYSTEM_ERROR, "download file " + fileUrl + " failed!");
+                }
             }
-            log.warn("download file failed. retry times: {}, from file {} to {}. ", fileUrl, filePath, retry);
-        }
-        if (!downloadFlag) {
-            throw new CommonException(ExceptionEnum.SYSTEM_ERROR, "download file " + fileUrl + " failed!");
         }
 
         return videoFilePath + File.separator + titleMmd5;
     }
 
-    private void file2oss(String localFile, String ossBucketKey, String crawler, String mode) throws IOException {
+    private void file2oss(String localFile, String ossBucketKey, String crawler, String mode) {
         // 文件上传 OSS
         slsService.log("message", "开始上传视频... ", "crawler", crawler, "mode", mode);
-        log.info("begin upload: {} to oss key: {}", localFile, ossBucketKey);
-        aliyunOssManager.putObject(ossBucket, ossBucketKey, Files.newInputStream(Paths.get(localFile)));
+        log.info("begin upload {} to oss key {}", localFile, ossBucketKey);
+
+        int retry = 0;
+        int file2ossRetry = failRetryTimes.getOrDefault("2oss", 3);
+        while (retry < file2ossRetry) {
+            retry++;
+            try {
+                aliyunOssManager.putObject(ossBucket, ossBucketKey, Files.newInputStream(Paths.get(localFile)));
+                break;
+            } catch (Exception e) {
+                log.error("upload file to oss failed retry {} times. from url {} to {}. ", retry, localFile, ossBucketKey);
+                if (retry >= file2ossRetry) {
+                    throw new CommonException(ExceptionEnum.SYSTEM_ERROR, "upload file " + localFile + " to oss failed!");
+                }
+            }
+        }
     }
 
     @PostConstruct

+ 15 - 17
etl-core/src/main/java/com/tzld/crawler/etl/service/strategy/handler/TitleScoreHandler.java

@@ -29,12 +29,12 @@ import com.huaban.analysis.jieba.JiebaSegmenter;
 import com.huaban.analysis.jieba.SegToken;
 import com.tzld.crawler.etl.model.dto.StrategyDataDto;
 import com.tzld.crawler.etl.model.vo.CrawlerVideoVO;
-import com.tzld.crawler.etl.service.SlsService;
 import com.tzld.crawler.etl.service.strategy.StrategyAbstractHandler;
 import com.tzld.crawler.etl.util.FeishuUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.BeanUtils;
+import org.springframework.beans.BeansException;
 import org.springframework.beans.factory.annotation.Value;
 import org.springframework.stereotype.Component;
 
@@ -54,7 +54,6 @@ import java.util.concurrent.TimeUnit;
 @Component
 public class TitleScoreHandler extends StrategyAbstractHandler {
     private static final Logger log = LoggerFactory.getLogger(TitleScoreHandler.class);
-    private final SlsService slsService;
 
     @Value("${feishu.appid:}")
     private String feishuAppid;
@@ -76,31 +75,30 @@ public class TitleScoreHandler extends StrategyAbstractHandler {
     private Map<String, Double> scoreWordMap = new HashMap<>();
     private List<String> stopWorlds = new ArrayList<>();
 
-    public TitleScoreHandler(SlsService slsService) {
-        this.slsService = slsService;
-    }
-
     @Override
     public StrategyDataDto execute(CrawlerVideoVO param) {
         double score = 0;
-        JiebaSegmenter segmenter = new JiebaSegmenter();
-        List<SegToken> process = segmenter.process(param.getVideoTitle(), JiebaSegmenter.SegMode.SEARCH);
-        for (SegToken segToken : process) {
-            String word = segToken.word;
-            if (stopWorlds.contains(word)) {
-                continue;
+        StrategyDataDto result = new StrategyDataDto();
+        BeanUtils.copyProperties(param, result);
+        try {
+            JiebaSegmenter segmenter = new JiebaSegmenter();
+            List<SegToken> process = segmenter.process(param.getVideoTitle(), JiebaSegmenter.SegMode.SEARCH);
+            for (SegToken segToken : process) {
+                String word = segToken.word;
+                if (stopWorlds.contains(word)) {
+                    continue;
+                }
+                score += scoreWordMap.getOrDefault(word, 0.0);
             }
-            score += scoreWordMap.getOrDefault(word, 0.0);
-        }
 // workaround:低于0.3先不过滤,写入飞书后,运营人工处理
 //        if (score <= 0.3) {
 //            log.warn("title score is less than 0.3 {}", param);
 //            slsService.log("mode", param.getStrategy(), "crawler", param.getPlatform(), "message", "权重分:" + score + "<=0.3");
 //            return null;
 //        }
-
-        StrategyDataDto result = new StrategyDataDto();
-        BeanUtils.copyProperties(param, result);
+        } catch (BeansException e) {
+            log.error("failed to calculate title weight score! title [{}]", param.getVideoTitle(), e);
+        }
 
         result.setTitleScore(score);
         return result;

+ 45 - 23
etl-core/src/main/java/com/tzld/crawler/etl/util/FileUtils.java

@@ -25,6 +25,8 @@
 package com.tzld.crawler.etl.util;
 
 
+import com.tzld.crawler.etl.common.enums.ExceptionEnum;
+import com.tzld.crawler.etl.common.exception.CommonException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -44,32 +46,52 @@ public class FileUtils {
         String fileUrl = "https://v2.kwaicdn.com/upic/2023/06/05/13/BMjAyMzA2MDUxMzA1MDNfNDU4Mjk4ODc5XzEwNDgwODM0NzMyNF8xXzM=_b_B4c86f13e02feb5462f484f95626229f6.mp4?pkey=AAVsuVfgud8EyKY8wn5cED5iBHD_2CCfZLuih27xJbjMwhqxIU-IHLkRhoDV0RxWUBjRvYtWWswKPL_n6u3csHgYD-euPHV1phmxa0r3ndOom3mRtowdHKCs5C9mr-PlXT8&tag=1-1686310919-unknown-0-5o0hj64qdg-0ee9b724883fdad8&clientCacheKey=3xz7hyd9c8h4zwa_b.mp4&di=ab7f961c&bp=14944&tt=b&ss=vp";
 
         String filePath = "/Users/ehlxr/Workspaces/tzld/crawler-etl/video1.mp4";
-        download(fileUrl, filePath);
+        try {
+            download(fileUrl, filePath);
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
     }
 
-    public static boolean download(String fileUrl, String filePath) {
-        try {
-            log.info("begin download file url: {} local path: {}", fileUrl, filePath);
-            URL url = new URL(fileUrl);
-            HttpURLConnection conn = (HttpURLConnection) url.openConnection();
-            conn.setConnectTimeout(5000);
-            conn.setReadTimeout(5000);
-            log.info("File url: {} local path: {} size: {}MB", fileUrl, filePath, conn.getContentLength() / 1024 / 1024);
+    public static String formatFileSize(long sizeInBytes) {
+        if (sizeInBytes < 1024) {
+            return sizeInBytes + " B";
+        } else if (sizeInBytes < 1024 * 1024) {
+            double sizeInKB = sizeInBytes / 1024.0;
+            return String.format("%.2f KB", sizeInKB);
+        } else {
+            double sizeInMB = sizeInBytes / (1024.0 * 1024.0);
+            return String.format("%.2f MB", sizeInMB);
+        }
+    }
 
-            InputStream inputStream = conn.getInputStream();
-            FileOutputStream outputStream = new FileOutputStream(filePath);
-            byte[] buffer = new byte[4096];
-            int len;
-            while ((len = inputStream.read(buffer)) != -1) {
-                outputStream.write(buffer, 0, len);
-            }
-            inputStream.close();
-            outputStream.close();
-            log.info("downloaded successfully file url: {} local path: {} ", fileUrl, filePath);
-            return true;
-        } catch (Exception e) {
-            log.error("downloaded error file url: {} local path: {} ", fileUrl, filePath, e);
-            return false;
+    public static void download(String fileUrl, String filePath) throws Exception {
+        // try {
+        log.info("begin download file url {} local path {}", fileUrl, filePath);
+        URL url = new URL(fileUrl);
+        HttpURLConnection conn = (HttpURLConnection) url.openConnection();
+        if (conn.getResponseCode() == HttpURLConnection.HTTP_FORBIDDEN) {
+            throw new CommonException(ExceptionEnum.URL_FORBIDDEN);
+        }
+        conn.setConnectTimeout(5000);
+        conn.setReadTimeout(5000);
+        log.info("file size {} url {}, local path {} ",
+                formatFileSize(conn.getContentLength()), fileUrl, filePath);
+
+        InputStream inputStream = conn.getInputStream();
+        FileOutputStream outputStream = new FileOutputStream(filePath);
+        byte[] buffer = new byte[4096];
+        int len;
+        while ((len = inputStream.read(buffer)) != -1) {
+            outputStream.write(buffer, 0, len);
         }
+        inputStream.close();
+        outputStream.close();
+        log.info("downloaded successfully file url {}, local path {} ", fileUrl, filePath);
+        // return true;
+        // } catch (Exception e) {
+        //     log.error("downloaded error file url: {} local path: {} ", fileUrl, filePath, e);
+        //     return false;
+        // }
     }
 }

+ 28 - 30
etl-core/src/main/java/com/tzld/crawler/etl/util/VideoUtils.java

@@ -24,8 +24,6 @@
 
 package com.tzld.crawler.etl.util;
 
-import com.tzld.crawler.etl.common.enums.ExceptionEnum;
-import com.tzld.crawler.etl.common.exception.CommonException;
 import com.tzld.crawler.etl.model.dto.VideoInfoDto;
 import net.bramp.ffmpeg.FFprobe;
 import net.bramp.ffmpeg.probe.FFmpegProbeResult;
@@ -65,37 +63,37 @@ public class VideoUtils {
         }
     }
 
-    public static void videoCompose(String ffmpegPath, String videoPath, String audioPath, String outPath) {
-        try {
-            // FFmpeg fFmpeg = new FFmpeg("ffmpeg");
-            // FFprobe fFprobe = new FFprobe("ffprobe");
-            //
-            // FFmpegBuilder builder = new FFmpegBuilder()
-            //         .addInput(videoPath)
-            //         // .addExtraArgs("-i", videoPath)  // 然后添加音频文件作为参数
-            //         .addExtraArgs("-i", audioPath)  // 然后添加音频文件作为参数
-            //         .addOutput(outPath)   // 输出文件
-            //         .addExtraArgs("-c:v", "copy")  // 使用原视频的编码
-            //         .addExtraArgs("-c:a", "aac")  // 音频用aac编码
-            //         .addExtraArgs("-strict", "experimental")
-            //         .addExtraArgs("-map", "0:v:0")  //  映射视频流
-            //         .addExtraArgs("-map", "1:a:0")  //  映射音频流
-            //         .done();
-            // FFmpegExecutor executor = new FFmpegExecutor(fFmpeg, fFprobe);
-            // executor.createJob(builder).run(); // 执行 ffmpeg 命令
-            ProcessBuilder processBuilder = new ProcessBuilder(ffmpegPath, "-i", videoPath, "-i", audioPath,
-                    "-c:v", "copy", "-c:a", "aac", "-strict", "experimental", "-map", "0:v:0", "-map", "1:a:0", outPath);
-            Process process = processBuilder.start();
-            process.waitFor();
-        } catch (Exception e) {
-            log.error("video {} audio {} compose error.", videoPath, audioPath, e);
-            throw new CommonException(ExceptionEnum.DATA_ERROR, "video compose error");
-        }
+    public static void videoSynthesis(String ffmpegPath, String videoPath, String audioPath, String outPath) throws Exception {
+        // try {
+        // FFmpeg fFmpeg = new FFmpeg("ffmpeg");
+        // FFprobe fFprobe = new FFprobe("ffprobe");
+        //
+        // FFmpegBuilder builder = new FFmpegBuilder()
+        //         .addInput(videoPath)
+        //         // .addExtraArgs("-i", videoPath)  // 然后添加音频文件作为参数
+        //         .addExtraArgs("-i", audioPath)  // 然后添加音频文件作为参数
+        //         .addOutput(outPath)   // 输出文件
+        //         .addExtraArgs("-c:v", "copy")  // 使用原视频的编码
+        //         .addExtraArgs("-c:a", "aac")  // 音频用aac编码
+        //         .addExtraArgs("-strict", "experimental")
+        //         .addExtraArgs("-map", "0:v:0")  //  映射视频流
+        //         .addExtraArgs("-map", "1:a:0")  //  映射音频流
+        //         .done();
+        // FFmpegExecutor executor = new FFmpegExecutor(fFmpeg, fFprobe);
+        // executor.createJob(builder).run(); // 执行 ffmpeg 命令
+        ProcessBuilder processBuilder = new ProcessBuilder(ffmpegPath, "-i", videoPath, "-i", audioPath,
+                "-c:v", "copy", "-c:a", "aac", "-strict", "experimental", "-map", "0:v:0", "-map", "1:a:0", outPath);
+        Process process = processBuilder.start();
+        process.waitFor();
+        // } catch (Exception e) {
+        //     log.error("video {} audio {} synthesis error.", videoPath, audioPath, e);
+        //     throw new CommonException(ExceptionEnum.DATA_ERROR, "video synthesis error");
+        // }
     }
 
 
-    public static void main(String[] args) {
-        videoCompose("ffmpeg", "/datalog/crawler/videos/longvideo/crawler_local/video/dev/20230628/7628fedb3f1abf460733da698c7fc169",
+    public static void main(String[] args) throws Exception {
+        videoSynthesis("ffmpeg", "/datalog/crawler/videos/longvideo/crawler_local/video/dev/20230628/7628fedb3f1abf460733da698c7fc169",
                 "/datalog/crawler/videos/longvideo/crawler_local/audit/dev/20230628/7628fedb3f1abf460733da698c7fc169",
                 "/datalog/crawler/videos/longvideo/crawler_local/video/dev/20230628/34b192c574330bbcaf008b3c90e565b9_comp111100000");
     }

+ 9 - 0
pom.xml

@@ -154,6 +154,15 @@
             <version>0.7.0</version>
         </dependency>
 
+        <dependency>
+            <groupId>javax.validation</groupId>
+            <artifactId>validation-api</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.hibernate.validator</groupId>
+            <artifactId>hibernate-validator</artifactId>
+        </dependency>
+
     </dependencies>
 
 </project>