Browse Source

update video repeat judgment logic

ehlxr 1 year ago
parent
commit
24aca517f7

+ 7 - 0
etl-core/src/main/java/com/tzld/crawler/etl/dao/mapper/ext/CrawlerVideoExtMapper.java

@@ -0,0 +1,7 @@
+package com.tzld.crawler.etl.dao.mapper.ext;
+
+import com.tzld.crawler.etl.model.po.CrawlerVideo;
+
+public interface CrawlerVideoExtMapper {
+    int insertSelectiveReturnId(CrawlerVideo record);
+}

+ 45 - 49
etl-core/src/main/java/com/tzld/crawler/etl/service/impl/EtlServiceImpl.java

@@ -33,11 +33,11 @@ import com.tzld.crawler.etl.common.base.Constant;
 import com.tzld.crawler.etl.common.enums.ExceptionEnum;
 import com.tzld.crawler.etl.common.exception.CommonException;
 import com.tzld.crawler.etl.dao.mapper.CrawlerVideoMapper;
+import com.tzld.crawler.etl.dao.mapper.ext.CrawlerVideoExtMapper;
 import com.tzld.crawler.etl.model.dto.StrategyDataDto;
 import com.tzld.crawler.etl.model.dto.VideoInfoDto;
 import com.tzld.crawler.etl.model.param.CrawlerVideoSendParam;
 import com.tzld.crawler.etl.model.po.CrawlerVideo;
-import com.tzld.crawler.etl.model.po.CrawlerVideoExample;
 import com.tzld.crawler.etl.model.vo.CrawlerVideoVO;
 import com.tzld.crawler.etl.model.vo.WxVideoVO;
 import com.tzld.crawler.etl.service.EtlService;
