Kaynağa Gözat

feat: V839 引入 YearValidPlayDkElements 召回, 替换 YearShareDkElements

V839 实验差异点 (vs V562): 把"分享行为源 -> dk_elements"召回替换为
"有效播放行为源 -> dk_elements"召回, 形成单变量纯净 AB 对照, 验证有效播放
信号在 dk_elements 召回链路上是否优于分享信号.

- YearValidPlayDkElementsRecallStrategy (新建): 克隆 YearShareDkElements
  架构, 触发源改读 userNetworkSeqFeature.rp_vid 序列 (上游已过滤为有效播放
  vid, 无需 a_t_s 类型过滤), 复用 elements_rovn_recall Redis 倒排.
- RecommendService.allVids 加入 rp_vid 序列, 让 userNetworkSeqVideoInfoMap
  含这批 vid 的 dk_elements; 上游未填充时返回空, 对其他实验零影响.
- V839 个性化白名单的 YearShareDkElements 替换为 YearValidPlay, 仍 7 路.
- RecallService gate: V839 解绑 YearShare, 独立挂 YearValidPlay; YearShare
  退回 562/536 共用.
yangxiaohui 2 hafta önce
ebeveyn
işleme
a763ccd592

+ 5 - 1
recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/RecommendService.java

@@ -561,8 +561,12 @@ public class RecommendService {
         Map<String, String> userNetworkSeqFeature = unionIdFeature.getOrDefault("alg_user_network_seq_feature", new HashMap<>());
         List<String> actVidSeq = FeatureUtils.extractVidsFromUserNetworkSeqFeature(userNetworkSeqFeature, "a_v_s");
         List<String> netVidSeq = FeatureUtils.extractVidsFromUserNetworkSeqFeature(userNetworkSeqFeature, "n_v_s");
+        // rp_vid: 用户近期有效播放 vid 序列, YearValidPlayDkElementsRecallStrategy 需要拿到这批 vid
+        // 的 dk_elements -> 必须进 userNetworkSeqVideoInfoMap, 否则 dk_elements lookup 全 miss.
+        // 上游未填充时返回空, 对其他实验零影响.
+        List<String> rpVidSeq = FeatureUtils.extractVidsFromUserNetworkSeqFeature(userNetworkSeqFeature, "rp_vid");
 
-        List<String> allVids = Stream.of(actVidSeq, netVidSeq)
+        List<String> allVids = Stream.of(actVidSeq, netVidSeq, rpVidSeq)
                 .flatMap(Collection::stream)
                 .distinct()
                 .filter(StringUtils::isNotBlank)

+ 4 - 3
recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/rank/strategy/RankStrategy4RegionMergeModelV839.java

@@ -40,9 +40,10 @@ public class RankStrategy4RegionMergeModelV839 extends RankStrategy4RegionMergeM
     private FeatureService featureService;
 
     /**
-     * V839 个性化召回白名单 (7 路: V566 基础 6 路 + 1 路 dk_elements 行为路实验):召回 key 含 mid/uid,
+     * V839 个性化召回白名单 (7 路: V566 基础 6 路 + 1 路 valid_play dk_elements 行为路实验):召回 key 含 mid/uid,
      * 依赖该用户行为信号。
-     * V839 实验路径: YearShareDkElements (用户近期 share 行为 join dk_elements)
+     * V839 vs V562 唯一差异: 把 YearShareDkElements (分享行为) 替换为
+     * YearValidPlayDkElements (有效播放行为) — 形成"分享 vs 有效播放"双行为源 dk_elements 召回的纯净 AB 对照。
      * 注:YearReturnCate2 因线上效果不佳, 2026-06-04 起移到非个性化白名单。
      */
     private static final Set<String> PERSONAL_RECALL_PUSH_FROMS = new HashSet<>(Arrays.asList(
@@ -52,7 +53,7 @@ public class RankStrategy4RegionMergeModelV839 extends RankStrategy4RegionMergeM
             Return1Cate2StrRecallStrategy.PUSH_FORM,
             YearShareCate1RecallStrategy.PUSH_FROM,
             YearShareCate2RecallStrategy.PUSH_FROM,
-            YearShareDkElementsRecallStrategy.PUSH_FROM
+            YearValidPlayDkElementsRecallStrategy.PUSH_FROM
     ));
 
     /**

+ 6 - 2
recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/recall/RecallService.java

@@ -182,15 +182,19 @@ public class RecallService implements ApplicationContextAware {
         // V536/V562/V565/V569 各承载一个 dk_elements 实验, 互相隔离便于独立归因:
         //   V562 → YearShareDkElements:    用户近期 share 行为 join dk_elements -> elements_rovn_recall 倒排
         //   V536 → YearShareDkElements:    同 V562, 但在 V536 rank 类中归非个性化白名单 (配额归属差异)
-        //   V839 → YearShareDkElements:    V839 已克隆 V562 架构, 白名单引用 YearShareDkElements, 不挂 gate 则该路永远空
+        //   V839 → YearValidPlayDkElements: V839 用 valid_play 行为源替换 V562 的 share 行为源,
+        //                                    形成"分享 vs 有效播放"双行为 dk_elements 召回 AB 对照.
         //   V565 → UserProfileDkElements:  用户元素画像 (s_z_y_s/zt_gyf)         -> elements_rovn_recall 倒排
         //   V569 → YearReturnDkElements:   用户近期 click 回流行为 join dk_elements -> elements_rovn_recall 倒排
         boolean isHit562Exp = experimentService.judgeHitAlgoExp(param.getAppType(), param.getRootSessionId(), abExpCodes, "562");
         boolean isHit536Exp = experimentService.judgeHitAlgoExp(param.getAppType(), param.getRootSessionId(), abExpCodes, "536");
         boolean isHit839Exp = experimentService.judgeHitAlgoExp(param.getAppType(), param.getRootSessionId(), abExpCodes, "839");
-        if (isHit562Exp || isHit536Exp || isHit839Exp) {
+        if (isHit562Exp || isHit536Exp) {
             strategies.add(strategyMap.get(YearShareDkElementsRecallStrategy.class.getSimpleName()));
         }
+        if (isHit839Exp) {
+            strategies.add(strategyMap.get(YearValidPlayDkElementsRecallStrategy.class.getSimpleName()));
+        }
         boolean isHit565Exp = experimentService.judgeHitAlgoExp(param.getAppType(), param.getRootSessionId(), abExpCodes, "565");
         if (isHit565Exp) {
             strategies.add(strategyMap.get(UserProfileDkElementsRecallStrategy.class.getSimpleName()));

+ 198 - 0
recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/recall/strategy/YearValidPlayDkElementsRecallStrategy.java

@@ -0,0 +1,198 @@
+package com.tzld.piaoquan.recommend.server.service.recall.strategy;
+
+import com.tzld.piaoquan.recommend.server.model.Video;
+import com.tzld.piaoquan.recommend.server.service.filter.FilterParam;
+import com.tzld.piaoquan.recommend.server.service.filter.FilterResult;
+import com.tzld.piaoquan.recommend.server.service.filter.FilterService;
+import com.tzld.piaoquan.recommend.server.service.recall.FilterParamFactory;
+import com.tzld.piaoquan.recommend.server.service.recall.RecallParam;
+import com.tzld.piaoquan.recommend.server.service.recall.RecallStrategy;
+import com.tzld.piaoquan.recommend.server.util.DkElementsUtils;
+import com.tzld.piaoquan.recommend.server.util.FeatureUtils;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.collections4.CollectionUtils;
+import org.apache.commons.collections4.MapUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.math.NumberUtils;
+import org.apache.commons.lang3.tuple.Pair;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Qualifier;
+import org.springframework.data.redis.core.RedisTemplate;
+import org.springframework.stereotype.Component;
+
+import java.util.*;
+import java.util.stream.Collectors;
+
+/**
+ * 视频解构 实质元素 rovn 召回 (用户近期 有效播放 行为 -> dk_elements)
+ *   触发源相对 YearShareDkElementsRecallStrategy 改为有效播放行为, 其余 Redis 倒排、参数完全一致.
+ *
+ *   数据通道差异: share/click 走 a_v_s + a_t_s 配对序列+类型过滤; 有效播放走独立 rp_vid 序列,
+ *   该序列上游已只保留"有效播放" vid, 无需 type 过滤.
+ *
+ *   挑 kw 逻辑: 按 rp_vid 时间序遍历, 每个 vid 摊平多个 element, distinct 取前 topN (30) 个
+ *               -> 一次 multiGet elements_rovn_recall:{kw} 倒排.
+ *
+ *   上游 ODPS: alg_recsys_recall_elements_rovn (原始元素 -> top-50 vid + rovn 得分)
+ *   Redis key: elements_rovn_recall:{原始元素}
+ *   value: vid1,vid2,...\tscore1,score2,...
+ */
+@Slf4j
+@Component
+public class YearValidPlayDkElementsRecallStrategy implements RecallStrategy {
+
+    @Autowired
+    @Qualifier("redisTemplate")
+    private RedisTemplate<String, String> redisTemplate;
+
+    @Autowired
+    private FilterService filterService;
+
+    private final String CLASS_NAME = this.getClass().getSimpleName();
+
+    public static final String PUSH_FROM = "recall_user_year_valid_play_dk_elements";
+    public static final String redisKeyPrefix = "elements_rovn_recall";
+
+    /** 用户近期有效播放 vid 序列 key, 上游已只保留有效播放, 无需 type 过滤 */
+    public static final String VID_SEQ_KEY = "rp_vid";
+
+    /** 元素贡献分过滤阈值 (parse 时丢弃 c < 0.8 的 element, 噪声元素不进召回) */
+    public static final double MIN_CONTRIB_SCORE = 0.8;
+    /** 摊平后按时间序 distinct 取前 N 个 element 进 Redis 倒排查询 */
+    public static final int topN = 30;
+
+    @Override
+    public List<Video> recall(RecallParam param) {
+
+        List<Video> videosResult = new ArrayList<>();
+        try {
+
+            if (MapUtils.isEmpty(param.getUserNetworkSeqVideoInfoMap())) {
+                return videosResult;
+            }
+
+            List<Pair<Long, String>> userValidPlayVideoElement = this.parseUserValidPlayVideoAndElements(param.getUserNetworkSeqFeature(), param.getUserNetworkSeqVideoInfoMap());
+            if (CollectionUtils.isEmpty(userValidPlayVideoElement)) {
+                return videosResult;
+            }
+            // 按用户近期 有效播放 行为时间序遍历, distinct 取前 topN 个高贡献分 element
+            // (贡献分过滤已在 parseUserValidPlayVideoAndElements 内 c >= MIN_CONTRIB_SCORE 完成)
+            List<String> allElements = userValidPlayVideoElement.stream()
+                    .map(Pair::getValue)
+                    .filter(StringUtils::isNotBlank)
+                    .distinct()
+                    .limit(topN)
+                    .collect(Collectors.toList());
+
+            List<String> keys = this.getRedisKey(allElements);
+            List<String> values = redisTemplate.opsForValue().multiGet(keys);
+
+            // 保留 Redis 倒排的真实 rovn 分 (而非位置分): scoresMap 的 score 会写到 Video.rovScore,
+            // 粗排截断 coarseMap miss 的 vid 会 fallback 用 Video.rovScore 排序, 真实分更有信号.
+            Map<Long, Double> scoresMap = recall(param.getVideoId(), values);
+            List<Long> ids = scoresMap.entrySet().stream()
+                    .sorted(Comparator.comparingDouble((Map.Entry<Long, Double> e) -> e.getValue()).reversed())
+                    .map(Map.Entry::getKey)
+                    .collect(Collectors.toList());
+
+            FilterParam filterParam = FilterParamFactory.create(param, ids, pushFrom(), scoresMap);
+            FilterResult filterResult = filterService.filter(filterParam);
+            if (filterResult != null && CollectionUtils.isNotEmpty(filterResult.getVideoIds())) {
+                for (Long vid : filterResult.getVideoIds()) {
+                    Video video = new Video();
+                    video.setVideoId(vid);
+                    video.setRovScore(scoresMap.getOrDefault(vid, 0.0));
+                    video.setPushFrom(pushFrom());
+                    videosResult.add(video);
+                }
+            }
+        } catch (Exception e) {
+            log.error("recall is wrong in {}, error={}", CLASS_NAME, e);
+        }
+
+        return videosResult;
+    }
+
+    /**
+     * 摊平: 每个有效播放 vid 一般有多个 dk_element, 输出 (vid, element) pair 序列, 按 vid 时间序保留.
+     * rp_vid 序列上游已是"有效播放过滤后"的 vid 列表, 不再做 type 过滤.
+     */
+    private List<Pair<Long, String>> parseUserValidPlayVideoAndElements(Map<String, String> userNetworkSeqFeature, Map<Long, Map<String, String>> userNetworkSeqVideoInfoMap) {
+        List<Pair<Long, String>> result = new ArrayList<>();
+        List<String> rpVidSeq = FeatureUtils.extractVidsFromUserNetworkSeqFeature(userNetworkSeqFeature, VID_SEQ_KEY);
+        if (CollectionUtils.isEmpty(rpVidSeq)) {
+            return result;
+        }
+
+        for (String vidStr : rpVidSeq) {
+            long videoIdL = NumberUtils.toLong(vidStr, -1);
+            if (videoIdL <= 0) {
+                continue;
+            }
+
+            Map<String, String> videoBaseInfo = userNetworkSeqVideoInfoMap.getOrDefault(videoIdL, new HashMap<>());
+            String dkElementsStr = videoBaseInfo.get("dk_elements");
+            if (StringUtils.isBlank(dkElementsStr)) {
+                continue;
+            }
+            List<String> kws = DkElementsUtils.parseElementKws(dkElementsStr, MIN_CONTRIB_SCORE);
+            for (String kw : kws) {
+                result.add(Pair.of(videoIdL, kw));
+            }
+        }
+        return result;
+    }
+
+    private List<String> getRedisKey(List<String> elementList) {
+        List<String> keys = new ArrayList<>();
+        for (String element : elementList) {
+            keys.add(String.format("%s:%s", redisKeyPrefix, element));
+        }
+        return keys;
+    }
+
+    /**
+     * 解析 multiGet 拿到的 N 个 Redis value, 拼成 vid -> 真实 score map.
+     * value 格式: vid1,vid2,...\tscore1,score2,...  (rovn 真实分)
+     * 同 vid 在多个 element 倒排里出现时, 取 max score (跟 AbstractRedisRecallStrategy 一致).
+     */
+    private Map<Long, Double> recall(Long headVid, List<String> values) {
+        Map<Long, Double> scoresMap = new HashMap<>();
+        if (CollectionUtils.isEmpty(values)) {
+            return scoresMap;
+        }
+        for (String value : values) {
+            if (StringUtils.isBlank(value)) {
+                continue;
+            }
+            String[] cells = value.split("\t");
+            if (cells.length != 2) {
+                continue;
+            }
+            List<Long> ids;
+            List<Double> scores;
+            try {
+                ids = Arrays.stream(cells[0].split(",")).map(Long::valueOf).collect(Collectors.toList());
+                scores = Arrays.stream(cells[1].split(",")).map(Double::valueOf).collect(Collectors.toList());
+            } catch (NumberFormatException nfe) {
+                continue;
+            }
+            if (ids.isEmpty() || ids.size() != scores.size()) {
+                continue;
+            }
+            for (int i = 0; i < ids.size(); i++) {
+                long id = ids.get(i);
+                if (headVid != null && headVid == id) {
+                    continue;
+                }
+                scoresMap.merge(id, scores.get(i), Math::max);
+            }
+        }
+        return scoresMap;
+    }
+
+    @Override
+    public String pushFrom() {
+        return PUSH_FROM;
+    }
+}