Explorar el Código

Merge branch 'cooperation_video_candidate_pool_improved_lld_0509' into test

刘立冬 hace 1 semana
padre
commit
8b42e8348e
Se han modificado 15 ficheros con 390 adiciones y 125 borrados
  1. 22 0
      api-module/src/main/java/com/tzld/piaoquan/api/common/enums/ApiChannelEnum.java
  2. 71 6
      api-module/src/main/java/com/tzld/piaoquan/api/component/DeepSeekApiService.java
  3. 1 1
      api-module/src/main/java/com/tzld/piaoquan/api/dao/mapper/contentplatform/ext/ContentPlatformPlanMapperExt.java
  4. 1 1
      api-module/src/main/java/com/tzld/piaoquan/api/job/GzhReplyVideoRefreshJob.java
  5. 37 3
      api-module/src/main/java/com/tzld/piaoquan/api/job/contentplatform/ContentPlatformVideoJob.java
  6. 22 0
      api-module/src/main/java/com/tzld/piaoquan/api/model/po/contentplatform/ContentPlatformVideo.java
  7. 1 1
      api-module/src/main/java/com/tzld/piaoquan/api/service/contentplatform/impl/ContentPlatformApiServiceImpl.java
  8. 1 1
      api-module/src/main/java/com/tzld/piaoquan/api/service/contentplatform/impl/ContentPlatformPlanServiceImpl.java
  9. 39 22
      api-module/src/main/java/com/tzld/piaoquan/api/service/strategy/impl/BuckStrategyV1.java
  10. 35 25
      api-module/src/main/java/com/tzld/piaoquan/api/service/strategy/impl/ThirdPartyPushMessageStrategyV1.java
  11. 89 53
      api-module/src/main/java/com/tzld/piaoquan/api/service/strategy/impl/WeComPushMessageStrategyV1.java
  12. 37 7
      api-module/src/main/resources/mapper/contentplatform/ContentPlatformVideoMapper.xml
  13. 7 4
      api-module/src/main/resources/mapper/contentplatform/ext/ContentPlatformPlanMapperExt.xml
  14. 1 1
      api-module/src/test/java/com/tzld/piaoquan/api/WeComThirdPartTest.java
  15. 26 0
      common-module/src/main/java/com/tzld/piaoquan/growth/common/utils/RedisUtils.java

+ 22 - 0
api-module/src/main/java/com/tzld/piaoquan/api/common/enums/ApiChannelEnum.java

@@ -0,0 +1,22 @@
+package com.tzld.piaoquan.api.common.enums;
+
+public enum ApiChannelEnum {
+    OFFICIAL("official", "DeepSeek 官方 API"),
+    VOLCENGINE("volcengine", "火山引擎方舟 API");
+
+    private final String code;
+    private final String desc;
+
+    ApiChannelEnum(String code, String desc) {
+        this.code = code;
+        this.desc = desc;
+    }
+
+    public String getCode() {
+        return code;
+    }
+
+    public String getDesc() {
+        return desc;
+    }
+}

+ 71 - 6
api-module/src/main/java/com/tzld/piaoquan/api/component/DeepSeekApiService.java

@@ -2,6 +2,7 @@ package com.tzld.piaoquan.api.component;
 
 import com.alibaba.fastjson.JSONArray;
 import com.alibaba.fastjson.JSONObject;
+import com.tzld.piaoquan.api.common.enums.ApiChannelEnum;
 import com.tzld.piaoquan.api.model.dto.AIOfficialApiResponse;
 import com.tzld.piaoquan.api.model.dto.AIResult;
 import com.tzld.piaoquan.growth.common.utils.MapBuilder;
@@ -9,6 +10,7 @@ import lombok.extern.slf4j.Slf4j;
 import okhttp3.*;
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.http.util.TextUtils;
+import org.springframework.beans.factory.annotation.Value;
 import org.springframework.stereotype.Service;
 
 import javax.annotation.PostConstruct;
