| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162 |
- from typing import Tuple
- class ProduceBaseData:
- @staticmethod
- def article_unionid_mapper(account_name_tuple: Tuple[str]) -> str:
- odps_query = f"""
- -- wx_sn粒度表
- CREATE TABLE IF NOT EXISTS article_union_id_mapper
- (
- publish_date STRING COMMENT '发布日期 yyyymmdd'
- ,accountname STRING COMMENT '账号名'
- ,show_view_count BIGINT COMMENT '展示/阅读量'
- ,title STRING COMMENT '文章标题'
- ,wx_sn STRING COMMENT '微信序列号'
- ,root_source_id STRING COMMENT 'root_source_id'
- ,union_id STRING COMMENT '微信 union_id'
- )
- COMMENT '文章与 union_id 映射表'
- ;
-
- -- title 粒度表
- CREATE TABLE IF NOT EXISTS title_union_id_mapper
- (
- title STRING COMMENT '文章标题'
- ,union_id STRING COMMENT '微信 union_id'
- )
- COMMENT '标题与 union_id 映射表'
- ;
-
- -- 文章基础信息表
- WITH article_table AS
- (
- SELECT TO_CHAR(FROM_UNIXTIME(publish_timestamp),'yyyymmdd') AS publish_date
- ,accountname
- ,show_view_count
- ,title
- ,wx_sn
- ,root_source_id
- FROM loghubods.official_articles_v2_hour
- LATERAL VIEW EXPLODE(SPLIT(REGEXP_REPLACE(root_source_id_list,'\\[|\\]|"',''),',')) tmp AS root_source_id
- WHERE accountname IN {account_name_tuple}
- AND itemindex = 1
- AND dt = MAX_PT('loghubods.official_articles_v2_hour')
- ) -- 文章对应的 root_source_id + publish_date,用于 JOIN 替代 IN,并做分区裁剪
- ,article_root_sources AS
- (
- SELECT root_source_id
- ,publish_date
- FROM article_table
- GROUP BY root_source_id
- ,publish_date
- ) -- openid 和 root_source_id 的映射表(用 JOIN 替代 IN,只扫文章相关分区)
- ,first_level_openid_table AS
- (
- SELECT SUBSTRING_INDEX(t.machinecode,'weixin_openid_',-1) AS openid
- ,t.rootsourceid
- ,t.dt AS publish_date
- FROM loghubods.ods_user_active_log_info_day t
- INNER JOIN article_root_sources ars
- ON t.rootsourceid = ars.root_source_id
- AND t.dt = ars.publish_date
- WHERE t.user_share_depth = 0
- ) -- openid_unionid 映射表
- ,open_union_id_mapper AS
- (
- SELECT open_id
- ,union_id
- FROM loghubods.user_wechat_identity_info_ha
- WHERE dt = MAX_PT('loghubods.user_wechat_identity_info_ha')
- )
- ,final_table AS
- (
- SELECT t1.publish_date
- ,t1.accountname
- ,t1.show_view_count
- ,t1.title
- ,t1.wx_sn
- ,t1.root_source_id
- ,t2.openid
- ,t3.union_id
- FROM article_table t1
- INNER JOIN first_level_openid_table t2
- ON t1.root_source_id = t2.rootsourceid
- AND t1.publish_date = t2.publish_date
- INNER JOIN open_union_id_mapper t3
- ON t2.openid = t3.open_id
- )
- INSERT OVERWRITE TABLE article_union_id_mapper
- SELECT DISTINCT publish_date
- ,accountname
- ,show_view_count
- ,title
- ,CAST(wx_sn AS STRING)
- ,root_source_id
- ,union_id
- FROM final_table
- ;
- """
- return odps_query
- @staticmethod
- def title_unionid_mapper():
- odps_query = """
- INSERT OVERWRITE TABLE title_union_id_mapper
- SELECT DISTINCT title
- ,union_id
- FROM article_union_id_mapper
- ;
- """
- return odps_query
- @staticmethod
- def i2i_mapper(dt):
- odps_query = f"""
- CREATE TABLE IF NOT EXISTS i2i_table
- (
- source_title STRING
- ,associated_title STRING
- ,association_count BIGINT
- ,associated_title_uid_count BIGINT
- ,uid_coverage_rate DOUBLE
- )
- PARTITIONED BY (dt STRING)
- ;
- INSERT OVERWRITE TABLE i2i_table PARTITION (dt = '{dt}')
- WITH src_title_user AS
- (
- SELECT DISTINCT title AS 源标题
- ,union_id
- FROM loghubods.title_union_id_mapper
- )
- ,co_occur AS
- (
- SELECT s.源标题
- ,t.title AS 联想标题
- ,COUNT(*) AS 联想次数
- FROM src_title_user s
- JOIN loghubods.title_union_id_mapper t
- ON s.union_id = t.union_id
- WHERE s.源标题 != t.title
- GROUP BY s.源标题
- ,t.title
- )
- ,title_uid_cnt AS
- (
- SELECT title
- ,COUNT(DISTINCT union_id) AS 联想标题_uid数量
- FROM loghubods.title_union_id_mapper
- GROUP BY title
- )
- SELECT c.源标题 AS source_title
- ,c.联想标题 AS associated_title
- ,c.联想次数 AS association_count
- ,COALESCE(u.联想标题_uid数量, 0) AS associated_title_uid_count
- ,CAST(c.联想次数 AS DOUBLE) / (COALESCE(u.联想标题_uid数量, 0) + 10000) AS uid_coverage_rate
- FROM co_occur c
- LEFT JOIN title_uid_cnt u
- ON c.联想标题 = u.title
- ;
- """
- return odps_query
|