produce_data.py 5.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162
  1. from typing import Tuple
  2. class ProduceBaseData:
  3. @staticmethod
  4. def article_unionid_mapper(account_name_tuple: Tuple[str]) -> str:
  5. odps_query = f"""
  6. -- wx_sn粒度表
  7. CREATE TABLE IF NOT EXISTS article_union_id_mapper
  8. (
  9. publish_date STRING COMMENT '发布日期 yyyymmdd'
  10. ,accountname STRING COMMENT '账号名'
  11. ,show_view_count BIGINT COMMENT '展示/阅读量'
  12. ,title STRING COMMENT '文章标题'
  13. ,wx_sn STRING COMMENT '微信序列号'
  14. ,root_source_id STRING COMMENT 'root_source_id'
  15. ,union_id STRING COMMENT '微信 union_id'
  16. )
  17. COMMENT '文章与 union_id 映射表'
  18. ;
  19. -- title 粒度表
  20. CREATE TABLE IF NOT EXISTS title_union_id_mapper
  21. (
  22. title STRING COMMENT '文章标题'
  23. ,union_id STRING COMMENT '微信 union_id'
  24. )
  25. COMMENT '标题与 union_id 映射表'
  26. ;
  27. -- 文章基础信息表
  28. WITH article_table AS
  29. (
  30. SELECT TO_CHAR(FROM_UNIXTIME(publish_timestamp),'yyyymmdd') AS publish_date
  31. ,accountname
  32. ,show_view_count
  33. ,title
  34. ,wx_sn
  35. ,root_source_id
  36. FROM loghubods.official_articles_v2_hour
  37. LATERAL VIEW EXPLODE(SPLIT(REGEXP_REPLACE(root_source_id_list,'\\[|\\]|"',''),',')) tmp AS root_source_id
  38. WHERE accountname IN {account_name_tuple}
  39. AND itemindex = 1
  40. AND dt = MAX_PT('loghubods.official_articles_v2_hour')
  41. ) -- 文章对应的 root_source_id + publish_date,用于 JOIN 替代 IN,并做分区裁剪
  42. ,article_root_sources AS
  43. (
  44. SELECT root_source_id
  45. ,publish_date
  46. FROM article_table
  47. GROUP BY root_source_id
  48. ,publish_date
  49. ) -- openid 和 root_source_id 的映射表(用 JOIN 替代 IN,只扫文章相关分区)
  50. ,first_level_openid_table AS
  51. (
  52. SELECT SUBSTRING_INDEX(t.machinecode,'weixin_openid_',-1) AS openid
  53. ,t.rootsourceid
  54. ,t.dt AS publish_date
  55. FROM loghubods.ods_user_active_log_info_day t
  56. INNER JOIN article_root_sources ars
  57. ON t.rootsourceid = ars.root_source_id
  58. AND t.dt = ars.publish_date
  59. WHERE t.user_share_depth = 0
  60. ) -- openid_unionid 映射表
  61. ,open_union_id_mapper AS
  62. (
  63. SELECT open_id
  64. ,union_id
  65. FROM loghubods.user_wechat_identity_info_ha
  66. WHERE dt = MAX_PT('loghubods.user_wechat_identity_info_ha')
  67. )
  68. ,final_table AS
  69. (
  70. SELECT t1.publish_date
  71. ,t1.accountname
  72. ,t1.show_view_count
  73. ,t1.title
  74. ,t1.wx_sn
  75. ,t1.root_source_id
  76. ,t2.openid
  77. ,t3.union_id
  78. FROM article_table t1
  79. INNER JOIN first_level_openid_table t2
  80. ON t1.root_source_id = t2.rootsourceid
  81. AND t1.publish_date = t2.publish_date
  82. INNER JOIN open_union_id_mapper t3
  83. ON t2.openid = t3.open_id
  84. )
  85. INSERT OVERWRITE TABLE article_union_id_mapper
  86. SELECT DISTINCT publish_date
  87. ,accountname
  88. ,show_view_count
  89. ,title
  90. ,CAST(wx_sn AS STRING)
  91. ,root_source_id
  92. ,union_id
  93. FROM final_table
  94. ;
  95. """
  96. return odps_query
  97. @staticmethod
  98. def title_unionid_mapper():
  99. odps_query = """
  100. INSERT OVERWRITE TABLE title_union_id_mapper
  101. SELECT DISTINCT title
  102. ,union_id
  103. FROM article_union_id_mapper
  104. ;
  105. """
  106. return odps_query
  107. @staticmethod
  108. def i2i_mapper(dt):
  109. odps_query = f"""
  110. CREATE TABLE IF NOT EXISTS i2i_table
  111. (
  112. source_title STRING
  113. ,associated_title STRING
  114. ,association_count BIGINT
  115. ,associated_title_uid_count BIGINT
  116. ,uid_coverage_rate DOUBLE
  117. )
  118. PARTITIONED BY (dt STRING)
  119. ;
  120. INSERT OVERWRITE TABLE i2i_table PARTITION (dt = '{dt}')
  121. WITH src_title_user AS
  122. (
  123. SELECT DISTINCT title AS 源标题
  124. ,union_id
  125. FROM loghubods.title_union_id_mapper
  126. )
  127. ,co_occur AS
  128. (
  129. SELECT s.源标题
  130. ,t.title AS 联想标题
  131. ,COUNT(*) AS 联想次数
  132. FROM src_title_user s
  133. JOIN loghubods.title_union_id_mapper t
  134. ON s.union_id = t.union_id
  135. WHERE s.源标题 != t.title
  136. GROUP BY s.源标题
  137. ,t.title
  138. )
  139. ,title_uid_cnt AS
  140. (
  141. SELECT title
  142. ,COUNT(DISTINCT union_id) AS 联想标题_uid数量
  143. FROM loghubods.title_union_id_mapper
  144. GROUP BY title
  145. )
  146. SELECT c.源标题 AS source_title
  147. ,c.联想标题 AS associated_title
  148. ,c.联想次数 AS association_count
  149. ,COALESCE(u.联想标题_uid数量, 0) AS associated_title_uid_count
  150. ,CAST(c.联想次数 AS DOUBLE) / (COALESCE(u.联想标题_uid数量, 0) + 10000) AS uid_coverage_rate
  151. FROM co_occur c
  152. LEFT JOIN title_uid_cnt u
  153. ON c.联想标题 = u.title
  154. ;
  155. """
  156. return odps_query