-- Task: 02_用户连续分享CF ID: 1017832518 Type: ODPS_SQL --@exclude_input=loghubods.user_share_log_flow --odps sql --********************************************************************-- --author:张博 --create time:2024-06-06 13:27:20 --********************************************************************-- -- select * from loghubods.alg_vid_feature_cfshare where dt = "20240606" and hh = "16"; CREATE TABLE IF NOT EXISTS loghubods.alg_vid_feature_cfshare ( vid STRING COMMENT '视频id' ,feature JSON COMMENT 'json格式的特征组合' ) COMMENT '视频特征-用户连续分享CF' PARTITIONED BY ( dt STRING COMMENT '天' ,hh STRING COMMENT '小时' ) LIFECYCLE 30 ; INSERT OVERWRITE TABLE loghubods.alg_vid_feature_cfshare PARTITION (dt = '${dt}',hh = '${hh}') WITH t_origin AS ( -- 一次曝光的多次分享,只保留最早的一次。 SELECT apptype ,mid ,vid ,pagesource ,subsessionid ,shareid ,MIN(clienttimestamp) AS ts ,UNIX_TIMESTAMP(TO_DATE('${dt}${hh}','YYYYMMDDHH')) AS ts_now ,UNIX_TIMESTAMP(TO_DATE('${dt}${hh}','YYYYMMDDHH')) - MIN(clienttimestamp) / 1000 AS ts_diff FROM ( SELECT __topic__ ,eventinfos ,apptype ,clickobjectid ,shareobjectid AS vid ,machinecode AS mid ,clienttimestamp ,pagesource ,parentpagesource ,parentrootpagesource ,shareid ,rootshareid ,subsessionid FROM loghubods.user_share_log_flow WHERE CONCAT(year,month,day,hour) BETWEEN TO_CHAR(FROM_UNIXTIME(UNIX_TIMESTAMP(TO_DATE('${dt}${hh}','YYYYMMDDHH')) - 3600 * ${hours_early}),'YYYYMMDDHH') AND TO_CHAR(FROM_UNIXTIME(UNIX_TIMESTAMP(TO_DATE('${dt}${hh}','YYYYMMDDHH')) - 3600 * 1),'YYYYMMDDHH') AND __topic__ IN ('share') AND apptype NOT IN ('12') AND apptype IS NOT NULL AND shareobjectid IS NOT NULL AND machinecode IS NOT NULL AND machinecode != "" AND clienttimestamp IS NOT NULL ) GROUP BY apptype ,mid ,vid ,pagesource ,subsessionid ,shareid ) -- SELECT * -- FROM t_share_ -- limit 999 -- ; -- SELECT vid2vid_cf_py("14824141:1,21175605:2,21175605:3,21256335:0") ,t_cf AS ( SELECT loghubods.vid2vid_cf_py(vid_ts) AS (cf_a,cf_b) FROM ( SELECT mid ,CONCAT_WS(',',COLLECT_SET(CONCAT(vid,":",CAST(ts / 1000 AS BIGINT)))) AS vid_ts ,COUNT(1) AS cnt FROM ( SELECT mid ,vid ,ts FROM t_origin WHERE mid IS NOT NULL AND mid <> "" AND vid IS NOT NULL AND vid <> "0" AND vid <> "" AND ts IS NOT NULL ) GROUP BY mid HAVING COUNT(1) > 1 AND SIZE(COLLECT_SET(vid)) > 1 ) ) -- SELECT * -- FROM t_cf -- ; ,t_score AS ( SELECT a.cf_a ,a.cf_b ,a.cnt AS cnt_ab ,b.cnt AS cnt_a ,ROUND(a.cnt / b.cnt,6) AS score ,ROW_NUMBER() OVER (PARTITION BY a.cf_a ORDER BY a.cnt / b.cnt DESC ) AS rank FROM ( SELECT cf_a ,cf_b ,COUNT(1) AS cnt FROM t_cf GROUP BY cf_a ,cf_b ) a JOIN ( SELECT cf_a ,COUNT(1) AS cnt FROM t_cf GROUP BY cf_a ORDER BY COUNT(1) DESC LIMIT ${key_limit} ) b ON a.cf_a = b.cf_a ) ,t_recall AS ( SELECT cf_a AS vid -- ,CONCAT_WS(',',COLLECT_LIST(cf_b)) AS videoid_arr -- ,CONCAT_WS(',',COLLECT_LIST(CAST(score AS STRING))) AS score_arr -- ,CONCAT_WS(',',COLLECT_LIST(CAST(cnt_ab AS STRING))) AS cnt_ab_arr -- ,CONCAT_WS(',',COLLECT_LIST(CAST(cnt_a AS STRING))) AS cnt_a_arr ,JSON_OBJECT("videoid_arr",CONCAT_WS(',',COLLECT_LIST(cf_b)),"score_arr",CONCAT_WS(',',COLLECT_LIST(CAST(score AS STRING))),"cnt_ab_arr",CONCAT_WS(',',COLLECT_LIST(CAST(cnt_ab AS STRING))),"cnt_a_arr",CONCAT_WS(',',COLLECT_LIST(CAST(cnt_a AS STRING)))) AS feature FROM ( SELECT cf_a ,cf_b ,cnt_ab ,cnt_a ,score ,rank FROM t_score WHERE rank <= ${rank_limit} ORDER BY rank ASC,CAST(cf_b AS BIGINT) DESC ) GROUP BY cf_a ORDER BY SUM(cnt_a) DESC ) SELECT * FROM t_recall