|
@@ -72,7 +72,9 @@ import java.util.List;
|
|
import java.util.Map;
|
|
import java.util.Map;
|
|
import java.util.concurrent.Executor;
|
|
import java.util.concurrent.Executor;
|
|
import java.util.concurrent.Executors;
|
|
import java.util.concurrent.Executors;
|
|
|
|
+import java.util.function.BooleanSupplier;
|
|
import java.util.function.Function;
|
|
import java.util.function.Function;
|
|
|
|
+import java.util.function.Predicate;
|
|
import java.util.function.Supplier;
|
|
import java.util.function.Supplier;
|
|
|
|
|
|
/**
|
|
/**
|
|
@@ -81,7 +83,7 @@ import java.util.function.Supplier;
|
|
*/
|
|
*/
|
|
@Service
|
|
@Service
|
|
public class EtlServiceImpl implements EtlService {
|
|
public class EtlServiceImpl implements EtlService {
|
|
- private final static Logger log = LoggerFactory.getLogger(EtlServiceImpl.class);
|
|
|
|
|
|
+ private static final Logger log = LoggerFactory.getLogger(EtlServiceImpl.class);
|
|
private final StrategyHandlerService strategyHandlerService;
|
|
private final StrategyHandlerService strategyHandlerService;
|
|
private final AliyunOssManager aliyunOssManager;
|
|
private final AliyunOssManager aliyunOssManager;
|
|
private final LongVideoFeign longVideoFeign;
|
|
private final LongVideoFeign longVideoFeign;
|
|
@@ -119,8 +121,8 @@ public class EtlServiceImpl implements EtlService {
|
|
private Executor pool;
|
|
private Executor pool;
|
|
|
|
|
|
public EtlServiceImpl(StrategyHandlerService strategyHandlerService, AliyunOssManager aliyunOssManager,
|
|
public EtlServiceImpl(StrategyHandlerService strategyHandlerService, AliyunOssManager aliyunOssManager,
|
|
- LongVideoFeign longVideoFeign, CrawlerVideoMapper crawlerVideoMapper, SlsService slsService,
|
|
|
|
- CrawlerVideoExtMapper crawlerVideoExtMapper, CrawlerUserV3Mapper crawlerUserV3Mapper) {
|
|
|
|
|
|
+ LongVideoFeign longVideoFeign, CrawlerVideoMapper crawlerVideoMapper, SlsService slsService,
|
|
|
|
+ CrawlerVideoExtMapper crawlerVideoExtMapper, CrawlerUserV3Mapper crawlerUserV3Mapper) {
|
|
this.strategyHandlerService = strategyHandlerService;
|
|
this.strategyHandlerService = strategyHandlerService;
|
|
this.aliyunOssManager = aliyunOssManager;
|
|
this.aliyunOssManager = aliyunOssManager;
|
|
this.longVideoFeign = longVideoFeign;
|
|
this.longVideoFeign = longVideoFeign;
|
|
@@ -186,18 +188,20 @@ public class EtlServiceImpl implements EtlService {
|
|
}
|
|
}
|
|
|
|
|
|
private long save2db(CrawlerVideo crawlerVideo) {
|
|
private long save2db(CrawlerVideo crawlerVideo) {
|
|
- return retrySupplierR(() -> {
|
|
|
|
|
|
+ Long id = retrySupplierR(() -> {
|
|
try {
|
|
try {
|
|
crawlerVideoExtMapper.insertSelectiveReturnId(crawlerVideo);
|
|
crawlerVideoExtMapper.insertSelectiveReturnId(crawlerVideo);
|
|
- slsService.log("message", "视频信息写入数据库成功", "crawler", crawlerVideo.getPlatform(), "mode", crawlerVideo.getStrategy());
|
|
|
|
- return Pair.of(true, crawlerVideo.getId());
|
|
|
|
|
|
+ slsService.log("message", "视频信息写入数据库成功", "crawler", crawlerVideo.getPlatform(), "mode",
|
|
|
|
+ crawlerVideo.getStrategy());
|
|
|
|
+ return Pair.of(false, crawlerVideo.getId());
|
|
} catch (DuplicateKeyException e) {
|
|
} catch (DuplicateKeyException e) {
|
|
// 根据站外视频 ID 唯一约束 key 做去重校验
|
|
// 根据站外视频 ID 唯一约束 key 做去重校验
|
|
log.info("out video id {} of platform {} strategy {} has exist!", crawlerVideo.getOutVideoId(),
|
|
log.info("out video id {} of platform {} strategy {} has exist!", crawlerVideo.getOutVideoId(),
|
|
crawlerVideo.getPlatform(), crawlerVideo.getStrategy());
|
|
crawlerVideo.getPlatform(), crawlerVideo.getStrategy());
|
|
- return Pair.of(true, -1L);
|
|
|
|
|
|
+ return Pair.of(false, 0L);
|
|
}
|
|
}
|
|
}, "video2db", String.format("save video info [%s] to db", crawlerVideo));
|
|
}, "video2db", String.format("save video info [%s] to db", crawlerVideo));
|
|
|
|
+ return id == null ? 0 : id;
|
|
}
|
|
}
|
|
|
|
|
|
private long videoSend(StrategyDataDto data) {
|
|
private long videoSend(StrategyDataDto data) {
|
|
@@ -250,7 +254,8 @@ public class EtlServiceImpl implements EtlService {
|
|
}
|
|
}
|
|
String range = feishuRangeMap.get(platform + strategyType);
|
|
String range = feishuRangeMap.get(platform + strategyType);
|
|
|
|
|
|
- log.info("platform [{}] strategy [{}] sheetToken is [{}], sheetId is [{}], range is [{}]", platform, strategy, sheetToken, sheetId, range);
|
|
|
|
|
|
+ log.info("platform [{}] strategy [{}] sheetToken is [{}], sheetId is [{}], range is [{}]", platform,
|
|
|
|
+ strategy, sheetToken, sheetId, range);
|
|
String fsResp = FeishuUtils.insertRows(feishuAppid, feishuAppsecret, sheetToken, sheetId, 1, 2);
|
|
String fsResp = FeishuUtils.insertRows(feishuAppid, feishuAppsecret, sheetToken, sheetId, 1, 2);
|
|
log.debug("insert columns to feishu sheet response is [{}]", fsResp);
|
|
log.debug("insert columns to feishu sheet response is [{}]", fsResp);
|
|
|
|
|
|
@@ -261,7 +266,7 @@ public class EtlServiceImpl implements EtlService {
|
|
CrawlerUserV3Example example = new CrawlerUserV3Example();
|
|
CrawlerUserV3Example example = new CrawlerUserV3Example();
|
|
example.createCriteria().andUidEqualTo(data.getUserId());
|
|
example.createCriteria().andUidEqualTo(data.getUserId());
|
|
List<CrawlerUserV3> crawlerUserV3s = crawlerUserV3Mapper.selectByExample(example);
|
|
List<CrawlerUserV3> crawlerUserV3s = crawlerUserV3Mapper.selectByExample(example);
|
|
- value.add(crawlerUserV3s.size() > 0 ? crawlerUserV3s.get(0).getLink() : "");
|
|
|
|
|
|
+ value.add(!crawlerUserV3s.isEmpty() ? crawlerUserV3s.get(0).getLink() : "");
|
|
}
|
|
}
|
|
|
|
|
|
value.add(data.getTitleScore());
|
|
value.add(data.getTitleScore());
|
|
@@ -306,11 +311,11 @@ public class EtlServiceImpl implements EtlService {
|
|
// 音、视频合成
|
|
// 音、视频合成
|
|
if (!Strings.isNullOrEmpty(audioUrl)) {
|
|
if (!Strings.isNullOrEmpty(audioUrl)) {
|
|
String audioPath = urlDownload(data.getAudioUrl(), "longvideo/crawler_local/audio", title);
|
|
String audioPath = urlDownload(data.getAudioUrl(), "longvideo/crawler_local/audio", title);
|
|
- retryFunc((t) -> {
|
|
|
|
|
|
+ retryFunc(t -> {
|
|
try {
|
|
try {
|
|
VideoUtils.videoSynthesis(ffmpegPath, downloadPath + File.separator + t,
|
|
VideoUtils.videoSynthesis(ffmpegPath, downloadPath + File.separator + t,
|
|
downloadPath + File.separator + audioPath, downloadPath + File.separator + t + "_comp.mp4");
|
|
downloadPath + File.separator + audioPath, downloadPath + File.separator + t + "_comp.mp4");
|
|
- return true;
|
|
|
|
|
|
+ return false;
|
|
} catch (Exception e) {
|
|
} catch (Exception e) {
|
|
throw new RuntimeException(e);
|
|
throw new RuntimeException(e);
|
|
}
|
|
}
|
|
@@ -359,26 +364,23 @@ public class EtlServiceImpl implements EtlService {
|
|
String localFilePath = downloadPath + File.separator + videoFilePath;
|
|
String localFilePath = downloadPath + File.separator + videoFilePath;
|
|
|
|
|
|
File localFile = new File(localFilePath);
|
|
File localFile = new File(localFilePath);
|
|
- if (!localFile.exists()) {
|
|
|
|
- if (!localFile.mkdirs()) {
|
|
|
|
- log.warn("mkdir dir [{}] failed!", localFilePath);
|
|
|
|
- }
|
|
|
|
|
|
+ if (!localFile.exists() && (!localFile.mkdirs())) {
|
|
|
|
+ log.warn("mkdir dir [{}] failed!", localFilePath);
|
|
}
|
|
}
|
|
|
|
|
|
retrySupplier(() -> {
|
|
retrySupplier(() -> {
|
|
try {
|
|
try {
|
|
// 下载文件
|
|
// 下载文件
|
|
FileUtils.download(fileUrl, localFilePath + File.separator + titleMmd5);
|
|
FileUtils.download(fileUrl, localFilePath + File.separator + titleMmd5);
|
|
- return true;
|
|
|
|
- } catch (Exception e) {
|
|
|
|
- if (e instanceof CommonException) {
|
|
|
|
- CommonException ce = (CommonException) e;
|
|
|
|
- if (ce.getCode() == ExceptionEnum.URL_FORBIDDEN.getCode()) {
|
|
|
|
- log.error("access to the url [{}] of remote server is prohibited.", fileUrl);
|
|
|
|
- return true;
|
|
|
|
- }
|
|
|
|
|
|
+ return false;
|
|
|
|
+ } catch (CommonException e) {
|
|
|
|
+ if (e.getCode() == ExceptionEnum.URL_FORBIDDEN.getCode()) {
|
|
|
|
+ log.error("access to the url [{}] of remote server is prohibited.", fileUrl);
|
|
|
|
+ return false;
|
|
}
|
|
}
|
|
throw new RuntimeException(e);
|
|
throw new RuntimeException(e);
|
|
|
|
+ } catch (Exception e) {
|
|
|
|
+ throw new RuntimeException(e);
|
|
}
|
|
}
|
|
}, "download", String.format("download file from [%s] to [%s]", fileUrl, filePath));
|
|
}, "download", String.format("download file from [%s] to [%s]", fileUrl, filePath));
|
|
|
|
|
|
@@ -390,22 +392,22 @@ public class EtlServiceImpl implements EtlService {
|
|
slsService.log("message", "开始上传视频... ", "crawler", crawler, "mode", mode);
|
|
slsService.log("message", "开始上传视频... ", "crawler", crawler, "mode", mode);
|
|
log.info("begin upload {} to oss key {}", localFile, ossBucketKey);
|
|
log.info("begin upload {} to oss key {}", localFile, ossBucketKey);
|
|
|
|
|
|
- retryFunc((t) -> {
|
|
|
|
|
|
+ retrySupplier(() -> {
|
|
try {
|
|
try {
|
|
aliyunOssManager.putObject(ossBucket, ossBucketKey, Files.newInputStream(Paths.get(localFile)));
|
|
aliyunOssManager.putObject(ossBucket, ossBucketKey, Files.newInputStream(Paths.get(localFile)));
|
|
- return true;
|
|
|
|
|
|
+ return false;
|
|
} catch (IOException e) {
|
|
} catch (IOException e) {
|
|
throw new RuntimeException(e);
|
|
throw new RuntimeException(e);
|
|
}
|
|
}
|
|
- }, "", "2oss", String.format("upload file [%s] to oss [%s]", localFile, ossBucketKey));
|
|
|
|
|
|
+ }, "2oss", String.format("upload file [%s] to oss [%s]", localFile, ossBucketKey));
|
|
}
|
|
}
|
|
|
|
|
|
- private void retrySupplier(Supplier<Boolean> supplier, String type, String errorMsg) {
|
|
|
|
- retryFunc((c) -> supplier.get(), null, type, errorMsg);
|
|
|
|
|
|
+ private void retrySupplier(BooleanSupplier supplier, String type, String errorMsg) {
|
|
|
|
+ retryFunc(c -> supplier.getAsBoolean(), null, type, errorMsg);
|
|
}
|
|
}
|
|
|
|
|
|
private <R> R retrySupplierR(Supplier<Pair<Boolean, R>> supplier, String type, String errorMsg) {
|
|
private <R> R retrySupplierR(Supplier<Pair<Boolean, R>> supplier, String type, String errorMsg) {
|
|
- return retryFuncR((c) -> supplier.get(), null, type, errorMsg);
|
|
|
|
|
|
+ return retryFuncR(c -> supplier.get(), null, type, errorMsg);
|
|
}
|
|
}
|
|
|
|
|
|
private <T, R> R retryFuncR(Function<T, Pair<Boolean, R>> func, T t, String type, String errorMsg) {
|
|
private <T, R> R retryFuncR(Function<T, Pair<Boolean, R>> func, T t, String type, String errorMsg) {
|
|
@@ -417,21 +419,22 @@ public class EtlServiceImpl implements EtlService {
|
|
try {
|
|
try {
|
|
Pair<Boolean, R> apply = func.apply(t);
|
|
Pair<Boolean, R> apply = func.apply(t);
|
|
r = apply.getRight();
|
|
r = apply.getRight();
|
|
- if (Boolean.TRUE.equals(apply.getLeft())) {
|
|
|
|
|
|
+ if (Boolean.FALSE.equals(apply.getLeft())) {
|
|
break;
|
|
break;
|
|
}
|
|
}
|
|
} catch (Exception e) {
|
|
} catch (Exception e) {
|
|
log.error("the operation '{}' has failed on the {}th retry.", errorMsg, retry, e);
|
|
log.error("the operation '{}' has failed on the {}th retry.", errorMsg, retry, e);
|
|
if (retry >= retryTimes) {
|
|
if (retry >= retryTimes) {
|
|
- throw new CommonException(ExceptionEnum.SYSTEM_ERROR, "the operation '" + errorMsg + "' has failed after " + retry + " times retry.");
|
|
|
|
|
|
+ throw new CommonException(ExceptionEnum.SYSTEM_ERROR,
|
|
|
|
+ "the operation '" + errorMsg + "' has failed after " + retry + " times retry.");
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
return r;
|
|
return r;
|
|
}
|
|
}
|
|
|
|
|
|
- private <T> void retryFunc(Function<T, Boolean> func, T t, String type, String errorMsg) {
|
|
|
|
- retryFuncR((t1) -> Pair.of(func.apply(t1), null), t, type, errorMsg);
|
|
|
|
|
|
+ private <T> void retryFunc(Predicate<T> func, T t, String type, String errorMsg) {
|
|
|
|
+ retryFuncR(c -> Pair.of(func.test(c), null), t, type, errorMsg);
|
|
}
|
|
}
|
|
|
|
|
|
@PostConstruct
|
|
@PostConstruct
|