浏览代码

修改查询更新逻辑 只更新当前公众号

xueyiming 9 月之前
父节点
当前提交
bb820146c6

+ 2 - 0
long-article-recommend-service/src/main/java/com/tzld/longarticle/recommend/server/mapper/crawler/AlgGhAutoreplyVideoRankDataMapper.java

@@ -31,4 +31,6 @@ public interface AlgGhAutoreplyVideoRankDataMapper {
     int updateByPrimaryKey(AlgGhAutoreplyVideoRankData row);
 
     String selectLatestDtVersionByStrategyKey(String strategyKey);
+
+    String selectLatestDtVersionByStrategyKeyAndGhId(@Param("strategyKey")String strategyKey, @Param("ghId")String ghId);
 }

+ 2 - 2
long-article-recommend-service/src/main/java/com/tzld/longarticle/recommend/server/mq/MessageCallbackProducer.java

@@ -43,8 +43,8 @@ public class MessageCallbackProducer {
         message.setTag(TAG);
         message.setBody(JSON.toJSONString(param).getBytes(StandardCharsets.UTF_8));
         try {
-            SendResult sendResult = producer.send(message);
-            log.info("sendResult = {}", sendResult);
+            log.info("MessageCallbackProducer sendMessage = {}", message);
+            producer.send(message);
         } catch (Exception e) {
             // TODO: 写阿里云日志
             log.error("MessageCallbackProducer send param = {} error", param, e);

+ 20 - 13
long-article-recommend-service/src/main/java/com/tzld/longarticle/recommend/server/service/strategy/reply/impl/PushMessageStrategyV1.java

@@ -17,6 +17,7 @@ import org.springframework.beans.BeanUtils;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Service;
 import org.springframework.util.CollectionUtils;
+import org.springframework.util.StringUtils;
 
 import java.util.*;
 import java.util.stream.Collectors;
@@ -62,12 +63,12 @@ public class PushMessageStrategyV1 implements ReplyStrategyService {
         // 1 处理文章--算法引擎--排序文章数据
 //        getWenzhangData();
         // 2 处理小程序--读取离线数据表--获取策略排序小程序数据
-        List<CgiReplyBucketData> smallDataCgiReplyList = readStrategyOrderSmallData(keyedSet);
+        List<CgiReplyBucketData> smallDataCgiReplyList = readStrategyOrderSmallData(keyedSet, bucketDataParam);
         // 2.1 获取小程序落地页地址 http调用
         smallDataCgiReplyList = setSmallPageUrl(smallDataCgiReplyList);
         log.info(JSON.toJSONString(smallDataCgiReplyList));
         // 3 入库读表
-        insertSmallData(smallDataCgiReplyList, keyedSet);
+        insertSmallData(smallDataCgiReplyList, keyedSet, bucketDataParam);
         // 4 组装分桶数据
         return getReplyBucketData(bucketStrategyConfigJsonObject, keyedSet, bucketDataParam.getGhId());
     }
@@ -116,7 +117,7 @@ public class PushMessageStrategyV1 implements ReplyStrategyService {
     }
 
 
-    private void insertSmallData(List<CgiReplyBucketData> smallDataCgiReplyList, Set<String> keyedSet) {
+    private void insertSmallData(List<CgiReplyBucketData> smallDataCgiReplyList, Set<String> keyedSet, BucketDataParam bucketDataParam) {
         if (CollectionUtils.isEmpty(smallDataCgiReplyList)) {
             return;
         }
@@ -124,14 +125,17 @@ public class PushMessageStrategyV1 implements ReplyStrategyService {
             if ("base".equals(key)) {
                 continue;
             }
-            List<CgiReplyBucketData> collect = smallDataCgiReplyList.stream().filter(x -> x.getStrategy().equals(key)).collect(Collectors.toList());
+            List<CgiReplyBucketData> collect = smallDataCgiReplyList.stream()
+                    .filter(x -> x.getStrategy().equals(key))
+                    .filter(x -> x.getGhId().equals(bucketDataParam.getGhId()))
+                    .collect(Collectors.toList());
             if (CollectionUtils.isEmpty(collect)) {
-                log.error("insertSmallData 算法排序数据异常,data:" + JSON.toJSONString(smallDataCgiReplyList));
+                log.error("PushMessageStrategyV1 insertSmallData 算法排序数据异常,data:" + JSON.toJSONString(smallDataCgiReplyList));
                 continue;
             }
             // 清上个版本的策略数据
             CgiReplyBucketDataExample cgiReplyBucketDataExample = new CgiReplyBucketDataExample();
-            cgiReplyBucketDataExample.createCriteria().andIsDeleteEqualTo(0).andMsgTypeEqualTo(1).andStrategyEqualTo(key);
+            cgiReplyBucketDataExample.createCriteria().andIsDeleteEqualTo(0).andMsgTypeEqualTo(1).andStrategyEqualTo(key).andGhIdEqualTo(bucketDataParam.getGhId());
             List<CgiReplyBucketData> cgiReplyBucketData1 = cgiReplyBucketDataMapper.selectByExample(cgiReplyBucketDataExample);
             for (CgiReplyBucketData cgiReplyBucketData : cgiReplyBucketData1) {
                 cgiReplyBucketData.setIsDelete(1);
@@ -193,26 +197,29 @@ public class PushMessageStrategyV1 implements ReplyStrategyService {
         return smallDataCgiReplyList;
     }
 
-    private List<CgiReplyBucketData> readStrategyOrderSmallData(Set<String> keyedSet) {
+    private List<CgiReplyBucketData> readStrategyOrderSmallData(Set<String> keyedSet, BucketDataParam bucketDataParam) {
         List<CgiReplyBucketData> result = new ArrayList<>();
-        // TODO: 只更新对应公众号的数据
         for (String key : keyedSet) {
             if ("base".equals(key)) {
                 // base作为人工控制
                 continue;
             }
             // 获取最新dt的策略
-            String dtVersion = algGhAutoreplyVideoRankDataMapper.selectLatestDtVersionByStrategyKey(key);
+            String dtVersion = algGhAutoreplyVideoRankDataMapper.selectLatestDtVersionByStrategyKeyAndGhId(key, bucketDataParam.getGhId());
+            if (StringUtils.isEmpty(dtVersion)) {
+                bucketDataParam.setGhId("default");
+                dtVersion = algGhAutoreplyVideoRankDataMapper.selectLatestDtVersionByStrategyKeyAndGhId(key, bucketDataParam.getGhId());
+            }
             // 判断当前的dtVersion是否已经处理过了
             CgiReplyBucketDataExample cgiReplyBucketDataExample = new CgiReplyBucketDataExample();
-            cgiReplyBucketDataExample.createCriteria().andIsDeleteEqualTo(0).andStrategyDtEqualTo(dtVersion).andStrategyEqualTo(key);
+            cgiReplyBucketDataExample.createCriteria().andIsDeleteEqualTo(0).andStrategyDtEqualTo(dtVersion).andStrategyEqualTo(key).andGhIdEqualTo(bucketDataParam.getGhId());
             long count = cgiReplyBucketDataMapper.countByExample(cgiReplyBucketDataExample);
             if (count != 0) {
                 // 说明已处理过该dtVersion数据
                 continue;
             }
             // 获取最新dt数据
-            List<AlgGhAutoreplyVideoRankData> dtVserSionStrategyData = getDtVersionStrategyData(key, dtVersion);
+            List<AlgGhAutoreplyVideoRankData> dtVserSionStrategyData = getDtVersionStrategyData(key, dtVersion, bucketDataParam.getGhId());
             result.addAll(dtVserSionStrategyData.stream().map(x -> {
                 CgiReplyBucketData cgiReplyBucketData = new CgiReplyBucketData();
                 cgiReplyBucketData.setStrategy(key);
@@ -231,9 +238,9 @@ public class PushMessageStrategyV1 implements ReplyStrategyService {
         return CollectionUtils.isEmpty(result) ? null : result;
     }
 
-    private List<AlgGhAutoreplyVideoRankData> getDtVersionStrategyData(String key, String dtVersion) {
+    private List<AlgGhAutoreplyVideoRankData> getDtVersionStrategyData(String key, String dtVersion, String ghId) {
         AlgGhAutoreplyVideoRankDataExample algGhAutoreplyVideoRankDataExample = new AlgGhAutoreplyVideoRankDataExample();
-        algGhAutoreplyVideoRankDataExample.createCriteria().andIsDeleteEqualTo(0).andDtVersionEqualTo(dtVersion).andStrategyKeyEqualTo(key);
+        algGhAutoreplyVideoRankDataExample.createCriteria().andIsDeleteEqualTo(0).andDtVersionEqualTo(dtVersion).andStrategyKeyEqualTo(key).andGhIdEqualTo(ghId);
         return algGhAutoreplyVideoRankDataMapper.selectByExample(algGhAutoreplyVideoRankDataExample);
     }
 

+ 6 - 0
long-article-recommend-service/src/main/java/com/tzld/longarticle/recommend/server/web/HealthCheckController.java

@@ -15,4 +15,10 @@ public class HealthCheckController {
     public String ok() {
         return "ok";
     }
+
+    @GetMapping("/test")
+    public String test() {
+        log.error("test");
+        return "ok";
+    }
 }

+ 9 - 0
long-article-recommend-service/src/main/resources/mapper/crawler/AlgGhAutoreplyVideoRankDataMapper.xml

@@ -312,4 +312,13 @@
     ORDER BY dt_version DESC
       LIMIT 1
   </select>
+
+  <select id="selectLatestDtVersionByStrategyKeyAndGhId" resultType="string" >
+    SELECT dt_version
+    FROM alg_gh_autoreply_video_rank_data
+    WHERE strategy_key = #{strategyKey} and gh_id = #{ghId}
+    GROUP BY dt_version
+    ORDER BY dt_version DESC
+      LIMIT 1
+  </select>
 </mapper>