--@exclude_input=loghubods.video_action_log_flow_new --@exclude_input=loghubods.user_share_log_flow --********************* -- alg_recsys_rank_labelmatch_20260206 -- 在 20250108 基础上新增 B/C 多跳回流列 --********************* -- drop table if exists loghubods.dwd_recsys_alg_exposure_base_20260206; CREATE TABLE IF NOT EXISTS loghubods.dwd_recsys_alg_exposure_base_20260206 ( apptype STRING ,uid STRING ,mid STRING ,vid STRING ,sessionid STRING ,subsessionid STRING ,pagesource STRING ,page STRING ,recommendlogvo STRING COMMENT '推荐算法的返回结果日志存在这个字段中' ,abcode STRING COMMENT '推荐算法的ab分组:ab0' ,recommendpagetype STRING COMMENT '用于区分pagesource相同时某些场景的。三种回流头部;两种下滑-沉浸页下滑和feed下滑。 -pages/user-videos-share-recommend-detail 是沉浸页。' ,recomtraceid STRING COMMENT '在后端调取推荐服务之前生成。前端降级会空;后端也可能为空。' ,headvideoid STRING ,rootsourceid STRING COMMENT '区分touliu等流量,咨询产品。' ,hotsencetype STRING ,flowpool STRING COMMENT '非流量池,是空字符串。没有null值。' ,level STRING COMMENT '非流量池,是null。' ,clientip STRING ,machineinfo_brand STRING ,machineinfo_model STRING ,machineinfo_system STRING ,machineinfo_wechatversion STRING ,machineinfo_sdkversion STRING ,province STRING ,city STRING ,ts STRING ,is_share STRING ,share_cnt STRING ,is_return_1 STRING ,return_1_pv STRING ,return_1_uv STRING ,return_1_mids STRING ,is_return_n STRING ,return_n_pv STRING ,return_n_uv STRING ,return_n_mids STRING ,is_return_noself STRING ,return_1_uv_noself STRING ,return_1_mids_noself STRING ,is_return_n_noself STRING ,return_n_uv_noself STRING ,return_n_mids_noself STRING ,new_exposure_cnt STRING ,b STRING COMMENT '直接回流去重人数(B)' ,c_1 STRING COMMENT '1跳回流SUM(B)' ,c_2 STRING COMMENT '2跳回流SUM(B)' ,c_3 STRING COMMENT '3跳回流SUM(B)' ,d_1 STRING COMMENT 'D链1跳: 同subsession后续曝光的B之和' ,d_2 STRING COMMENT 'D链2跳: d1回流用户session内曝光的B之和' ,d_3 STRING COMMENT 'D链3跳: d2回流用户session内曝光的B之和(去环)' ,b_mids STRING COMMENT 'B对应的回流mid列表' ,c_1_mids STRING COMMENT 'C_1对应的回流mid列表' ,c_2_mids STRING COMMENT 'C_2对应的回流mid列表' ,c_3_mids STRING COMMENT 'C_3对应的回流mid列表' ,d_1_mids STRING COMMENT 'D链1跳对应的回流mid列表' ,d_2_mids STRING COMMENT 'D链2跳对应的回流mid列表' ,d_3_mids STRING COMMENT 'D链3跳对应的回流mid列表' ,extend STRING ) PARTITIONED BY ( dt STRING COMMENT '日期:20240105' ,hh STRING COMMENT '小时:04' ) STORED AS ALIORC TBLPROPERTIES ('comment' = '推荐算法-labelmatch表-20260206更新-含多跳B/C/D') LIFECYCLE 3650 ; SET hive.exec.dynamic.partition = true ; SET hive.exec.dynamic.partition.mode = nonstrict ; SET odps.stage.mapper.split.size = 1024 ; INSERT OVERWRITE TABLE loghubods.dwd_recsys_alg_exposure_base_20260206 PARTITION (dt,hh) WITH t_return AS ( SELECT * ,CONCAT(dthh,":",shareid,":",vid,":",dthh_id) AS id FROM ( SELECT CONCAT(year,month,day,hour) AS dthh ,apptype ,machinecode AS mid ,clickobjectid AS vid ,sessionid ,subsessionid -- 注意这是回流对应的subsessionid,每次回流点击会重置,可以通过这个字段找到回流的曝光。 ,shareid ,rootshareid ,CAST(clienttimestamp / 1000 AS BIGINT) AS ts ,ROW_NUMBER() OVER (PARTITION BY CONCAT(year,month,day,hour),apptype,machinecode,clickobjectid,sessionid,subsessionid,shareid,rootshareid ORDER BY clienttimestamp DESC ) AS rn ,ROW_NUMBER() OVER (PARTITION BY CONCAT(year,month,day,hour),shareid,clickobjectid ORDER BY clienttimestamp ) AS dthh_id FROM loghubods.user_share_log_flow -- 回流行为,理应subsessionid只有一条,但有脏数据,去重。 WHERE CONCAT(year,month,day,hour) BETWEEN TO_CHAR(FROM_UNIXTIME(UNIX_TIMESTAMP(TO_DATE('${dt}${hh}','YYYYMMDDHH')) - 3600 * 25),'YYYYMMDDHH') AND TO_CHAR(FROM_UNIXTIME(UNIX_TIMESTAMP(TO_DATE('${dt}${hh}','YYYYMMDDHH')) - 3600 * 1),'YYYYMMDDHH') --WHERE CONCAT(year,month,day,hour) = TO_CHAR(FROM_UNIXTIME(UNIX_TIMESTAMP(TO_DATE('${dt}${hh}','YYYYMMDDHH')) - 3600 * 25),'YYYYMMDDHH') AND __topic__ = 'click' AND apptype IS NOT NULL AND apptype NOT IN ('12') -- 12的pagesoucre是h5-share和h5-detail 暂时过滤掉 不做处理 AND machinecode IS NOT NULL AND clickobjectid IS NOT NULL AND pagesource REGEXP "-pages/user-videos-share$" -- 存在脏数据 vlog-gzh /mine/mine-info$ 结尾的,都过滤掉。 ) WHERE rn = 1 ) ,t_share_from_sharelog AS ( SELECT * FROM ( SELECT CONCAT(year,month,day,hour) AS dthh ,apptype ,machinecode AS mid ,shareobjectid AS vid ,sessionid ,subsessionid ,pagesource ,shareid ,CAST(clienttimestamp / 1000 AS BIGINT) AS ts ,ROW_NUMBER() OVER (PARTITION BY CONCAT(year,month,day,hour),apptype,machinecode,shareobjectid,sessionid,subsessionid,pagesource,shareid ORDER BY clienttimestamp DESC ) AS rn FROM loghubods.user_share_log_flow WHERE CONCAT(year,month,day,hour) BETWEEN TO_CHAR(FROM_UNIXTIME(UNIX_TIMESTAMP(TO_DATE('${dt}${hh}','YYYYMMDDHH')) - 3600 * 25),'YYYYMMDDHH') AND TO_CHAR(FROM_UNIXTIME(UNIX_TIMESTAMP(TO_DATE('${dt}${hh}','YYYYMMDDHH')) - 3600 * 1),'YYYYMMDDHH') --WHERE CONCAT(year,month,day,hour) = TO_CHAR(FROM_UNIXTIME(UNIX_TIMESTAMP(TO_DATE('${dt}${hh}','YYYYMMDDHH')) - 3600 * 25),'YYYYMMDDHH') AND __topic__ = 'share' AND apptype IS NOT NULL AND apptype NOT IN ('12') AND machinecode IS NOT NULL AND shareobjectid IS NOT NULL ) WHERE rn = 1 ) ,t_exposure AS ( SELECT dthh_id ,dthh ,apptype ,uid ,mid ,vid ,sessionid ,subsessionid ,rootsessionid_new ,pagesource ,recommendlogvo ,abcode ,recommendpagetype ,recomtraceid ,headvideoid ,rootsourceid ,hotsencetype ,animationscenetype ,JSON_PARSE(IF(JSON_VALID(extparams),extparams,"{}")) AS extParams ,flowpool ,level ,clientip ,machineinfo_brand ,machineinfo_model ,machineinfo_system ,machineinfo_wechatversion ,machineinfo_sdkversion ,province ,city ,versioncode ,ts ,rn ,id ,dt ,hh FROM loghubods.dwd_recsys_alg_exposure_base_view_20250402 WHERE CONCAT(dt,hh) BETWEEN TO_CHAR(FROM_UNIXTIME(UNIX_TIMESTAMP(TO_DATE('${dt}${hh}','YYYYMMDDHH')) - 3600 * 25),'YYYYMMDDHH') AND TO_CHAR(FROM_UNIXTIME(UNIX_TIMESTAMP(TO_DATE('${dt}${hh}','YYYYMMDDHH')) - 3600 * 1),'YYYYMMDDHH') ) ,t_exposure_recommend AS ( SELECT * FROM t_exposure WHERE pagesource REGEXP 'category$|recommend$|-pages/user-videos-detail$' ) ,t_return_exposure_1 AS -- 曝光关联回流,用于计算viewh24 ( SELECT * FROM ( SELECT t1.id AS exposure_id ,t1.mid AS mid ,t1.vid AS vid ,t1.subsessionid AS subsessionid ,t1.sessionid AS sessionid ,t1.headvideoid AS headvideoid ,t1.dthh ,t2.id AS return_id ,ROW_NUMBER() OVER (PARTITION BY t1.id ORDER BY t2.ts DESC ) AS rn FROM t_exposure_recommend t1 LEFT JOIN t_return t2 ON t1.mid = t2.mid AND t1.headvideoid = t2.vid AND t1.subsessionid = t2.subsessionid ) WHERE rn = 1 ) ,t_return_exposure_2 AS -- 曝光关联回流,用于计算viewh24 ( SELECT * FROM ( SELECT t1.exposure_id AS exposure_id ,t1.mid AS mid ,t1.vid AS vid ,t1.subsessionid AS subsessionid ,t1.sessionid AS sessionid ,t1.headvideoid AS headvideoid ,t1.dthh ,t2.id AS return_id ,ROW_NUMBER() OVER (PARTITION BY t1.exposure_id ORDER BY t2.ts DESC ) AS rn FROM ( SELECT * FROM t_return_exposure_1 WHERE return_id IS NULL ) t1 LEFT JOIN t_return t2 ON t1.mid = t2.mid AND t1.headvideoid = t2.vid AND t1.sessionid = t2.sessionid ) WHERE rn = 1 ) ,t_return_exposure_3 AS -- 曝光关联回流,用于计算viewh24 ( SELECT * FROM ( SELECT t1.exposure_id AS exposure_id ,t1.mid AS mid ,t1.vid AS vid ,t1.subsessionid AS subsessionid ,t1.sessionid AS sessionid ,t1.headvideoid AS headvideoid ,t1.dthh ,t2.id AS return_id ,ROW_NUMBER() OVER (PARTITION BY t1.exposure_id ORDER BY t2.ts DESC ) AS rn FROM ( SELECT * FROM t_return_exposure_2 WHERE return_id IS NULL ) t1 LEFT JOIN t_return t2 ON t1.mid = t2.mid AND t1.subsessionid = t2.subsessionid ) WHERE rn = 1 ) ,t_return_exposure_4 AS -- 曝光关联回流,用于计算viewh24 ( SELECT * FROM ( SELECT t1.exposure_id AS exposure_id ,t1.mid AS mid ,t1.vid AS vid ,t1.subsessionid AS subsessionid ,t1.sessionid AS sessionid ,t1.headvideoid AS headvideoid ,t1.dthh ,t2.id AS return_id ,ROW_NUMBER() OVER (PARTITION BY t1.exposure_id ORDER BY t2.ts DESC ) AS rn FROM ( SELECT * FROM t_return_exposure_3 WHERE return_id IS NULL ) t1 LEFT JOIN t_return t2 ON t1.mid = t2.mid AND t1.sessionid = t2.sessionid ) WHERE rn = 1 ) ,t_return_exposure AS ( SELECT a.* ,b.exposure_cnt AS new_exposure_cnt FROM t_return a LEFT JOIN ( SELECT return_id ,COUNT(1) AS exposure_cnt FROM ( SELECT * FROM t_return_exposure_1 WHERE return_id IS NOT NULL UNION ALL SELECT * FROM t_return_exposure_2 WHERE return_id IS NOT NULL UNION ALL SELECT * FROM t_return_exposure_3 WHERE return_id IS NOT NULL UNION ALL SELECT * FROM t_return_exposure_4 WHERE return_id IS NOT NULL ) GROUP BY return_id ) b ON a.id = b.return_id ) ,t_normal_share_exposure_1 AS -- 开始处理常规的分享与曝光关联 ( SELECT * FROM ( SELECT t1.dthh ,t1.apptype ,t1.mid ,t1.vid ,t1.sessionid ,t1.subsessionid ,t1.pagesource ,t1.shareid ,t1.ts ,t2.id AS exposure_id ,t2.ts AS exposure_ts ,ROW_NUMBER() OVER (PARTITION BY t1.dthh,t1.apptype,t1.mid,t1.vid,t1.sessionid,t1.subsessionid,t1.pagesource,t1.shareid ORDER BY t2.ts DESC ) AS rn FROM t_share_from_sharelog t1 LEFT JOIN t_exposure t2 ON t1.apptype = t2.apptype AND t1.mid = t2.mid AND t1.vid = t2.vid AND t1.subsessionid = t2.subsessionid AND t1.pagesource = t2.pagesource AND t1.ts >= t2.ts WHERE t1.pagesource NOT REGEXP "pages/detail-user-videos-share-recommend$" ) WHERE rn = 1 ) ,t_normal_share_exposure_2 AS ( SELECT * FROM ( SELECT t1.dthh ,t1.apptype ,t1.mid ,t1.vid ,t1.sessionid ,t1.subsessionid ,t1.pagesource ,t1.shareid ,t1.ts ,t2.id AS exposure_id ,t2.ts AS exposure_ts ,ROW_NUMBER() OVER (PARTITION BY t1.dthh,t1.apptype,t1.mid,t1.vid,t1.sessionid,t1.subsessionid,t1.pagesource,t1.shareid ORDER BY t2.ts DESC ) AS rn FROM ( SELECT * FROM t_normal_share_exposure_1 WHERE exposure_id IS NULL ) t1 LEFT JOIN t_exposure t2 ON t1.apptype = t2.apptype AND t1.mid = t2.mid AND t1.vid = t2.vid AND t1.sessionid = t2.sessionid AND t1.pagesource = t2.pagesource AND t1.ts >= t2.ts ) WHERE rn = 1 ) ,t_normal_share_exposure_3 AS ( SELECT * FROM ( SELECT t1.dthh ,t1.apptype ,t1.mid ,t1.vid ,t1.sessionid ,t1.subsessionid ,t1.pagesource ,t1.shareid ,t1.ts ,t2.id AS exposure_id ,t2.ts AS exposure_ts ,ROW_NUMBER() OVER (PARTITION BY t1.dthh,t1.apptype,t1.mid,t1.vid,t1.sessionid,t1.subsessionid,t1.pagesource,t1.shareid ORDER BY t2.ts DESC ) AS rn FROM ( SELECT * FROM t_normal_share_exposure_2 WHERE exposure_id IS NULL ) t1 LEFT JOIN t_exposure t2 ON t1.apptype = t2.apptype AND t1.mid = t2.mid AND t1.vid = t2.vid AND t1.subsessionid = t2.subsessionid AND t1.pagesource = t2.pagesource ) WHERE rn = 1 ) ,t_normal_share_exposure_4 AS ( SELECT * FROM ( SELECT t1.dthh ,t1.apptype ,t1.mid ,t1.vid ,t1.sessionid ,t1.subsessionid ,t1.pagesource ,t1.shareid ,t1.ts ,t2.id AS exposure_id ,t2.ts AS exposure_ts ,ROW_NUMBER() OVER (PARTITION BY t1.dthh,t1.apptype,t1.mid,t1.vid,t1.sessionid,t1.subsessionid,t1.pagesource,t1.shareid ORDER BY t2.ts DESC ) AS rn FROM ( SELECT * FROM t_normal_share_exposure_3 WHERE exposure_id IS NULL ) t1 LEFT JOIN t_exposure t2 ON t1.apptype = t2.apptype AND t1.mid = t2.mid AND t1.vid = t2.vid AND t1.sessionid = t2.sessionid AND t1.pagesource = t2.pagesource ) WHERE rn = 1 ) ,t_normal_share_exposure_5 AS ( SELECT * FROM ( SELECT t1.dthh ,t1.apptype ,t1.mid ,t1.vid ,t1.sessionid ,t1.subsessionid ,t1.pagesource ,t1.shareid ,t1.ts ,t2.id AS exposure_id ,t2.ts AS exposure_ts ,ROW_NUMBER() OVER (PARTITION BY t1.dthh,t1.apptype,t1.mid,t1.vid,t1.sessionid,t1.subsessionid,t1.pagesource,t1.shareid ORDER BY t2.ts DESC ) AS rn FROM ( SELECT * FROM t_normal_share_exposure_4 WHERE exposure_id IS NULL ) t1 LEFT JOIN t_exposure t2 ON t1.apptype = t2.apptype AND t1.mid = t2.mid AND t1.vid = t2.vid AND t1.subsessionid = t2.subsessionid ) WHERE rn = 1 ) ,t_normal_share_exposure_6 AS ( SELECT * FROM ( SELECT t1.dthh ,t1.apptype ,t1.mid ,t1.vid ,t1.sessionid ,t1.subsessionid ,t1.pagesource ,t1.shareid ,t1.ts ,t2.id AS exposure_id ,t2.ts AS exposure_ts ,ROW_NUMBER() OVER (PARTITION BY t1.dthh,t1.apptype,t1.mid,t1.vid,t1.sessionid,t1.subsessionid,t1.pagesource,t1.shareid ORDER BY t2.ts DESC ) AS rn FROM ( SELECT * FROM t_normal_share_exposure_5 WHERE exposure_id IS NULL ) t1 LEFT JOIN t_exposure t2 ON t1.apptype = t2.apptype AND t1.mid = t2.mid AND t1.vid = t2.vid AND t1.sessionid = t2.sessionid ) WHERE rn = 1 ) ,t_exposure_detail AS ( SELECT * FROM t_exposure WHERE pagesource REGEXP "-pages/user-videos-detail$|pages/detail-recommend$" ) ,t_no_normal_share_exposure_1 AS -- 开始处理非常规的分享与曝光关联 ( SELECT * FROM ( SELECT t1.dthh ,t1.apptype ,t1.mid ,t1.vid ,t1.sessionid ,t1.subsessionid ,t1.pagesource ,t1.shareid ,t1.ts ,t2.id AS exposure_id ,t2.ts AS exposure_ts ,ROW_NUMBER() OVER (PARTITION BY t1.dthh,t1.apptype,t1.mid,t1.vid,t1.sessionid,t1.subsessionid,t1.pagesource,t1.shareid ORDER BY t2.ts DESC ) AS rn FROM t_share_from_sharelog t1 LEFT JOIN t_exposure_detail t2 ON t1.apptype = t2.apptype AND t1.mid = t2.mid AND t1.vid = t2.vid AND t1.subsessionid = t2.subsessionid AND t1.ts >= t2.ts WHERE t1.pagesource REGEXP "pages/detail-user-videos-share-recommend$" ) WHERE rn = 1 ) ,t_no_normal_share_exposure_2 AS ( SELECT * FROM ( SELECT t1.dthh ,t1.apptype ,t1.mid ,t1.vid ,t1.sessionid ,t1.subsessionid ,t1.pagesource ,t1.shareid ,t1.ts ,t2.id AS exposure_id ,t2.ts AS exposure_ts ,ROW_NUMBER() OVER (PARTITION BY t1.dthh,t1.apptype,t1.mid,t1.vid,t1.sessionid,t1.subsessionid,t1.pagesource,t1.shareid ORDER BY t2.ts DESC ) AS rn FROM ( SELECT * FROM t_no_normal_share_exposure_1 WHERE exposure_id IS NULL ) t1 LEFT JOIN t_exposure_detail t2 ON t1.apptype = t2.apptype AND t1.mid = t2.mid AND t1.vid = t2.vid AND t1.sessionid = t2.sessionid AND t1.ts >= t2.ts ) WHERE rn = 1 ) ,t_no_normal_share_exposure_3 AS ( SELECT * FROM ( SELECT t1.dthh ,t1.apptype ,t1.mid ,t1.vid ,t1.sessionid ,t1.subsessionid ,t1.pagesource ,t1.shareid ,t1.ts ,t2.id AS exposure_id ,t2.ts AS exposure_ts ,ROW_NUMBER() OVER (PARTITION BY t1.dthh,t1.apptype,t1.mid,t1.vid,t1.sessionid,t1.subsessionid,t1.pagesource,t1.shareid ORDER BY t2.ts DESC ) AS rn FROM ( SELECT * FROM t_no_normal_share_exposure_2 WHERE exposure_id IS NULL ) t1 LEFT JOIN t_exposure_detail t2 ON t1.apptype = t2.apptype AND t1.mid = t2.mid AND t1.vid = t2.vid AND t1.subsessionid = t2.subsessionid ) WHERE rn = 1 ) ,t_no_normal_share_exposure_4 AS ( SELECT * FROM ( SELECT t1.dthh ,t1.apptype ,t1.mid ,t1.vid ,t1.sessionid ,t1.subsessionid ,t1.pagesource ,t1.shareid ,t1.ts ,t2.id AS exposure_id ,t2.ts AS exposure_ts ,ROW_NUMBER() OVER (PARTITION BY t1.dthh,t1.apptype,t1.mid,t1.vid,t1.sessionid,t1.subsessionid,t1.pagesource,t1.shareid ORDER BY t2.ts DESC ) AS rn FROM ( SELECT * FROM t_no_normal_share_exposure_3 WHERE exposure_id IS NULL ) t1 LEFT JOIN t_exposure_detail t2 ON t1.apptype = t2.apptype AND t1.mid = t2.mid AND t1.vid = t2.vid AND t1.sessionid = t2.sessionid ) WHERE rn = 1 ) ,t_share_exposure AS ( SELECT * FROM t_normal_share_exposure_1 WHERE exposure_id IS NOT NULL UNION ALL SELECT * FROM t_normal_share_exposure_2 WHERE exposure_id IS NOT NULL UNION ALL SELECT * FROM t_normal_share_exposure_3 WHERE exposure_id IS NOT NULL UNION ALL SELECT * FROM t_normal_share_exposure_4 WHERE exposure_id IS NOT NULL UNION ALL SELECT * FROM t_normal_share_exposure_5 WHERE exposure_id IS NOT NULL UNION ALL SELECT * FROM t_normal_share_exposure_6 UNION ALL SELECT * FROM t_no_normal_share_exposure_1 WHERE exposure_id IS NOT NULL UNION ALL SELECT * FROM t_no_normal_share_exposure_2 WHERE exposure_id IS NOT NULL UNION ALL SELECT * FROM t_no_normal_share_exposure_3 WHERE exposure_id IS NOT NULL UNION ALL SELECT * FROM t_no_normal_share_exposure_4 ) --======================================== -- 多跳 B/C 计算 (BFS frontier + anti-join 去环, 仅 24h) --======================================== ,t_share_return AS ( SELECT se.exposure_id ,se.shareid ,se.vid ,se.apptype ,se.subsessionid ,r.subsessionid AS return_subsessionid ,r.mid AS return_mid FROM t_share_exposure se JOIN t_return r ON se.shareid = r.rootshareid AND se.vid = r.vid AND se.apptype = r.apptype ) ,t_exposure_bn AS ( SELECT exposure_id ,COUNT(DISTINCT return_mid) AS B ,COLLECT_SET(return_mid) AS B_mids FROM t_share_return GROUP BY exposure_id ) -- BFS frontier 1: 直达回流的 subsessionid 集合 ,t_frontier_1 AS ( SELECT DISTINCT exposure_id AS source_id, return_subsessionid AS reached_sub FROM t_share_return ) -- C_1: frontier_1 中曝光的 B 之和 ,t_c1 AS ( SELECT f.source_id AS exposure_id, SUM(bn.B) AS C_1 FROM t_frontier_1 f JOIN t_exposure e ON f.reached_sub = e.subsessionid JOIN t_exposure_bn bn ON e.id = bn.exposure_id GROUP BY f.source_id ) ,t_c1_mids AS ( SELECT f.source_id AS exposure_id ,COLLECT_SET(sr.return_mid) AS C_1_mids FROM t_frontier_1 f JOIN t_exposure e ON f.reached_sub = e.subsessionid JOIN t_exposure_bn bn ON e.id = bn.exposure_id JOIN t_share_return sr ON bn.exposure_id = sr.exposure_id GROUP BY f.source_id ) -- BFS frontier 2: frontier_1 延伸, anti-join 排除 frontier_1 ,t_frontier_2 AS ( SELECT DISTINCT f1.source_id, sr2.return_subsessionid AS reached_sub FROM t_frontier_1 f1 JOIN t_exposure e1 ON f1.reached_sub = e1.subsessionid JOIN t_exposure_bn bn1 ON e1.id = bn1.exposure_id JOIN t_share_return sr2 ON bn1.exposure_id = sr2.exposure_id LEFT JOIN t_frontier_1 v1 ON f1.source_id = v1.source_id AND sr2.return_subsessionid = v1.reached_sub WHERE v1.source_id IS NULL ) ,t_c2 AS ( SELECT f.source_id AS exposure_id, SUM(bn.B) AS C_2 FROM t_frontier_2 f JOIN t_exposure e ON f.reached_sub = e.subsessionid JOIN t_exposure_bn bn ON e.id = bn.exposure_id GROUP BY f.source_id ) ,t_c2_mids AS ( SELECT f.source_id AS exposure_id ,COLLECT_SET(sr.return_mid) AS C_2_mids FROM t_frontier_2 f JOIN t_exposure e ON f.reached_sub = e.subsessionid JOIN t_exposure_bn bn ON e.id = bn.exposure_id JOIN t_share_return sr ON bn.exposure_id = sr.exposure_id GROUP BY f.source_id ) -- BFS frontier 3: frontier_2 延伸, anti-join 排除 frontier_1 + frontier_2 ,t_frontier_3 AS ( SELECT DISTINCT f2.source_id, sr3.return_subsessionid AS reached_sub FROM t_frontier_2 f2 JOIN t_exposure e2 ON f2.reached_sub = e2.subsessionid JOIN t_exposure_bn bn2 ON e2.id = bn2.exposure_id JOIN t_share_return sr3 ON bn2.exposure_id = sr3.exposure_id LEFT JOIN t_frontier_1 v1 ON f2.source_id = v1.source_id AND sr3.return_subsessionid = v1.reached_sub LEFT JOIN t_frontier_2 v2 ON f2.source_id = v2.source_id AND sr3.return_subsessionid = v2.reached_sub WHERE v1.source_id IS NULL AND v2.source_id IS NULL ) ,t_c3 AS ( SELECT f.source_id AS exposure_id, SUM(bn.B) AS C_3 FROM t_frontier_3 f JOIN t_exposure e ON f.reached_sub = e.subsessionid JOIN t_exposure_bn bn ON e.id = bn.exposure_id GROUP BY f.source_id ) ,t_c3_mids AS ( SELECT f.source_id AS exposure_id ,COLLECT_SET(sr.return_mid) AS C_3_mids FROM t_frontier_3 f JOIN t_exposure e ON f.reached_sub = e.subsessionid JOIN t_exposure_bn bn ON e.id = bn.exposure_id JOIN t_share_return sr ON bn.exposure_id = sr.exposure_id GROUP BY f.source_id ) --======================================== -- D 链: session 内后续曝光传播 (BFS 去环) --======================================== ,t_d1 AS ( SELECT e1.id AS exposure_id ,SUM(bn2.B) AS D_1 FROM t_exposure e1 JOIN t_exposure e2 ON e1.subsessionid = e2.subsessionid AND CAST(e2.ts AS BIGINT) > CAST(e1.ts AS BIGINT) JOIN t_exposure_bn bn2 ON e2.id = bn2.exposure_id GROUP BY e1.id ) ,t_d1_mids AS ( SELECT e1.id AS exposure_id ,COLLECT_SET(sr.return_mid) AS D_1_mids FROM t_exposure e1 JOIN t_exposure e2 ON e1.subsessionid = e2.subsessionid AND CAST(e2.ts AS BIGINT) > CAST(e1.ts AS BIGINT) JOIN t_share_return sr ON e2.id = sr.exposure_id GROUP BY e1.id ) ,t_d1_frontier AS ( SELECT DISTINCT e1.id AS source_id ,sr.return_subsessionid AS reached_sub FROM t_exposure e1 JOIN t_exposure e2 ON e1.subsessionid = e2.subsessionid AND CAST(e2.ts AS BIGINT) > CAST(e1.ts AS BIGINT) JOIN t_share_return sr ON e2.id = sr.exposure_id ) ,t_d2 AS ( SELECT f.source_id AS exposure_id, SUM(bn.B) AS D_2 FROM t_d1_frontier f JOIN t_exposure e ON f.reached_sub = e.subsessionid JOIN t_exposure_bn bn ON e.id = bn.exposure_id GROUP BY f.source_id ) ,t_d2_mids AS ( SELECT f.source_id AS exposure_id ,COLLECT_SET(sr.return_mid) AS D_2_mids FROM t_d1_frontier f JOIN t_exposure e ON f.reached_sub = e.subsessionid JOIN t_exposure_bn bn ON e.id = bn.exposure_id JOIN t_share_return sr ON bn.exposure_id = sr.exposure_id GROUP BY f.source_id ) ,t_d2_frontier AS ( SELECT DISTINCT f1.source_id, sr2.return_subsessionid AS reached_sub FROM t_d1_frontier f1 JOIN t_exposure e1 ON f1.reached_sub = e1.subsessionid JOIN t_exposure_bn bn1 ON e1.id = bn1.exposure_id JOIN t_share_return sr2 ON bn1.exposure_id = sr2.exposure_id LEFT JOIN t_d1_frontier v1 ON f1.source_id = v1.source_id AND sr2.return_subsessionid = v1.reached_sub WHERE v1.source_id IS NULL ) ,t_d3 AS ( SELECT f.source_id AS exposure_id, SUM(bn.B) AS D_3 FROM t_d2_frontier f JOIN t_exposure e ON f.reached_sub = e.subsessionid JOIN t_exposure_bn bn ON e.id = bn.exposure_id GROUP BY f.source_id ) ,t_d3_mids AS ( SELECT f.source_id AS exposure_id ,COLLECT_SET(sr.return_mid) AS D_3_mids FROM t_d2_frontier f JOIN t_exposure e ON f.reached_sub = e.subsessionid JOIN t_exposure_bn bn ON e.id = bn.exposure_id JOIN t_share_return sr ON bn.exposure_id = sr.exposure_id GROUP BY f.source_id ) --======================================== -- 以下为原有 CTE 继续 --======================================== ,t_share_with_label AS ( SELECT a.dthh ,a.apptype -- join 条件 ,a.mid ,a.vid -- join 条件 ,a.sessionid ,a.subsessionid ,a.pagesource ,a.shareid -- join 条件 ,a.ts ,a.exposure_id ,COALESCE(b.return_1_pv,0) AS return_1_pv ,COALESCE(b.return_1_uv,0) AS return_1_uv ,b.return_1_mids AS return_1_mids -- 可能为null,再决策是否提前处理。 ,COALESCE(c.return_n_pv,0) AS return_n_pv ,COALESCE(c.return_n_uv,0) AS return_n_uv ,c.return_n_mids AS return_n_mids -- 可能为null,再决策是否提前处理。 ,COALESCE(c.new_exposure_cnt,0) AS new_exposure_cnt FROM t_share_exposure a LEFT JOIN ( SELECT shareid ,vid ,apptype ,COUNT(1) AS return_1_pv ,COUNT(DISTINCT mid) AS return_1_uv ,CONCAT_WS(',',COLLECT_SET(mid)) AS return_1_mids FROM t_return GROUP BY shareid ,vid ,apptype ) b ON a.shareid = b.shareid AND a.vid = b.vid AND a.apptype = b.apptype LEFT JOIN ( SELECT rootshareid ,vid ,apptype ,COUNT(1) AS return_n_pv ,COUNT(DISTINCT mid) AS return_n_uv ,CONCAT_WS(',',COLLECT_SET(mid)) AS return_n_mids ,SUM(new_exposure_cnt) AS new_exposure_cnt FROM t_return_exposure GROUP BY rootshareid ,vid ,apptype ) c ON a.shareid = c.rootshareid AND a.vid = c.vid AND a.apptype = c.apptype ) ,t_share_with_label_group AS ( SELECT exposure_id ,COUNT(1) AS share_cnt ,SUM(return_1_pv) AS return_1_pv ,COALESCE(SIZE(SPLIT(DEDUPLICATION4LIST(CONCAT_WS(',',COLLECT_LIST(return_1_mids))),",")),0) AS return_1_uv ,DEDUPLICATION4LIST(CONCAT_WS(',',COLLECT_LIST(return_1_mids))) AS return_1_mids -- 可能是null ,SUM(return_n_pv) AS return_n_pv ,COALESCE(SIZE(SPLIT(DEDUPLICATION4LIST(CONCAT_WS(',',COLLECT_LIST(return_n_mids))),",")),0) AS return_n_uv ,DEDUPLICATION4LIST(CONCAT_WS(',',COLLECT_LIST(return_n_mids))) AS return_n_mids -- 可能是null ,SUM(new_exposure_cnt) AS new_exposure_cnt FROM t_share_with_label GROUP BY exposure_id ) ,t_root_source_id_group_name AS ( SELECT * FROM ( SELECT root_source_id ,group_name ,ROW_NUMBER() OVER (PARTITION BY root_source_id ) AS rn FROM loghubods.changwen_rootsourceid_group_hour WHERE dt = MAX_PT('loghubods.changwen_rootsourceid_group_hour') ) WHERE rn = 1 ) ,t_exposure_share_return AS ( SELECT apptype ,uid ,mid ,vid ,sessionid ,subsessionid ,pagesource ,CASE WHEN pagesource REGEXP 'pages/user-videos-share-recommend$' THEN '回流后沉浸页&内页feed' WHEN pagesource REGEXP 'pages/detail-recommend$' THEN '详情后沉浸页' WHEN pagesource REGEXP 'pages/user-videos-share$' THEN '回流页' WHEN pagesource REGEXP 'pages/user-videos-detail$' THEN '详情页' WHEN pagesource REGEXP 'pages/category$' THEN '首页feed' ELSE '其他' END AS pagesource_new ,recommendlogvo -- 推荐算法的返回结果日志存在这个字段中 ,abcode -- 推荐算法的ab分组 ,recommendpagetype -- 三种回流头部;两种下滑-沉浸页下滑和feed下滑 ,recomtraceid ,headvideoid ,rootsourceid ,hotsencetype ,flowpool -- 14#68#3#1735262438476#2 ,level ,clientip ,machineinfo_brand ,machineinfo_model ,machineinfo_system ,machineinfo_wechatversion ,machineinfo_sdkversion ,province ,city ,ts ,IF(COALESCE(share_cnt,0) > 0,1,0) AS is_share ,COALESCE(share_cnt,0) AS share_cnt ,IF(COALESCE(return_1_uv,0) > 0,1,0) AS is_return_1 ,COALESCE(return_1_pv,0) AS return_1_pv ,COALESCE(return_1_uv,0) AS return_1_uv ,return_1_mids -- 可能是null ,IF(COALESCE(return_n_pv,0) > 0,1,0) AS is_return_n ,COALESCE(return_n_pv,0) AS return_n_pv ,COALESCE(return_n_uv,0) AS return_n_uv ,return_n_mids -- 可能是null ,IF(COALESCE(COALESCE(SIZE(ARRAY_REMOVE(SPLIT(return_1_mids,","),mid)),0),0) > 0,1,0) AS is_return_noself ,COALESCE(SIZE(ARRAY_REMOVE(SPLIT(return_1_mids,","),mid)),0) AS return_1_uv_noself ,ARRAY_JOIN(ARRAY_REMOVE(SPLIT(return_1_mids,","),mid),",") AS return_1_mids_noself ,IF(COALESCE(COALESCE(SIZE(ARRAY_REMOVE(SPLIT(return_n_mids,","),mid)),0),0) > 0,1,0) AS is_return_n_noself ,COALESCE(SIZE(ARRAY_REMOVE(SPLIT(return_n_mids,","),mid)),0) AS return_n_uv_noself ,ARRAY_JOIN(ARRAY_REMOVE(SPLIT(return_n_mids,","),mid),",") AS return_n_mids_noself ,COALESCE(new_exposure_cnt) AS new_exposure_cnt ,COALESCE(bn_hop.B, 0) AS b ,COALESCE(c1_hop.C_1, 0) AS c_1 ,COALESCE(c2_hop.C_2, 0) AS c_2 ,COALESCE(c3_hop.C_3, 0) AS c_3 ,COALESCE(d1_hop.D_1, 0) AS d_1 ,COALESCE(d2_hop.D_2, 0) AS d_2 ,COALESCE(d3_hop.D_3, 0) AS d_3 ,CONCAT_WS(',', bn_hop.B_mids) AS b_mids ,CONCAT_WS(',', c1m_hop.C_1_mids) AS c_1_mids ,CONCAT_WS(',', c2m_hop.C_2_mids) AS c_2_mids ,CONCAT_WS(',', c3m_hop.C_3_mids) AS c_3_mids ,CONCAT_WS(',', d1m_hop.D_1_mids) AS d_1_mids ,CONCAT_WS(',', d2m_hop.D_2_mids) AS d_2_mids ,CONCAT_WS(',', d3m_hop.D_3_mids) AS d_3_mids ,JSON_FORMAT( JSON_OBJECT("animationSceneType",animationSceneType,"extParams",extParams,"rootsessionid",rootsessionid_new,"versioncode",versioncode,"group_name",tc.group_name) ) AS extend ,SUBSTR(dthh,1,8) AS dt ,SUBSTR(dthh,9,2) AS hh FROM t_exposure ta LEFT JOIN t_share_with_label_group tb ON ta.id = tb.exposure_id LEFT JOIN t_root_source_id_group_name tc ON ta.rootsourceid = tc.root_source_id LEFT JOIN t_exposure_bn bn_hop ON ta.id = bn_hop.exposure_id LEFT JOIN t_c1 c1_hop ON ta.id = c1_hop.exposure_id LEFT JOIN t_c1_mids c1m_hop ON ta.id = c1m_hop.exposure_id LEFT JOIN t_c2 c2_hop ON ta.id = c2_hop.exposure_id LEFT JOIN t_c2_mids c2m_hop ON ta.id = c2m_hop.exposure_id LEFT JOIN t_c3 c3_hop ON ta.id = c3_hop.exposure_id LEFT JOIN t_c3_mids c3m_hop ON ta.id = c3m_hop.exposure_id LEFT JOIN t_d1 d1_hop ON ta.id = d1_hop.exposure_id LEFT JOIN t_d1_mids d1m_hop ON ta.id = d1m_hop.exposure_id LEFT JOIN t_d2 d2_hop ON ta.id = d2_hop.exposure_id LEFT JOIN t_d2_mids d2m_hop ON ta.id = d2m_hop.exposure_id LEFT JOIN t_d3 d3_hop ON ta.id = d3_hop.exposure_id LEFT JOIN t_d3_mids d3m_hop ON ta.id = d3m_hop.exposure_id )SELECT * FROM t_exposure_share_return ;