from typing import Tuple class ProduceBaseData: @staticmethod def article_unionid_mapper(account_name_tuple: Tuple[str]) -> str: odps_query = f""" -- wx_sn粒度表 CREATE TABLE IF NOT EXISTS article_union_id_mapper ( publish_date STRING COMMENT '发布日期 yyyymmdd' ,accountname STRING COMMENT '账号名' ,show_view_count BIGINT COMMENT '展示/阅读量' ,title STRING COMMENT '文章标题' ,wx_sn STRING COMMENT '微信序列号' ,root_source_id STRING COMMENT 'root_source_id' ,union_id STRING COMMENT '微信 union_id' ) COMMENT '文章与 union_id 映射表' ; -- title 粒度表 CREATE TABLE IF NOT EXISTS title_union_id_mapper ( title STRING COMMENT '文章标题' ,union_id STRING COMMENT '微信 union_id' ) COMMENT '标题与 union_id 映射表' ; -- 文章基础信息表 WITH article_table AS ( SELECT TO_CHAR(FROM_UNIXTIME(publish_timestamp),'yyyymmdd') AS publish_date ,accountname ,show_view_count ,title ,wx_sn ,root_source_id FROM loghubods.official_articles_v2_hour LATERAL VIEW EXPLODE(SPLIT(REGEXP_REPLACE(root_source_id_list,'\\[|\\]|"',''),',')) tmp AS root_source_id WHERE accountname IN {account_name_tuple} AND itemindex = 1 AND dt = MAX_PT('loghubods.official_articles_v2_hour') ) -- 文章对应的 root_source_id + publish_date,用于 JOIN 替代 IN,并做分区裁剪 ,article_root_sources AS ( SELECT root_source_id ,publish_date FROM article_table GROUP BY root_source_id ,publish_date ) -- openid 和 root_source_id 的映射表(用 JOIN 替代 IN,只扫文章相关分区) ,first_level_openid_table AS ( SELECT SUBSTRING_INDEX(t.machinecode,'weixin_openid_',-1) AS openid ,t.rootsourceid ,t.dt AS publish_date FROM loghubods.ods_user_active_log_info_day t INNER JOIN article_root_sources ars ON t.rootsourceid = ars.root_source_id AND t.dt = ars.publish_date WHERE t.user_share_depth = 0 ) -- openid_unionid 映射表 ,open_union_id_mapper AS ( SELECT open_id ,union_id FROM loghubods.user_wechat_identity_info_ha WHERE dt = MAX_PT('loghubods.user_wechat_identity_info_ha') ) ,final_table AS ( SELECT t1.publish_date ,t1.accountname ,t1.show_view_count ,t1.title ,t1.wx_sn ,t1.root_source_id ,t2.openid ,t3.union_id FROM article_table t1 INNER JOIN first_level_openid_table t2 ON t1.root_source_id = t2.rootsourceid AND t1.publish_date = t2.publish_date INNER JOIN open_union_id_mapper t3 ON t2.openid = t3.open_id ) INSERT OVERWRITE TABLE article_union_id_mapper SELECT DISTINCT publish_date ,accountname ,show_view_count ,title ,CAST(wx_sn AS STRING) ,root_source_id ,union_id FROM final_table ; """ return odps_query @staticmethod def title_unionid_mapper(): odps_query = """ INSERT OVERWRITE TABLE title_union_id_mapper SELECT DISTINCT title ,union_id FROM article_union_id_mapper ; """ return odps_query @staticmethod def i2i_mapper(dt): odps_query = f""" CREATE TABLE IF NOT EXISTS i2i_table ( source_title STRING ,associated_title STRING ,association_count BIGINT ,associated_title_uid_count BIGINT ,uid_coverage_rate DOUBLE ) PARTITIONED BY (dt STRING) ; INSERT OVERWRITE TABLE i2i_table PARTITION (dt = '{dt}') WITH src_title_user AS ( SELECT DISTINCT title AS 源标题 ,union_id FROM loghubods.title_union_id_mapper ) ,co_occur AS ( SELECT s.源标题 ,t.title AS 联想标题 ,COUNT(*) AS 联想次数 FROM src_title_user s JOIN loghubods.title_union_id_mapper t ON s.union_id = t.union_id WHERE s.源标题 != t.title GROUP BY s.源标题 ,t.title ) ,title_uid_cnt AS ( SELECT title ,COUNT(DISTINCT union_id) AS 联想标题_uid数量 FROM loghubods.title_union_id_mapper GROUP BY title ) SELECT c.源标题 AS source_title ,c.联想标题 AS associated_title ,c.联想次数 AS association_count ,COALESCE(u.联想标题_uid数量, 0) AS associated_title_uid_count ,CAST(c.联想次数 AS DOUBLE) / (COALESCE(u.联想标题_uid数量, 0) + 10000) AS uid_coverage_rate FROM co_occur c LEFT JOIN title_uid_cnt u ON c.联想标题 = u.title ; """ return odps_query