data_query_tools.py 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339
  1. from odps import ODPS
  2. from odps.errors import ODPSError
  3. from datetime import date, timedelta
  4. import json
  5. from pathlib import Path
  6. from agent import tool
  7. def get_odps_data(sql):
  8. # 配置信息
  9. access_id = 'LTAI9EBa0bd5PrDa'
  10. access_key = 'vAalxds7YxhfOA2yVv8GziCg3Y87v5'
  11. project = 'loghubods'
  12. endpoint = 'http://service.odps.aliyun.com/api'
  13. # 1. 初始化 ODPS 入口
  14. o = ODPS(access_id, access_key, project, endpoint=endpoint)
  15. try:
  16. # 2. 执行 SQL 并获取结果
  17. # execute_sql 会等待任务完成,使用 open_reader 读取数据
  18. with o.execute_sql(sql).open_reader() as reader:
  19. # reader 类似于 Java 中的 List<Record>
  20. # 我们可以直接将其转换为 Python 的 list
  21. records = [record for record in reader]
  22. return records
  23. except ODPSError as e:
  24. print(f"ODPS 错误: {e}")
  25. return None
  26. def get_rov_by_merge_leve2_and_video_ids(merge_leve2, video_ids):
  27. merge_level_in_clause = f"'{merge_leve2}'"
  28. video_ids_in_clause = ", ".join([f"'{video_id}'" for video_id in video_ids])
  29. end_date = (date.today() - timedelta(days=1)).strftime("%Y%m%d")
  30. start_date = (date.today() - timedelta(days=14)).strftime("%Y%m%d")
  31. sql_query = f'''
  32. SELECT
  33. v.videoid,
  34. CASE
  35. WHEN COALESCE(SUM(COALESCE(t3.`当日分发曝光pv`, 0)), 0) < 1000 THEN 0
  36. ELSE COALESCE(AVG(NULLIF(t3.rov_t0, 0)), 0)
  37. END AS avg_rov_t0
  38. FROM
  39. (
  40. SELECT
  41. t2.videoid,
  42. t2.merge_leve2
  43. FROM videoods.content_profile t1
  44. JOIN loghubods.video_merge_tag t2
  45. ON t1.content_id = t2.videoid
  46. WHERE
  47. t1.status = 3
  48. AND t1.is_deleted = 0
  49. AND t2.merge_leve2 IN ({merge_level_in_clause})
  50. ) v
  51. LEFT JOIN loghubods.video_dimension_detail_add_column t3
  52. ON v.videoid = t3.视频id
  53. AND t3.dt >= '{start_date}'
  54. AND t3.dt <= '{end_date}'
  55. WHERE v.videoid in ({video_ids_in_clause})
  56. GROUP BY
  57. v.videoid
  58. ;
  59. '''
  60. data = get_odps_data(sql_query)
  61. result_dict = {}
  62. if data:
  63. result_dict = {r[0]: r[1] for r in data}
  64. return result_dict
  65. def get_changwen_weight(account_name):
  66. bizdatemax_date = date.today() - timedelta(days=1)
  67. bizdatemin_date = bizdatemax_date - timedelta(days=30)
  68. bizdatemax = bizdatemax_date.strftime("%Y%m%d")
  69. bizdatemin = bizdatemin_date.strftime("%Y%m%d")
  70. sql_query = f'''
  71. SELECT 公众号名
  72. ,videoid
  73. ,一级品类
  74. ,二级品类
  75. ,头部曝光
  76. ,头部realplay
  77. ,头部分享
  78. ,头部回流人数 AS 头部回流数
  79. ,推荐曝光数
  80. ,推荐realplay
  81. ,推荐分享数
  82. ,推荐回流数
  83. ,当日回流进入分发曝光次数 AS vov分子
  84. FROM (
  85. SELECT DISTINCT a.公众号名
  86. ,a.videoid
  87. ,e.merge_leve1 AS 一级品类
  88. ,e.merge_leve2 AS 二级品类
  89. ,a.title
  90. ,a.进入分发人数
  91. ,头部曝光pv AS 头部曝光
  92. ,头部realplay_pv AS 头部realplay
  93. ,头部分享pv AS 头部分享
  94. ,a.当日分发曝光pv AS 推荐曝光数
  95. ,a.当日分发播放pv
  96. ,分发realplay_pv AS 推荐realplay
  97. ,分发realplay_pv / a.当日分发播放pv AS 真实播放率pv
  98. ,当日分发播放uv
  99. ,c.realplay_uv AS 分发真实播uv
  100. ,c.realplay_uv / a.当日分发播放uv AS 真实播放率uv
  101. ,a.当日分发分享pv AS 推荐分享数
  102. ,a.当日分发分享pv / a.当日分发曝光pv AS str
  103. ,NVL(b.当日分发回流人数,0) AS 推荐回流数
  104. ,NVL(b.当日回流进入分发人数,0) AS 当日回流进入分发人数
  105. ,NVL(b.当日回流进入分发曝光次数,0) AS 当日回流进入分发曝光次数
  106. ,NVL(b.当日回流进入分发曝光次数,0) / a.当日分发曝光pv AS vov分子
  107. ,d.头部回流人数
  108. FROM (
  109. SELECT account_name AS 公众号名
  110. ,videoid
  111. ,title
  112. ,COUNT(DISTINCT mid) AS 进入分发人数
  113. ,COUNT(
  114. CASE WHEN pagesource REGEXP 'category$|recommend$|-pages/user-videos-detail$' AND businesstype = 'videoView' THEN mid END
  115. ) AS 当日分发曝光pv
  116. ,COUNT(
  117. CASE WHEN pagesource REGEXP 'pages/user-videos-share$' AND businesstype = 'videoView' THEN mid END
  118. ) AS 头部曝光pv
  119. ,COUNT(
  120. CASE WHEN pagesource REGEXP 'category$|recommend$|-pages/user-videos-detail$' AND businesstype = 'videoPlay' THEN mid END
  121. ) AS 当日分发播放pv
  122. ,COUNT(DISTINCT
  123. CASE WHEN pagesource REGEXP 'category$|recommend$|-pages/user-videos-detail$' AND businesstype = 'videoPlay' THEN mid END
  124. ) AS 当日分发播放uv
  125. ,COUNT(
  126. CASE WHEN pagesource REGEXP 'category$|recommend$|-pages/user-videos-detail$' AND businesstype = 'videoShareFriend' THEN mid END
  127. ) AS 当日分发分享pv
  128. ,COUNT(
  129. CASE WHEN pagesource REGEXP 'pages/user-videos-share$' AND businesstype = 'videoShareFriend' THEN mid END
  130. ) AS 头部分享pv
  131. FROM (
  132. SELECT DISTINCT a.mid
  133. ,a.videoid
  134. ,a.businesstype
  135. ,a.pagesource
  136. ,a.subsessionid
  137. ,account_name
  138. ,e.title
  139. FROM loghubods.video_action_log_rp a
  140. LEFT JOIN loghubods.user_wechat_identity_info_ha b
  141. ON a.mid = CONCAT('weixin_openid_',b.open_id)
  142. AND b.dt = MAX_PT("loghubods.user_wechat_identity_info_ha")
  143. LEFT JOIN loghubods.gzh_fans_info d
  144. ON b.union_id = d.union_id
  145. AND d.dt = MAX_PT("loghubods.gzh_fans_info")
  146. LEFT JOIN videoods.wx_video e
  147. ON a.videoid = e.id
  148. WHERE a.dt >= '{bizdatemin}'
  149. AND a.dt <= '{bizdatemax}'
  150. AND businesstype IN ('videoView','videoPlay','videoShareFriend')
  151. AND d.user_create_time IS NOT NULL
  152. AND account_name = '{account_name}'
  153. AND a.videoid IN (
  154. SELECT
  155. DISTINCT content_id AS videoid
  156. FROM
  157. videoods.content_profile
  158. WHERE status=3
  159. AND is_deleted = 0
  160. )
  161. ) t
  162. GROUP BY 公众号名
  163. ,videoid
  164. ,title
  165. ) a
  166. LEFT JOIN (
  167. SELECT t.account_name AS 公众号名
  168. ,t.videoid
  169. ,COUNT(DISTINCT s.machinecode) AS 当日分发回流人数
  170. ,COUNT(DISTINCT v.mid) AS 当日回流进入分发人数
  171. ,COUNT(v.mid) AS 当日回流进入分发曝光次数
  172. FROM (
  173. SELECT DISTINCT a.subsessionid
  174. ,a.videoid
  175. ,a.mid
  176. ,d.account_name
  177. ,GET_JSON_OBJECT(extparams,'$.recomTraceId') AS recomtraceid
  178. FROM loghubods.video_action_log_rp a
  179. LEFT JOIN loghubods.user_wechat_identity_info_ha b
  180. ON a.mid = CONCAT('weixin_openid_',b.open_id)
  181. AND b.dt = MAX_PT("loghubods.user_wechat_identity_info_ha")
  182. LEFT JOIN loghubods.gzh_fans_info d
  183. ON b.union_id = d.union_id
  184. AND d.dt = MAX_PT("loghubods.gzh_fans_info")
  185. WHERE a.dt >= '{bizdatemin}'
  186. AND a.dt <= '{bizdatemax}'
  187. AND a.businesstype = 'videoShareFriend'
  188. AND a.pagesource REGEXP 'category$|recommend$|-pages/user-videos-detail$'
  189. AND d.user_create_time IS NOT NULL
  190. AND d.account_name = '{account_name}'
  191. ) t
  192. LEFT JOIN (
  193. SELECT DISTINCT subsessionid
  194. ,machinecode
  195. ,recomtraceid
  196. ,clickobjectid
  197. FROM loghubods.user_share_log
  198. WHERE dt >= '{bizdatemin}'
  199. AND dt <= '{bizdatemax}'
  200. AND topic = 'click'
  201. ) s
  202. ON t.recomtraceid = s.recomtraceid
  203. AND t.videoid = s.clickobjectid
  204. LEFT JOIN (
  205. SELECT subsessionid
  206. ,mid
  207. ,videoid
  208. FROM loghubods.video_action_log_rp
  209. WHERE dt >= '{bizdatemin}'
  210. AND dt <= '{bizdatemax}'
  211. AND pagesource REGEXP 'category$|recommend$|-pages/user-videos-detail$'
  212. AND businesstype = 'videoView'
  213. ) v
  214. ON s.subsessionid = v.subsessionid
  215. AND s.machinecode = v.mid
  216. GROUP BY account_name
  217. ,t.videoid
  218. ) b
  219. ON a.公众号名 = b.公众号名
  220. AND a.videoid = b.videoid
  221. LEFT JOIN (
  222. SELECT d.account_name AS 公众号名
  223. ,a.videoid
  224. ,COUNT(DISTINCT a.mid) AS realplay_uv
  225. ,COUNT(
  226. CASE WHEN a.pagesource REGEXP 'category$|recommend$|-pages/user-videos-detail$' THEN a.mid END
  227. ) AS 分发realplay_pv
  228. ,COUNT(CASE WHEN a.pagesource REGEXP 'pages/user-videos-share$' THEN a.mid END) AS 头部realplay_pv
  229. FROM loghubods.ods_video_play_log_day a
  230. LEFT JOIN (
  231. SELECT DISTINCT open_id
  232. ,union_id
  233. FROM loghubods.user_wechat_identity_info_ha
  234. WHERE dt = MAX_PT("loghubods.user_wechat_identity_info_ha")
  235. ) b
  236. ON a.mid = CONCAT('weixin_openid_',b.open_id)
  237. LEFT JOIN loghubods.gzh_fans_info d
  238. ON b.union_id = d.union_id
  239. AND d.dt = MAX_PT("loghubods.gzh_fans_info")
  240. WHERE a.dt >= '{bizdatemin}'
  241. AND a.dt <= '{bizdatemax}'
  242. AND a.businesstype = 'videoRealPlay'
  243. AND d.user_create_time IS NOT NULL
  244. AND d.account_name = '{account_name}'
  245. GROUP BY d.account_name
  246. ,a.videoid
  247. ORDER BY 分发realplay_pv DESC
  248. ) c
  249. ON a.公众号名 = c.公众号名
  250. AND a.videoid = c.videoid
  251. LEFT JOIN (
  252. SELECT t.account_name AS 公众号名
  253. ,t.videoid
  254. ,COUNT(DISTINCT s.machinecode) AS 头部回流人数
  255. FROM (
  256. SELECT DISTINCT a.shareobjectid AS videoid
  257. ,a.shareid
  258. ,a.machinecode
  259. ,d.account_name
  260. FROM loghubods.user_share_log a
  261. LEFT JOIN loghubods.user_wechat_identity_info_ha b
  262. ON a.machinecode = CONCAT('weixin_openid_',b.open_id)
  263. AND b.dt = MAX_PT("loghubods.user_wechat_identity_info_ha")
  264. LEFT JOIN loghubods.gzh_fans_info d
  265. ON b.union_id = d.union_id
  266. AND d.dt = MAX_PT("loghubods.gzh_fans_info")
  267. WHERE a.dt >= '{bizdatemin}'
  268. AND a.dt <= '{bizdatemax}'
  269. AND a.topic = 'share'
  270. AND a.pagesource REGEXP 'pages/user-videos-share$'
  271. AND d.user_create_time IS NOT NULL
  272. AND d.account_name = '{account_name}'
  273. ) t
  274. LEFT JOIN (
  275. SELECT DISTINCT shareid
  276. ,machinecode
  277. ,clickobjectid
  278. FROM loghubods.user_share_log
  279. WHERE dt >= '{bizdatemin}'
  280. AND dt <= '{bizdatemax}'
  281. AND topic = 'click'
  282. ) s
  283. ON t.shareid = s.shareid
  284. GROUP BY account_name
  285. ,t.videoid
  286. ) d
  287. ON a.公众号名 = d.公众号名
  288. AND a.videoid = d.videoid
  289. LEFT JOIN loghubods.video_merge_tag e
  290. ON a.videoid = e.videoid
  291. )
  292. ORDER BY 推荐曝光数 DESC
  293. '''
  294. result_list = []
  295. data = get_odps_data(sql_query)
  296. if data:
  297. for r in data:
  298. result_list.append(
  299. {
  300. "account_name": r[0],
  301. "videoid": r[1],
  302. "一级品类": r[2],
  303. "二级品类": r[3],
  304. "ext_data": {
  305. "头部曝光": r[4],
  306. "头部realplay": r[5],
  307. "头部分享": r[6],
  308. "头部回流数": r[7],
  309. "推荐曝光数": r[8],
  310. "推荐realplay": r[9],
  311. "推荐分享数": r[10],
  312. "推荐回流数": r[11],
  313. "vov分子": r[12],
  314. },
  315. }
  316. )
  317. output_file = Path(__file__).parent / f"{account_name}.json"
  318. with output_file.open("w", encoding="utf-8") as f:
  319. json.dump(result_list, f, ensure_ascii=False, indent=2)
  320. return result_list
  321. if __name__ == '__main__':
  322. result_list = get_changwen_weight('青史铁事漫谈')
  323. print(result_list)