--@exclude_input=loghubods.video_action_log_flow_new --@exclude_input=loghubods.user_share_log_flow -- ===================================================================== -- 曝光回流基础表 (行级, 每行 = 一次曝光) -- 版本: 20260209 (基于 20260206 重构 B/C/D 链: +sharedepth 维度 + exp/pv/uv/mids) -- 版本历史: 20250108 → 20260206(+B/C/D) → 20260209(+sharedepth+exp/pv/uv/mids) -- ===================================================================== -- -- 数据源 (3 张流水表): -- video_action_log_flow_new → 曝光事件 (businesstype=videoView) -- user_share_log_flow → 分享事件 (topic=share) + 回流点击 (topic=click) -- changwen_rootsourceid_group_hour → rootsourceid 分组名映射 -- -- 数据流: -- 曝光去重 ──→ 分享关联曝光 ──→ 回流关联分享 ──→ B链(直达回流) -- │ │ │ -- │ └── 回流关联曝光 ─── 1度/n度回流 ├─→ C链(二次传播, BFS 3hop) -- │ └─→ D链(session内后续曝光传播, BFS 3hop) -- └── 最终 LEFT JOIN 组装 → 输出行级明细 -- -- CTE 管线: -- t_return 回流点击去重 (user_share_log_flow topic=click, ROW_NUMBER dedup) -- t_share_from_sharelog 分享行为去重 (user_share_log_flow topic=share) -- t_exposure_raw/t_exposure 曝光去重 (video_action_log_flow_new, 分 share/非share 两路 UNION ALL) -- t_return_exposure_1~4 回流关联曝光: 4 轮渐进放宽 JOIN 条件 -- 1) subsessionid + headvideoid -- 2) sessionid + headvideoid -- 3) subsessionid (不限vid) -- 4) sessionid (不限vid) -- t_normal_share_exposure_1~6 常规分享关联曝光: 6 轮渐进放宽 -- 1) subsessionid + pagesource + vid, ts>= -- 2) sessionid + pagesource + vid, ts>= -- 3) subsessionid + pagesource + vid (无ts) -- 4) sessionid + pagesource + vid (无ts) -- 5) subsessionid + vid -- 6) sessionid + vid -- t_no_normal_share_exposure_1~4 非常规(detail页)分享关联曝光: 4 轮 -- t_share_return bridge: 分享曝光 × 回流点击 (rootshareid + vid + apptype) -- B 链 t_exposure_bn (pv/uv/mids) + t_b_exp (exp), 按 sharedepth 拆分 -- C 链 BFS 3hop: frontier_N → t_c_hopN (pv/uv/mids) + t_c_hopN_exp (exp) -- D 链 BFS 3hop: t_d0(成本) → t_d_hopN + t_d_hopN_exp, frontier anti-join 去环 -- t_share_with_label/_group 分享标签聚合 (1度/n度 pv/uv/mids) -- t_exposure_share_return 最终 SELECT: LEFT JOIN 组装所有字段 -- -- 关键设计决策: -- 去环策略: 仅 session 级 anti-join (frontier_N LEFT JOIN 排除已访问 subsessionid) -- 不做用户级去环 (会丢失 A→B→A→C 中的 C) -- sharedepth: 来自 user_share_log_flow 的 click topic, CAST(sharedepth AS BIGINT) -- COLLECT_SET + CASE WHEN: 条件不满足时会加入 NULL, SIZE 需要 COALESCE 兜底 -- ===================================================================== -- drop table if exists loghubods.dwd_recsys_alg_exposure_base_20260209; CREATE TABLE IF NOT EXISTS loghubods.dwd_recsys_alg_exposure_base_20260209 ( apptype STRING ,uid STRING ,mid STRING ,vid STRING ,sessionid STRING ,subsessionid STRING ,pagesource STRING ,page STRING ,recommendlogvo STRING COMMENT '推荐算法的返回结果日志存在这个字段中' ,abcode STRING COMMENT '推荐算法的ab分组:ab0' ,recommendpagetype STRING COMMENT '用于区分pagesource相同时某些场景的。三种回流头部;两种下滑-沉浸页下滑和feed下滑。 -pages/user-videos-share-recommend-detail 是沉浸页。' ,recomtraceid STRING COMMENT '在后端调取推荐服务之前生成。前端降级会空;后端也可能为空。' ,headvideoid STRING ,rootsourceid STRING COMMENT '区分touliu等流量,咨询产品。' ,hotsencetype STRING ,flowpool STRING COMMENT '非流量池,是空字符串。没有null值。' ,level STRING COMMENT '非流量池,是null。' ,clientip STRING ,machineinfo_brand STRING ,machineinfo_model STRING ,machineinfo_system STRING ,machineinfo_wechatversion STRING ,machineinfo_sdkversion STRING ,province STRING ,city STRING ,ts STRING ,is_share STRING ,share_cnt STRING ,is_return_1 STRING ,return_1_pv STRING ,return_1_uv STRING ,is_return_n STRING ,return_n_pv STRING ,return_n_uv STRING ,is_return_noself STRING ,return_1_uv_noself STRING ,is_return_n_noself STRING ,return_n_uv_noself STRING ,new_exposure_cnt STRING -- ========== B 链 (4 depth × 3 metric = 12 字段) ========== ,bn_exp STRING COMMENT 'B链全量: 回流用户session曝光数' ,bn_pv STRING COMMENT 'B链全量: 回流点击次数' ,bn_uv STRING COMMENT 'B链全量: 回流去重人数' ,b1_exp STRING COMMENT 'B链depth=1: 回流用户session曝光数' ,b1_pv STRING COMMENT 'B链depth=1: 回流点击次数' ,b1_uv STRING COMMENT 'B链depth=1: 回流去重人数' ,b2_exp STRING COMMENT 'B链depth=2: 回流用户session曝光数' ,b2_pv STRING COMMENT 'B链depth=2: 回流点击次数' ,b2_uv STRING COMMENT 'B链depth=2: 回流去重人数' ,b3_exp STRING COMMENT 'B链depth=3: 回流用户session曝光数' ,b3_pv STRING COMMENT 'B链depth=3: 回流点击次数' ,b3_uv STRING COMMENT 'B链depth=3: 回流去重人数' -- ========== C 链 (4 depth × 3 hop × 3 metric = 36 字段) ========== ,cn_1_exp STRING COMMENT 'C链全量hop1: 回流用户session曝光数' ,cn_1_pv STRING COMMENT 'C链全量hop1: 回流点击次数' ,cn_1_uv STRING COMMENT 'C链全量hop1: 回流去重人数' ,c1_1_exp STRING COMMENT 'C链depth=1 hop1: 回流用户session曝光数' ,c1_1_pv STRING COMMENT 'C链depth=1 hop1: 回流点击次数' ,c1_1_uv STRING COMMENT 'C链depth=1 hop1: 回流去重人数' ,c2_1_exp STRING COMMENT 'C链depth=2 hop1: 回流用户session曝光数' ,c2_1_pv STRING COMMENT 'C链depth=2 hop1: 回流点击次数' ,c2_1_uv STRING COMMENT 'C链depth=2 hop1: 回流去重人数' ,c3_1_exp STRING COMMENT 'C链depth=3 hop1: 回流用户session曝光数' ,c3_1_pv STRING COMMENT 'C链depth=3 hop1: 回流点击次数' ,c3_1_uv STRING COMMENT 'C链depth=3 hop1: 回流去重人数' ,cn_2_exp STRING COMMENT 'C链全量hop2: 回流用户session曝光数' ,cn_2_pv STRING COMMENT 'C链全量hop2: 回流点击次数' ,cn_2_uv STRING COMMENT 'C链全量hop2: 回流去重人数' ,c1_2_exp STRING COMMENT 'C链depth=1 hop2: 回流用户session曝光数' ,c1_2_pv STRING COMMENT 'C链depth=1 hop2: 回流点击次数' ,c1_2_uv STRING COMMENT 'C链depth=1 hop2: 回流去重人数' ,c2_2_exp STRING COMMENT 'C链depth=2 hop2: 回流用户session曝光数' ,c2_2_pv STRING COMMENT 'C链depth=2 hop2: 回流点击次数' ,c2_2_uv STRING COMMENT 'C链depth=2 hop2: 回流去重人数' ,c3_2_exp STRING COMMENT 'C链depth=3 hop2: 回流用户session曝光数' ,c3_2_pv STRING COMMENT 'C链depth=3 hop2: 回流点击次数' ,c3_2_uv STRING COMMENT 'C链depth=3 hop2: 回流去重人数' ,cn_3_exp STRING COMMENT 'C链全量hop3: 回流用户session曝光数' ,cn_3_pv STRING COMMENT 'C链全量hop3: 回流点击次数' ,cn_3_uv STRING COMMENT 'C链全量hop3: 回流去重人数' ,c1_3_exp STRING COMMENT 'C链depth=1 hop3: 回流用户session曝光数' ,c1_3_pv STRING COMMENT 'C链depth=1 hop3: 回流点击次数' ,c1_3_uv STRING COMMENT 'C链depth=1 hop3: 回流去重人数' ,c2_3_exp STRING COMMENT 'C链depth=2 hop3: 回流用户session曝光数' ,c2_3_pv STRING COMMENT 'C链depth=2 hop3: 回流点击次数' ,c2_3_uv STRING COMMENT 'C链depth=2 hop3: 回流去重人数' ,c3_3_exp STRING COMMENT 'C链depth=3 hop3: 回流用户session曝光数' ,c3_3_pv STRING COMMENT 'C链depth=3 hop3: 回流点击次数' ,c3_3_uv STRING COMMENT 'C链depth=3 hop3: 回流去重人数' -- ========== D 链 (d0 + 4 depth × 3 hop × 3 metric = 37 字段) ========== ,d0 STRING COMMENT 'D链初始成本: session内后续曝光数' ,dn_1_exp STRING COMMENT 'D链全量hop1: 回流用户session曝光数' ,dn_1_pv STRING COMMENT 'D链全量hop1: 回流点击次数' ,dn_1_uv STRING COMMENT 'D链全量hop1: 回流去重人数' ,d1_1_exp STRING COMMENT 'D链depth=1 hop1: 回流用户session曝光数' ,d1_1_pv STRING COMMENT 'D链depth=1 hop1: 回流点击次数' ,d1_1_uv STRING COMMENT 'D链depth=1 hop1: 回流去重人数' ,d2_1_exp STRING COMMENT 'D链depth=2 hop1: 回流用户session曝光数' ,d2_1_pv STRING COMMENT 'D链depth=2 hop1: 回流点击次数' ,d2_1_uv STRING COMMENT 'D链depth=2 hop1: 回流去重人数' ,d3_1_exp STRING COMMENT 'D链depth=3 hop1: 回流用户session曝光数' ,d3_1_pv STRING COMMENT 'D链depth=3 hop1: 回流点击次数' ,d3_1_uv STRING COMMENT 'D链depth=3 hop1: 回流去重人数' ,dn_2_exp STRING COMMENT 'D链全量hop2: 回流用户session曝光数' ,dn_2_pv STRING COMMENT 'D链全量hop2: 回流点击次数' ,dn_2_uv STRING COMMENT 'D链全量hop2: 回流去重人数' ,d1_2_exp STRING COMMENT 'D链depth=1 hop2: 回流用户session曝光数' ,d1_2_pv STRING COMMENT 'D链depth=1 hop2: 回流点击次数' ,d1_2_uv STRING COMMENT 'D链depth=1 hop2: 回流去重人数' ,d2_2_exp STRING COMMENT 'D链depth=2 hop2: 回流用户session曝光数' ,d2_2_pv STRING COMMENT 'D链depth=2 hop2: 回流点击次数' ,d2_2_uv STRING COMMENT 'D链depth=2 hop2: 回流去重人数' ,d3_2_exp STRING COMMENT 'D链depth=3 hop2: 回流用户session曝光数' ,d3_2_pv STRING COMMENT 'D链depth=3 hop2: 回流点击次数' ,d3_2_uv STRING COMMENT 'D链depth=3 hop2: 回流去重人数' ,dn_3_exp STRING COMMENT 'D链全量hop3: 回流用户session曝光数' ,dn_3_pv STRING COMMENT 'D链全量hop3: 回流点击次数' ,dn_3_uv STRING COMMENT 'D链全量hop3: 回流去重人数' ,d1_3_exp STRING COMMENT 'D链depth=1 hop3: 回流用户session曝光数' ,d1_3_pv STRING COMMENT 'D链depth=1 hop3: 回流点击次数' ,d1_3_uv STRING COMMENT 'D链depth=1 hop3: 回流去重人数' ,d2_3_exp STRING COMMENT 'D链depth=2 hop3: 回流用户session曝光数' ,d2_3_pv STRING COMMENT 'D链depth=2 hop3: 回流点击次数' ,d2_3_uv STRING COMMENT 'D链depth=2 hop3: 回流去重人数' ,d3_3_exp STRING COMMENT 'D链depth=3 hop3: 回流用户session曝光数' ,d3_3_pv STRING COMMENT 'D链depth=3 hop3: 回流点击次数' ,d3_3_uv STRING COMMENT 'D链depth=3 hop3: 回流去重人数' ,extend STRING -- ========== mids 列表字段 (变长, 统一放末尾) ========== ,return_1_mids STRING ,return_n_mids STRING ,return_1_mids_noself STRING ,return_n_mids_noself STRING ,bn_mids STRING COMMENT 'B链全量: 回流mid列表' ,b1_mids STRING COMMENT 'B链depth=1: 回流mid列表' ,b2_mids STRING COMMENT 'B链depth=2: 回流mid列表' ,b3_mids STRING COMMENT 'B链depth=3: 回流mid列表' ,cn_1_mids STRING COMMENT 'C链全量hop1: 回流mid列表' ,c1_1_mids STRING COMMENT 'C链depth=1 hop1: 回流mid列表' ,c2_1_mids STRING COMMENT 'C链depth=2 hop1: 回流mid列表' ,c3_1_mids STRING COMMENT 'C链depth=3 hop1: 回流mid列表' ,cn_2_mids STRING COMMENT 'C链全量hop2: 回流mid列表' ,c1_2_mids STRING COMMENT 'C链depth=1 hop2: 回流mid列表' ,c2_2_mids STRING COMMENT 'C链depth=2 hop2: 回流mid列表' ,c3_2_mids STRING COMMENT 'C链depth=3 hop2: 回流mid列表' ,cn_3_mids STRING COMMENT 'C链全量hop3: 回流mid列表' ,c1_3_mids STRING COMMENT 'C链depth=1 hop3: 回流mid列表' ,c2_3_mids STRING COMMENT 'C链depth=2 hop3: 回流mid列表' ,c3_3_mids STRING COMMENT 'C链depth=3 hop3: 回流mid列表' ,dn_1_mids STRING COMMENT 'D链全量hop1: 回流mid列表' ,d1_1_mids STRING COMMENT 'D链depth=1 hop1: 回流mid列表' ,d2_1_mids STRING COMMENT 'D链depth=2 hop1: 回流mid列表' ,d3_1_mids STRING COMMENT 'D链depth=3 hop1: 回流mid列表' ,dn_2_mids STRING COMMENT 'D链全量hop2: 回流mid列表' ,d1_2_mids STRING COMMENT 'D链depth=1 hop2: 回流mid列表' ,d2_2_mids STRING COMMENT 'D链depth=2 hop2: 回流mid列表' ,d3_2_mids STRING COMMENT 'D链depth=3 hop2: 回流mid列表' ,dn_3_mids STRING COMMENT 'D链全量hop3: 回流mid列表' ,d1_3_mids STRING COMMENT 'D链depth=1 hop3: 回流mid列表' ,d2_3_mids STRING COMMENT 'D链depth=2 hop3: 回流mid列表' ,d3_3_mids STRING COMMENT 'D链depth=3 hop3: 回流mid列表' ) PARTITIONED BY ( dt STRING COMMENT '日期:20240105' ,hh STRING COMMENT '小时:04' ) STORED AS ALIORC TBLPROPERTIES ('comment' = '推荐算法-labelmatch表-20260209更新-含多跳B/C/D-sharedepth维度') LIFECYCLE 3650 ; SET hive.exec.dynamic.partition = true ; SET hive.exec.dynamic.partition.mode = nonstrict ; SET odps.stage.mapper.split.size = 1024 ; INSERT OVERWRITE TABLE loghubods.dwd_recsys_alg_exposure_base_20260209 PARTITION (dt,hh) WITH t_return AS ( SELECT * ,CONCAT(dthh,":",shareid,":",vid,":",dthh_id) AS id FROM ( SELECT CONCAT(year,month,day,hour) AS dthh ,apptype ,machinecode AS mid ,clickobjectid AS vid ,sessionid ,subsessionid -- 注意这是回流对应的subsessionid,每次回流点击会重置,可以通过这个字段找到回流的曝光。 ,shareid ,rootshareid ,CAST(clienttimestamp / 1000 AS BIGINT) AS ts ,CAST(sharedepth AS BIGINT) AS sharedepth ,ROW_NUMBER() OVER (PARTITION BY CONCAT(year,month,day,hour),apptype,machinecode,clickobjectid,sessionid,subsessionid,shareid,rootshareid ORDER BY clienttimestamp DESC ) AS rn ,ROW_NUMBER() OVER (PARTITION BY CONCAT(year,month,day,hour),shareid,clickobjectid ORDER BY clienttimestamp ) AS dthh_id FROM loghubods.user_share_log_flow -- 回流行为,理应subsessionid只有一条,但有脏数据,去重。 WHERE CONCAT(year,month,day,hour) BETWEEN TO_CHAR(FROM_UNIXTIME(UNIX_TIMESTAMP(TO_DATE('${dt}${hh}','YYYYMMDDHH')) - 3600 * 25),'YYYYMMDDHH') AND TO_CHAR(FROM_UNIXTIME(UNIX_TIMESTAMP(TO_DATE('${dt}${hh}','YYYYMMDDHH')) - 3600 * 1),'YYYYMMDDHH') --WHERE CONCAT(year,month,day,hour) = TO_CHAR(FROM_UNIXTIME(UNIX_TIMESTAMP(TO_DATE('${dt}${hh}','YYYYMMDDHH')) - 3600 * 25),'YYYYMMDDHH') AND __topic__ = 'click' AND apptype IS NOT NULL AND apptype NOT IN ('12') -- 12的pagesoucre是h5-share和h5-detail 暂时过滤掉 不做处理 AND machinecode IS NOT NULL AND clickobjectid IS NOT NULL AND pagesource REGEXP "-pages/user-videos-share$" -- 存在脏数据 vlog-gzh /mine/mine-info$ 结尾的,都过滤掉。 ) WHERE rn = 1 ) ,t_share_from_sharelog AS ( SELECT * FROM ( SELECT CONCAT(year,month,day,hour) AS dthh ,apptype ,machinecode AS mid ,shareobjectid AS vid ,sessionid ,subsessionid ,pagesource ,shareid ,CAST(clienttimestamp / 1000 AS BIGINT) AS ts ,ROW_NUMBER() OVER (PARTITION BY CONCAT(year,month,day,hour),apptype,machinecode,shareobjectid,sessionid,subsessionid,pagesource,shareid ORDER BY clienttimestamp DESC ) AS rn FROM loghubods.user_share_log_flow WHERE CONCAT(year,month,day,hour) BETWEEN TO_CHAR(FROM_UNIXTIME(UNIX_TIMESTAMP(TO_DATE('${dt}${hh}','YYYYMMDDHH')) - 3600 * 25),'YYYYMMDDHH') AND TO_CHAR(FROM_UNIXTIME(UNIX_TIMESTAMP(TO_DATE('${dt}${hh}','YYYYMMDDHH')) - 3600 * 1),'YYYYMMDDHH') --WHERE CONCAT(year,month,day,hour) = TO_CHAR(FROM_UNIXTIME(UNIX_TIMESTAMP(TO_DATE('${dt}${hh}','YYYYMMDDHH')) - 3600 * 25),'YYYYMMDDHH') AND __topic__ = 'share' AND apptype IS NOT NULL AND apptype NOT IN ('12') AND machinecode IS NOT NULL AND shareobjectid IS NOT NULL ) WHERE rn = 1 ) ,t_exposure_raw AS ( SELECT ROW_NUMBER() OVER (PARTITION BY CONCAT(year,month,day,hour),subsessionid ORDER BY clienttimestamp DESC ) AS dthh_id ,CONCAT(year,month,day,hour) AS dthh ,apptype ,uid ,mid ,videoid AS vid ,sessionid ,subsessionid ,rootsessionid_new ,pagesource ,recommendlogvo ,COALESCE(GET_JSON_OBJECT(extparams,'$.eventInfos.ab_test003'),"unknown") AS abcode ,GET_JSON_OBJECT(extparams,'$.recommendPageType') AS recommendpagetype ,GET_JSON_OBJECT(extparams,'$.recomTraceId') AS recomtraceid ,CASE WHEN GET_JSON_OBJECT(extParams,'$.head_videoid') IS NOT NULL THEN GET_JSON_OBJECT(extParams,'$.head_videoid') ELSE GET_JSON_OBJECT(extParams,'$.head_videoId') END AS headvideoid ,GET_JSON_OBJECT(extParams,'$.rootSourceId') AS rootsourceid ,COALESCE(hotsencetype,sencetype,"other") AS hotsencetype ,GET_JSON_OBJECT(extParams,'$.animationSceneType') AS animationscenetype ,JSON_PARSE(IF(JSON_VALID(extparams),extparams,"{}")) AS extParams ,flowpool ,SPLIT(flowpool,'#')[2] AS level ,clientip ,machineinfo_brand ,machineinfo_model ,machineinfo_system ,machineinfo_wechatversion ,machineinfo_sdkversion ,ANALYSISIP(clientip,"region") AS province ,ANALYSISIP(clientip,"city") AS city ,versioncode ,CAST(logtimestamp / 1000 AS BIGINT) AS ts ,ROW_NUMBER() OVER (PARTITION BY CONCAT(year,month,day,hour),apptype,uid,mid,videoid,sessionid,subsessionid,pagesource ORDER BY logtimestamp ) AS rn FROM loghubods.video_action_log_flow_new WHERE CONCAT(year,month,day,hour) BETWEEN TO_CHAR(FROM_UNIXTIME(UNIX_TIMESTAMP(TO_DATE('${dt}${hh}','YYYYMMDDHH')) - 3600 * 25),'YYYYMMDDHH') AND TO_CHAR(FROM_UNIXTIME(UNIX_TIMESTAMP(TO_DATE('${dt}${hh}','YYYYMMDDHH')) - 3600 * 1),'YYYYMMDDHH') AND businesstype IN ('videoView') AND apptype IS NOT NULL AND apptype NOT IN ('12') AND mid IS NOT NULL AND videoid IS NOT NULL ) ,t_exposure AS ( SELECT dthh_id ,dthh ,apptype ,uid ,mid ,vid ,sessionid ,subsessionid ,rootsessionid_new ,pagesource ,recommendlogvo ,abcode ,recommendpagetype ,recomtraceid ,headvideoid ,rootsourceid ,hotsencetype ,animationscenetype ,extParams ,flowpool ,level ,clientip ,machineinfo_brand ,machineinfo_model ,machineinfo_system ,machineinfo_wechatversion ,machineinfo_sdkversion ,province ,city ,versioncode ,ts ,rn ,CONCAT(dthh,":",subsessionid,":",dthh_id) AS id FROM t_exposure_raw WHERE pagesource NOT REGEXP "-pages/user-videos-share$" AND rn = 1 UNION ALL SELECT dthh_id ,dthh ,apptype ,uid ,mid ,vid ,sessionid ,subsessionid ,rootsessionid_new ,pagesource ,recommendlogvo ,abcode ,recommendpagetype ,recomtraceid ,headvideoid ,rootsourceid ,hotsencetype ,animationscenetype ,extParams ,flowpool ,level ,clientip ,machineinfo_brand ,machineinfo_model ,machineinfo_system ,machineinfo_wechatversion ,machineinfo_sdkversion ,province ,city ,versioncode ,ts ,rn ,CONCAT(dthh,":",subsessionid,":",dthh_id) AS id FROM t_exposure_raw WHERE pagesource REGEXP "-pages/user-videos-share$" ) ,t_exposure_recommend AS ( SELECT * FROM t_exposure WHERE pagesource REGEXP 'category$|recommend$|-pages/user-videos-detail$' ) ,t_return_exposure_1 AS -- 曝光关联回流,用于计算viewh24 ( SELECT * FROM ( SELECT t1.id AS exposure_id ,t1.mid AS mid ,t1.vid AS vid ,t1.subsessionid AS subsessionid ,t1.sessionid AS sessionid ,t1.headvideoid AS headvideoid ,t1.dthh ,t2.id AS return_id ,ROW_NUMBER() OVER (PARTITION BY t1.id ORDER BY t2.ts DESC ) AS rn FROM t_exposure_recommend t1 LEFT JOIN t_return t2 ON t1.mid = t2.mid AND t1.headvideoid = t2.vid AND t1.subsessionid = t2.subsessionid ) WHERE rn = 1 ) ,t_return_exposure_2 AS -- 曝光关联回流,用于计算viewh24 ( SELECT * FROM ( SELECT t1.exposure_id AS exposure_id ,t1.mid AS mid ,t1.vid AS vid ,t1.subsessionid AS subsessionid ,t1.sessionid AS sessionid ,t1.headvideoid AS headvideoid ,t1.dthh ,t2.id AS return_id ,ROW_NUMBER() OVER (PARTITION BY t1.exposure_id ORDER BY t2.ts DESC ) AS rn FROM ( SELECT * FROM t_return_exposure_1 WHERE return_id IS NULL ) t1 LEFT JOIN t_return t2 ON t1.mid = t2.mid AND t1.headvideoid = t2.vid AND t1.sessionid = t2.sessionid ) WHERE rn = 1 ) ,t_return_exposure_3 AS -- 曝光关联回流,用于计算viewh24 ( SELECT * FROM ( SELECT t1.exposure_id AS exposure_id ,t1.mid AS mid ,t1.vid AS vid ,t1.subsessionid AS subsessionid ,t1.sessionid AS sessionid ,t1.headvideoid AS headvideoid ,t1.dthh ,t2.id AS return_id ,ROW_NUMBER() OVER (PARTITION BY t1.exposure_id ORDER BY t2.ts DESC ) AS rn FROM ( SELECT * FROM t_return_exposure_2 WHERE return_id IS NULL ) t1 LEFT JOIN t_return t2 ON t1.mid = t2.mid AND t1.subsessionid = t2.subsessionid ) WHERE rn = 1 ) ,t_return_exposure_4 AS -- 曝光关联回流,用于计算viewh24 ( SELECT * FROM ( SELECT t1.exposure_id AS exposure_id ,t1.mid AS mid ,t1.vid AS vid ,t1.subsessionid AS subsessionid ,t1.sessionid AS sessionid ,t1.headvideoid AS headvideoid ,t1.dthh ,t2.id AS return_id ,ROW_NUMBER() OVER (PARTITION BY t1.exposure_id ORDER BY t2.ts DESC ) AS rn FROM ( SELECT * FROM t_return_exposure_3 WHERE return_id IS NULL ) t1 LEFT JOIN t_return t2 ON t1.mid = t2.mid AND t1.sessionid = t2.sessionid ) WHERE rn = 1 ) ,t_return_exposure AS ( SELECT a.* ,b.exposure_cnt AS new_exposure_cnt FROM t_return a LEFT JOIN ( SELECT return_id ,COUNT(1) AS exposure_cnt FROM ( SELECT * FROM t_return_exposure_1 WHERE return_id IS NOT NULL UNION ALL SELECT * FROM t_return_exposure_2 WHERE return_id IS NOT NULL UNION ALL SELECT * FROM t_return_exposure_3 WHERE return_id IS NOT NULL UNION ALL SELECT * FROM t_return_exposure_4 WHERE return_id IS NOT NULL ) GROUP BY return_id ) b ON a.id = b.return_id ) ,t_normal_share_exposure_1 AS -- 开始处理常规的分享与曝光关联 ( SELECT * FROM ( SELECT t1.dthh ,t1.apptype ,t1.mid ,t1.vid ,t1.sessionid ,t1.subsessionid ,t1.pagesource ,t1.shareid ,t1.ts ,t2.id AS exposure_id ,t2.ts AS exposure_ts ,ROW_NUMBER() OVER (PARTITION BY t1.dthh,t1.apptype,t1.mid,t1.vid,t1.sessionid,t1.subsessionid,t1.pagesource,t1.shareid ORDER BY t2.ts DESC ) AS rn FROM t_share_from_sharelog t1 LEFT JOIN t_exposure t2 ON t1.apptype = t2.apptype AND t1.mid = t2.mid AND t1.vid = t2.vid AND t1.subsessionid = t2.subsessionid AND t1.pagesource = t2.pagesource AND t1.ts >= t2.ts WHERE t1.pagesource NOT REGEXP "pages/detail-user-videos-share-recommend$" ) WHERE rn = 1 ) ,t_normal_share_exposure_2 AS ( SELECT * FROM ( SELECT t1.dthh ,t1.apptype ,t1.mid ,t1.vid ,t1.sessionid ,t1.subsessionid ,t1.pagesource ,t1.shareid ,t1.ts ,t2.id AS exposure_id ,t2.ts AS exposure_ts ,ROW_NUMBER() OVER (PARTITION BY t1.dthh,t1.apptype,t1.mid,t1.vid,t1.sessionid,t1.subsessionid,t1.pagesource,t1.shareid ORDER BY t2.ts DESC ) AS rn FROM ( SELECT * FROM t_normal_share_exposure_1 WHERE exposure_id IS NULL ) t1 LEFT JOIN t_exposure t2 ON t1.apptype = t2.apptype AND t1.mid = t2.mid AND t1.vid = t2.vid AND t1.sessionid = t2.sessionid AND t1.pagesource = t2.pagesource AND t1.ts >= t2.ts ) WHERE rn = 1 ) ,t_normal_share_exposure_3 AS ( SELECT * FROM ( SELECT t1.dthh ,t1.apptype ,t1.mid ,t1.vid ,t1.sessionid ,t1.subsessionid ,t1.pagesource ,t1.shareid ,t1.ts ,t2.id AS exposure_id ,t2.ts AS exposure_ts ,ROW_NUMBER() OVER (PARTITION BY t1.dthh,t1.apptype,t1.mid,t1.vid,t1.sessionid,t1.subsessionid,t1.pagesource,t1.shareid ORDER BY t2.ts DESC ) AS rn FROM ( SELECT * FROM t_normal_share_exposure_2 WHERE exposure_id IS NULL ) t1 LEFT JOIN t_exposure t2 ON t1.apptype = t2.apptype AND t1.mid = t2.mid AND t1.vid = t2.vid AND t1.subsessionid = t2.subsessionid AND t1.pagesource = t2.pagesource AND t1.ts >= t2.ts ) WHERE rn = 1 ) ,t_normal_share_exposure_4 AS ( SELECT * FROM ( SELECT t1.dthh ,t1.apptype ,t1.mid ,t1.vid ,t1.sessionid ,t1.subsessionid ,t1.pagesource ,t1.shareid ,t1.ts ,t2.id AS exposure_id ,t2.ts AS exposure_ts ,ROW_NUMBER() OVER (PARTITION BY t1.dthh,t1.apptype,t1.mid,t1.vid,t1.sessionid,t1.subsessionid,t1.pagesource,t1.shareid ORDER BY t2.ts DESC ) AS rn FROM ( SELECT * FROM t_normal_share_exposure_3 WHERE exposure_id IS NULL ) t1 LEFT JOIN t_exposure t2 ON t1.apptype = t2.apptype AND t1.mid = t2.mid AND t1.vid = t2.vid AND t1.sessionid = t2.sessionid AND t1.pagesource = t2.pagesource AND t1.ts >= t2.ts ) WHERE rn = 1 ) ,t_normal_share_exposure_5 AS ( SELECT * FROM ( SELECT t1.dthh ,t1.apptype ,t1.mid ,t1.vid ,t1.sessionid ,t1.subsessionid ,t1.pagesource ,t1.shareid ,t1.ts ,t2.id AS exposure_id ,t2.ts AS exposure_ts ,ROW_NUMBER() OVER (PARTITION BY t1.dthh,t1.apptype,t1.mid,t1.vid,t1.sessionid,t1.subsessionid,t1.pagesource,t1.shareid ORDER BY t2.ts DESC ) AS rn FROM ( SELECT * FROM t_normal_share_exposure_4 WHERE exposure_id IS NULL ) t1 LEFT JOIN t_exposure t2 ON t1.apptype = t2.apptype AND t1.mid = t2.mid AND t1.vid = t2.vid AND t1.subsessionid = t2.subsessionid ) WHERE rn = 1 ) ,t_normal_share_exposure_6 AS ( SELECT * FROM ( SELECT t1.dthh ,t1.apptype ,t1.mid ,t1.vid ,t1.sessionid ,t1.subsessionid ,t1.pagesource ,t1.shareid ,t1.ts ,t2.id AS exposure_id ,t2.ts AS exposure_ts ,ROW_NUMBER() OVER (PARTITION BY t1.dthh,t1.apptype,t1.mid,t1.vid,t1.sessionid,t1.subsessionid,t1.pagesource,t1.shareid ORDER BY t2.ts DESC ) AS rn FROM ( SELECT * FROM t_normal_share_exposure_5 WHERE exposure_id IS NULL ) t1 LEFT JOIN t_exposure t2 ON t1.apptype = t2.apptype AND t1.mid = t2.mid AND t1.vid = t2.vid AND t1.sessionid = t2.sessionid ) WHERE rn = 1 ) ,t_exposure_detail AS ( SELECT * FROM t_exposure WHERE pagesource REGEXP "-pages/user-videos-detail$|pages/detail-recommend$" ) ,t_no_normal_share_exposure_1 AS -- 开始处理非常规的分享与曝光关联 ( SELECT * FROM ( SELECT t1.dthh ,t1.apptype ,t1.mid ,t1.vid ,t1.sessionid ,t1.subsessionid ,t1.pagesource ,t1.shareid ,t1.ts ,t2.id AS exposure_id ,t2.ts AS exposure_ts ,ROW_NUMBER() OVER (PARTITION BY t1.dthh,t1.apptype,t1.mid,t1.vid,t1.sessionid,t1.subsessionid,t1.pagesource,t1.shareid ORDER BY t2.ts DESC ) AS rn FROM t_share_from_sharelog t1 LEFT JOIN t_exposure_detail t2 ON t1.apptype = t2.apptype AND t1.mid = t2.mid AND t1.vid = t2.vid AND t1.subsessionid = t2.subsessionid AND t1.ts >= t2.ts WHERE t1.pagesource REGEXP "pages/detail-user-videos-share-recommend$" ) WHERE rn = 1 ) ,t_no_normal_share_exposure_2 AS ( SELECT * FROM ( SELECT t1.dthh ,t1.apptype ,t1.mid ,t1.vid ,t1.sessionid ,t1.subsessionid ,t1.pagesource ,t1.shareid ,t1.ts ,t2.id AS exposure_id ,t2.ts AS exposure_ts ,ROW_NUMBER() OVER (PARTITION BY t1.dthh,t1.apptype,t1.mid,t1.vid,t1.sessionid,t1.subsessionid,t1.pagesource,t1.shareid ORDER BY t2.ts DESC ) AS rn FROM ( SELECT * FROM t_no_normal_share_exposure_1 WHERE exposure_id IS NULL ) t1 LEFT JOIN t_exposure_detail t2 ON t1.apptype = t2.apptype AND t1.mid = t2.mid AND t1.vid = t2.vid AND t1.sessionid = t2.sessionid AND t1.ts >= t2.ts ) WHERE rn = 1 ) ,t_no_normal_share_exposure_3 AS ( SELECT * FROM ( SELECT t1.dthh ,t1.apptype ,t1.mid ,t1.vid ,t1.sessionid ,t1.subsessionid ,t1.pagesource ,t1.shareid ,t1.ts ,t2.id AS exposure_id ,t2.ts AS exposure_ts ,ROW_NUMBER() OVER (PARTITION BY t1.dthh,t1.apptype,t1.mid,t1.vid,t1.sessionid,t1.subsessionid,t1.pagesource,t1.shareid ORDER BY t2.ts DESC ) AS rn FROM ( SELECT * FROM t_no_normal_share_exposure_2 WHERE exposure_id IS NULL ) t1 LEFT JOIN t_exposure_detail t2 ON t1.apptype = t2.apptype AND t1.mid = t2.mid AND t1.vid = t2.vid AND t1.subsessionid = t2.subsessionid ) WHERE rn = 1 ) ,t_no_normal_share_exposure_4 AS ( SELECT * FROM ( SELECT t1.dthh ,t1.apptype ,t1.mid ,t1.vid ,t1.sessionid ,t1.subsessionid ,t1.pagesource ,t1.shareid ,t1.ts ,t2.id AS exposure_id ,t2.ts AS exposure_ts ,ROW_NUMBER() OVER (PARTITION BY t1.dthh,t1.apptype,t1.mid,t1.vid,t1.sessionid,t1.subsessionid,t1.pagesource,t1.shareid ORDER BY t2.ts DESC ) AS rn FROM ( SELECT * FROM t_no_normal_share_exposure_3 WHERE exposure_id IS NULL ) t1 LEFT JOIN t_exposure_detail t2 ON t1.apptype = t2.apptype AND t1.mid = t2.mid AND t1.vid = t2.vid AND t1.sessionid = t2.sessionid ) WHERE rn = 1 ) ,t_share_exposure AS ( SELECT * FROM t_normal_share_exposure_1 WHERE exposure_id IS NOT NULL UNION ALL SELECT * FROM t_normal_share_exposure_2 WHERE exposure_id IS NOT NULL UNION ALL SELECT * FROM t_normal_share_exposure_3 WHERE exposure_id IS NOT NULL UNION ALL SELECT * FROM t_normal_share_exposure_4 WHERE exposure_id IS NOT NULL UNION ALL SELECT * FROM t_normal_share_exposure_5 WHERE exposure_id IS NOT NULL UNION ALL SELECT * FROM t_normal_share_exposure_6 UNION ALL SELECT * FROM t_no_normal_share_exposure_1 WHERE exposure_id IS NOT NULL UNION ALL SELECT * FROM t_no_normal_share_exposure_2 WHERE exposure_id IS NOT NULL UNION ALL SELECT * FROM t_no_normal_share_exposure_3 WHERE exposure_id IS NOT NULL UNION ALL SELECT * FROM t_no_normal_share_exposure_4 ) --======================================== -- 多跳 B/C/D 计算 (BFS frontier + anti-join 去环, 仅 24h) --======================================== ,t_share_return AS ( SELECT se.exposure_id ,se.shareid ,se.vid ,se.apptype ,se.subsessionid ,r.subsessionid AS return_subsessionid ,r.mid AS return_mid ,r.sharedepth FROM t_share_exposure se JOIN t_return r ON se.shareid = r.rootshareid AND se.vid = r.vid AND se.apptype = r.apptype ) --======================================== -- B 链: pv/uv/mids (按 sharedepth 拆分) --======================================== ,t_exposure_bn AS ( SELECT exposure_id ,COUNT(return_mid) AS bn_pv ,COLLECT_SET(return_mid) AS bn_mids ,COUNT(CASE WHEN sharedepth = 1 THEN return_mid END) AS b1_pv ,COLLECT_SET(CASE WHEN sharedepth = 1 THEN return_mid END) AS b1_mids ,COUNT(CASE WHEN sharedepth = 2 THEN return_mid END) AS b2_pv ,COLLECT_SET(CASE WHEN sharedepth = 2 THEN return_mid END) AS b2_mids ,COUNT(CASE WHEN sharedepth = 3 THEN return_mid END) AS b3_pv ,COLLECT_SET(CASE WHEN sharedepth = 3 THEN return_mid END) AS b3_mids FROM t_share_return GROUP BY exposure_id ) --======================================== -- B 链: exp (回流用户 session 内曝光数, 按 sharedepth 拆分) --======================================== ,t_b_exp AS ( SELECT sr.exposure_id ,COUNT(e.id) AS bn_exp ,COUNT(CASE WHEN sr.sharedepth = 1 THEN e.id END) AS b1_exp ,COUNT(CASE WHEN sr.sharedepth = 2 THEN e.id END) AS b2_exp ,COUNT(CASE WHEN sr.sharedepth = 3 THEN e.id END) AS b3_exp FROM (SELECT DISTINCT exposure_id, return_subsessionid, sharedepth FROM t_share_return) sr JOIN t_exposure e ON sr.return_subsessionid = e.subsessionid GROUP BY sr.exposure_id ) -- BFS frontier 1: 直达回流的 subsessionid 集合 ,t_frontier_1 AS ( SELECT DISTINCT exposure_id AS source_id, return_subsessionid AS reached_sub FROM t_share_return ) --======================================== -- C 链 hop1: pv/uv/mids (按 sharedepth 拆分) --======================================== ,t_c_hop1 AS ( SELECT f.source_id AS exposure_id ,COUNT(sr.return_mid) AS cn_1_pv ,COLLECT_SET(sr.return_mid) AS cn_1_mids ,COUNT(CASE WHEN sr.sharedepth = 1 THEN sr.return_mid END) AS c1_1_pv ,COLLECT_SET(CASE WHEN sr.sharedepth = 1 THEN sr.return_mid END) AS c1_1_mids ,COUNT(CASE WHEN sr.sharedepth = 2 THEN sr.return_mid END) AS c2_1_pv ,COLLECT_SET(CASE WHEN sr.sharedepth = 2 THEN sr.return_mid END) AS c2_1_mids ,COUNT(CASE WHEN sr.sharedepth = 3 THEN sr.return_mid END) AS c3_1_pv ,COLLECT_SET(CASE WHEN sr.sharedepth = 3 THEN sr.return_mid END) AS c3_1_mids FROM t_frontier_1 f JOIN t_exposure e ON f.reached_sub = e.subsessionid JOIN t_share_return sr ON e.id = sr.exposure_id GROUP BY f.source_id ) --======================================== -- C 链 hop1: exp (回流用户 session 内曝光数) --======================================== ,t_c_hop1_exp AS ( SELECT f.source_id AS exposure_id ,COUNT(e_ret.id) AS cn_1_exp ,COUNT(CASE WHEN sr.sharedepth = 1 THEN e_ret.id END) AS c1_1_exp ,COUNT(CASE WHEN sr.sharedepth = 2 THEN e_ret.id END) AS c2_1_exp ,COUNT(CASE WHEN sr.sharedepth = 3 THEN e_ret.id END) AS c3_1_exp FROM t_frontier_1 f JOIN t_exposure e ON f.reached_sub = e.subsessionid JOIN (SELECT DISTINCT exposure_id, return_subsessionid, sharedepth FROM t_share_return) sr ON e.id = sr.exposure_id JOIN t_exposure e_ret ON sr.return_subsessionid = e_ret.subsessionid GROUP BY f.source_id ) -- BFS frontier 2: frontier_1 延伸, anti-join 排除 frontier_1 ,t_frontier_2 AS ( SELECT DISTINCT f1.source_id, sr2.return_subsessionid AS reached_sub FROM t_frontier_1 f1 JOIN t_exposure e1 ON f1.reached_sub = e1.subsessionid JOIN t_share_return sr2 ON e1.id = sr2.exposure_id LEFT JOIN t_frontier_1 v1 ON f1.source_id = v1.source_id AND sr2.return_subsessionid = v1.reached_sub WHERE v1.source_id IS NULL ) --======================================== -- C 链 hop2: pv/uv/mids (按 sharedepth 拆分) --======================================== ,t_c_hop2 AS ( SELECT f.source_id AS exposure_id ,COUNT(sr.return_mid) AS cn_2_pv ,COLLECT_SET(sr.return_mid) AS cn_2_mids ,COUNT(CASE WHEN sr.sharedepth = 1 THEN sr.return_mid END) AS c1_2_pv ,COLLECT_SET(CASE WHEN sr.sharedepth = 1 THEN sr.return_mid END) AS c1_2_mids ,COUNT(CASE WHEN sr.sharedepth = 2 THEN sr.return_mid END) AS c2_2_pv ,COLLECT_SET(CASE WHEN sr.sharedepth = 2 THEN sr.return_mid END) AS c2_2_mids ,COUNT(CASE WHEN sr.sharedepth = 3 THEN sr.return_mid END) AS c3_2_pv ,COLLECT_SET(CASE WHEN sr.sharedepth = 3 THEN sr.return_mid END) AS c3_2_mids FROM t_frontier_2 f JOIN t_exposure e ON f.reached_sub = e.subsessionid JOIN t_share_return sr ON e.id = sr.exposure_id GROUP BY f.source_id ) --======================================== -- C 链 hop2: exp --======================================== ,t_c_hop2_exp AS ( SELECT f.source_id AS exposure_id ,COUNT(e_ret.id) AS cn_2_exp ,COUNT(CASE WHEN sr.sharedepth = 1 THEN e_ret.id END) AS c1_2_exp ,COUNT(CASE WHEN sr.sharedepth = 2 THEN e_ret.id END) AS c2_2_exp ,COUNT(CASE WHEN sr.sharedepth = 3 THEN e_ret.id END) AS c3_2_exp FROM t_frontier_2 f JOIN t_exposure e ON f.reached_sub = e.subsessionid JOIN (SELECT DISTINCT exposure_id, return_subsessionid, sharedepth FROM t_share_return) sr ON e.id = sr.exposure_id JOIN t_exposure e_ret ON sr.return_subsessionid = e_ret.subsessionid GROUP BY f.source_id ) -- BFS frontier 3: frontier_2 延伸, anti-join 排除 frontier_1 + frontier_2 ,t_frontier_3 AS ( SELECT DISTINCT f2.source_id, sr3.return_subsessionid AS reached_sub FROM t_frontier_2 f2 JOIN t_exposure e2 ON f2.reached_sub = e2.subsessionid JOIN t_share_return sr3 ON e2.id = sr3.exposure_id LEFT JOIN t_frontier_1 v1 ON f2.source_id = v1.source_id AND sr3.return_subsessionid = v1.reached_sub LEFT JOIN t_frontier_2 v2 ON f2.source_id = v2.source_id AND sr3.return_subsessionid = v2.reached_sub WHERE v1.source_id IS NULL AND v2.source_id IS NULL ) --======================================== -- C 链 hop3: pv/uv/mids (按 sharedepth 拆分) --======================================== ,t_c_hop3 AS ( SELECT f.source_id AS exposure_id ,COUNT(sr.return_mid) AS cn_3_pv ,COLLECT_SET(sr.return_mid) AS cn_3_mids ,COUNT(CASE WHEN sr.sharedepth = 1 THEN sr.return_mid END) AS c1_3_pv ,COLLECT_SET(CASE WHEN sr.sharedepth = 1 THEN sr.return_mid END) AS c1_3_mids ,COUNT(CASE WHEN sr.sharedepth = 2 THEN sr.return_mid END) AS c2_3_pv ,COLLECT_SET(CASE WHEN sr.sharedepth = 2 THEN sr.return_mid END) AS c2_3_mids ,COUNT(CASE WHEN sr.sharedepth = 3 THEN sr.return_mid END) AS c3_3_pv ,COLLECT_SET(CASE WHEN sr.sharedepth = 3 THEN sr.return_mid END) AS c3_3_mids FROM t_frontier_3 f JOIN t_exposure e ON f.reached_sub = e.subsessionid JOIN t_share_return sr ON e.id = sr.exposure_id GROUP BY f.source_id ) --======================================== -- C 链 hop3: exp --======================================== ,t_c_hop3_exp AS ( SELECT f.source_id AS exposure_id ,COUNT(e_ret.id) AS cn_3_exp ,COUNT(CASE WHEN sr.sharedepth = 1 THEN e_ret.id END) AS c1_3_exp ,COUNT(CASE WHEN sr.sharedepth = 2 THEN e_ret.id END) AS c2_3_exp ,COUNT(CASE WHEN sr.sharedepth = 3 THEN e_ret.id END) AS c3_3_exp FROM t_frontier_3 f JOIN t_exposure e ON f.reached_sub = e.subsessionid JOIN (SELECT DISTINCT exposure_id, return_subsessionid, sharedepth FROM t_share_return) sr ON e.id = sr.exposure_id JOIN t_exposure e_ret ON sr.return_subsessionid = e_ret.subsessionid GROUP BY f.source_id ) --======================================== -- D 链: session 内后续曝光传播 (BFS 去环) --======================================== -- D0: session 内后续曝光数 (D 链初始成本) ,t_d0 AS ( SELECT e1.id AS exposure_id ,COUNT(e2.id) AS d0 FROM t_exposure e1 JOIN t_exposure e2 ON e1.subsessionid = e2.subsessionid AND CAST(e2.ts AS BIGINT) > CAST(e1.ts AS BIGINT) GROUP BY e1.id ) -- D 链 hop1: pv/uv/mids (session 内时序 JOIN) ,t_d_hop1 AS ( SELECT e1.id AS exposure_id ,COUNT(sr.return_mid) AS dn_1_pv ,COLLECT_SET(sr.return_mid) AS dn_1_mids ,COUNT(CASE WHEN sr.sharedepth = 1 THEN sr.return_mid END) AS d1_1_pv ,COLLECT_SET(CASE WHEN sr.sharedepth = 1 THEN sr.return_mid END) AS d1_1_mids ,COUNT(CASE WHEN sr.sharedepth = 2 THEN sr.return_mid END) AS d2_1_pv ,COLLECT_SET(CASE WHEN sr.sharedepth = 2 THEN sr.return_mid END) AS d2_1_mids ,COUNT(CASE WHEN sr.sharedepth = 3 THEN sr.return_mid END) AS d3_1_pv ,COLLECT_SET(CASE WHEN sr.sharedepth = 3 THEN sr.return_mid END) AS d3_1_mids FROM t_exposure e1 JOIN t_exposure e2 ON e1.subsessionid = e2.subsessionid AND CAST(e2.ts AS BIGINT) > CAST(e1.ts AS BIGINT) JOIN t_share_return sr ON e2.id = sr.exposure_id GROUP BY e1.id ) -- D 链 hop1: exp ,t_d_hop1_exp AS ( SELECT e1.id AS exposure_id ,COUNT(e_ret.id) AS dn_1_exp ,COUNT(CASE WHEN sr.sharedepth = 1 THEN e_ret.id END) AS d1_1_exp ,COUNT(CASE WHEN sr.sharedepth = 2 THEN e_ret.id END) AS d2_1_exp ,COUNT(CASE WHEN sr.sharedepth = 3 THEN e_ret.id END) AS d3_1_exp FROM t_exposure e1 JOIN t_exposure e2 ON e1.subsessionid = e2.subsessionid AND CAST(e2.ts AS BIGINT) > CAST(e1.ts AS BIGINT) JOIN (SELECT DISTINCT exposure_id, return_subsessionid, sharedepth FROM t_share_return) sr ON e2.id = sr.exposure_id JOIN t_exposure e_ret ON sr.return_subsessionid = e_ret.subsessionid GROUP BY e1.id ) -- D 链 frontier: hop1 延伸 ,t_d1_frontier AS ( SELECT DISTINCT e1.id AS source_id ,sr.return_subsessionid AS reached_sub FROM t_exposure e1 JOIN t_exposure e2 ON e1.subsessionid = e2.subsessionid AND CAST(e2.ts AS BIGINT) > CAST(e1.ts AS BIGINT) JOIN t_share_return sr ON e2.id = sr.exposure_id ) -- D 链 hop2: pv/uv/mids ,t_d_hop2 AS ( SELECT f.source_id AS exposure_id ,COUNT(sr.return_mid) AS dn_2_pv ,COLLECT_SET(sr.return_mid) AS dn_2_mids ,COUNT(CASE WHEN sr.sharedepth = 1 THEN sr.return_mid END) AS d1_2_pv ,COLLECT_SET(CASE WHEN sr.sharedepth = 1 THEN sr.return_mid END) AS d1_2_mids ,COUNT(CASE WHEN sr.sharedepth = 2 THEN sr.return_mid END) AS d2_2_pv ,COLLECT_SET(CASE WHEN sr.sharedepth = 2 THEN sr.return_mid END) AS d2_2_mids ,COUNT(CASE WHEN sr.sharedepth = 3 THEN sr.return_mid END) AS d3_2_pv ,COLLECT_SET(CASE WHEN sr.sharedepth = 3 THEN sr.return_mid END) AS d3_2_mids FROM t_d1_frontier f JOIN t_exposure e ON f.reached_sub = e.subsessionid JOIN t_share_return sr ON e.id = sr.exposure_id GROUP BY f.source_id ) -- D 链 hop2: exp ,t_d_hop2_exp AS ( SELECT f.source_id AS exposure_id ,COUNT(e_ret.id) AS dn_2_exp ,COUNT(CASE WHEN sr.sharedepth = 1 THEN e_ret.id END) AS d1_2_exp ,COUNT(CASE WHEN sr.sharedepth = 2 THEN e_ret.id END) AS d2_2_exp ,COUNT(CASE WHEN sr.sharedepth = 3 THEN e_ret.id END) AS d3_2_exp FROM t_d1_frontier f JOIN t_exposure e ON f.reached_sub = e.subsessionid JOIN (SELECT DISTINCT exposure_id, return_subsessionid, sharedepth FROM t_share_return) sr ON e.id = sr.exposure_id JOIN t_exposure e_ret ON sr.return_subsessionid = e_ret.subsessionid GROUP BY f.source_id ) -- D 链 frontier 2: hop2 延伸, anti-join 排除 d1_frontier ,t_d2_frontier AS ( SELECT DISTINCT f1.source_id, sr2.return_subsessionid AS reached_sub FROM t_d1_frontier f1 JOIN t_exposure e1 ON f1.reached_sub = e1.subsessionid JOIN t_share_return sr2 ON e1.id = sr2.exposure_id LEFT JOIN t_d1_frontier v1 ON f1.source_id = v1.source_id AND sr2.return_subsessionid = v1.reached_sub WHERE v1.source_id IS NULL ) -- D 链 hop3: pv/uv/mids ,t_d_hop3 AS ( SELECT f.source_id AS exposure_id ,COUNT(sr.return_mid) AS dn_3_pv ,COLLECT_SET(sr.return_mid) AS dn_3_mids ,COUNT(CASE WHEN sr.sharedepth = 1 THEN sr.return_mid END) AS d1_3_pv ,COLLECT_SET(CASE WHEN sr.sharedepth = 1 THEN sr.return_mid END) AS d1_3_mids ,COUNT(CASE WHEN sr.sharedepth = 2 THEN sr.return_mid END) AS d2_3_pv ,COLLECT_SET(CASE WHEN sr.sharedepth = 2 THEN sr.return_mid END) AS d2_3_mids ,COUNT(CASE WHEN sr.sharedepth = 3 THEN sr.return_mid END) AS d3_3_pv ,COLLECT_SET(CASE WHEN sr.sharedepth = 3 THEN sr.return_mid END) AS d3_3_mids FROM t_d2_frontier f JOIN t_exposure e ON f.reached_sub = e.subsessionid JOIN t_share_return sr ON e.id = sr.exposure_id GROUP BY f.source_id ) -- D 链 hop3: exp ,t_d_hop3_exp AS ( SELECT f.source_id AS exposure_id ,COUNT(e_ret.id) AS dn_3_exp ,COUNT(CASE WHEN sr.sharedepth = 1 THEN e_ret.id END) AS d1_3_exp ,COUNT(CASE WHEN sr.sharedepth = 2 THEN e_ret.id END) AS d2_3_exp ,COUNT(CASE WHEN sr.sharedepth = 3 THEN e_ret.id END) AS d3_3_exp FROM t_d2_frontier f JOIN t_exposure e ON f.reached_sub = e.subsessionid JOIN (SELECT DISTINCT exposure_id, return_subsessionid, sharedepth FROM t_share_return) sr ON e.id = sr.exposure_id JOIN t_exposure e_ret ON sr.return_subsessionid = e_ret.subsessionid GROUP BY f.source_id ) --======================================== -- 以下为原有 CTE 继续 --======================================== ,t_share_with_label AS ( SELECT a.dthh ,a.apptype -- join 条件 ,a.mid ,a.vid -- join 条件 ,a.sessionid ,a.subsessionid ,a.pagesource ,a.shareid -- join 条件 ,a.ts ,a.exposure_id ,COALESCE(b.return_1_pv,0) AS return_1_pv ,COALESCE(b.return_1_uv,0) AS return_1_uv ,b.return_1_mids AS return_1_mids -- 可能为null,再决策是否提前处理。 ,COALESCE(c.return_n_pv,0) AS return_n_pv ,COALESCE(c.return_n_uv,0) AS return_n_uv ,c.return_n_mids AS return_n_mids -- 可能为null,再决策是否提前处理。 ,COALESCE(c.new_exposure_cnt,0) AS new_exposure_cnt FROM t_share_exposure a LEFT JOIN ( SELECT shareid ,vid ,apptype ,COUNT(1) AS return_1_pv ,COUNT(DISTINCT mid) AS return_1_uv ,CONCAT_WS(',',COLLECT_SET(mid)) AS return_1_mids FROM t_return GROUP BY shareid ,vid ,apptype ) b ON a.shareid = b.shareid AND a.vid = b.vid AND a.apptype = b.apptype LEFT JOIN ( SELECT rootshareid ,vid ,apptype ,COUNT(1) AS return_n_pv ,COUNT(DISTINCT mid) AS return_n_uv ,CONCAT_WS(',',COLLECT_SET(mid)) AS return_n_mids ,SUM(new_exposure_cnt) AS new_exposure_cnt FROM t_return_exposure GROUP BY rootshareid ,vid ,apptype ) c ON a.shareid = c.rootshareid AND a.vid = c.vid AND a.apptype = c.apptype ) ,t_share_with_label_group AS ( SELECT exposure_id ,COUNT(1) AS share_cnt ,SUM(return_1_pv) AS return_1_pv ,COALESCE(SIZE(SPLIT(DEDUPLICATION4LIST(CONCAT_WS(',',COLLECT_LIST(return_1_mids))),",")),0) AS return_1_uv ,DEDUPLICATION4LIST(CONCAT_WS(',',COLLECT_LIST(return_1_mids))) AS return_1_mids -- 可能是null ,SUM(return_n_pv) AS return_n_pv ,COALESCE(SIZE(SPLIT(DEDUPLICATION4LIST(CONCAT_WS(',',COLLECT_LIST(return_n_mids))),",")),0) AS return_n_uv ,DEDUPLICATION4LIST(CONCAT_WS(',',COLLECT_LIST(return_n_mids))) AS return_n_mids -- 可能是null ,SUM(new_exposure_cnt) AS new_exposure_cnt FROM t_share_with_label GROUP BY exposure_id ) ,t_root_source_id_group_name AS ( SELECT * FROM ( SELECT root_source_id ,group_name ,ROW_NUMBER() OVER (PARTITION BY root_source_id ) AS rn FROM loghubods.changwen_rootsourceid_group_hour WHERE dt = MAX_PT('loghubods.changwen_rootsourceid_group_hour') ) WHERE rn = 1 ) ,t_exposure_share_return AS ( SELECT apptype ,uid ,mid ,vid ,sessionid ,subsessionid ,pagesource ,CASE WHEN pagesource REGEXP 'pages/user-videos-share-recommend$' THEN '回流后沉浸页&内页feed' WHEN pagesource REGEXP 'pages/detail-recommend$' THEN '详情后沉浸页' WHEN pagesource REGEXP 'pages/user-videos-share$' THEN '回流页' WHEN pagesource REGEXP 'pages/user-videos-detail$' THEN '详情页' WHEN pagesource REGEXP 'pages/category$' THEN '首页feed' ELSE '其他' END AS pagesource_new ,recommendlogvo -- 推荐算法的返回结果日志存在这个字段中 ,abcode -- 推荐算法的ab分组 ,recommendpagetype -- 三种回流头部;两种下滑-沉浸页下滑和feed下滑 ,recomtraceid ,headvideoid ,rootsourceid ,hotsencetype ,flowpool -- 14#68#3#1735262438476#2 ,level ,clientip ,machineinfo_brand ,machineinfo_model ,machineinfo_system ,machineinfo_wechatversion ,machineinfo_sdkversion ,province ,city ,ts ,IF(COALESCE(share_cnt,0) > 0,1,0) AS is_share ,COALESCE(share_cnt,0) AS share_cnt ,IF(COALESCE(return_1_uv,0) > 0,1,0) AS is_return_1 ,COALESCE(return_1_pv,0) AS return_1_pv ,COALESCE(return_1_uv,0) AS return_1_uv ,IF(COALESCE(return_n_pv,0) > 0,1,0) AS is_return_n ,COALESCE(return_n_pv,0) AS return_n_pv ,COALESCE(return_n_uv,0) AS return_n_uv ,IF(COALESCE(COALESCE(SIZE(ARRAY_REMOVE(SPLIT(return_1_mids,","),mid)),0),0) > 0,1,0) AS is_return_noself ,COALESCE(SIZE(ARRAY_REMOVE(SPLIT(return_1_mids,","),mid)),0) AS return_1_uv_noself ,IF(COALESCE(COALESCE(SIZE(ARRAY_REMOVE(SPLIT(return_n_mids,","),mid)),0),0) > 0,1,0) AS is_return_n_noself ,COALESCE(SIZE(ARRAY_REMOVE(SPLIT(return_n_mids,","),mid)),0) AS return_n_uv_noself ,COALESCE(new_exposure_cnt) AS new_exposure_cnt -- ========== B 链 ========== ,COALESCE(b_exp.bn_exp, 0) AS bn_exp ,COALESCE(bn_hop.bn_pv, 0) AS bn_pv ,COALESCE(SIZE(bn_hop.bn_mids), 0) AS bn_uv ,COALESCE(b_exp.b1_exp, 0) AS b1_exp ,COALESCE(bn_hop.b1_pv, 0) AS b1_pv ,COALESCE(SIZE(bn_hop.b1_mids), 0) AS b1_uv ,COALESCE(b_exp.b2_exp, 0) AS b2_exp ,COALESCE(bn_hop.b2_pv, 0) AS b2_pv ,COALESCE(SIZE(bn_hop.b2_mids), 0) AS b2_uv ,COALESCE(b_exp.b3_exp, 0) AS b3_exp ,COALESCE(bn_hop.b3_pv, 0) AS b3_pv ,COALESCE(SIZE(bn_hop.b3_mids), 0) AS b3_uv -- ========== C 链 hop1 ========== ,COALESCE(c_hop1_exp.cn_1_exp, 0) AS cn_1_exp ,COALESCE(c_hop1.cn_1_pv, 0) AS cn_1_pv ,COALESCE(SIZE(c_hop1.cn_1_mids), 0) AS cn_1_uv ,COALESCE(c_hop1_exp.c1_1_exp, 0) AS c1_1_exp ,COALESCE(c_hop1.c1_1_pv, 0) AS c1_1_pv ,COALESCE(SIZE(c_hop1.c1_1_mids), 0) AS c1_1_uv ,COALESCE(c_hop1_exp.c2_1_exp, 0) AS c2_1_exp ,COALESCE(c_hop1.c2_1_pv, 0) AS c2_1_pv ,COALESCE(SIZE(c_hop1.c2_1_mids), 0) AS c2_1_uv ,COALESCE(c_hop1_exp.c3_1_exp, 0) AS c3_1_exp ,COALESCE(c_hop1.c3_1_pv, 0) AS c3_1_pv ,COALESCE(SIZE(c_hop1.c3_1_mids), 0) AS c3_1_uv -- ========== C 链 hop2 ========== ,COALESCE(c_hop2_exp.cn_2_exp, 0) AS cn_2_exp ,COALESCE(c_hop2.cn_2_pv, 0) AS cn_2_pv ,COALESCE(SIZE(c_hop2.cn_2_mids), 0) AS cn_2_uv ,COALESCE(c_hop2_exp.c1_2_exp, 0) AS c1_2_exp ,COALESCE(c_hop2.c1_2_pv, 0) AS c1_2_pv ,COALESCE(SIZE(c_hop2.c1_2_mids), 0) AS c1_2_uv ,COALESCE(c_hop2_exp.c2_2_exp, 0) AS c2_2_exp ,COALESCE(c_hop2.c2_2_pv, 0) AS c2_2_pv ,COALESCE(SIZE(c_hop2.c2_2_mids), 0) AS c2_2_uv ,COALESCE(c_hop2_exp.c3_2_exp, 0) AS c3_2_exp ,COALESCE(c_hop2.c3_2_pv, 0) AS c3_2_pv ,COALESCE(SIZE(c_hop2.c3_2_mids), 0) AS c3_2_uv -- ========== C 链 hop3 ========== ,COALESCE(c_hop3_exp.cn_3_exp, 0) AS cn_3_exp ,COALESCE(c_hop3.cn_3_pv, 0) AS cn_3_pv ,COALESCE(SIZE(c_hop3.cn_3_mids), 0) AS cn_3_uv ,COALESCE(c_hop3_exp.c1_3_exp, 0) AS c1_3_exp ,COALESCE(c_hop3.c1_3_pv, 0) AS c1_3_pv ,COALESCE(SIZE(c_hop3.c1_3_mids), 0) AS c1_3_uv ,COALESCE(c_hop3_exp.c2_3_exp, 0) AS c2_3_exp ,COALESCE(c_hop3.c2_3_pv, 0) AS c2_3_pv ,COALESCE(SIZE(c_hop3.c2_3_mids), 0) AS c2_3_uv ,COALESCE(c_hop3_exp.c3_3_exp, 0) AS c3_3_exp ,COALESCE(c_hop3.c3_3_pv, 0) AS c3_3_pv ,COALESCE(SIZE(c_hop3.c3_3_mids), 0) AS c3_3_uv -- ========== D 链 ========== ,COALESCE(d0_hop.d0, 0) AS d0 -- D hop1 ,COALESCE(d_hop1_exp.dn_1_exp, 0) AS dn_1_exp ,COALESCE(d_hop1.dn_1_pv, 0) AS dn_1_pv ,COALESCE(SIZE(d_hop1.dn_1_mids), 0) AS dn_1_uv ,COALESCE(d_hop1_exp.d1_1_exp, 0) AS d1_1_exp ,COALESCE(d_hop1.d1_1_pv, 0) AS d1_1_pv ,COALESCE(SIZE(d_hop1.d1_1_mids), 0) AS d1_1_uv ,COALESCE(d_hop1_exp.d2_1_exp, 0) AS d2_1_exp ,COALESCE(d_hop1.d2_1_pv, 0) AS d2_1_pv ,COALESCE(SIZE(d_hop1.d2_1_mids), 0) AS d2_1_uv ,COALESCE(d_hop1_exp.d3_1_exp, 0) AS d3_1_exp ,COALESCE(d_hop1.d3_1_pv, 0) AS d3_1_pv ,COALESCE(SIZE(d_hop1.d3_1_mids), 0) AS d3_1_uv -- D hop2 ,COALESCE(d_hop2_exp.dn_2_exp, 0) AS dn_2_exp ,COALESCE(d_hop2.dn_2_pv, 0) AS dn_2_pv ,COALESCE(SIZE(d_hop2.dn_2_mids), 0) AS dn_2_uv ,COALESCE(d_hop2_exp.d1_2_exp, 0) AS d1_2_exp ,COALESCE(d_hop2.d1_2_pv, 0) AS d1_2_pv ,COALESCE(SIZE(d_hop2.d1_2_mids), 0) AS d1_2_uv ,COALESCE(d_hop2_exp.d2_2_exp, 0) AS d2_2_exp ,COALESCE(d_hop2.d2_2_pv, 0) AS d2_2_pv ,COALESCE(SIZE(d_hop2.d2_2_mids), 0) AS d2_2_uv ,COALESCE(d_hop2_exp.d3_2_exp, 0) AS d3_2_exp ,COALESCE(d_hop2.d3_2_pv, 0) AS d3_2_pv ,COALESCE(SIZE(d_hop2.d3_2_mids), 0) AS d3_2_uv -- D hop3 ,COALESCE(d_hop3_exp.dn_3_exp, 0) AS dn_3_exp ,COALESCE(d_hop3.dn_3_pv, 0) AS dn_3_pv ,COALESCE(SIZE(d_hop3.dn_3_mids), 0) AS dn_3_uv ,COALESCE(d_hop3_exp.d1_3_exp, 0) AS d1_3_exp ,COALESCE(d_hop3.d1_3_pv, 0) AS d1_3_pv ,COALESCE(SIZE(d_hop3.d1_3_mids), 0) AS d1_3_uv ,COALESCE(d_hop3_exp.d2_3_exp, 0) AS d2_3_exp ,COALESCE(d_hop3.d2_3_pv, 0) AS d2_3_pv ,COALESCE(SIZE(d_hop3.d2_3_mids), 0) AS d2_3_uv ,COALESCE(d_hop3_exp.d3_3_exp, 0) AS d3_3_exp ,COALESCE(d_hop3.d3_3_pv, 0) AS d3_3_pv ,COALESCE(SIZE(d_hop3.d3_3_mids), 0) AS d3_3_uv ,JSON_FORMAT( JSON_OBJECT("animationSceneType",animationSceneType,"extParams",extParams,"rootsessionid",rootsessionid_new,"versioncode",versioncode,"group_name",tc.group_name) ) AS extend -- ========== mids 列表字段 (变长, 统一放末尾) ========== ,return_1_mids ,return_n_mids ,ARRAY_JOIN(ARRAY_REMOVE(SPLIT(return_1_mids,","),mid),",") AS return_1_mids_noself ,ARRAY_JOIN(ARRAY_REMOVE(SPLIT(return_n_mids,","),mid),",") AS return_n_mids_noself ,CONCAT_WS(',', bn_hop.bn_mids) AS bn_mids ,CONCAT_WS(',', bn_hop.b1_mids) AS b1_mids ,CONCAT_WS(',', bn_hop.b2_mids) AS b2_mids ,CONCAT_WS(',', bn_hop.b3_mids) AS b3_mids ,CONCAT_WS(',', c_hop1.cn_1_mids) AS cn_1_mids ,CONCAT_WS(',', c_hop1.c1_1_mids) AS c1_1_mids ,CONCAT_WS(',', c_hop1.c2_1_mids) AS c2_1_mids ,CONCAT_WS(',', c_hop1.c3_1_mids) AS c3_1_mids ,CONCAT_WS(',', c_hop2.cn_2_mids) AS cn_2_mids ,CONCAT_WS(',', c_hop2.c1_2_mids) AS c1_2_mids ,CONCAT_WS(',', c_hop2.c2_2_mids) AS c2_2_mids ,CONCAT_WS(',', c_hop2.c3_2_mids) AS c3_2_mids ,CONCAT_WS(',', c_hop3.cn_3_mids) AS cn_3_mids ,CONCAT_WS(',', c_hop3.c1_3_mids) AS c1_3_mids ,CONCAT_WS(',', c_hop3.c2_3_mids) AS c2_3_mids ,CONCAT_WS(',', c_hop3.c3_3_mids) AS c3_3_mids ,CONCAT_WS(',', d_hop1.dn_1_mids) AS dn_1_mids ,CONCAT_WS(',', d_hop1.d1_1_mids) AS d1_1_mids ,CONCAT_WS(',', d_hop1.d2_1_mids) AS d2_1_mids ,CONCAT_WS(',', d_hop1.d3_1_mids) AS d3_1_mids ,CONCAT_WS(',', d_hop2.dn_2_mids) AS dn_2_mids ,CONCAT_WS(',', d_hop2.d1_2_mids) AS d1_2_mids ,CONCAT_WS(',', d_hop2.d2_2_mids) AS d2_2_mids ,CONCAT_WS(',', d_hop2.d3_2_mids) AS d3_2_mids ,CONCAT_WS(',', d_hop3.dn_3_mids) AS dn_3_mids ,CONCAT_WS(',', d_hop3.d1_3_mids) AS d1_3_mids ,CONCAT_WS(',', d_hop3.d2_3_mids) AS d2_3_mids ,CONCAT_WS(',', d_hop3.d3_3_mids) AS d3_3_mids ,SUBSTR(dthh,1,8) AS dt ,SUBSTR(dthh,9,2) AS hh FROM t_exposure ta LEFT JOIN t_share_with_label_group tb ON ta.id = tb.exposure_id LEFT JOIN t_root_source_id_group_name tc ON ta.rootsourceid = tc.root_source_id LEFT JOIN t_exposure_bn bn_hop ON ta.id = bn_hop.exposure_id LEFT JOIN t_b_exp b_exp ON ta.id = b_exp.exposure_id LEFT JOIN t_d0 d0_hop ON ta.id = d0_hop.exposure_id LEFT JOIN t_c_hop1 c_hop1 ON ta.id = c_hop1.exposure_id LEFT JOIN t_c_hop1_exp c_hop1_exp ON ta.id = c_hop1_exp.exposure_id LEFT JOIN t_c_hop2 c_hop2 ON ta.id = c_hop2.exposure_id LEFT JOIN t_c_hop2_exp c_hop2_exp ON ta.id = c_hop2_exp.exposure_id LEFT JOIN t_c_hop3 c_hop3 ON ta.id = c_hop3.exposure_id LEFT JOIN t_c_hop3_exp c_hop3_exp ON ta.id = c_hop3_exp.exposure_id LEFT JOIN t_d_hop1 d_hop1 ON ta.id = d_hop1.exposure_id LEFT JOIN t_d_hop1_exp d_hop1_exp ON ta.id = d_hop1_exp.exposure_id LEFT JOIN t_d_hop2 d_hop2 ON ta.id = d_hop2.exposure_id LEFT JOIN t_d_hop2_exp d_hop2_exp ON ta.id = d_hop2_exp.exposure_id LEFT JOIN t_d_hop3 d_hop3 ON ta.id = d_hop3.exposure_id LEFT JOIN t_d_hop3_exp d_hop3_exp ON ta.id = d_hop3_exp.exposure_id )SELECT * FROM t_exposure_share_return ;