@@ -79,6 +79,7 @@ public class EtlServiceImpl implements EtlService {
     private final LongVideoFeign longVideoFeign;
     private final CrawlerVideoMapper crawlerVideoMapper;
     private final SlsService slsService;
+    private final CrawlerVideoExtMapper crawlerVideoExtMapper;
 
     @Value("${download.file.path:/data/crawler/videos}")
     private String downloadPath;
@@ -106,12 +107,14 @@ public class EtlServiceImpl implements EtlService {
     private Executor pool;
 
     public EtlServiceImpl(StrategyHandlerService strategyHandlerService, AliyunOssManager aliyunOssManager,
-                          LongVideoFeign longVideoFeign, CrawlerVideoMapper crawlerVideoMapper, SlsService slsService) {
+            LongVideoFeign longVideoFeign, CrawlerVideoMapper crawlerVideoMapper, SlsService slsService,
+            CrawlerVideoExtMapper crawlerVideoExtMapper) {
         this.strategyHandlerService = strategyHandlerService;
         this.aliyunOssManager = aliyunOssManager;
         this.longVideoFeign = longVideoFeign;
         this.crawlerVideoMapper = crawlerVideoMapper;
         this.slsService = slsService;
+        this.crawlerVideoExtMapper = crawlerVideoExtMapper;
     }
 
     @Override
@@ -119,31 +122,42 @@ public class EtlServiceImpl implements EtlService {
         String title = param.getVideoTitle();
         String platform = param.getPlatform();
         String strategy = param.getStrategy();
+        long id = 0l;
         try {
-            // 视频去重
-            CrawlerVideoExample example = new CrawlerVideoExample();
-            example.createCriteria().andOutVideoIdEqualTo(param.getOutVideoId()).andPlatformEqualTo(platform);
-            long count = crawlerVideoMapper.countByExample(example);
-            if (count > 1) {
-                slsService.log("message", "video" + param.getOutVideoId() + " has exist", "crawler", platform, "mode", strategy);
-                log.info("video {} platform {} strategy {}  has exist.", param.getOutVideoId(), param.getPlatform(), param.getStrategy());
-                return;
-            }
+            // 保存数据库
+            // TODO: alter table crawler_video drop key unq_video_id;
+            CrawlerVideo crawlerVideo = new CrawlerVideo();
+            BeanUtils.copyProperties(param, crawlerVideo);
+            String insertSql =
+                    "insert into crawler_video(user_id, out_user_id, platform, strategy, out_video_id, video_title,"
+                            + " cover_url, video_url, duration, publish_time, play_cnt, crawler_rule, width, height) values("
+                            + crawlerVideo.getUserId() + "," + crawlerVideo.getOutUserId() + ","
+                            + crawlerVideo.getPlatform() + "," + crawlerVideo.getStrategy() + ","
+                            + crawlerVideo.getOutVideoId() + "," + crawlerVideo.getVideoTitle() + ","
+                            + crawlerVideo.getCoverUrl() + "," + crawlerVideo.getVideoUrl() + ","
+                            + crawlerVideo.getDuration() + "," + crawlerVideo.getPublishTime() + ","
+                            + crawlerVideo.getPlayCnt() + "," + crawlerVideo.getCrawlerRule() + ","
+                            + crawlerVideo.getWidth() + "," + crawlerVideo.getHeight() + ")";
+            slsService.log("message", "insert_sql: " + insertSql, "crawler", platform, "mode", strategy);
+            crawlerVideoExtMapper.insertSelectiveReturnId(crawlerVideo);
+            id = crawlerVideo.getId();
+            slsService.log("message", "视频信息写入数据库成功", "crawler", platform, "mode", strategy);
 
-            // 1.策略应用
-            // 获取用户选择策略
+            // 策略应用
+            // TODO: 获取用户选择策略
             List<String> stategies = Lists.newArrayList("titleScore");
 
             StrategyDataDto data = strategyHandlerService.execute(stategies, param);
             if (data == null) {
+                log.info("{} filter by stategies {}", param, stategies);
                 return;
             }
 
-            // 2.音频、视频文件下载、合成,上传 OSS、清理视频信息
-            // 3.视频封面下载、上传 OSS、清理视频信息
+            // 音频、视频文件下载、合成,上传 OSS、清理视频信息
+            // 视频封面下载、上传 OSS、清理视频信息
             processVideo(data);
 
-            // 4.视频发布
+            // 视频发布
             CrawlerVideoSendParam request = new CrawlerVideoSendParam();
             request.setLoginUid(data.getUserId());
             request.setAppType(888888);
@@ -155,43 +169,23 @@ public class EtlServiceImpl implements EtlService {
             request.setViewStatus(1);
             request.setCrawlerSrcId(data.getOutVideoId());
             request.setCrawlerSrcCode(platform.toUpperCase());
-            LocalDateTime localDateTime = LocalDateTime.parse(data.getPublishTime(), DateTimeFormatter.ofPattern(Constant.STANDARD_FORMAT));
+            LocalDateTime localDateTime =
+                    LocalDateTime.parse(data.getPublishTime(), DateTimeFormatter.ofPattern(Constant.STANDARD_FORMAT));
             request.setCrawlerSrcPublishTimestamp(localDateTime.toInstant(ZoneOffset.of("+8")).toEpochMilli());
             request.setCrawlerTaskTimestamp(System.currentTimeMillis());
             CommonResponse<WxVideoVO> response = longVideoFeign.crawlerVideoSend(request);
             log.info("crawler data send request: {}, response: {}", request, response);
             if (!response.isSuccess()) {
-                throw new CommonException(ExceptionEnum.INVOKE_VIDEOAPI_ERROR, "invoke crawler data send failed!" + response);
+                throw new CommonException(ExceptionEnum.INVOKE_VIDEOAPI_ERROR,
+                        "invoke crawler data send failed!" + response);
             }
 
-            // 5.视频元信息更新
-
-            // 6.保存信息到数据库
-            CrawlerVideo crawlerVideo = new CrawlerVideo();
-            BeanUtils.copyProperties(data, crawlerVideo);
+            // 更新数据库
             crawlerVideo.setVideoId(response.getData().getId());
-            String insertSql = "insert into crawler_video(video_id, user_id, out_user_id, platform, strategy, out_video_id, video_title," +
-                    " cover_url, video_url, duration, publish_time, play_cnt, crawler_rule, width, height, title_score) values(" + crawlerVideo.getVideoId() + "," +
-                    crawlerVideo.getUserId() + "," +
-                    crawlerVideo.getOutUserId() + "," +
-                    crawlerVideo.getPlatform() + "," +
-                    crawlerVideo.getStrategy() + "," +
-                    crawlerVideo.getOutVideoId() + "," +
-                    crawlerVideo.getVideoTitle() + "," +
-                    crawlerVideo.getCoverUrl() + "," +
-                    crawlerVideo.getVideoUrl() + "," +
-                    crawlerVideo.getDuration() + "," +
-                    crawlerVideo.getPublishTime() + "," +
-                    crawlerVideo.getPlayCnt() + "," +
-                    crawlerVideo.getCrawlerRule() + "," +
-                    crawlerVideo.getWidth() + "," +
-                    crawlerVideo.getHeight() + "," +
-                    crawlerVideo.getTitleScore() + ")";
-            slsService.log("message", "insert_sql: " + insertSql, "crawler", platform, "mode", strategy);
-            crawlerVideoMapper.insertSelective(crawlerVideo);
-            slsService.log("message", "视频信息写入数据库成功", "crawler", platform, "mode", strategy);
+            crawlerVideo.setTitleScore(data.getTitleScore());
+            crawlerVideoMapper.updateByPrimaryKeySelective(crawlerVideo);
 
-            // 7.视频写入飞书
+            // 视频写入飞书
             pool.execute(() -> {
                 try {
                     String sheetToken = feishuSheetTokenMap.get(platform);
@@ -208,8 +202,7 @@ public class EtlServiceImpl implements EtlService {
                     String range = feishuRangeMap.get(platform + strategyType);
 
                     log.info("{} {} sheetToken {} sheetId {} range {}", platform, strategy, sheetToken, sheetId, range);
-                    String fsResp = FeishuUtils.insertRows(feishuAppid, feishuAppsecret, sheetToken,
-                            sheetId, 1, 2);
+                    String fsResp = FeishuUtils.insertRows(feishuAppid, feishuAppsecret, sheetToken, sheetId, 1, 2);
                     log.debug("insert columns to feishu sheet response is {}", fsResp);
 
                     List<List<Object>> values = new ArrayList<>();
@@ -217,7 +210,8 @@ public class EtlServiceImpl implements EtlService {
                     DateTimeFormatter formatter = DateTimeFormatter.ofPattern(Constant.STANDARD_FORMAT);
                     value.add(data.getTitleScore());
                     value.add(crawlerVideo.getVideoId());
-                    value.add(formatter.format(LocalDateTime.ofInstant(Instant.ofEpochMilli(System.currentTimeMillis()), ZoneId.systemDefault())));
+                    value.add(formatter.format(LocalDateTime.ofInstant(Instant.ofEpochMilli(System.currentTimeMillis()),
+                            ZoneId.systemDefault())));
                     value.add(strategy);
                     value.add(data.getOutVideoId());
                     value.add(data.getVideoTitle());
@@ -241,11 +235,14 @@ public class EtlServiceImpl implements EtlService {
 
                     slsService.log("message", "视频已保存至云文档", "crawler", platform, "mode", strategy);
                 } catch (Exception e) {
+                    // 保存飞书失败不回滚数据
                     log.error("save data to feishu sheet error. platform {}, strategy {}", platform, strategy, e);
                 }
             });
         } catch (Exception e) {
             log.error("etl server deal {} failed.", param, e);
+            // 回滚数据
+            crawlerVideoMapper.deleteByPrimaryKey(id);
             throw new CommonException(ExceptionEnum.SYSTEM_ERROR, "etl server deal error: " + e.getMessage());
         }
     }
@@ -272,8 +269,7 @@ public class EtlServiceImpl implements EtlService {
         String tempFilePath = downloadPath + File.separator + videoPath;
         file2oss(tempFilePath, videoPath, platform, strategy);
         data.setVideoOssPath(videoPath);
-        if (data.getDuration() == null || data.getDuration() <= 0
-                || data.getWidth() == null || data.getWidth() <= 0
+        if (data.getDuration() == null || data.getDuration() <= 0 || data.getWidth() == null || data.getWidth() <= 0
                 || data.getHeight() == null || data.getHeight() <= 0) {
             // 获取视频时长、宽、高
             VideoInfoDto videoInfo = VideoUtils.getVideoInfo(tempFilePath, ffprobePath);

+ 150 - 0
etl-core/src/main/resources/mapper/ext/CrawlerVideoExtMapper.xml

@@ -0,0 +1,150 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
+<mapper namespace="com.tzld.crawler.etl.dao.mapper.ext.CrawlerVideoExtMapper">
+<insert id="insertSelectiveReturnId" parameterType="com.tzld.crawler.etl.model.po.CrawlerVideo" keyProperty="id"
+            useGeneratedKeys="true">
+    insert into crawler_video
+    <trim prefix="(" suffix=")" suffixOverrides=",">
+      <if test="id != null">
+        id,
+      </if>
+      <if test="videoId != null">
+        video_id,
+      </if>
+      <if test="userId != null">
+        user_id,
+      </if>
+      <if test="outUserId != null">
+        out_user_id,
+      </if>
+      <if test="platform != null">
+        platform,
+      </if>
+      <if test="strategy != null">
+        strategy,
+      </if>
+      <if test="outVideoId != null">
+        out_video_id,
+      </if>
+      <if test="videoTitle != null">
+        video_title,
+      </if>
+      <if test="coverUrl != null">
+        cover_url,
+      </if>
+      <if test="videoUrl != null">
+        video_url,
+      </if>
+      <if test="duration != null">
+        duration,
+      </if>
+      <if test="publishTime != null">
+        publish_time,
+      </if>
+      <if test="playCnt != null">
+        play_cnt,
+      </if>
+      <if test="likeCnt != null">
+        like_cnt,
+      </if>
+      <if test="shareCnt != null">
+        share_cnt,
+      </if>
+      <if test="collectionCnt != null">
+        collection_cnt,
+      </if>
+      <if test="commentCnt != null">
+        comment_cnt,
+      </if>
+      <if test="crawlerRule != null">
+        crawler_rule,
+      </if>
+      <if test="width != null">
+        width,
+      </if>
+      <if test="height != null">
+        height,
+      </if>
+      <if test="titleScore != null">
+        title_score,
+      </if>
+      <if test="createTime != null">
+        create_time,
+      </if>
+      <if test="updateTime != null">
+        update_time,
+      </if>
+    </trim>
+    <trim prefix="values (" suffix=")" suffixOverrides=",">
+      <if test="id != null">
+        #{id,jdbcType=BIGINT},
+      </if>
+      <if test="videoId != null">
+        #{videoId,jdbcType=BIGINT},
+      </if>
+      <if test="userId != null">
+        #{userId,jdbcType=BIGINT},
+      </if>
+      <if test="outUserId != null">
+        #{outUserId,jdbcType=VARCHAR},
+      </if>
+      <if test="platform != null">
+        #{platform,jdbcType=VARCHAR},
+      </if>
+      <if test="strategy != null">
+        #{strategy,jdbcType=VARCHAR},
+      </if>
+      <if test="outVideoId != null">
+        #{outVideoId,jdbcType=VARCHAR},
+      </if>
+      <if test="videoTitle != null">
+        #{videoTitle,jdbcType=VARCHAR},
+      </if>
+      <if test="coverUrl != null">
+        #{coverUrl,jdbcType=VARCHAR},
+      </if>
+      <if test="videoUrl != null">
+        #{videoUrl,jdbcType=VARCHAR},
+      </if>
+      <if test="duration != null">
+        #{duration,jdbcType=BIGINT},
+      </if>
+      <if test="publishTime != null">
+        #{publishTime,jdbcType=VARCHAR},
+      </if>
+      <if test="playCnt != null">
+        #{playCnt,jdbcType=INTEGER},
+      </if>
+      <if test="likeCnt != null">
+        #{likeCnt,jdbcType=INTEGER},
+      </if>
+      <if test="shareCnt != null">
+        #{shareCnt,jdbcType=INTEGER},
+      </if>
+      <if test="collectionCnt != null">
+        #{collectionCnt,jdbcType=INTEGER},
+      </if>
+      <if test="commentCnt != null">
+        #{commentCnt,jdbcType=INTEGER},
+      </if>
+      <if test="crawlerRule != null">
+        #{crawlerRule,jdbcType=CHAR},
+      </if>
+      <if test="width != null">
+        #{width,jdbcType=INTEGER},
+      </if>
+      <if test="height != null">
+        #{height,jdbcType=INTEGER},
+      </if>
+      <if test="titleScore != null">
+        #{titleScore,jdbcType=DOUBLE},
+      </if>
+      <if test="createTime != null">
+        #{createTime,jdbcType=TIMESTAMP},
+      </if>
+      <if test="updateTime != null">
+        #{updateTime,jdbcType=TIMESTAMP},
+      </if>
+    </trim>
+  </insert>
+</mapper>

+ 1 - 1
etl-server/src/main/resources/application.yml

@@ -5,7 +5,7 @@ spring:
     name: crawler-etl
 
   jackson:
-    default-property-inclusion: non_null
+    default-property-inclusion: NON_NULL
 
   datasource:
     driver-class-name: com.mysql.jdbc.Driver