data_query_tools.py 22 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454
  1. from odps import ODPS
  2. from odps.errors import ODPSError
  3. from datetime import date, timedelta
  4. import json
  5. from pathlib import Path
  6. from agent import tool
  7. from examples.demand.mysql import mysql_db
  8. def get_odps_data(sql):
  9. # 配置信息
  10. access_id = 'LTAI9EBa0bd5PrDa'
  11. access_key = 'vAalxds7YxhfOA2yVv8GziCg3Y87v5'
  12. project = 'loghubods'
  13. endpoint = 'http://service.odps.aliyun.com/api'
  14. # 1. 初始化 ODPS 入口
  15. o = ODPS(access_id, access_key, project, endpoint=endpoint)
  16. try:
  17. # 2. 执行 SQL 并获取结果
  18. # execute_sql 会等待任务完成,使用 open_reader 读取数据
  19. with o.execute_sql(sql).open_reader() as reader:
  20. # reader 类似于 Java 中的 List<Record>
  21. # 我们可以直接将其转换为 Python 的 list
  22. records = [record for record in reader]
  23. return records
  24. except ODPSError as e:
  25. print(f"ODPS 错误: {e}")
  26. return None
  27. def get_rov_by_merge_leve2_and_video_ids(merge_leve2, video_ids):
  28. merge_level_in_clause = f"'{merge_leve2}'"
  29. video_ids_in_clause = ", ".join([f"'{video_id}'" for video_id in video_ids])
  30. end_date = (date.today() - timedelta(days=1)).strftime("%Y%m%d")
  31. start_date = (date.today() - timedelta(days=14)).strftime("%Y%m%d")
  32. sql_query = f'''
  33. SELECT
  34. v.videoid,
  35. CASE
  36. WHEN COALESCE(SUM(COALESCE(t3.`当日分发曝光pv`, 0)), 0) < 1000 THEN 0
  37. ELSE COALESCE(AVG(NULLIF(t3.rov_t0, 0)), 0)
  38. END AS avg_rov_t0
  39. FROM
  40. (
  41. SELECT
  42. t2.videoid,
  43. t2.merge_leve2
  44. FROM videoods.content_profile t1
  45. JOIN loghubods.video_merge_tag t2
  46. ON t1.content_id = t2.videoid
  47. WHERE
  48. t1.status = 3
  49. AND t1.is_deleted = 0
  50. AND t2.merge_leve2 IN ({merge_level_in_clause})
  51. ) v
  52. LEFT JOIN loghubods.video_dimension_detail_add_column t3
  53. ON v.videoid = t3.视频id
  54. AND t3.dt >= '{start_date}'
  55. AND t3.dt <= '{end_date}'
  56. WHERE v.videoid in ({video_ids_in_clause})
  57. GROUP BY
  58. v.videoid
  59. ;
  60. '''
  61. data = get_odps_data(sql_query)
  62. result_dict = {}
  63. if data:
  64. result_dict = {r[0]: r[1] for r in data}
  65. return result_dict
  66. def get_changwen_weight(account_name):
  67. bizdatemax_date = date.today() - timedelta(days=1)
  68. bizdatemin_date = bizdatemax_date - timedelta(days=30)
  69. bizdatemax = bizdatemax_date.strftime("%Y%m%d")
  70. bizdatemin = bizdatemin_date.strftime("%Y%m%d")
  71. sql_query = f'''
  72. SELECT
  73. 公众号名
  74. ,videoid
  75. ,一级品类
  76. ,二级品类
  77. ,头部曝光
  78. ,头部曝光uv
  79. ,头部realplay
  80. ,头部realplay_uv
  81. ,头部分享
  82. ,头部分享uv
  83. ,头部回流人数 AS 头部回流数
  84. ,推荐曝光数
  85. ,当日分发曝光uv
  86. ,推荐realplay
  87. ,分发realplay_uv
  88. ,推荐分享数
  89. ,当日分发分享uv
  90. ,推荐回流数
  91. ,当日回流进入分发曝光次数 AS vov分子
  92. FROM (
  93. SELECT DISTINCT a.公众号名
  94. ,a.videoid
  95. ,e.merge_leve1 AS 一级品类
  96. ,e.merge_leve2 AS 二级品类
  97. ,a.title
  98. ,a.进入分发人数
  99. ,头部曝光pv AS 头部曝光
  100. ,头部realplay_pv AS 头部realplay
  101. ,头部分享pv AS 头部分享
  102. ,a.当日分发曝光pv AS 推荐曝光数
  103. ,a.当日分发播放pv
  104. ,分发realplay_pv AS 推荐realplay
  105. ,分发realplay_pv / a.当日分发播放pv AS 真实播放率pv
  106. ,当日分发播放uv
  107. ,c.realplay_uv AS 分发真实播uv
  108. ,c.realplay_uv / a.当日分发播放uv AS 真实播放率uv
  109. ,a.当日分发分享pv AS 推荐分享数
  110. ,a.当日分发分享pv / a.当日分发曝光pv AS str
  111. ,NVL(b.当日分发回流人数,0) AS 推荐回流数
  112. ,NVL(b.当日回流进入分发人数,0) AS 当日回流进入分发人数
  113. ,NVL(b.当日回流进入分发曝光次数,0) AS 当日回流进入分发曝光次数
  114. ,NVL(b.当日回流进入分发曝光次数,0) / a.当日分发曝光pv AS vov分子
  115. ,d.头部回流人数
  116. ,当日分发曝光uv
  117. ,头部曝光uv
  118. ,当日分发分享uv
  119. ,头部分享uv
  120. ,分发realplay_uv
  121. ,头部realplay_uv
  122. FROM (
  123. SELECT account_name AS 公众号名
  124. ,videoid
  125. ,title
  126. ,COUNT(DISTINCT mid) AS 进入分发人数
  127. ,COUNT(
  128. CASE WHEN pagesource REGEXP 'category$|recommend$|-pages/user-videos-detail$' AND businesstype = 'videoView' THEN mid END
  129. ) AS 当日分发曝光pv
  130. ,COUNT(DISTINCT
  131. CASE WHEN pagesource REGEXP 'category$|recommend$|-pages/user-videos-detail$' AND businesstype = 'videoView' THEN mid END
  132. ) AS 当日分发曝光uv
  133. ,COUNT(
  134. CASE WHEN pagesource REGEXP 'pages/user-videos-share$' AND businesstype = 'videoView' THEN mid END
  135. ) AS 头部曝光pv
  136. ,COUNT(DISTINCT
  137. CASE WHEN pagesource REGEXP 'pages/user-videos-share$' AND businesstype = 'videoView' THEN mid END
  138. ) AS 头部曝光uv
  139. ,COUNT(
  140. CASE WHEN pagesource REGEXP 'category$|recommend$|-pages/user-videos-detail$' AND businesstype = 'videoPlay' THEN mid END
  141. ) AS 当日分发播放pv
  142. ,COUNT(DISTINCT
  143. CASE WHEN pagesource REGEXP 'category$|recommend$|-pages/user-videos-detail$' AND businesstype = 'videoPlay' THEN mid END
  144. ) AS 当日分发播放uv
  145. ,COUNT(
  146. CASE WHEN pagesource REGEXP 'category$|recommend$|-pages/user-videos-detail$' AND businesstype = 'videoShareFriend' THEN mid END
  147. ) AS 当日分发分享pv
  148. ,COUNT(DISTINCT
  149. CASE WHEN pagesource REGEXP 'category$|recommend$|-pages/user-videos-detail$' AND businesstype = 'videoShareFriend' THEN mid END
  150. ) AS 当日分发分享uv
  151. ,COUNT(
  152. CASE WHEN pagesource REGEXP 'pages/user-videos-share$' AND businesstype = 'videoShareFriend' THEN mid END
  153. ) AS 头部分享pv
  154. ,COUNT(DISTINCT
  155. CASE WHEN pagesource REGEXP 'pages/user-videos-share$' AND businesstype = 'videoShareFriend' THEN mid END
  156. ) AS 头部分享uv
  157. FROM (
  158. SELECT DISTINCT a.mid
  159. ,a.videoid
  160. ,a.businesstype
  161. ,a.pagesource
  162. ,a.subsessionid
  163. ,account_name
  164. ,e.title
  165. FROM loghubods.video_action_log_rp a
  166. LEFT JOIN loghubods.user_wechat_identity_info_ha b
  167. ON a.mid = CONCAT('weixin_openid_',b.open_id)
  168. AND b.dt = MAX_PT("loghubods.user_wechat_identity_info_ha")
  169. LEFT JOIN loghubods.gzh_fans_info d
  170. ON b.union_id = d.union_id
  171. AND d.dt = MAX_PT("loghubods.gzh_fans_info")
  172. LEFT JOIN videoods.wx_video e
  173. ON a.videoid = e.id
  174. WHERE a.dt >= '{bizdatemin}'
  175. AND a.dt <= '{bizdatemax}'
  176. AND businesstype IN ('videoView','videoPlay','videoShareFriend')
  177. AND d.user_create_time IS NOT NULL
  178. AND account_name = '{account_name}'
  179. AND a.videoid IN (
  180. SELECT
  181. DISTINCT content_id AS videoid
  182. FROM
  183. videoods.content_profile
  184. WHERE status=3
  185. AND is_deleted = 0
  186. )
  187. ) t
  188. GROUP BY 公众号名
  189. ,videoid
  190. ,title
  191. ) a
  192. LEFT JOIN (
  193. SELECT t.account_name AS 公众号名
  194. ,t.videoid
  195. ,COUNT(DISTINCT s.machinecode) AS 当日分发回流人数
  196. ,COUNT(DISTINCT v.mid) AS 当日回流进入分发人数
  197. ,COUNT(v.mid) AS 当日回流进入分发曝光次数
  198. FROM (
  199. SELECT DISTINCT a.subsessionid
  200. ,a.videoid
  201. ,a.mid
  202. ,d.account_name
  203. ,GET_JSON_OBJECT(extparams,'$.recomTraceId') AS recomtraceid
  204. FROM loghubods.video_action_log_rp a
  205. LEFT JOIN loghubods.user_wechat_identity_info_ha b
  206. ON a.mid = CONCAT('weixin_openid_',b.open_id)
  207. AND b.dt = MAX_PT("loghubods.user_wechat_identity_info_ha")
  208. LEFT JOIN loghubods.gzh_fans_info d
  209. ON b.union_id = d.union_id
  210. AND d.dt = MAX_PT("loghubods.gzh_fans_info")
  211. WHERE a.dt >= '{bizdatemin}'
  212. AND a.dt <= '{bizdatemax}'
  213. AND a.businesstype = 'videoShareFriend'
  214. AND a.pagesource REGEXP 'category$|recommend$|-pages/user-videos-detail$'
  215. AND d.user_create_time IS NOT NULL
  216. AND d.account_name = '{account_name}'
  217. ) t
  218. LEFT JOIN (
  219. SELECT DISTINCT subsessionid
  220. ,machinecode
  221. ,recomtraceid
  222. ,clickobjectid
  223. FROM loghubods.user_share_log
  224. WHERE dt >= '{bizdatemin}'
  225. AND dt <= '{bizdatemax}'
  226. AND topic = 'click'
  227. ) s
  228. ON t.recomtraceid = s.recomtraceid
  229. AND t.videoid = s.clickobjectid
  230. LEFT JOIN (
  231. SELECT subsessionid
  232. ,mid
  233. ,videoid
  234. FROM loghubods.video_action_log_rp
  235. WHERE dt >= '{bizdatemin}'
  236. AND dt <= '{bizdatemax}'
  237. AND pagesource REGEXP 'category$|recommend$|-pages/user-videos-detail$'
  238. AND businesstype = 'videoView'
  239. ) v
  240. ON s.subsessionid = v.subsessionid
  241. AND s.machinecode = v.mid
  242. GROUP BY account_name
  243. ,t.videoid
  244. ) b
  245. ON a.公众号名 = b.公众号名
  246. AND a.videoid = b.videoid
  247. LEFT JOIN (
  248. SELECT d.account_name AS 公众号名
  249. ,a.videoid
  250. ,COUNT(DISTINCT a.mid) AS realplay_uv
  251. ,COUNT(
  252. CASE WHEN a.pagesource REGEXP 'category$|recommend$|-pages/user-videos-detail$' THEN a.mid END
  253. ) AS 分发realplay_pv
  254. ,COUNT(CASE WHEN a.pagesource REGEXP 'pages/user-videos-share$' THEN a.mid END) AS 头部realplay_pv
  255. ,COUNT(DISTINCT
  256. CASE WHEN a.pagesource REGEXP 'category$|recommend$|-pages/user-videos-detail$' THEN a.mid END
  257. ) AS 分发realplay_uv
  258. ,COUNT(DISTINCT CASE WHEN a.pagesource REGEXP 'pages/user-videos-share$' THEN a.mid END) AS 头部realplay_uv
  259. FROM loghubods.ods_video_play_log_day a
  260. LEFT JOIN (
  261. SELECT DISTINCT open_id
  262. ,union_id
  263. FROM loghubods.user_wechat_identity_info_ha
  264. WHERE dt = MAX_PT("loghubods.user_wechat_identity_info_ha")
  265. ) b
  266. ON a.mid = CONCAT('weixin_openid_',b.open_id)
  267. LEFT JOIN loghubods.gzh_fans_info d
  268. ON b.union_id = d.union_id
  269. AND d.dt = MAX_PT("loghubods.gzh_fans_info")
  270. WHERE a.dt >= '{bizdatemin}'
  271. AND a.dt <= '{bizdatemax}'
  272. AND a.businesstype = 'videoRealPlay'
  273. AND d.user_create_time IS NOT NULL
  274. AND d.account_name = '{account_name}'
  275. GROUP BY d.account_name
  276. ,a.videoid
  277. ORDER BY 分发realplay_pv DESC
  278. ) c
  279. ON a.公众号名 = c.公众号名
  280. AND a.videoid = c.videoid
  281. LEFT JOIN (
  282. SELECT t.account_name AS 公众号名
  283. ,t.videoid
  284. ,COUNT(DISTINCT s.machinecode) AS 头部回流人数
  285. FROM (
  286. SELECT DISTINCT a.shareobjectid AS videoid
  287. ,a.shareid
  288. ,a.machinecode
  289. ,d.account_name
  290. FROM loghubods.user_share_log a
  291. LEFT JOIN loghubods.user_wechat_identity_info_ha b
  292. ON a.machinecode = CONCAT('weixin_openid_',b.open_id)
  293. AND b.dt = MAX_PT("loghubods.user_wechat_identity_info_ha")
  294. LEFT JOIN loghubods.gzh_fans_info d
  295. ON b.union_id = d.union_id
  296. AND d.dt = MAX_PT("loghubods.gzh_fans_info")
  297. WHERE a.dt >= '{bizdatemin}'
  298. AND a.dt <= '{bizdatemax}'
  299. AND a.topic = 'share'
  300. AND a.pagesource REGEXP 'pages/user-videos-share$'
  301. AND d.user_create_time IS NOT NULL
  302. AND d.account_name = '{account_name}'
  303. ) t
  304. LEFT JOIN (
  305. SELECT DISTINCT shareid
  306. ,machinecode
  307. ,clickobjectid
  308. FROM loghubods.user_share_log
  309. WHERE dt >= '{bizdatemin}'
  310. AND dt <= '{bizdatemax}'
  311. AND topic = 'click'
  312. ) s
  313. ON t.shareid = s.shareid
  314. GROUP BY account_name
  315. ,t.videoid
  316. ) d
  317. ON a.公众号名 = d.公众号名
  318. AND a.videoid = d.videoid
  319. LEFT JOIN loghubods.video_merge_tag e
  320. ON a.videoid = e.videoid
  321. )
  322. ORDER BY 推荐曝光数 DESC
  323. '''
  324. result_list = []
  325. data = get_odps_data(sql_query)
  326. if data:
  327. for r in data:
  328. result_list.append(
  329. {
  330. "account_name": r[0],
  331. "videoid": r[1],
  332. "一级品类": r[2],
  333. "二级品类": r[3],
  334. "ext_data": {
  335. "头部曝光": r[4],
  336. "头部曝光uv": r[5],
  337. "头部realplay": r[6],
  338. "头部realplay_uv": r[7],
  339. "头部分享": r[8],
  340. "头部分享uv": r[9],
  341. "头部回流数": r[10],
  342. "推荐曝光数": r[11],
  343. "当日分发曝光uv": r[12],
  344. "推荐realplay": r[13],
  345. "分发realplay_uv": r[14],
  346. "推荐分享数": r[15],
  347. "当日分发分享uv": r[16],
  348. "推荐回流数": r[17],
  349. "vov分子": r[18],
  350. },
  351. }
  352. )
  353. # 输出到 examples/demand/data/changwen_data/
  354. output_dir = Path(__file__).parent / "data" / "changwen_data"
  355. output_dir.mkdir(parents=True, exist_ok=True)
  356. output_file = output_dir / f"{account_name}.json"
  357. with output_file.open("w", encoding="utf-8") as f:
  358. json.dump(result_list, f, ensure_ascii=False, indent=2)
  359. return result_list
  360. def get_merge_leve2_by_video_ids(video_ids, batch_size=2000):
  361. result = {}
  362. if not video_ids:
  363. return result
  364. normalized_ids = [str(video_id) for video_id in video_ids if video_id is not None]
  365. for i in range(0, len(normalized_ids), batch_size):
  366. batch_ids = normalized_ids[i:i + batch_size]
  367. escaped_ids = [video_id.replace("'", "''") for video_id in batch_ids]
  368. video_ids_in_clause = ", ".join([f"'{video_id}'" for video_id in escaped_ids])
  369. sql_query = f'''
  370. SELECT videoid, merge_leve2
  371. FROM loghubods.video_merge_tag
  372. WHERE videoid IN ({video_ids_in_clause})
  373. '''
  374. data = get_odps_data(sql_query)
  375. if not data:
  376. continue
  377. for row in data:
  378. result[str(row[0])] = row[1]
  379. return result
  380. def get_all_decode_task_result_rows():
  381. return mysql_db.select(
  382. "workflow_decode_task_result",
  383. columns="id, channel_content_id, merge_leve2",
  384. )
  385. def update_decode_task_result_merge_leve2(channel_content_id, merge_leve2):
  386. return mysql_db.update(
  387. "workflow_decode_task_result",
  388. {"merge_leve2": str(merge_leve2)},
  389. "channel_content_id = %s",
  390. (str(channel_content_id),),
  391. )
  392. def backfill_merge_leve2_for_decode_task_result():
  393. rows = get_all_decode_task_result_rows()
  394. updated_count = 0
  395. skipped_count = 0
  396. valid_content_ids = []
  397. for row in rows:
  398. channel_content_id = row.get("channel_content_id")
  399. if channel_content_id is None:
  400. skipped_count += 1
  401. continue
  402. channel_content_id = str(channel_content_id)
  403. if len(channel_content_id) > 8:
  404. skipped_count += 1
  405. continue
  406. valid_content_ids.append(channel_content_id)
  407. merge_leve2_map = get_merge_leve2_by_video_ids(valid_content_ids, batch_size=2000)
  408. for channel_content_id in valid_content_ids:
  409. merge_leve2 = merge_leve2_map.get(channel_content_id)
  410. if not merge_leve2:
  411. continue
  412. affected = update_decode_task_result_merge_leve2(channel_content_id, merge_leve2)
  413. if affected > 0:
  414. updated_count += affected
  415. return {
  416. "total": len(rows),
  417. "updated": updated_count,
  418. "skipped": skipped_count,
  419. }
  420. if __name__ == '__main__':
  421. backfill_merge_leve2_for_decode_task_result()