@@ -22,6 +24,24 @@ public class DeepSeekApiService {
 
     private OkHttpClient client;
 
+    @Value("${deepseek.default.model:deepseek-v4-flash}")
+    private String defaultModel;
+
+    @Value("${deepseek.official.url:https://api.deepseek.com/chat/completions}")
+    private String officialUrl;
+
+    @Value("${deepseek.official.apiKey:sk-62d7b2c37f824735aa4985852c919c1f}")
+    private String officialApiKey;
+
+    @Value("${deepseek.volcengine.url:https://ark.cn-beijing.volces.com/api/v3/chat/completions}")
+    private String volcengineUrl;
+
+    @Value("${deepseek.volcengine.apiKey:ark-b5d6fcbb-14f9-4f70-a92d-605ec6a72c8d-40883}")
+    private String volcengineApiKey;
+
+    @Value("${deepseek.volcengine.model:ep-20250717193758-8gvmz}")
+    private String volcengineModel;
+
     @PostConstruct
     public void init() {
         client = new OkHttpClient().newBuilder()
@@ -31,7 +51,17 @@ public class DeepSeekApiService {
                 .build();
     }
 
-    public AIResult requestOfficialApi(String prompt, String model, Double temperature, Boolean isJSON) {
+    /**
+     * 默认调用火山引擎 API
+     */
+    public AIResult request(String prompt, String model, Double temperature, Boolean isJSON) {
+        return request(prompt, model, temperature, isJSON, ApiChannelEnum.VOLCENGINE);
+    }
+
+    /**
+     * 支持指定渠道的 API 调用
+     */
+    public AIResult request(String prompt, String model, Double temperature, Boolean isJSON, ApiChannelEnum channel) {
         AIResult result = new AIResult();
         result.setSuccess(false);
         if (TextUtils.isBlank(prompt) || TextUtils.isBlank(prompt.trim())) {
@@ -39,6 +69,28 @@ public class DeepSeekApiService {
             return result;
         }
 
+        String url;
+        String apiKey;
+        String defaultChannelModel;
+        String channelName;
+
+        if (channel == ApiChannelEnum.OFFICIAL) {
+            url = officialUrl;
+            apiKey = officialApiKey;
+            defaultChannelModel = defaultModel;
+            channelName = "official";
+        } else {
+            url = volcengineUrl;
+            apiKey = volcengineApiKey;
+            defaultChannelModel = volcengineModel;
+            channelName = "volcengine";
+        }
+
+        if (TextUtils.isBlank(apiKey)) {
+            result.setFailReason(channelName + " apiKey is not configured");
+            return result;
+        }
+
         try {
             JSONArray jsonArray = new JSONArray();
             JSONObject message = new JSONObject();
@@ -46,9 +98,15 @@ public class DeepSeekApiService {
             message.put("content", prompt);
             jsonArray.add(message);
 
+            String useModel = Optional.ofNullable(model).orElse(defaultChannelModel);
+            if (TextUtils.isBlank(useModel)) {
+                result.setFailReason("model is empty");
+                return result;
+            }
+
             Map<Object, Object> bodyParam = MapBuilder
                     .builder()
-                    .put("model", Optional.ofNullable(model).orElse("deepseek-chat"))
+                    .put("model", useModel)
                     .put("temperature", Optional.ofNullable(temperature).orElse(0.3))
                     .put("messages", jsonArray)
                     .build();
@@ -61,17 +119,17 @@ public class DeepSeekApiService {
             MediaType mediaType = MediaType.parse("application/json");
             RequestBody body = RequestBody.create(mediaType, JSONObject.toJSONString(bodyParam));
             Request request = new Request.Builder()
-                    .url("https://api.deepseek.com/chat/completions")
+                    .url(url)
                     .method("POST", body)
                     .addHeader("Content-Type", "application/json")
                     .addHeader("Accept", "application/json")
-                    .addHeader("Authorization", "Bearer sk-62d7b2c37f824735aa4985852c919c1f")
+                    .addHeader("Authorization", "Bearer " + apiKey)
                     .build();
             Response response = client.newCall(request).execute();
 
             String responseContent = response.body().string();
             result.setResponseStr(responseContent);
-            log.info("deepseek api responseContent = {}", responseContent);
+            log.info("deepseek {} api responseContent = {}", channelName, responseContent);
             if (response.isSuccessful()) {
                 AIOfficialApiResponse obj = JSONObject.parseObject(responseContent, AIOfficialApiResponse.class);
                 if (CollectionUtils.isNotEmpty(obj.getChoices())) {
@@ -85,9 +143,16 @@ public class DeepSeekApiService {
                 result.setFailReason("request error code:" + response.code() + " message:" + json.getString("error"));
             }
         } catch (Exception e) {
-            log.error("deepseek official api fail: " + e.getMessage());
+            log.error("deepseek {} api fail: {}", channelName, e.getMessage());
             result.setFailReason(e.getMessage());
         }
         return result;
     }
+
+    /**
+     * 调用 DeepSeek 官方 API(向后兼容)
+     */
+    public AIResult requestOfficialApi(String prompt, String model, Double temperature, Boolean isJSON) {
+        return request(prompt, model, temperature, isJSON, ApiChannelEnum.OFFICIAL);
+    }
 }

+ 1 - 1
api-module/src/main/java/com/tzld/piaoquan/api/dao/mapper/contentplatform/ext/ContentPlatformPlanMapperExt.java

@@ -94,7 +94,7 @@ public interface ContentPlatformPlanMapperExt {
                                            @Param("oldStatus") Integer oldStatus,
                                            @Param("now") Long now);
 
-    List<ContentPlatformVideoAgg> getVideoAggList(@Param("dtList") List<String> dtList);
+    List<ContentPlatformVideoAgg> getVideoAggList(@Param("aggDt") String aggDt, @Param("dtList") List<String> dtList);
 
     List<ContentPlatformVideo> getVideoListByIds(@Param("videoIds") List<Long> videoIds);
 

+ 1 - 1
api-module/src/main/java/com/tzld/piaoquan/api/job/GzhReplyVideoRefreshJob.java

@@ -148,7 +148,7 @@ public class GzhReplyVideoRefreshJob {
         for (JSONObject obj : sortedList) {
             String text = obj.getString("title");
             String keywordPrompt = getKeyWordPrompt(text);
-            AIResult aiResult = deepSeekApiService.requestOfficialApi(keywordPrompt, null, null, false);
+            AIResult aiResult = deepSeekApiService.request(keywordPrompt, null, null, false);
             if (aiResult.isSuccess()) {
                 List<String> keywords = JSONObject.parseArray(aiResult.getResponse().getChoices().get(0).getMessage().getContent(), String.class);
                 log.info("GzhReplyVideoRefreshJob accountName:{} text:{} keywords:{}", accountName, text, keywords);

+ 37 - 3
api-module/src/main/java/com/tzld/piaoquan/api/job/contentplatform/ContentPlatformVideoJob.java

@@ -49,7 +49,7 @@ public class ContentPlatformVideoJob {
     @Autowired
     private ContentPlatformAccountMapper accountMapper;
 
-    @Value("${video.agg.days:3}")
+    @Value("${video.agg.days:30}")
     private Integer videoAggDays;
 
     @ApolloJsonValue("${video.recommend.score.config:{}}")
@@ -61,7 +61,7 @@ public class ContentPlatformVideoJob {
         if (StringUtils.hasText(param)) {
             aggDt = param;
         }
-        List<String> dtList = DateUtil.getBeforeDays(aggDt, null, videoAggDays);
+        List<String> dtList = DateUtil.getBeforeDays(aggDt, aggDt, videoAggDays - 1);
         // 轮询查询大数据获取最近 videoAggDays 天视频
         for (String dt : dtList) {
             String sql = String.format("SELECT * FROM loghubods.wecom_cooperation_video_candidate_pool WHERE dt=%s;", dt);
@@ -81,12 +81,16 @@ public class ContentPlatformVideoJob {
                     String title = (String) record.get(2);
                     String videoUrl = (String) record.get(3);
                     Double score = Double.parseDouble((String) record.get(4));
+                    Long exposure = parseLongSafe(record.get(5));
+                    Double rovn = parseDoubleSafe(record.get(6));
                     item.setDt(dt);
                     item.setVideoId(videoId);
                     item.setCategory(category);
                     item.setTitle(title);
                     item.setVideo(videoUrl);
                     item.setScore(score);
+                    item.setExposure(exposure);
+                    item.setRovn(rovn);
                     item.setCreateTimestamp(now);
                     saveList.add(item);
                 }
@@ -157,7 +161,7 @@ public class ContentPlatformVideoJob {
             planMapperExt.deleteContentPlatformVideoAgg(aggDt);
         }
         // 聚合最近14天视频
-        List<ContentPlatformVideoAgg> saveAggList = planMapperExt.getVideoAggList(dtList);
+        List<ContentPlatformVideoAgg> saveAggList = planMapperExt.getVideoAggList(aggDt, dtList);
         if (CollectionUtils.isNotEmpty(saveAggList)) {
             Long now = System.currentTimeMillis();
             for (ContentPlatformVideoAgg item : saveAggList) {
@@ -438,6 +442,36 @@ public class ContentPlatformVideoJob {
         return bdScore.setScale(2, RoundingMode.HALF_UP).doubleValue();
     }
 
+    private Long parseLongSafe(Object v) {
+        if (v == null) {
+            return 0L;
+        }
+        try {
+            String s = v.toString().trim();
+            if (s.isEmpty() || "null".equalsIgnoreCase(s) || "NaN".equalsIgnoreCase(s)) {
+                return 0L;
+            }
+            return (long) Double.parseDouble(s);
+        } catch (NumberFormatException e) {
+            return 0L;
+        }
+    }
+
+    private Double parseDoubleSafe(Object v) {
+        if (v == null) {
+            return 0.0;
+        }
+        try {
+            String s = v.toString().trim();
+            if (s.isEmpty() || "null".equalsIgnoreCase(s) || "NaN".equalsIgnoreCase(s)) {
+                return 0.0;
+            }
+            double d = Double.parseDouble(s);
+            return Double.isNaN(d) ? 0.0 : d;
+        } catch (NumberFormatException e) {
+            return 0.0;
+        }
+    }
 
     private List<ContentPlatformAccount> getAllContentPlatformAccount() {
         ContentPlatformAccountExample example = new ContentPlatformAccountExample();

+ 22 - 0
api-module/src/main/java/com/tzld/piaoquan/api/model/po/contentplatform/ContentPlatformVideo.java

@@ -17,6 +17,10 @@ public class ContentPlatformVideo {
 
     private Double score;
 
+    private Double rovn;
+
+    private Long exposure;
+
     private Integer status;
 
     private Long createTimestamp;
@@ -87,6 +91,22 @@ public class ContentPlatformVideo {
         this.score = score;
     }
 
+    public Double getRovn() {
+        return rovn;
+    }
+
+    public void setRovn(Double rovn) {
+        this.rovn = rovn;
+    }
+
+    public Long getExposure() {
+        return exposure;
+    }
+
+    public void setExposure(Long exposure) {
+        this.exposure = exposure;
+    }
+
     public Integer getStatus() {
         return status;
     }
@@ -125,6 +145,8 @@ public class ContentPlatformVideo {
         sb.append(", cover=").append(cover);
         sb.append(", video=").append(video);
         sb.append(", score=").append(score);
+        sb.append(", rovn=").append(rovn);
+        sb.append(", exposure=").append(exposure);
         sb.append(", status=").append(status);
         sb.append(", createTimestamp=").append(createTimestamp);
         sb.append(", updateTimestamp=").append(updateTimestamp);

+ 1 - 1
api-module/src/main/java/com/tzld/piaoquan/api/service/contentplatform/impl/ContentPlatformApiServiceImpl.java

@@ -52,7 +52,7 @@ public class ContentPlatformApiServiceImpl implements ContentPlatformApiService
     @Autowired
     private RedisUtils redisUtils;
 
-    @Value("${video.min.score:0.6}")
+    @Value("${video.min.score:0.5}")
     private Double videoMinScore;
 
     @Override

+ 1 - 1
api-module/src/main/java/com/tzld/piaoquan/api/service/contentplatform/impl/ContentPlatformPlanServiceImpl.java

@@ -105,7 +105,7 @@ public class ContentPlatformPlanServiceImpl implements ContentPlatformPlanServic
     @Value("${vlog.share.appType:11}")
     private String shareAppType;
 
-    @Value("${video.min.score:0.6}")
+    @Value("${video.min.score:0.5}")
     private Double videoMinScore;
 
     @Value("${video.title.search.max.count:500}")

+ 39 - 22
api-module/src/main/java/com/tzld/piaoquan/api/service/strategy/impl/BuckStrategyV1.java

@@ -94,15 +94,26 @@ public class BuckStrategyV1 implements ReplyStrategyService {
         // 0 获取策略key
         JSONObject bucketStrategyConfigJsonObject = getStrategyConfig(bucketDataParam);
         Set<String> keyedSet = bucketStrategyConfigJsonObject.keySet();
-        // 1 处理文章--算法引擎--排序文章数据
-//        getWenzhangData();
-        // 2 处理小程序--读取离线数据表--获取策略排序小程序数据
-        List<CgiReplyBucketData> smallDataCgiReplyList = readStrategyOrderSmallData(keyedSet, bucketDataParam);
-        // 2.1 获取小程序落地页地址 http调用
-        smallDataCgiReplyList = setSmallPageUrl(smallDataCgiReplyList);
-        log.info(JSON.toJSONString(smallDataCgiReplyList));
-        // 3 入库读表
-        insertSmallData(smallDataCgiReplyList, keyedSet, bucketDataParam);
+        // 加锁,防止并发请求重复触发数据生成(readStrategyOrderSmallData + setSmallPageUrl + insertSmallData)
+        String LOCK_KEY = "getResult:" + bucketDataParam.getGhId();
+        boolean locked = redisUtils.tryLockWithRetry(LOCK_KEY, "1", 60, 10, 1000);
+        if (!locked) {
+            log.warn("BuckStrategyV1 getResult 获取锁失败,直接读取已有数据,ghId:" + bucketDataParam.getGhId());
+            return getReplyBucketData(bucketStrategyConfigJsonObject, keyedSet, bucketDataParam);
+        }
+        try {
+            // 1 处理文章--算法引擎--排序文章数据
+//            getWenzhangData();
+            // 2 处理小程序--读取离线数据表--获取策略排序小程序数据
+            List<CgiReplyBucketData> smallDataCgiReplyList = readStrategyOrderSmallData(keyedSet, bucketDataParam);
+            // 2.1 获取小程序落地页地址 http调用
+            smallDataCgiReplyList = setSmallPageUrl(smallDataCgiReplyList);
+            log.info(JSON.toJSONString(smallDataCgiReplyList));
+            // 3 入库读表
+            insertSmallData(smallDataCgiReplyList, keyedSet, bucketDataParam);
+        } finally {
+            redisUtils.del(LOCK_KEY);
+        }
         // 4 组装分桶数据
         return getReplyBucketData(bucketStrategyConfigJsonObject, keyedSet, bucketDataParam);
     }
@@ -246,20 +257,13 @@ public class BuckStrategyV1 implements ReplyStrategyService {
         if (CollectionUtils.isEmpty(smallDataCgiReplyList)) {
             return;
         }
-        // 循环获取锁,锁5s,每获取一次锁,未获取到锁,等待1s,最多等待10次
-        int tryCount = 0;
+        // 循环获取锁,锁30s,每次重试间隔 1s,最多重试10次
         String LOCK_KEY = "insertSmallData:" + bucketDataParam.getGhId();
-        while (tryCount < 10) {
-            if (redisUtils.tryLock(LOCK_KEY, "1", 5)) {
-                break;
-            } else {
-                try {
-                    Thread.sleep(1000);
-                } catch (Exception e) {
-                    log.error("BuckStrategyV1 insertSmallData 尝试获取锁失败,ghId:" + bucketDataParam.getGhId() + ",tryCount:" + tryCount, e);
-                }
-                tryCount++;
-            }
+        boolean locked = redisUtils.tryLockWithRetry(LOCK_KEY, "1", 30, 10, 1000);
+        // 未获取到锁,直接返回,避免重复入库
+        if (!locked) {
+            log.error("BuckStrategyV1 insertSmallData 获取锁失败,放弃入库,ghId:" + bucketDataParam.getGhId());
+            return;
         }
         try {
             for (String key : keyedSet) {
@@ -280,6 +284,19 @@ public class BuckStrategyV1 implements ReplyStrategyService {
                 List<CgiReplyBucketData> cgiReplyBucketData1 = cgiReplyBucketDataMapper.selectByExample(cgiReplyBucketDataExample);
                 Map<Long, CgiReplyBucketData> cgiReplyBucketDataMap = cgiReplyBucketData1.stream()
                         .collect(Collectors.toMap(CgiReplyBucketData::getMiniVideoId, x -> x, (a, b) -> b));
+                // 锁内二次校验:若已存在相同视频ID的 is_delete=0 记录,说明并发请求已处理,跳过本次入库
+                if (!CollectionUtils.isEmpty(cgiReplyBucketData1) && cgiReplyBucketData1.size() == collect.size()) {
+                    Set<Long> existVideoIds = cgiReplyBucketDataMap.keySet();
+                    Set<Long> toInsertVideoIds = collect.stream()
+                            .map(x -> x.getMiniVideoId() != null ? x.getMiniVideoId() : MessageUtil.getVideoId(x.getMiniPagePath()))
+                            .filter(Objects::nonNull)
+                            .collect(Collectors.toSet());
+                    if (existVideoIds.equals(toInsertVideoIds)) {
+                        log.info("BuckStrategyV1 insertSmallData 锁内二次校验:数据已被并发请求处理,跳过,key:{},ghId:{}",
+                                key, bucketDataParam.getGhId());
+                        continue;
+                    }
+                }
                 // 清上个版本的策略数据
                 cgiReplyBucketDataMapperExt.updateDeleteStatus(bucketDataParam.getGhId(), key, 1, 1);
                 List<GhDetailExt> ghDetailExtList = ghDetailService.getGhDetailExtList(bucketDataParam.getGhId(), GhTypeEnum.GH.type);

+ 35 - 25
api-module/src/main/java/com/tzld/piaoquan/api/service/strategy/impl/ThirdPartyPushMessageStrategyV1.java

@@ -90,15 +90,26 @@ public class ThirdPartyPushMessageStrategyV1 implements ReplyStrategyService {
             configJsonObject = JSON.parseObject(bucketStrategyConfig);
         }
         Set<String> keyedSet = configJsonObject.keySet();
-        // 1 处理文章--算法引擎--排序文章数据
-//        getWenzhangData();
-        // 2 处理小程序--读取离线数据表--获取策略排序小程序数据
-        List<CgiReplyBucketData> smallDataCgiReplyList = readStrategyOrderSmallData(keyedSet, bucketDataParam);
-        // 2.1 获取小程序落地页地址 http调用
-        smallDataCgiReplyList = setSmallPageUrl(smallDataCgiReplyList, bucketDataParam.getChannel());
-        log.info(JSON.toJSONString(smallDataCgiReplyList));
-        // 3 入库读表
-        insertSmallData(smallDataCgiReplyList, keyedSet, bucketDataParam);
+        // 加锁,防止并发请求重复触发数据生成(readStrategyOrderSmallData + setSmallPageUrl + insertSmallData)
+        String LOCK_KEY = "getResult:" + bucketDataParam.getGhId();
+        boolean locked = redisUtils.tryLockWithRetry(LOCK_KEY, "1", 60, 10, 1000);
+        if (!locked) {
+            log.warn("ThirdPartyPushMessageStrategyV1 getResult 获取锁失败,直接读取已有数据,ghId:" + bucketDataParam.getGhId());
+            return getReplyBucketData(configJsonObject, keyedSet, bucketDataParam.getGhId());
+        }
+        try {
+            // 1 处理文章--算法引擎--排序文章数据
+//            getWenzhangData();
+            // 2 处理小程序--读取离线数据表--获取策略排序小程序数据
+            List<CgiReplyBucketData> smallDataCgiReplyList = readStrategyOrderSmallData(keyedSet, bucketDataParam);
+            // 2.1 获取小程序落地页地址 http调用
+            smallDataCgiReplyList = setSmallPageUrl(smallDataCgiReplyList, bucketDataParam.getChannel());
+            log.info(JSON.toJSONString(smallDataCgiReplyList));
+            // 3 入库读表
+            insertSmallData(smallDataCgiReplyList, keyedSet, bucketDataParam);
+        } finally {
+            redisUtils.del(LOCK_KEY);
+        }
         // 4 组装分桶数据
         return getReplyBucketData(configJsonObject, keyedSet, bucketDataParam.getGhId());
     }
@@ -151,23 +162,9 @@ public class ThirdPartyPushMessageStrategyV1 implements ReplyStrategyService {
         if (CollectionUtils.isEmpty(smallDataCgiReplyList)) {
             return;
         }
-        // 循环获取锁,锁5s,每获取一次锁,未获取到锁,等待1s,最多等待10次
-        int tryCount = 0;
+        // 循环获取锁,锁30s,每次重试间隔 1s,最多重试10次
         String LOCK_KEY = "insertSmallData:" + bucketDataParam.getGhId();
-        boolean locked = false;
-        while (tryCount < 10) {
-            if (redisUtils.tryLock(LOCK_KEY, "1", 5)) {
-                locked = true;
-                break;
-            } else {
-                try {
-                    Thread.sleep(1000);
-                } catch (Exception e) {
-                    log.error("ThirdPartyPushMessageStrategyV1 insertSmallData 尝试获取锁失败,ghId:" + bucketDataParam.getGhId() + ",tryCount:" + tryCount, e);
-                }
-                tryCount++;
-            }
-        }
+        boolean locked = redisUtils.tryLockWithRetry(LOCK_KEY, "1", 30, 10, 1000);
         // 未获取到锁,直接返回,避免重复入库
         if (!locked) {
             log.error("ThirdPartyPushMessageStrategyV1 insertSmallData 获取锁失败,放弃入库,ghId:" + bucketDataParam.getGhId());
@@ -192,6 +189,19 @@ public class ThirdPartyPushMessageStrategyV1 implements ReplyStrategyService {
                 List<CgiReplyBucketData> cgiReplyBucketData1 = cgiReplyBucketDataMapper.selectByExample(cgiReplyBucketDataExample);
                 Map<Long, CgiReplyBucketData> cgiReplyBucketDataMap = cgiReplyBucketData1.stream()
                         .collect(Collectors.toMap(CgiReplyBucketData::getMiniVideoId, x -> x, (a, b) -> b));
+                // 锁内二次校验:若已存在相同视频ID的 is_delete=0 记录,说明并发请求已处理,跳过本次入库
+                if (!CollectionUtils.isEmpty(cgiReplyBucketData1) && cgiReplyBucketData1.size() == collect.size()) {
+                    Set<Long> existVideoIds = cgiReplyBucketDataMap.keySet();
+                    Set<Long> toInsertVideoIds = collect.stream()
+                            .map(x -> x.getMiniVideoId() != null ? x.getMiniVideoId() : MessageUtil.getVideoId(x.getMiniPagePath()))
+                            .filter(Objects::nonNull)
+                            .collect(Collectors.toSet());
+                    if (existVideoIds.equals(toInsertVideoIds)) {
+                        log.info("ThirdPartyPushMessageStrategyV1 insertSmallData 锁内二次校验:数据已被并发请求处理,跳过,key:{},ghId:{}",
+                                key, bucketDataParam.getGhId());
+                        continue;
+                    }
+                }
                 // 清上个版本的策略数据
                 cgiReplyBucketDataMapperExt.updateDeleteStatus(bucketDataParam.getGhId(), key, 1, 1);
                 List<ContentPlatformGzhPlanVideo> gzhPlanVideoList = contentPlatformPlanService.getGzhPlanVideoListByCooperateAccountId(bucketDataParam.getGhId());

+ 89 - 53
api-module/src/main/java/com/tzld/piaoquan/api/service/strategy/impl/WeComPushMessageStrategyV1.java

@@ -73,7 +73,7 @@ public class WeComPushMessageStrategyV1 implements ReplyStrategyService {
 
     @Override
     public ReplyBucketData getResult(BucketDataParam bucketDataParam) {
-        log.info("ThirdPartyPushMessageStrategyV1 start");
+        log.info("WeComPushMessageStrategyV1 start");
         // 0 获取策略key
         JSONObject configJsonObject;
         if (Objects.equals(StrategyStatusEnum.DEFAULT.status, bucketDataParam.getStrategyStatus())) {
@@ -82,15 +82,26 @@ public class WeComPushMessageStrategyV1 implements ReplyStrategyService {
             configJsonObject = JSON.parseObject(bucketStrategyConfig);
         }
         Set<String> keyedSet = configJsonObject.keySet();
-        // 1 处理文章--算法引擎--排序文章数据
-//        getWenzhangData();
-        // 2 处理小程序--读取离线数据表--获取策略排序小程序数据
-        List<CgiReplyBucketData> smallDataCgiReplyList = readStrategyOrderSmallData(keyedSet, bucketDataParam);
-        // 2.1 获取小程序落地页地址 http调用
-        smallDataCgiReplyList = setSmallPageUrl(smallDataCgiReplyList);
-        log.info(JSON.toJSONString(smallDataCgiReplyList));
-        // 3 入库读表
-        insertSmallData(smallDataCgiReplyList, keyedSet, bucketDataParam);
+        // 加锁,防止并发请求重复触发数据生成(readStrategyOrderSmallData + setSmallPageUrl + insertSmallData)
+        String LOCK_KEY = "getResult:" + bucketDataParam.getGhId();
+        boolean locked = redisUtils.tryLockWithRetry(LOCK_KEY, "1", 60, 10, 1000);
+        if (!locked) {
+            log.warn("WeComPushMessageStrategyV1 getResult 获取锁失败,直接读取已有数据,ghId:" + bucketDataParam.getGhId());
+            return getReplyBucketData(configJsonObject, keyedSet, bucketDataParam.getGhId());
+        }
+        try {
+            // 1 处理文章--算法引擎--排序文章数据
+//            getWenzhangData();
+            // 2 处理小程序--读取离线数据表--获取策略排序小程序数据
+            List<CgiReplyBucketData> smallDataCgiReplyList = readStrategyOrderSmallData(keyedSet, bucketDataParam);
+            // 2.1 获取小程序落地页地址 http调用
+            smallDataCgiReplyList = setSmallPageUrl(smallDataCgiReplyList);
+            log.info(JSON.toJSONString(smallDataCgiReplyList));
+            // 3 入库读表
+            insertSmallData(smallDataCgiReplyList, keyedSet, bucketDataParam);
+        } finally {
+            redisUtils.del(LOCK_KEY);
+        }
         // 4 组装分桶数据
         return getReplyBucketData(configJsonObject, keyedSet, bucketDataParam.getGhId());
     }
@@ -143,54 +154,79 @@ public class WeComPushMessageStrategyV1 implements ReplyStrategyService {
         if (CollectionUtils.isEmpty(smallDataCgiReplyList)) {
             return;
         }
-        for (String key : keyedSet) {
-            if ("base".equals(key)) {
-                continue;
-            }
-            List<CgiReplyBucketData> collect = smallDataCgiReplyList.stream()
-                    .filter(x -> x.getStrategy().equals(key))
-                    .filter(x -> x.getGhId().equals(bucketDataParam.getGhId()))
-                    .collect(Collectors.toList());
-            if (CollectionUtils.isEmpty(collect)) {
-                log.info("WeComPushMessageStrategyV1 insertSmallData 数据未变更跳过,key:{},ghId:{},data:{}",
-                        key, bucketDataParam.getGhId(), JSON.toJSONString(smallDataCgiReplyList));
-                continue;
-            }
-            CgiReplyBucketDataExample cgiReplyBucketDataExample = new CgiReplyBucketDataExample();
-            cgiReplyBucketDataExample.createCriteria().andIsDeleteEqualTo(0).andMsgTypeEqualTo(1).andStrategyEqualTo(key).andGhIdEqualTo(bucketDataParam.getGhId());
-            List<CgiReplyBucketData> cgiReplyBucketData1 = cgiReplyBucketDataMapper.selectByExample(cgiReplyBucketDataExample);
-            Map<Long, CgiReplyBucketData> cgiReplyBucketDataMap = cgiReplyBucketData1.stream()
-                    .collect(Collectors.toMap(CgiReplyBucketData::getMiniVideoId, x -> x, (a, b) -> b));
-            // 清上个版本的策略数据
-            cgiReplyBucketDataMapperExt.updateDeleteStatus(bucketDataParam.getGhId(), key, 1, 1);
-            // 入库
-            for (CgiReplyBucketData cgiReplyBucketData : collect) {
-                CgiReplyBucketData oldData = cgiReplyBucketDataMap.get(cgiReplyBucketData.getMiniVideoId());
-                if (Objects.isNull(oldData)) {
-                    oldData = cgiReplyBucketDataMapperExt.getOldCgiReplyData(cgiReplyBucketData.getMiniVideoId());
+        // 循环获取锁,锁30s,每次重试间隔 1s,最多重试10次
+        String LOCK_KEY = "insertSmallData:" + bucketDataParam.getGhId();
+        boolean locked = redisUtils.tryLockWithRetry(LOCK_KEY, "1", 30, 10, 1000);
+        // 未获取到锁,直接返回,避免重复入库
+        if (!locked) {
+            log.error("WeComPushMessageStrategyV1 insertSmallData 获取锁失败,放弃入库,ghId:" + bucketDataParam.getGhId());
+            return;
+        }
+        try {
+            for (String key : keyedSet) {
+                if ("base".equals(key)) {
+                    continue;
+                }
+                List<CgiReplyBucketData> collect = smallDataCgiReplyList.stream()
+                        .filter(x -> x.getStrategy().equals(key))
+                        .filter(x -> x.getGhId().equals(bucketDataParam.getGhId()))
+                        .collect(Collectors.toList());
+                if (CollectionUtils.isEmpty(collect)) {
+                    log.info("WeComPushMessageStrategyV1 insertSmallData 数据未变更跳过,key:{},ghId:{},data:{}",
+                            key, bucketDataParam.getGhId(), JSON.toJSONString(smallDataCgiReplyList));
+                    continue;
+                }
+                CgiReplyBucketDataExample cgiReplyBucketDataExample = new CgiReplyBucketDataExample();
+                cgiReplyBucketDataExample.createCriteria().andIsDeleteEqualTo(0).andMsgTypeEqualTo(1).andStrategyEqualTo(key).andGhIdEqualTo(bucketDataParam.getGhId());
+                List<CgiReplyBucketData> cgiReplyBucketData1 = cgiReplyBucketDataMapper.selectByExample(cgiReplyBucketDataExample);
+                Map<Long, CgiReplyBucketData> cgiReplyBucketDataMap = cgiReplyBucketData1.stream()
+                        .collect(Collectors.toMap(CgiReplyBucketData::getMiniVideoId, x -> x, (a, b) -> b));
+                // 锁内二次校验:若已存在相同视频ID的 is_delete=0 记录,说明并发请求已处理,跳过本次入库
+                if (!CollectionUtils.isEmpty(cgiReplyBucketData1) && cgiReplyBucketData1.size() == collect.size()) {
+                    Set<Long> existVideoIds = cgiReplyBucketDataMap.keySet();
+                    Set<Long> toInsertVideoIds = collect.stream()
+                            .map(x -> x.getMiniVideoId() != null ? x.getMiniVideoId() : MessageUtil.getVideoId(x.getMiniPagePath()))
+                            .filter(Objects::nonNull)
+                            .collect(Collectors.toSet());
+                    if (existVideoIds.equals(toInsertVideoIds)) {
+                        log.info("WeComPushMessageStrategyV1 insertSmallData 锁内二次校验:数据已被并发请求处理,跳过,key:{},ghId:{}",
+                                key, bucketDataParam.getGhId());
+                        continue;
+                    }
                 }
-                if (Objects.nonNull(oldData)) {
-                    if (StringUtils.isEmpty(cgiReplyBucketData.getTitle())) {
-                        cgiReplyBucketData.setTitle(oldData.getTitle());
+                // 清上个版本的策略数据
+                cgiReplyBucketDataMapperExt.updateDeleteStatus(bucketDataParam.getGhId(), key, 1, 1);
+                // 入库
+                for (CgiReplyBucketData cgiReplyBucketData : collect) {
+                    CgiReplyBucketData oldData = cgiReplyBucketDataMap.get(cgiReplyBucketData.getMiniVideoId());
+                    if (Objects.isNull(oldData)) {
+                        oldData = cgiReplyBucketDataMapperExt.getOldCgiReplyData(cgiReplyBucketData.getMiniVideoId());
                     }
-                    if (StringUtils.isEmpty(cgiReplyBucketData.getCoverUrl())) {
-                        cgiReplyBucketData.setCoverUrl(oldData.getCoverUrl());
+                    if (Objects.nonNull(oldData)) {
+                        if (StringUtils.isEmpty(cgiReplyBucketData.getTitle())) {
+                            cgiReplyBucketData.setTitle(oldData.getTitle());
+                        }
+                        if (StringUtils.isEmpty(cgiReplyBucketData.getCoverUrl())) {
+                            cgiReplyBucketData.setCoverUrl(oldData.getCoverUrl());
+                        }
                     }
+                    String pageUrl = videoMultiService.setVideoMultiTitleCoverPagePath(cgiReplyBucketData.getMiniVideoId(),
+                            cgiReplyBucketData.getMiniPagePath(), cgiReplyBucketData.getTitle(), cgiReplyBucketData.getCoverUrl());
+                    cgiReplyBucketData.setMiniPagePath(pageUrl);
+                    cgiReplyBucketData.setRootSourceId(MessageUtil.getRootSourceId(cgiReplyBucketData.getMiniPagePath()));
+                    cgiReplyBucketDataMapper.insertSelective(cgiReplyBucketData);
+                    String redisKey = "auto_reply_video_detail_" + cgiReplyBucketData.getRootSourceId();
+                    VideoCharacteristicVO vo = new VideoCharacteristicVO();
+                    vo.setGhId(cgiReplyBucketData.getGhId());
+                    vo.setName(bucketDataParam.getAccountName());
+                    vo.setVideoId(cgiReplyBucketData.getMiniVideoId());
+                    vo.setTitle(cgiReplyBucketData.getTitle());
+                    vo.setCover(cgiReplyBucketData.getCoverUrl());
+                    redisUtils.set(redisKey, JSONObject.toJSONString(vo), 3L * 24 * 60 * 60);
                 }
-                String pageUrl = videoMultiService.setVideoMultiTitleCoverPagePath(cgiReplyBucketData.getMiniVideoId(),
-                        cgiReplyBucketData.getMiniPagePath(), cgiReplyBucketData.getTitle(), cgiReplyBucketData.getCoverUrl());
-                cgiReplyBucketData.setMiniPagePath(pageUrl);
-                cgiReplyBucketData.setRootSourceId(MessageUtil.getRootSourceId(cgiReplyBucketData.getMiniPagePath()));
-                cgiReplyBucketDataMapper.insertSelective(cgiReplyBucketData);
-                String redisKey = "auto_reply_video_detail_" + cgiReplyBucketData.getRootSourceId();
-                VideoCharacteristicVO vo = new VideoCharacteristicVO();
-                vo.setGhId(cgiReplyBucketData.getGhId());
-                vo.setName(bucketDataParam.getAccountName());
-                vo.setVideoId(cgiReplyBucketData.getMiniVideoId());
-                vo.setTitle(cgiReplyBucketData.getTitle());
-                vo.setCover(cgiReplyBucketData.getCoverUrl());
-                redisUtils.set(redisKey, JSONObject.toJSONString(vo), 3L * 24 * 60 * 60);
             }
+        } finally {
+            redisUtils.del(LOCK_KEY);
         }
     }
 

+ 37 - 7
api-module/src/main/resources/mapper/contentplatform/ContentPlatformVideoMapper.xml

@@ -10,6 +10,8 @@
     <result column="cover" jdbcType="VARCHAR" property="cover" />
     <result column="video" jdbcType="VARCHAR" property="video" />
     <result column="score" jdbcType="DOUBLE" property="score" />
+    <result column="rovn" jdbcType="DOUBLE" property="rovn" />
+    <result column="exposure" jdbcType="BIGINT" property="exposure" />
     <result column="status" jdbcType="INTEGER" property="status" />
     <result column="create_timestamp" jdbcType="BIGINT" property="createTimestamp" />
     <result column="update_timestamp" jdbcType="BIGINT" property="updateTimestamp" />
@@ -73,7 +75,7 @@
     </where>
   </sql>
   <sql id="Base_Column_List">
-    id, dt, video_id, category, title, cover, video, score, `status`, create_timestamp, 
+    id, dt, video_id, category, title, cover, video, score, rovn, exposure, `status`, create_timestamp,
     update_timestamp
   </sql>
   <select id="selectByExample" parameterType="com.tzld.piaoquan.api.model.po.contentplatform.ContentPlatformVideoExample" resultMap="BaseResultMap">
@@ -110,13 +112,13 @@
     </if>
   </delete>
   <insert id="insert" parameterType="com.tzld.piaoquan.api.model.po.contentplatform.ContentPlatformVideo">
-    insert into content_platform_video (id, dt, video_id, 
-      category, title, cover, 
-      video, score, `status`, 
+    insert into content_platform_video (id, dt, video_id,
+      category, title, cover,
+      video, score, rovn, exposure, `status`,
       create_timestamp, update_timestamp)
-    values (#{id,jdbcType=BIGINT}, #{dt,jdbcType=VARCHAR}, #{videoId,jdbcType=BIGINT}, 
-      #{category,jdbcType=VARCHAR}, #{title,jdbcType=VARCHAR}, #{cover,jdbcType=VARCHAR}, 
-      #{video,jdbcType=VARCHAR}, #{score,jdbcType=DOUBLE}, #{status,jdbcType=INTEGER}, 
+    values (#{id,jdbcType=BIGINT}, #{dt,jdbcType=VARCHAR}, #{videoId,jdbcType=BIGINT},
+      #{category,jdbcType=VARCHAR}, #{title,jdbcType=VARCHAR}, #{cover,jdbcType=VARCHAR},
+      #{video,jdbcType=VARCHAR}, #{score,jdbcType=DOUBLE}, #{rovn,jdbcType=DOUBLE}, #{exposure,jdbcType=BIGINT}, #{status,jdbcType=INTEGER},
       #{createTimestamp,jdbcType=BIGINT}, #{updateTimestamp,jdbcType=BIGINT})
   </insert>
   <insert id="insertSelective" parameterType="com.tzld.piaoquan.api.model.po.contentplatform.ContentPlatformVideo">
@@ -146,6 +148,12 @@
       <if test="score != null">
         score,
       </if>
+      <if test="rovn != null">
+        rovn,
+      </if>
+      <if test="exposure != null">
+        exposure,
+      </if>
       <if test="status != null">
         `status`,
       </if>
@@ -181,6 +189,12 @@
       <if test="score != null">
         #{score,jdbcType=DOUBLE},
       </if>
+      <if test="rovn != null">
+        #{rovn,jdbcType=DOUBLE},
+      </if>
+      <if test="exposure != null">
+        #{exposure,jdbcType=BIGINT},
+      </if>
       <if test="status != null">
         #{status,jdbcType=INTEGER},
       </if>
@@ -225,6 +239,12 @@
       <if test="record.score != null">
         score = #{record.score,jdbcType=DOUBLE},
       </if>
+      <if test="record.rovn != null">
+        rovn = #{record.rovn,jdbcType=DOUBLE},
+      </if>
+      <if test="record.exposure != null">
+        exposure = #{record.exposure,jdbcType=BIGINT},
+      </if>
       <if test="record.status != null">
         `status` = #{record.status,jdbcType=INTEGER},
       </if>
@@ -249,6 +269,8 @@
       cover = #{record.cover,jdbcType=VARCHAR},
       video = #{record.video,jdbcType=VARCHAR},
       score = #{record.score,jdbcType=DOUBLE},
+      rovn = #{record.rovn,jdbcType=DOUBLE},
+      exposure = #{record.exposure,jdbcType=BIGINT},
       `status` = #{record.status,jdbcType=INTEGER},
       create_timestamp = #{record.createTimestamp,jdbcType=BIGINT},
       update_timestamp = #{record.updateTimestamp,jdbcType=BIGINT}
@@ -280,6 +302,12 @@
       <if test="score != null">
         score = #{score,jdbcType=DOUBLE},
       </if>
+      <if test="rovn != null">
+        rovn = #{rovn,jdbcType=DOUBLE},
+      </if>
+      <if test="exposure != null">
+        exposure = #{exposure,jdbcType=BIGINT},
+      </if>
       <if test="status != null">
         `status` = #{status,jdbcType=INTEGER},
       </if>
@@ -301,6 +329,8 @@
       cover = #{cover,jdbcType=VARCHAR},
       video = #{video,jdbcType=VARCHAR},
       score = #{score,jdbcType=DOUBLE},
+      rovn = #{rovn,jdbcType=DOUBLE},
+      exposure = #{exposure,jdbcType=BIGINT},
       `status` = #{status,jdbcType=INTEGER},
       create_timestamp = #{createTimestamp,jdbcType=BIGINT},
       update_timestamp = #{updateTimestamp,jdbcType=BIGINT}

+ 7 - 4
api-module/src/main/resources/mapper/contentplatform/ext/ContentPlatformPlanMapperExt.xml

@@ -163,11 +163,11 @@
     </select>
 
     <insert id="batchInsertContentPlatformVideo">
-        insert into content_platform_video (dt, video_id, category, title, cover, video, score, create_timestamp)
+        insert into content_platform_video (dt, video_id, category, title, cover, video, score, rovn, exposure, create_timestamp)
         values
         <foreach collection="records" item="item" separator=",">
             (#{item.dt}, #{item.videoId}, #{item.category}, #{item.title}, #{item.cover}, #{item.video}, #{item.score},
-            #{item.createTimestamp})
+            #{item.rovn}, #{item.exposure}, #{item.createTimestamp})
         </foreach>
     </insert>
 
@@ -260,9 +260,12 @@
 
     <select id="getVideoAggList"
             resultType="com.tzld.piaoquan.api.model.po.contentplatform.ContentPlatformVideoAgg">
-        SELECT t.video_id, t.category, t.title, t.cover, t.video, round(t.avg_score, 3) as score
+        SELECT t.video_id, t.category, t.title, t.cover, t.video, round(t.weighted_score, 3) as score
         FROM (
-        SELECT video_id, category, title, cover, video, AVG(score) OVER (PARTITION BY video_id) AS avg_score,
+        SELECT video_id, category, title, cover, video,
+            SUM(IFNULL(rovn, 0) * POW(GREATEST(IFNULL(exposure, 0) / 5000.0, 1.0), 0.5)
+                * POWER(0.9, DATEDIFF(STR_TO_DATE(#{aggDt}, '%Y%m%d'), STR_TO_DATE(dt, '%Y%m%d'))))
+                OVER (PARTITION BY video_id) AS weighted_score,
             ROW_NUMBER() OVER (PARTITION BY video_id ORDER BY dt DESC) AS rn
         FROM content_platform_video
         where dt in

+ 1 - 1
api-module/src/test/java/com/tzld/piaoquan/api/WeComThirdPartTest.java

@@ -193,7 +193,7 @@ public class WeComThirdPartTest {
                             "请基于上述规则,输出最终的JSON:";
             keywordPrompt = keywordPrompt.replace("text", title);
             // 调用API
-            AIResult result = deepSeekApiService.requestOfficialApi(keywordPrompt, null, null, true);
+            AIResult result = deepSeekApiService.request(keywordPrompt, null, null, true);
             List<String> keywords = JSONObject.parseArray(result.getResponse().getChoices().get(0).getMessage().getContent(), String.class);
 
             System.out.println(String.format("title: %s, keywords: %s", title, keywords));

+ 26 - 0
common-module/src/main/java/com/tzld/piaoquan/growth/common/utils/RedisUtils.java

@@ -191,6 +191,32 @@ public class RedisUtils {
         return false;
     }
 
+    /**
+     * 带重试的尝试获取分布式锁
+     *
+     * @param key              锁key
+     * @param value            锁value
+     * @param expireSeconds    锁超时时间(秒)
+     * @param maxRetry         最大重试次数
+     * @param retryIntervalMs  每次重试间隔(毫秒)
+     * @return true:获取锁成功,false:达到最大重试次数仍未获取到锁
+     */
+    public boolean tryLockWithRetry(String key, String value, long expireSeconds, int maxRetry, long retryIntervalMs) {
+        for (int i = 0; i < maxRetry; i++) {
+            if (tryLock(key, value, expireSeconds)) {
+                return true;
+            }
+            try {
+                Thread.sleep(retryIntervalMs);
+            } catch (InterruptedException e) {
+                Thread.currentThread().interrupt();
+                log.warn("tryLockWithRetry 被中断,放弃获取锁,key:{}", key);
+                return false;
+            }
+        }
+        return false;
+    }
+
     public void listLeftPush(String key, String value) {
         redisTemplate.opsForList().leftPush(key, value);