data_query_tools.py 25 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532
  1. from odps import ODPS
  2. from odps.errors import ODPSError
  3. from datetime import date, timedelta
  4. import json
  5. from pathlib import Path
  6. from examples.demand.mysql import mysql_db
  7. def get_odps_data(sql):
  8. # 配置信息
  9. access_id = 'LTAI9EBa0bd5PrDa'
  10. access_key = 'vAalxds7YxhfOA2yVv8GziCg3Y87v5'
  11. project = 'loghubods'
  12. endpoint = 'http://service.odps.aliyun.com/api'
  13. # 1. 初始化 ODPS 入口
  14. o = ODPS(access_id, access_key, project, endpoint=endpoint)
  15. try:
  16. # 2. 执行 SQL 并获取结果
  17. # execute_sql 会等待任务完成,使用 open_reader 读取数据
  18. with o.execute_sql(sql).open_reader() as reader:
  19. # reader 类似于 Java 中的 List<Record>
  20. # 我们可以直接将其转换为 Python 的 list
  21. records = [record for record in reader]
  22. return records
  23. except ODPSError as e:
  24. print(f"ODPS 错误: {e}")
  25. return None
  26. def get_rov_by_merge_leve2_and_video_ids(merge_leve2, video_ids):
  27. merge_level_in_clause = f"'{merge_leve2}'"
  28. video_ids_in_clause = ", ".join([f"'{video_id}'" for video_id in video_ids])
  29. end_date = (date.today() - timedelta(days=1)).strftime("%Y%m%d")
  30. start_date = (date.today() - timedelta(days=14)).strftime("%Y%m%d")
  31. sql_query = f'''
  32. SELECT
  33. v.videoid,
  34. CASE
  35. WHEN COALESCE(SUM(COALESCE(t3.`当日分发曝光pv`, 0)), 0) < 1000 THEN 0
  36. ELSE COALESCE(AVG(NULLIF(t3.rov_t0, 0)), 0)
  37. END AS avg_rov_t0
  38. FROM
  39. (
  40. SELECT
  41. t2.videoid,
  42. t2.merge_leve2
  43. FROM videoods.content_profile t1
  44. JOIN loghubods.video_merge_tag t2
  45. ON t1.content_id = t2.videoid
  46. WHERE
  47. t1.status = 3
  48. AND t1.is_deleted = 0
  49. AND t2.merge_leve2 IN ({merge_level_in_clause})
  50. ) v
  51. LEFT JOIN loghubods.video_dimension_detail_add_column t3
  52. ON v.videoid = t3.视频id
  53. AND t3.dt >= '{start_date}'
  54. AND t3.dt <= '{end_date}'
  55. WHERE v.videoid in ({video_ids_in_clause})
  56. GROUP BY
  57. v.videoid
  58. ;
  59. '''
  60. data = get_odps_data(sql_query)
  61. result_dict = {}
  62. if data:
  63. result_dict = {r[0]: r[1] for r in data}
  64. return result_dict
  65. def get_changwen_weight(account_name):
  66. bizdatemax_date = date.today() - timedelta(days=1)
  67. bizdatemin_date = bizdatemax_date - timedelta(days=30)
  68. bizdatemax = bizdatemax_date.strftime("%Y%m%d")
  69. bizdatemin = bizdatemin_date.strftime("%Y%m%d")
  70. sql_query = f'''
  71. SELECT
  72. 公众号名
  73. ,videoid
  74. ,一级品类
  75. ,二级品类
  76. ,头部曝光
  77. ,头部曝光uv
  78. ,头部realplay
  79. ,头部realplay_uv
  80. ,头部分享
  81. ,头部分享uv
  82. ,头部回流人数 AS 头部回流数
  83. ,推荐曝光数
  84. ,当日分发曝光uv
  85. ,推荐realplay
  86. ,分发realplay_uv
  87. ,推荐分享数
  88. ,当日分发分享uv
  89. ,推荐回流数
  90. ,当日回流进入分发曝光次数 AS vov分子
  91. FROM (
  92. SELECT DISTINCT a.公众号名
  93. ,a.videoid
  94. ,e.merge_leve1 AS 一级品类
  95. ,e.merge_leve2 AS 二级品类
  96. ,a.title
  97. ,a.进入分发人数
  98. ,头部曝光pv AS 头部曝光
  99. ,头部realplay_pv AS 头部realplay
  100. ,头部分享pv AS 头部分享
  101. ,a.当日分发曝光pv AS 推荐曝光数
  102. ,a.当日分发播放pv
  103. ,分发realplay_pv AS 推荐realplay
  104. ,分发realplay_pv / a.当日分发播放pv AS 真实播放率pv
  105. ,当日分发播放uv
  106. ,c.realplay_uv AS 分发真实播uv
  107. ,c.realplay_uv / a.当日分发播放uv AS 真实播放率uv
  108. ,a.当日分发分享pv AS 推荐分享数
  109. ,a.当日分发分享pv / a.当日分发曝光pv AS str
  110. ,NVL(b.当日分发回流人数,0) AS 推荐回流数
  111. ,NVL(b.当日回流进入分发人数,0) AS 当日回流进入分发人数
  112. ,NVL(b.当日回流进入分发曝光次数,0) AS 当日回流进入分发曝光次数
  113. ,NVL(b.当日回流进入分发曝光次数,0) / a.当日分发曝光pv AS vov分子
  114. ,d.头部回流人数
  115. ,当日分发曝光uv
  116. ,头部曝光uv
  117. ,当日分发分享uv
  118. ,头部分享uv
  119. ,分发realplay_uv
  120. ,头部realplay_uv
  121. FROM (
  122. SELECT account_name AS 公众号名
  123. ,videoid
  124. ,title
  125. ,COUNT(DISTINCT mid) AS 进入分发人数
  126. ,COUNT(
  127. CASE WHEN pagesource REGEXP 'category$|recommend$|-pages/user-videos-detail$' AND businesstype = 'videoView' THEN mid END
  128. ) AS 当日分发曝光pv
  129. ,COUNT(DISTINCT
  130. CASE WHEN pagesource REGEXP 'category$|recommend$|-pages/user-videos-detail$' AND businesstype = 'videoView' THEN mid END
  131. ) AS 当日分发曝光uv
  132. ,COUNT(
  133. CASE WHEN pagesource REGEXP 'pages/user-videos-share$' AND businesstype = 'videoView' THEN mid END
  134. ) AS 头部曝光pv
  135. ,COUNT(DISTINCT
  136. CASE WHEN pagesource REGEXP 'pages/user-videos-share$' AND businesstype = 'videoView' THEN mid END
  137. ) AS 头部曝光uv
  138. ,COUNT(
  139. CASE WHEN pagesource REGEXP 'category$|recommend$|-pages/user-videos-detail$' AND businesstype = 'videoPlay' THEN mid END
  140. ) AS 当日分发播放pv
  141. ,COUNT(DISTINCT
  142. CASE WHEN pagesource REGEXP 'category$|recommend$|-pages/user-videos-detail$' AND businesstype = 'videoPlay' THEN mid END
  143. ) AS 当日分发播放uv
  144. ,COUNT(
  145. CASE WHEN pagesource REGEXP 'category$|recommend$|-pages/user-videos-detail$' AND businesstype = 'videoShareFriend' THEN mid END
  146. ) AS 当日分发分享pv
  147. ,COUNT(DISTINCT
  148. CASE WHEN pagesource REGEXP 'category$|recommend$|-pages/user-videos-detail$' AND businesstype = 'videoShareFriend' THEN mid END
  149. ) AS 当日分发分享uv
  150. ,COUNT(
  151. CASE WHEN pagesource REGEXP 'pages/user-videos-share$' AND businesstype = 'videoShareFriend' THEN mid END
  152. ) AS 头部分享pv
  153. ,COUNT(DISTINCT
  154. CASE WHEN pagesource REGEXP 'pages/user-videos-share$' AND businesstype = 'videoShareFriend' THEN mid END
  155. ) AS 头部分享uv
  156. FROM (
  157. SELECT DISTINCT a.mid
  158. ,a.videoid
  159. ,a.businesstype
  160. ,a.pagesource
  161. ,a.subsessionid
  162. ,account_name
  163. ,e.title
  164. FROM loghubods.video_action_log_rp a
  165. LEFT JOIN loghubods.user_wechat_identity_info_ha b
  166. ON a.mid = CONCAT('weixin_openid_',b.open_id)
  167. AND b.dt = MAX_PT("loghubods.user_wechat_identity_info_ha")
  168. LEFT JOIN loghubods.gzh_fans_info d
  169. ON b.union_id = d.union_id
  170. AND d.dt = MAX_PT("loghubods.gzh_fans_info")
  171. LEFT JOIN videoods.wx_video e
  172. ON a.videoid = e.id
  173. WHERE a.dt >= '{bizdatemin}'
  174. AND a.dt <= '{bizdatemax}'
  175. AND businesstype IN ('videoView','videoPlay','videoShareFriend')
  176. AND d.user_create_time IS NOT NULL
  177. AND account_name = '{account_name}'
  178. AND a.videoid IN (
  179. SELECT
  180. DISTINCT content_id AS videoid
  181. FROM
  182. videoods.content_profile
  183. WHERE status=3
  184. AND is_deleted = 0
  185. )
  186. ) t
  187. GROUP BY 公众号名
  188. ,videoid
  189. ,title
  190. ) a
  191. LEFT JOIN (
  192. SELECT t.account_name AS 公众号名
  193. ,t.videoid
  194. ,COUNT(DISTINCT s.machinecode) AS 当日分发回流人数
  195. ,COUNT(DISTINCT v.mid) AS 当日回流进入分发人数
  196. ,COUNT(v.mid) AS 当日回流进入分发曝光次数
  197. FROM (
  198. SELECT DISTINCT a.subsessionid
  199. ,a.videoid
  200. ,a.mid
  201. ,d.account_name
  202. ,GET_JSON_OBJECT(extparams,'$.recomTraceId') AS recomtraceid
  203. FROM loghubods.video_action_log_rp a
  204. LEFT JOIN loghubods.user_wechat_identity_info_ha b
  205. ON a.mid = CONCAT('weixin_openid_',b.open_id)
  206. AND b.dt = MAX_PT("loghubods.user_wechat_identity_info_ha")
  207. LEFT JOIN loghubods.gzh_fans_info d
  208. ON b.union_id = d.union_id
  209. AND d.dt = MAX_PT("loghubods.gzh_fans_info")
  210. WHERE a.dt >= '{bizdatemin}'
  211. AND a.dt <= '{bizdatemax}'
  212. AND a.businesstype = 'videoShareFriend'
  213. AND a.pagesource REGEXP 'category$|recommend$|-pages/user-videos-detail$'
  214. AND d.user_create_time IS NOT NULL
  215. AND d.account_name = '{account_name}'
  216. ) t
  217. LEFT JOIN (
  218. SELECT DISTINCT subsessionid
  219. ,machinecode
  220. ,recomtraceid
  221. ,clickobjectid
  222. FROM loghubods.user_share_log
  223. WHERE dt >= '{bizdatemin}'
  224. AND dt <= '{bizdatemax}'
  225. AND topic = 'click'
  226. ) s
  227. ON t.recomtraceid = s.recomtraceid
  228. AND t.videoid = s.clickobjectid
  229. LEFT JOIN (
  230. SELECT subsessionid
  231. ,mid
  232. ,videoid
  233. FROM loghubods.video_action_log_rp
  234. WHERE dt >= '{bizdatemin}'
  235. AND dt <= '{bizdatemax}'
  236. AND pagesource REGEXP 'category$|recommend$|-pages/user-videos-detail$'
  237. AND businesstype = 'videoView'
  238. ) v
  239. ON s.subsessionid = v.subsessionid
  240. AND s.machinecode = v.mid
  241. GROUP BY account_name
  242. ,t.videoid
  243. ) b
  244. ON a.公众号名 = b.公众号名
  245. AND a.videoid = b.videoid
  246. LEFT JOIN (
  247. SELECT d.account_name AS 公众号名
  248. ,a.videoid
  249. ,COUNT(DISTINCT a.mid) AS realplay_uv
  250. ,COUNT(
  251. CASE WHEN a.pagesource REGEXP 'category$|recommend$|-pages/user-videos-detail$' THEN a.mid END
  252. ) AS 分发realplay_pv
  253. ,COUNT(CASE WHEN a.pagesource REGEXP 'pages/user-videos-share$' THEN a.mid END) AS 头部realplay_pv
  254. ,COUNT(DISTINCT
  255. CASE WHEN a.pagesource REGEXP 'category$|recommend$|-pages/user-videos-detail$' THEN a.mid END
  256. ) AS 分发realplay_uv
  257. ,COUNT(DISTINCT CASE WHEN a.pagesource REGEXP 'pages/user-videos-share$' THEN a.mid END) AS 头部realplay_uv
  258. FROM loghubods.ods_video_play_log_day a
  259. LEFT JOIN (
  260. SELECT DISTINCT open_id
  261. ,union_id
  262. FROM loghubods.user_wechat_identity_info_ha
  263. WHERE dt = MAX_PT("loghubods.user_wechat_identity_info_ha")
  264. ) b
  265. ON a.mid = CONCAT('weixin_openid_',b.open_id)
  266. LEFT JOIN loghubods.gzh_fans_info d
  267. ON b.union_id = d.union_id
  268. AND d.dt = MAX_PT("loghubods.gzh_fans_info")
  269. WHERE a.dt >= '{bizdatemin}'
  270. AND a.dt <= '{bizdatemax}'
  271. AND a.businesstype = 'videoRealPlay'
  272. AND d.user_create_time IS NOT NULL
  273. AND d.account_name = '{account_name}'
  274. GROUP BY d.account_name
  275. ,a.videoid
  276. ORDER BY 分发realplay_pv DESC
  277. ) c
  278. ON a.公众号名 = c.公众号名
  279. AND a.videoid = c.videoid
  280. LEFT JOIN (
  281. SELECT t.account_name AS 公众号名
  282. ,t.videoid
  283. ,COUNT(DISTINCT s.machinecode) AS 头部回流人数
  284. FROM (
  285. SELECT DISTINCT a.shareobjectid AS videoid
  286. ,a.shareid
  287. ,a.machinecode
  288. ,d.account_name
  289. FROM loghubods.user_share_log a
  290. LEFT JOIN loghubods.user_wechat_identity_info_ha b
  291. ON a.machinecode = CONCAT('weixin_openid_',b.open_id)
  292. AND b.dt = MAX_PT("loghubods.user_wechat_identity_info_ha")
  293. LEFT JOIN loghubods.gzh_fans_info d
  294. ON b.union_id = d.union_id
  295. AND d.dt = MAX_PT("loghubods.gzh_fans_info")
  296. WHERE a.dt >= '{bizdatemin}'
  297. AND a.dt <= '{bizdatemax}'
  298. AND a.topic = 'share'
  299. AND a.pagesource REGEXP 'pages/user-videos-share$'
  300. AND d.user_create_time IS NOT NULL
  301. AND d.account_name = '{account_name}'
  302. ) t
  303. LEFT JOIN (
  304. SELECT DISTINCT shareid
  305. ,machinecode
  306. ,clickobjectid
  307. FROM loghubods.user_share_log
  308. WHERE dt >= '{bizdatemin}'
  309. AND dt <= '{bizdatemax}'
  310. AND topic = 'click'
  311. ) s
  312. ON t.shareid = s.shareid
  313. GROUP BY account_name
  314. ,t.videoid
  315. ) d
  316. ON a.公众号名 = d.公众号名
  317. AND a.videoid = d.videoid
  318. LEFT JOIN loghubods.video_merge_tag e
  319. ON a.videoid = e.videoid
  320. )
  321. ORDER BY 推荐曝光数 DESC
  322. '''
  323. result_list = []
  324. data = get_odps_data(sql_query)
  325. if data:
  326. for r in data:
  327. result_list.append(
  328. {
  329. "account_name": r[0],
  330. "videoid": r[1],
  331. "一级品类": r[2],
  332. "二级品类": r[3],
  333. "ext_data": {
  334. "头部曝光": r[4],
  335. "头部曝光uv": r[5],
  336. "头部realplay": r[6],
  337. "头部realplay_uv": r[7],
  338. "头部分享": r[8],
  339. "头部分享uv": r[9],
  340. "头部回流数": r[10],
  341. "推荐曝光数": r[11],
  342. "当日分发曝光uv": r[12],
  343. "推荐realplay": r[13],
  344. "分发realplay_uv": r[14],
  345. "推荐分享数": r[15],
  346. "当日分发分享uv": r[16],
  347. "推荐回流数": r[17],
  348. "vov分子": r[18],
  349. },
  350. }
  351. )
  352. # 输出到 examples/demand/data/changwen_data/
  353. output_dir = Path(__file__).parent / "data" / "changwen_data"
  354. output_dir.mkdir(parents=True, exist_ok=True)
  355. output_file = output_dir / f"{account_name}.json"
  356. with output_file.open("w", encoding="utf-8") as f:
  357. json.dump(result_list, f, ensure_ascii=False, indent=2)
  358. return result_list
  359. def get_zengzhang_weight(account_name):
  360. bizdatemax_date = date.today() - timedelta(days=1)
  361. bizdatemin_date = bizdatemax_date - timedelta(days=30)
  362. bizdatemax = bizdatemax_date.strftime("%Y%m%d")
  363. bizdatemin = bizdatemin_date.strftime("%Y%m%d")
  364. sql_query = f'''
  365. SELECT 合作方名
  366. ,合作方简称
  367. ,videoid
  368. ,一级品类
  369. ,二级品类
  370. ,SUM(头部曝光) as 头部曝光
  371. ,SUM(头部曝光uv) as 头部曝光uv
  372. ,SUM(头部realplay) as 头部realplay
  373. ,SUM(头部realplay_uv) as 头部realplay_uv
  374. ,SUM(头部分享) as 头部分享
  375. ,SUM(头部分享uv) as 头部分享uv
  376. ,SUM(头部回流数) as 头部回流数
  377. ,SUM(推荐曝光数) as 推荐曝光数
  378. ,SUM(当日分发曝光uv) as 当日分发曝光uv
  379. ,SUM(推荐realplay) as 推荐realplay
  380. ,SUM(分发realplay_uv) as 分发realplay_uv
  381. ,SUM(推荐分享数) as 推荐分享数
  382. ,SUM(当日分发分享uv) as 当日分发分享uv
  383. ,SUM(推荐回流数) as 推荐回流数
  384. ,SUM(vov分子) as vov分子
  385. FROM loghubods.dws_growth_partner_vid_data
  386. WHERE dt BETWEEN '{bizdatemin}' AND '{bizdatemax}'
  387. AND 合作方名 = '{account_name}'
  388. GROUP BY 合作方名
  389. ,合作方简称
  390. ,videoid
  391. ,一级品类
  392. ,二级品类
  393. ORDER BY SUM(推荐曝光数)
  394. ;
  395. '''
  396. result_list = []
  397. data = get_odps_data(sql_query)
  398. if data:
  399. for r in data:
  400. result_list.append(
  401. {
  402. "account_name": r[0],
  403. "合作方简称": r[1],
  404. "videoid": r[2],
  405. "一级品类": r[3],
  406. "二级品类": r[4],
  407. "ext_data": {
  408. "头部曝光": r[5],
  409. "头部曝光uv": r[6],
  410. "头部realplay": r[7],
  411. "头部realplay_uv": r[8],
  412. "头部分享": r[9],
  413. "头部分享uv": r[10],
  414. "头部回流数": r[11],
  415. "推荐曝光数": r[12],
  416. "当日分发曝光uv": r[13],
  417. "推荐realplay": r[14],
  418. "分发realplay_uv": r[15],
  419. "推荐分享数": r[16],
  420. "当日分发分享uv": r[17],
  421. "推荐回流数": r[18],
  422. "vov分子": r[19],
  423. },
  424. }
  425. )
  426. # 输出到 examples/demand/data/zengzhang_data/
  427. output_dir = Path(__file__).parent / "data" / "zengzhang_data"
  428. output_dir.mkdir(parents=True, exist_ok=True)
  429. output_file = output_dir / f"{account_name}.json"
  430. with output_file.open("w", encoding="utf-8") as f:
  431. json.dump(result_list, f, ensure_ascii=False, indent=2)
  432. return result_list
  433. def get_merge_leve2_by_video_ids(video_ids, batch_size=2000):
  434. result = {}
  435. if not video_ids:
  436. return result
  437. normalized_ids = [str(video_id) for video_id in video_ids if video_id is not None]
  438. for i in range(0, len(normalized_ids), batch_size):
  439. batch_ids = normalized_ids[i:i + batch_size]
  440. escaped_ids = [video_id.replace("'", "''") for video_id in batch_ids]
  441. video_ids_in_clause = ", ".join([f"'{video_id}'" for video_id in escaped_ids])
  442. sql_query = f'''
  443. SELECT videoid, merge_leve2
  444. FROM loghubods.video_merge_tag
  445. WHERE videoid IN ({video_ids_in_clause})
  446. '''
  447. data = get_odps_data(sql_query)
  448. if not data:
  449. continue
  450. for row in data:
  451. result[str(row[0])] = row[1]
  452. return result
  453. def get_all_decode_task_result_rows():
  454. return mysql_db.select(
  455. "workflow_decode_task_result",
  456. columns="id, channel_content_id, merge_leve2",
  457. )
  458. def update_decode_task_result_merge_leve2(channel_content_id, merge_leve2):
  459. return mysql_db.update(
  460. "workflow_decode_task_result",
  461. {"merge_leve2": str(merge_leve2)},
  462. "channel_content_id = %s",
  463. (str(channel_content_id),),
  464. )
  465. def backfill_merge_leve2_for_decode_task_result():
  466. rows = get_all_decode_task_result_rows()
  467. updated_count = 0
  468. skipped_count = 0
  469. valid_content_ids = []
  470. for row in rows:
  471. channel_content_id = row.get("channel_content_id")
  472. if channel_content_id is None:
  473. skipped_count += 1
  474. continue
  475. channel_content_id = str(channel_content_id)
  476. if len(channel_content_id) > 8:
  477. skipped_count += 1
  478. continue
  479. valid_content_ids.append(channel_content_id)
  480. merge_leve2_map = get_merge_leve2_by_video_ids(valid_content_ids, batch_size=2000)
  481. for channel_content_id in valid_content_ids:
  482. merge_leve2 = merge_leve2_map.get(channel_content_id)
  483. if not merge_leve2:
  484. continue
  485. affected = update_decode_task_result_merge_leve2(channel_content_id, merge_leve2)
  486. if affected > 0:
  487. updated_count += affected
  488. return {
  489. "total": len(rows),
  490. "updated": updated_count,
  491. "skipped": skipped_count,
  492. }
  493. if __name__ == '__main__':
  494. backfill_merge_leve2_for_decode_task_result()