data_query_tools.py 29 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640
  1. from zoneinfo import ZoneInfo
  2. from odps import ODPS
  3. from odps.errors import ODPSError
  4. from datetime import date, datetime, timedelta
  5. import json
  6. from pathlib import Path
  7. from examples.demand.mysql import mysql_db
  8. def get_odps_data(sql):
  9. # 配置信息
  10. access_id = 'LTAI9EBa0bd5PrDa'
  11. access_key = 'vAalxds7YxhfOA2yVv8GziCg3Y87v5'
  12. project = 'loghubods'
  13. endpoint = 'http://service.odps.aliyun.com/api'
  14. # 1. 初始化 ODPS 入口
  15. o = ODPS(access_id, access_key, project, endpoint=endpoint)
  16. try:
  17. # 2. 执行 SQL 并获取结果
  18. # execute_sql 会等待任务完成,使用 open_reader 读取数据
  19. with o.execute_sql(sql).open_reader() as reader:
  20. # reader 类似于 Java 中的 List<Record>
  21. # 我们可以直接将其转换为 Python 的 list
  22. records = [record for record in reader]
  23. return records
  24. except ODPSError as e:
  25. print(f"ODPS 错误: {e}")
  26. return None
  27. def get_demand_merge_level2_names():
  28. date_time = datetime.now(ZoneInfo("Asia/Shanghai")).date() - timedelta(days=1)
  29. day = date_time.strftime("%Y%m%d")
  30. count = 500
  31. sql_query = f'''
  32. select *
  33. from (
  34. select
  35. dt,
  36. merge二级品类,
  37. sum(当日分发曝光pv) as 分发曝光pv,
  38. sum(累计分享回流uv) AS bn_总回流,
  39. sum(当日分发回流uv)/(sum(当日分发曝光pv)+100) as 质bn_rovn,
  40. case when sum(当日分发曝光pv)>=10000 then
  41. case when sum(当日分发回流uv)/(sum(当日分发曝光pv)+100)<0.035
  42. then -1*(count(DISTINCT 视频id)/avg(总日分发视频数))/((sum(累计分享回流uv)/avg(总日回流uv)))
  43. else 10*(sum(累计分享回流uv)/avg(总日回流uv)*sum(当日分发回流uv)/(sum(当日分发曝光pv)+100))/(count(DISTINCT 视频id)/avg(总日分发视频数))
  44. end
  45. else 0 end AS 总供需分,
  46. case when sum(当日分发曝光pv)>=10000 then
  47. case when sum(当日分发回流uv)/(sum(当日分发曝光pv)+100)<0.035
  48. then -1*(COUNT(DISTINCT CASE WHEN 推荐天数间隔<3 THEN 视频id END ) /avg(总日分发视频数))/(sum(累计分享回流uv)/avg(总日回流uv))
  49. else 10*(sum(累计分享回流uv)/avg(总日回流uv)*sum(当日分发回流uv)/(sum(当日分发曝光pv)+1000))/(COUNT(DISTINCT CASE WHEN 推荐天数间隔<3 THEN 视频id END ) /avg(总日分发视频数))
  50. end
  51. else 0 end AS 新供需分,
  52. count(DISTINCT 视频id) as 分发视频量,
  53. count(DISTINCT if(推荐天数间隔<3,视频id,null)) as 3日新推荐视频量,
  54. case when sum(当日分发曝光pv)>=10000 and sum(当日分发回流uv)/(sum(当日分发曝光pv)+100)>0.035
  55. then (avg(总日分发视频数)*(10*(sum(当日分发回流uv)/(sum(当日分发曝光pv)+100))*(sum(累计分享回流uv)/avg(总日回流uv) ))/0.5-count(DISTINCT 视频id))/3
  56. end as 缺量,
  57. case when sum(当日分发曝光pv)>=10000 and sum(当日分发回流uv)/(sum(当日分发曝光pv)+100)<=0.035
  58. then (avg(总日分发视频数)*(10*(sum(当日分发回流uv)/(sum(当日分发曝光pv)+100))*(sum(累计分享回流uv)/avg(总日回流uv) ))/(2)-count(DISTINCT 视频id))/3
  59. end as 控量,
  60. avg(总日回流uv) AS 总日回流uv,
  61. avg(总日分发视频数) AS 总日分发视频数,
  62. avg(总日推荐视频数) AS 总日推荐视频数,
  63. COUNT(DISTINCT CASE WHEN 总回流uv>0 THEN 视频id END )/avg(总日分发视频数) AS 回流视频个数占比,
  64. sum(当日分发回流uv) AS bn_当日分发回流,
  65. sum(当日分发回流uv)/avg(总日回流uv) AS 分发拉回回流uv占比,
  66. sum(累计分享回流uv)/avg(总日回流uv) AS 回流uv占比,
  67. count(DISTINCT 视频id)/avg(总日分发视频数) AS 分发视频量占比,
  68. COUNT(DISTINCT CASE WHEN 是否当日新推荐=1 THEN 视频id END ) /avg(总日分发视频数) AS 新推荐视频量占比
  69. from loghubods.video_dimension_detail_add_column
  70. where dt = '{day}'
  71. group by dt, merge二级品类
  72. ) t1
  73. where t1.缺量>= {count}
  74. '''
  75. data = get_odps_data(sql_query)
  76. result_list = []
  77. if data:
  78. for r in data:
  79. lack_count = r[9]
  80. if lack_count > 1000:
  81. count = 50
  82. elif 500 < lack_count <= 1000:
  83. count = 30
  84. elif 100 < lack_count <= 500:
  85. count = 20
  86. elif 50 < lack_count <= 100:
  87. count = 10
  88. else:
  89. count = 0
  90. if count == 0:
  91. continue
  92. result_list.append({
  93. "cluster_name": r[1],
  94. "platform_type": "piaoquan",
  95. "count": count,
  96. })
  97. return result_list
  98. def get_rov_by_merge_leve2_and_video_ids(merge_leve2_list, video_ids):
  99. if not merge_leve2_list or not video_ids:
  100. return {}
  101. merge_level_in_clause = ", ".join([f"'{m}'" for m in merge_leve2_list])
  102. video_ids_in_clause = ", ".join([f"'{video_id}'" for video_id in video_ids])
  103. end_date = (date.today() - timedelta(days=1)).strftime("%Y%m%d")
  104. start_date = (date.today() - timedelta(days=14)).strftime("%Y%m%d")
  105. sql_query = f'''
  106. SELECT
  107. v.videoid,
  108. CASE
  109. WHEN COALESCE(SUM(COALESCE(t3.`当日分发曝光pv`, 0)), 0) < 1000 THEN 0
  110. ELSE COALESCE(AVG(NULLIF(t3.rov_t0, 0)), 0)
  111. END AS avg_rov_t0
  112. FROM
  113. (
  114. SELECT
  115. t2.videoid,
  116. t2.merge_leve2
  117. FROM videoods.content_profile t1
  118. JOIN loghubods.video_merge_tag t2
  119. ON t1.content_id = t2.videoid
  120. WHERE
  121. t1.status = 3
  122. AND t1.is_deleted = 0
  123. AND t2.merge_leve2 IN ({merge_level_in_clause})
  124. ) v
  125. LEFT JOIN loghubods.video_dimension_detail_add_column t3
  126. ON v.videoid = t3.视频id
  127. AND t3.dt >= '{start_date}'
  128. AND t3.dt <= '{end_date}'
  129. WHERE v.videoid in ({video_ids_in_clause})
  130. GROUP BY
  131. v.videoid
  132. ;
  133. '''
  134. data = get_odps_data(sql_query)
  135. result_dict = {}
  136. if data:
  137. result_dict = {r[0]: r[1] for r in data}
  138. return result_dict
  139. def get_changwen_weight(account_name):
  140. bizdatemax_date = date.today() - timedelta(days=1)
  141. bizdatemin_date = bizdatemax_date - timedelta(days=30)
  142. bizdatemax = bizdatemax_date.strftime("%Y%m%d")
  143. bizdatemin = bizdatemin_date.strftime("%Y%m%d")
  144. sql_query = f'''
  145. SELECT
  146. 公众号名
  147. ,videoid
  148. ,一级品类
  149. ,二级品类
  150. ,头部曝光
  151. ,头部曝光uv
  152. ,头部realplay
  153. ,头部realplay_uv
  154. ,头部分享
  155. ,头部分享uv
  156. ,头部回流人数 AS 头部回流数
  157. ,推荐曝光数
  158. ,当日分发曝光uv
  159. ,推荐realplay
  160. ,分发realplay_uv
  161. ,推荐分享数
  162. ,当日分发分享uv
  163. ,推荐回流数
  164. ,当日回流进入分发曝光次数 AS vov分子
  165. FROM (
  166. SELECT DISTINCT a.公众号名
  167. ,a.videoid
  168. ,e.merge_leve1 AS 一级品类
  169. ,e.merge_leve2 AS 二级品类
  170. ,a.title
  171. ,a.进入分发人数
  172. ,头部曝光pv AS 头部曝光
  173. ,头部realplay_pv AS 头部realplay
  174. ,头部分享pv AS 头部分享
  175. ,a.当日分发曝光pv AS 推荐曝光数
  176. ,a.当日分发播放pv
  177. ,分发realplay_pv AS 推荐realplay
  178. ,分发realplay_pv / a.当日分发播放pv AS 真实播放率pv
  179. ,当日分发播放uv
  180. ,c.realplay_uv AS 分发真实播uv
  181. ,c.realplay_uv / a.当日分发播放uv AS 真实播放率uv
  182. ,a.当日分发分享pv AS 推荐分享数
  183. ,a.当日分发分享pv / a.当日分发曝光pv AS str
  184. ,NVL(b.当日分发回流人数,0) AS 推荐回流数
  185. ,NVL(b.当日回流进入分发人数,0) AS 当日回流进入分发人数
  186. ,NVL(b.当日回流进入分发曝光次数,0) AS 当日回流进入分发曝光次数
  187. ,NVL(b.当日回流进入分发曝光次数,0) / a.当日分发曝光pv AS vov分子
  188. ,d.头部回流人数
  189. ,当日分发曝光uv
  190. ,头部曝光uv
  191. ,当日分发分享uv
  192. ,头部分享uv
  193. ,分发realplay_uv
  194. ,头部realplay_uv
  195. FROM (
  196. SELECT account_name AS 公众号名
  197. ,videoid
  198. ,title
  199. ,COUNT(DISTINCT mid) AS 进入分发人数
  200. ,COUNT(
  201. CASE WHEN pagesource REGEXP 'category$|recommend$|-pages/user-videos-detail$' AND businesstype = 'videoView' THEN mid END
  202. ) AS 当日分发曝光pv
  203. ,COUNT(DISTINCT
  204. CASE WHEN pagesource REGEXP 'category$|recommend$|-pages/user-videos-detail$' AND businesstype = 'videoView' THEN mid END
  205. ) AS 当日分发曝光uv
  206. ,COUNT(
  207. CASE WHEN pagesource REGEXP 'pages/user-videos-share$' AND businesstype = 'videoView' THEN mid END
  208. ) AS 头部曝光pv
  209. ,COUNT(DISTINCT
  210. CASE WHEN pagesource REGEXP 'pages/user-videos-share$' AND businesstype = 'videoView' THEN mid END
  211. ) AS 头部曝光uv
  212. ,COUNT(
  213. CASE WHEN pagesource REGEXP 'category$|recommend$|-pages/user-videos-detail$' AND businesstype = 'videoPlay' THEN mid END
  214. ) AS 当日分发播放pv
  215. ,COUNT(DISTINCT
  216. CASE WHEN pagesource REGEXP 'category$|recommend$|-pages/user-videos-detail$' AND businesstype = 'videoPlay' THEN mid END
  217. ) AS 当日分发播放uv
  218. ,COUNT(
  219. CASE WHEN pagesource REGEXP 'category$|recommend$|-pages/user-videos-detail$' AND businesstype = 'videoShareFriend' THEN mid END
  220. ) AS 当日分发分享pv
  221. ,COUNT(DISTINCT
  222. CASE WHEN pagesource REGEXP 'category$|recommend$|-pages/user-videos-detail$' AND businesstype = 'videoShareFriend' THEN mid END
  223. ) AS 当日分发分享uv
  224. ,COUNT(
  225. CASE WHEN pagesource REGEXP 'pages/user-videos-share$' AND businesstype = 'videoShareFriend' THEN mid END
  226. ) AS 头部分享pv
  227. ,COUNT(DISTINCT
  228. CASE WHEN pagesource REGEXP 'pages/user-videos-share$' AND businesstype = 'videoShareFriend' THEN mid END
  229. ) AS 头部分享uv
  230. FROM (
  231. SELECT DISTINCT a.mid
  232. ,a.videoid
  233. ,a.businesstype
  234. ,a.pagesource
  235. ,a.subsessionid
  236. ,account_name
  237. ,e.title
  238. FROM loghubods.video_action_log_rp a
  239. LEFT JOIN loghubods.user_wechat_identity_info_ha b
  240. ON a.mid = CONCAT('weixin_openid_',b.open_id)
  241. AND b.dt = MAX_PT("loghubods.user_wechat_identity_info_ha")
  242. LEFT JOIN loghubods.gzh_fans_info d
  243. ON b.union_id = d.union_id
  244. AND d.dt = MAX_PT("loghubods.gzh_fans_info")
  245. LEFT JOIN videoods.wx_video e
  246. ON a.videoid = e.id
  247. WHERE a.dt >= '{bizdatemin}'
  248. AND a.dt <= '{bizdatemax}'
  249. AND businesstype IN ('videoView','videoPlay','videoShareFriend')
  250. AND d.user_create_time IS NOT NULL
  251. AND account_name = '{account_name}'
  252. AND a.videoid IN (
  253. SELECT
  254. DISTINCT content_id AS videoid
  255. FROM
  256. videoods.content_profile
  257. WHERE status=3
  258. AND is_deleted = 0
  259. )
  260. ) t
  261. GROUP BY 公众号名
  262. ,videoid
  263. ,title
  264. ) a
  265. LEFT JOIN (
  266. SELECT t.account_name AS 公众号名
  267. ,t.videoid
  268. ,COUNT(DISTINCT s.machinecode) AS 当日分发回流人数
  269. ,COUNT(DISTINCT v.mid) AS 当日回流进入分发人数
  270. ,COUNT(v.mid) AS 当日回流进入分发曝光次数
  271. FROM (
  272. SELECT DISTINCT a.subsessionid
  273. ,a.videoid
  274. ,a.mid
  275. ,d.account_name
  276. ,GET_JSON_OBJECT(extparams,'$.recomTraceId') AS recomtraceid
  277. FROM loghubods.video_action_log_rp a
  278. LEFT JOIN loghubods.user_wechat_identity_info_ha b
  279. ON a.mid = CONCAT('weixin_openid_',b.open_id)
  280. AND b.dt = MAX_PT("loghubods.user_wechat_identity_info_ha")
  281. LEFT JOIN loghubods.gzh_fans_info d
  282. ON b.union_id = d.union_id
  283. AND d.dt = MAX_PT("loghubods.gzh_fans_info")
  284. WHERE a.dt >= '{bizdatemin}'
  285. AND a.dt <= '{bizdatemax}'
  286. AND a.businesstype = 'videoShareFriend'
  287. AND a.pagesource REGEXP 'category$|recommend$|-pages/user-videos-detail$'
  288. AND d.user_create_time IS NOT NULL
  289. AND d.account_name = '{account_name}'
  290. ) t
  291. LEFT JOIN (
  292. SELECT DISTINCT subsessionid
  293. ,machinecode
  294. ,recomtraceid
  295. ,clickobjectid
  296. FROM loghubods.user_share_log
  297. WHERE dt >= '{bizdatemin}'
  298. AND dt <= '{bizdatemax}'
  299. AND topic = 'click'
  300. ) s
  301. ON t.recomtraceid = s.recomtraceid
  302. AND t.videoid = s.clickobjectid
  303. LEFT JOIN (
  304. SELECT subsessionid
  305. ,mid
  306. ,videoid
  307. FROM loghubods.video_action_log_rp
  308. WHERE dt >= '{bizdatemin}'
  309. AND dt <= '{bizdatemax}'
  310. AND pagesource REGEXP 'category$|recommend$|-pages/user-videos-detail$'
  311. AND businesstype = 'videoView'
  312. ) v
  313. ON s.subsessionid = v.subsessionid
  314. AND s.machinecode = v.mid
  315. GROUP BY account_name
  316. ,t.videoid
  317. ) b
  318. ON a.公众号名 = b.公众号名
  319. AND a.videoid = b.videoid
  320. LEFT JOIN (
  321. SELECT d.account_name AS 公众号名
  322. ,a.videoid
  323. ,COUNT(DISTINCT a.mid) AS realplay_uv
  324. ,COUNT(
  325. CASE WHEN a.pagesource REGEXP 'category$|recommend$|-pages/user-videos-detail$' THEN a.mid END
  326. ) AS 分发realplay_pv
  327. ,COUNT(CASE WHEN a.pagesource REGEXP 'pages/user-videos-share$' THEN a.mid END) AS 头部realplay_pv
  328. ,COUNT(DISTINCT
  329. CASE WHEN a.pagesource REGEXP 'category$|recommend$|-pages/user-videos-detail$' THEN a.mid END
  330. ) AS 分发realplay_uv
  331. ,COUNT(DISTINCT CASE WHEN a.pagesource REGEXP 'pages/user-videos-share$' THEN a.mid END) AS 头部realplay_uv
  332. FROM loghubods.ods_video_play_log_day a
  333. LEFT JOIN (
  334. SELECT DISTINCT open_id
  335. ,union_id
  336. FROM loghubods.user_wechat_identity_info_ha
  337. WHERE dt = MAX_PT("loghubods.user_wechat_identity_info_ha")
  338. ) b
  339. ON a.mid = CONCAT('weixin_openid_',b.open_id)
  340. LEFT JOIN loghubods.gzh_fans_info d
  341. ON b.union_id = d.union_id
  342. AND d.dt = MAX_PT("loghubods.gzh_fans_info")
  343. WHERE a.dt >= '{bizdatemin}'
  344. AND a.dt <= '{bizdatemax}'
  345. AND a.businesstype = 'videoRealPlay'
  346. AND d.user_create_time IS NOT NULL
  347. AND d.account_name = '{account_name}'
  348. GROUP BY d.account_name
  349. ,a.videoid
  350. ORDER BY 分发realplay_pv DESC
  351. ) c
  352. ON a.公众号名 = c.公众号名
  353. AND a.videoid = c.videoid
  354. LEFT JOIN (
  355. SELECT t.account_name AS 公众号名
  356. ,t.videoid
  357. ,COUNT(DISTINCT s.machinecode) AS 头部回流人数
  358. FROM (
  359. SELECT DISTINCT a.shareobjectid AS videoid
  360. ,a.shareid
  361. ,a.machinecode
  362. ,d.account_name
  363. FROM loghubods.user_share_log a
  364. LEFT JOIN loghubods.user_wechat_identity_info_ha b
  365. ON a.machinecode = CONCAT('weixin_openid_',b.open_id)
  366. AND b.dt = MAX_PT("loghubods.user_wechat_identity_info_ha")
  367. LEFT JOIN loghubods.gzh_fans_info d
  368. ON b.union_id = d.union_id
  369. AND d.dt = MAX_PT("loghubods.gzh_fans_info")
  370. WHERE a.dt >= '{bizdatemin}'
  371. AND a.dt <= '{bizdatemax}'
  372. AND a.topic = 'share'
  373. AND a.pagesource REGEXP 'pages/user-videos-share$'
  374. AND d.user_create_time IS NOT NULL
  375. AND d.account_name = '{account_name}'
  376. ) t
  377. LEFT JOIN (
  378. SELECT DISTINCT shareid
  379. ,machinecode
  380. ,clickobjectid
  381. FROM loghubods.user_share_log
  382. WHERE dt >= '{bizdatemin}'
  383. AND dt <= '{bizdatemax}'
  384. AND topic = 'click'
  385. ) s
  386. ON t.shareid = s.shareid
  387. GROUP BY account_name
  388. ,t.videoid
  389. ) d
  390. ON a.公众号名 = d.公众号名
  391. AND a.videoid = d.videoid
  392. LEFT JOIN loghubods.video_merge_tag e
  393. ON a.videoid = e.videoid
  394. )
  395. ORDER BY 推荐曝光数 DESC
  396. '''
  397. result_list = []
  398. data = get_odps_data(sql_query)
  399. if data:
  400. for r in data:
  401. result_list.append(
  402. {
  403. "account_name": r[0],
  404. "videoid": r[1],
  405. "一级品类": r[2],
  406. "二级品类": r[3],
  407. "ext_data": {
  408. "头部曝光": r[4],
  409. "头部曝光uv": r[5],
  410. "头部realplay": r[6],
  411. "头部realplay_uv": r[7],
  412. "头部分享": r[8],
  413. "头部分享uv": r[9],
  414. "头部回流数": r[10],
  415. "推荐曝光数": r[11],
  416. "当日分发曝光uv": r[12],
  417. "推荐realplay": r[13],
  418. "分发realplay_uv": r[14],
  419. "推荐分享数": r[15],
  420. "当日分发分享uv": r[16],
  421. "推荐回流数": r[17],
  422. "vov分子": r[18],
  423. },
  424. }
  425. )
  426. # 输出到 examples/demand/data/changwen_data/
  427. output_dir = Path(__file__).parent / "data" / "changwen_data"
  428. output_dir.mkdir(parents=True, exist_ok=True)
  429. output_file = output_dir / f"{account_name}.json"
  430. with output_file.open("w", encoding="utf-8") as f:
  431. json.dump(result_list, f, ensure_ascii=False, indent=2)
  432. return result_list
  433. def get_zengzhang_weight(account_name):
  434. bizdatemax_date = date.today() - timedelta(days=1)
  435. bizdatemin_date = bizdatemax_date - timedelta(days=30)
  436. bizdatemax = bizdatemax_date.strftime("%Y%m%d")
  437. bizdatemin = bizdatemin_date.strftime("%Y%m%d")
  438. sql_query = f'''
  439. SELECT 合作方名
  440. ,合作方简称
  441. ,videoid
  442. ,一级品类
  443. ,二级品类
  444. ,SUM(头部曝光) as 头部曝光
  445. ,SUM(头部曝光uv) as 头部曝光uv
  446. ,SUM(头部realplay) as 头部realplay
  447. ,SUM(头部realplay_uv) as 头部realplay_uv
  448. ,SUM(头部分享) as 头部分享
  449. ,SUM(头部分享uv) as 头部分享uv
  450. ,SUM(头部回流数) as 头部回流数
  451. ,SUM(推荐曝光数) as 推荐曝光数
  452. ,SUM(当日分发曝光uv) as 当日分发曝光uv
  453. ,SUM(推荐realplay) as 推荐realplay
  454. ,SUM(分发realplay_uv) as 分发realplay_uv
  455. ,SUM(推荐分享数) as 推荐分享数
  456. ,SUM(当日分发分享uv) as 当日分发分享uv
  457. ,SUM(推荐回流数) as 推荐回流数
  458. ,SUM(vov分子) as vov分子
  459. FROM loghubods.dws_growth_partner_vid_data
  460. WHERE dt BETWEEN '{bizdatemin}' AND '{bizdatemax}'
  461. AND 合作方名 = '{account_name}'
  462. GROUP BY 合作方名
  463. ,合作方简称
  464. ,videoid
  465. ,一级品类
  466. ,二级品类
  467. ORDER BY SUM(推荐曝光数)
  468. ;
  469. '''
  470. print(sql_query)
  471. result_list = []
  472. data = get_odps_data(sql_query)
  473. if data:
  474. for r in data:
  475. result_list.append(
  476. {
  477. "account_name": r[0],
  478. "合作方简称": r[1],
  479. "videoid": r[2],
  480. "一级品类": r[3],
  481. "二级品类": r[4],
  482. "ext_data": {
  483. "头部曝光": r[5],
  484. "头部曝光uv": r[6],
  485. "头部realplay": r[7],
  486. "头部realplay_uv": r[8],
  487. "头部分享": r[9],
  488. "头部分享uv": r[10],
  489. "头部回流数": r[11],
  490. "推荐曝光数": r[12],
  491. "当日分发曝光uv": r[13],
  492. "推荐realplay": r[14],
  493. "分发realplay_uv": r[15],
  494. "推荐分享数": r[16],
  495. "当日分发分享uv": r[17],
  496. "推荐回流数": r[18],
  497. "vov分子": r[19],
  498. },
  499. }
  500. )
  501. # 输出到 examples/demand/data/zengzhang_data/
  502. output_dir = Path(__file__).parent / "data" / "zengzhang_data"
  503. output_dir.mkdir(parents=True, exist_ok=True)
  504. output_file = output_dir / f"{account_name}.json"
  505. with output_file.open("w", encoding="utf-8") as f:
  506. json.dump(result_list, f, ensure_ascii=False, indent=2)
  507. return result_list
  508. def get_merge_leve2_by_video_ids(video_ids, batch_size=2000):
  509. result = {}
  510. if not video_ids:
  511. return result
  512. normalized_ids = [str(video_id) for video_id in video_ids if video_id is not None]
  513. for i in range(0, len(normalized_ids), batch_size):
  514. batch_ids = normalized_ids[i:i + batch_size]
  515. escaped_ids = [video_id.replace("'", "''") for video_id in batch_ids]
  516. video_ids_in_clause = ", ".join([f"'{video_id}'" for video_id in escaped_ids])
  517. sql_query = f'''
  518. SELECT videoid, merge_leve2
  519. FROM loghubods.video_merge_tag
  520. WHERE videoid IN ({video_ids_in_clause})
  521. '''
  522. data = get_odps_data(sql_query)
  523. if not data:
  524. continue
  525. for row in data:
  526. result[str(row[0])] = row[1]
  527. return result
  528. def get_all_decode_task_result_rows():
  529. return mysql_db.select(
  530. "workflow_decode_task_result",
  531. columns="id, channel_content_id, merge_leve2",
  532. )
  533. def get_channel_content_ids_by_merge_leve2_list(merge_leve2_list):
  534. """
  535. 在 MySQL 表 workflow_decode_task_result 中,按 merge_leve2 IN (给定列表) 一次查询,
  536. 返回 channel_content_id 组成的 list。
  537. """
  538. if not merge_leve2_list:
  539. return []
  540. normalized = [str(x).strip() for x in merge_leve2_list if x is not None and str(x).strip()]
  541. if not normalized:
  542. return []
  543. placeholders = ", ".join(["%s"] * len(normalized))
  544. rows = mysql_db.select(
  545. "workflow_decode_task_result",
  546. columns="channel_content_id",
  547. where=f"merge_leve2 IN ({placeholders})",
  548. where_params=tuple(normalized),
  549. )
  550. return [row.get("channel_content_id") for row in rows if row.get("channel_content_id") is not None]
  551. def update_decode_task_result_merge_leve2(channel_content_id, merge_leve2):
  552. return mysql_db.update(
  553. "workflow_decode_task_result",
  554. {"merge_leve2": str(merge_leve2)},
  555. "channel_content_id = %s",
  556. (str(channel_content_id),),
  557. )
  558. def backfill_merge_leve2_for_decode_task_result():
  559. rows = get_all_decode_task_result_rows()
  560. updated_count = 0
  561. skipped_count = 0
  562. valid_content_ids = []
  563. for row in rows:
  564. channel_content_id = row.get("channel_content_id")
  565. if channel_content_id is None:
  566. skipped_count += 1
  567. continue
  568. channel_content_id = str(channel_content_id)
  569. if len(channel_content_id) > 8:
  570. skipped_count += 1
  571. continue
  572. valid_content_ids.append(channel_content_id)
  573. merge_leve2_map = get_merge_leve2_by_video_ids(valid_content_ids, batch_size=2000)
  574. for channel_content_id in valid_content_ids:
  575. merge_leve2 = merge_leve2_map.get(channel_content_id)
  576. if not merge_leve2:
  577. continue
  578. affected = update_decode_task_result_merge_leve2(channel_content_id, merge_leve2)
  579. if affected > 0:
  580. updated_count += affected
  581. return {
  582. "total": len(rows),
  583. "updated": updated_count,
  584. "skipped": skipped_count,
  585. }
  586. #
  587. # if __name__ == '__main__':
  588. # backfill_merge_leve2_for_decode_task_result()