|
@@ -25,6 +25,7 @@
|
|
|
package com.tzld.crawler.etl.service.impl;
|
|
|
|
|
|
import com.ctrip.framework.apollo.spring.annotation.ApolloJsonValue;
|
|
|
+import com.google.common.base.Stopwatch;
|
|
|
import com.google.common.base.Strings;
|
|
|
import com.google.common.collect.Lists;
|
|
|
import com.tzld.commons.aliyun.oss.AliyunOssManager;
|
|
@@ -36,13 +37,15 @@ import com.tzld.crawler.etl.common.exception.CommonException;
|
|
|
import com.tzld.crawler.etl.dao.mapper.CrawlerUserV3Mapper;
|
|
|
import com.tzld.crawler.etl.dao.mapper.CrawlerVideoMapper;
|
|
|
import com.tzld.crawler.etl.dao.mapper.ext.CrawlerVideoExtMapper;
|
|
|
+import com.tzld.crawler.etl.enums.MetricTypeEnum;
|
|
|
+import com.tzld.crawler.etl.model.dto.MetricLogDto;
|
|
|
import com.tzld.crawler.etl.model.dto.StrategyDataDto;
|
|
|
import com.tzld.crawler.etl.model.dto.VideoInfoDto;
|
|
|
import com.tzld.crawler.etl.model.param.CrawlerVideoSendParam;
|
|
|
import com.tzld.crawler.etl.model.po.CrawlerUserV3;
|
|
|
import com.tzld.crawler.etl.model.po.CrawlerUserV3Example;
|
|
|
import com.tzld.crawler.etl.model.po.CrawlerVideo;
|
|
|
-import com.tzld.crawler.etl.model.vo.CrawlerVideoVO;
|
|
|
+import com.tzld.crawler.etl.model.vo.CrawlerEtlParam;
|
|
|
import com.tzld.crawler.etl.model.vo.WxVideoVO;
|
|
|
import com.tzld.crawler.etl.service.EtlService;
|
|
|
import com.tzld.crawler.etl.service.SlsService;
|
|
@@ -114,15 +117,16 @@ public class EtlServiceImpl implements EtlService {
|
|
|
private String ffprobePath;
|
|
|
@Value("${ffprobe.path:ffmpeg}")
|
|
|
private String ffmpegPath;
|
|
|
-
|
|
|
@ApolloJsonValue("${fail.retry.times:{}}")
|
|
|
private Map<String, Integer> failRetryTimes;
|
|
|
|
|
|
private Executor pool;
|
|
|
|
|
|
+ private final ThreadLocal<CrawlerEtlParam> tlParam = new ThreadLocal<>();
|
|
|
+
|
|
|
public EtlServiceImpl(StrategyHandlerService strategyHandlerService, AliyunOssManager aliyunOssManager,
|
|
|
- LongVideoFeign longVideoFeign, CrawlerVideoMapper crawlerVideoMapper, SlsService slsService,
|
|
|
- CrawlerVideoExtMapper crawlerVideoExtMapper, CrawlerUserV3Mapper crawlerUserV3Mapper) {
|
|
|
+ LongVideoFeign longVideoFeign, CrawlerVideoMapper crawlerVideoMapper, SlsService slsService,
|
|
|
+ CrawlerVideoExtMapper crawlerVideoExtMapper, CrawlerUserV3Mapper crawlerUserV3Mapper) {
|
|
|
this.strategyHandlerService = strategyHandlerService;
|
|
|
this.aliyunOssManager = aliyunOssManager;
|
|
|
this.longVideoFeign = longVideoFeign;
|
|
@@ -133,7 +137,9 @@ public class EtlServiceImpl implements EtlService {
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
- public void deal(CrawlerVideoVO param) {
|
|
|
+ public void deal(CrawlerEtlParam param) {
|
|
|
+ tlParam.set(param);
|
|
|
+ Stopwatch sw = Stopwatch.createStarted();
|
|
|
CrawlerVideo crawlerVideo = new CrawlerVideo();
|
|
|
BeanUtils.copyProperties(param, crawlerVideo);
|
|
|
long id = 0L;
|
|
@@ -141,14 +147,13 @@ public class EtlServiceImpl implements EtlService {
|
|
|
// 参数校验
|
|
|
String errorMessage = CustomValidator.validate(param);
|
|
|
if (!Strings.isNullOrEmpty(errorMessage)) {
|
|
|
- log.error("param validate failed. {}", errorMessage);
|
|
|
- return;
|
|
|
+ throw new CommonException(ExceptionEnum.PARAM_ERROR, "param validate failed." + errorMessage);
|
|
|
}
|
|
|
|
|
|
// 保存数据库(去重校验)
|
|
|
id = save2db(crawlerVideo);
|
|
|
if (id <= 0) {
|
|
|
- return;
|
|
|
+ throw new CommonException(ExceptionEnum.DATA_ERROR, "save2db failed.");
|
|
|
}
|
|
|
|
|
|
// 策略应用
|
|
@@ -157,8 +162,7 @@ public class EtlServiceImpl implements EtlService {
|
|
|
|
|
|
StrategyDataDto data = strategyHandlerService.execute(strategies, param);
|
|
|
if (data == null) {
|
|
|
- log.info("{} filter by strategies {}", param, strategies);
|
|
|
- return;
|
|
|
+ throw new CommonException(ExceptionEnum.DATA_ERROR, param + " filter by strategies " + strategies);
|
|
|
}
|
|
|
|
|
|
// 音频、视频文件下载、合成,上传 OSS、清理视频信息
|
|
@@ -177,6 +181,8 @@ public class EtlServiceImpl implements EtlService {
|
|
|
|
|
|
// 视频写入飞书
|
|
|
async2Feishu(data, videoId);
|
|
|
+
|
|
|
+ metric(MetricTypeEnum.SUCCESS);
|
|
|
} catch (Exception e) {
|
|
|
log.error("etl server deal {} failed.", param, e);
|
|
|
// 回滚数据
|
|
@@ -184,6 +190,9 @@ public class EtlServiceImpl implements EtlService {
|
|
|
crawlerVideoMapper.deleteByPrimaryKey(id);
|
|
|
}
|
|
|
throw new CommonException(ExceptionEnum.SYSTEM_ERROR, "etl server deal error: " + e.getMessage());
|
|
|
+ } finally {
|
|
|
+ metric(MetricTypeEnum.DURATION, sw.stop());
|
|
|
+ tlParam.remove();
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -198,6 +207,7 @@ public class EtlServiceImpl implements EtlService {
|
|
|
// 根据站外视频 ID 唯一约束 key 做去重校验
|
|
|
log.info("out video id {} of platform {} strategy {} has exist!", crawlerVideo.getOutVideoId(),
|
|
|
crawlerVideo.getPlatform(), crawlerVideo.getStrategy());
|
|
|
+ metric(MetricTypeEnum.DUPLICATED);
|
|
|
return Pair.of(false, 0L);
|
|
|
}
|
|
|
}, "video2db", String.format("save video info [%s] to db", crawlerVideo));
|
|
@@ -437,6 +447,20 @@ public class EtlServiceImpl implements EtlService {
|
|
|
retryFuncR(c -> Pair.of(func.test(c), null), t, type, errorMsg);
|
|
|
}
|
|
|
|
|
|
+ private void metric(MetricTypeEnum typeEnum, Object data) {
|
|
|
+ CrawlerEtlParam param = tlParam.get();
|
|
|
+ slsService.metric(MetricLogDto.newBuiler()
|
|
|
+ .platform(param.getPlatform())
|
|
|
+ .strategy(param.getStrategy())
|
|
|
+ .type(typeEnum)
|
|
|
+ .data(data)
|
|
|
+ .msg(param.toString()).build());
|
|
|
+ }
|
|
|
+
|
|
|
+ private void metric(MetricTypeEnum typeEnum) {
|
|
|
+ metric(typeEnum, "");
|
|
|
+ }
|
|
|
+
|
|
|
@PostConstruct
|
|
|
public void init() {
|
|
|
pool = Executors.newFixedThreadPool(1);
|