data_query_tools.py 28 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617
  1. from zoneinfo import ZoneInfo
  2. from odps import ODPS
  3. from odps.errors import ODPSError
  4. from datetime import date, datetime, timedelta
  5. import json
  6. from pathlib import Path
  7. from examples.demand.mysql import mysql_db
  8. def get_odps_data(sql):
  9. # 配置信息
  10. access_id = 'LTAI9EBa0bd5PrDa'
  11. access_key = 'vAalxds7YxhfOA2yVv8GziCg3Y87v5'
  12. project = 'loghubods'
  13. endpoint = 'http://service.odps.aliyun.com/api'
  14. # 1. 初始化 ODPS 入口
  15. o = ODPS(access_id, access_key, project, endpoint=endpoint)
  16. try:
  17. # 2. 执行 SQL 并获取结果
  18. # execute_sql 会等待任务完成,使用 open_reader 读取数据
  19. with o.execute_sql(sql).open_reader() as reader:
  20. # reader 类似于 Java 中的 List<Record>
  21. # 我们可以直接将其转换为 Python 的 list
  22. records = [record for record in reader]
  23. return records
  24. except ODPSError as e:
  25. print(f"ODPS 错误: {e}")
  26. return None
  27. def get_demand_merge_level2_names():
  28. date_time = datetime.now(ZoneInfo("Asia/Shanghai")).date() - timedelta(days=1)
  29. day = date_time.strftime("%Y%m%d")
  30. count = 500
  31. sql_query = f'''
  32. select *
  33. from (
  34. select
  35. dt,
  36. merge二级品类,
  37. sum(当日分发曝光pv) as 分发曝光pv,
  38. sum(累计分享回流uv) AS bn_总回流,
  39. sum(当日分发回流uv)/(sum(当日分发曝光pv)+100) as 质bn_rovn,
  40. case when sum(当日分发曝光pv)>=10000 then
  41. case when sum(当日分发回流uv)/(sum(当日分发曝光pv)+100)<0.035
  42. then -1*(count(DISTINCT 视频id)/avg(总日分发视频数))/((sum(累计分享回流uv)/avg(总日回流uv)))
  43. else 10*(sum(累计分享回流uv)/avg(总日回流uv)*sum(当日分发回流uv)/(sum(当日分发曝光pv)+100))/(count(DISTINCT 视频id)/avg(总日分发视频数))
  44. end
  45. else 0 end AS 总供需分,
  46. case when sum(当日分发曝光pv)>=10000 then
  47. case when sum(当日分发回流uv)/(sum(当日分发曝光pv)+100)<0.035
  48. then -1*(COUNT(DISTINCT CASE WHEN 推荐天数间隔<3 THEN 视频id END ) /avg(总日分发视频数))/(sum(累计分享回流uv)/avg(总日回流uv))
  49. else 10*(sum(累计分享回流uv)/avg(总日回流uv)*sum(当日分发回流uv)/(sum(当日分发曝光pv)+1000))/(COUNT(DISTINCT CASE WHEN 推荐天数间隔<3 THEN 视频id END ) /avg(总日分发视频数))
  50. end
  51. else 0 end AS 新供需分,
  52. count(DISTINCT 视频id) as 分发视频量,
  53. count(DISTINCT if(推荐天数间隔<3,视频id,null)) as 3日新推荐视频量,
  54. case when sum(当日分发曝光pv)>=10000 and sum(当日分发回流uv)/(sum(当日分发曝光pv)+100)>0.035
  55. then (avg(总日分发视频数)*(10*(sum(当日分发回流uv)/(sum(当日分发曝光pv)+100))*(sum(累计分享回流uv)/avg(总日回流uv) ))/0.5-count(DISTINCT 视频id))/3
  56. end as 缺量,
  57. case when sum(当日分发曝光pv)>=10000 and sum(当日分发回流uv)/(sum(当日分发曝光pv)+100)<=0.035
  58. then (avg(总日分发视频数)*(10*(sum(当日分发回流uv)/(sum(当日分发曝光pv)+100))*(sum(累计分享回流uv)/avg(总日回流uv) ))/(2)-count(DISTINCT 视频id))/3
  59. end as 控量,
  60. avg(总日回流uv) AS 总日回流uv,
  61. avg(总日分发视频数) AS 总日分发视频数,
  62. avg(总日推荐视频数) AS 总日推荐视频数,
  63. COUNT(DISTINCT CASE WHEN 总回流uv>0 THEN 视频id END )/avg(总日分发视频数) AS 回流视频个数占比,
  64. sum(当日分发回流uv) AS bn_当日分发回流,
  65. sum(当日分发回流uv)/avg(总日回流uv) AS 分发拉回回流uv占比,
  66. sum(累计分享回流uv)/avg(总日回流uv) AS 回流uv占比,
  67. count(DISTINCT 视频id)/avg(总日分发视频数) AS 分发视频量占比,
  68. COUNT(DISTINCT CASE WHEN 是否当日新推荐=1 THEN 视频id END ) /avg(总日分发视频数) AS 新推荐视频量占比
  69. from loghubods.video_dimension_detail_add_column
  70. where dt = '{day}'
  71. group by dt, merge二级品类
  72. ) t1
  73. where t1.缺量>= {count}
  74. '''
  75. data = get_odps_data(sql_query)
  76. result_list = []
  77. if data:
  78. for r in data:
  79. lack_count = r[9]
  80. if lack_count > 1000:
  81. count = 50
  82. elif 500 < lack_count <= 1000:
  83. count = 30
  84. elif 100 < lack_count <= 500:
  85. count = 20
  86. elif 50 < lack_count <= 100:
  87. count = 10
  88. else:
  89. count = 0
  90. if count == 0:
  91. continue
  92. result_list.append({
  93. "cluster_name": r[1],
  94. "platform_type": "piaoquan",
  95. "count": count,
  96. })
  97. return result_list
  98. def get_rov_by_merge_leve2_and_video_ids(merge_leve2, video_ids):
  99. merge_level_in_clause = f"'{merge_leve2}'"
  100. video_ids_in_clause = ", ".join([f"'{video_id}'" for video_id in video_ids])
  101. end_date = (date.today() - timedelta(days=1)).strftime("%Y%m%d")
  102. start_date = (date.today() - timedelta(days=14)).strftime("%Y%m%d")
  103. sql_query = f'''
  104. SELECT
  105. v.videoid,
  106. CASE
  107. WHEN COALESCE(SUM(COALESCE(t3.`当日分发曝光pv`, 0)), 0) < 1000 THEN 0
  108. ELSE COALESCE(AVG(NULLIF(t3.rov_t0, 0)), 0)
  109. END AS avg_rov_t0
  110. FROM
  111. (
  112. SELECT
  113. t2.videoid,
  114. t2.merge_leve2
  115. FROM videoods.content_profile t1
  116. JOIN loghubods.video_merge_tag t2
  117. ON t1.content_id = t2.videoid
  118. WHERE
  119. t1.status = 3
  120. AND t1.is_deleted = 0
  121. AND t2.merge_leve2 IN ({merge_level_in_clause})
  122. ) v
  123. LEFT JOIN loghubods.video_dimension_detail_add_column t3
  124. ON v.videoid = t3.视频id
  125. AND t3.dt >= '{start_date}'
  126. AND t3.dt <= '{end_date}'
  127. WHERE v.videoid in ({video_ids_in_clause})
  128. GROUP BY
  129. v.videoid
  130. ;
  131. '''
  132. data = get_odps_data(sql_query)
  133. result_dict = {}
  134. if data:
  135. result_dict = {r[0]: r[1] for r in data}
  136. return result_dict
  137. def get_changwen_weight(account_name):
  138. bizdatemax_date = date.today() - timedelta(days=1)
  139. bizdatemin_date = bizdatemax_date - timedelta(days=30)
  140. bizdatemax = bizdatemax_date.strftime("%Y%m%d")
  141. bizdatemin = bizdatemin_date.strftime("%Y%m%d")
  142. sql_query = f'''
  143. SELECT
  144. 公众号名
  145. ,videoid
  146. ,一级品类
  147. ,二级品类
  148. ,头部曝光
  149. ,头部曝光uv
  150. ,头部realplay
  151. ,头部realplay_uv
  152. ,头部分享
  153. ,头部分享uv
  154. ,头部回流人数 AS 头部回流数
  155. ,推荐曝光数
  156. ,当日分发曝光uv
  157. ,推荐realplay
  158. ,分发realplay_uv
  159. ,推荐分享数
  160. ,当日分发分享uv
  161. ,推荐回流数
  162. ,当日回流进入分发曝光次数 AS vov分子
  163. FROM (
  164. SELECT DISTINCT a.公众号名
  165. ,a.videoid
  166. ,e.merge_leve1 AS 一级品类
  167. ,e.merge_leve2 AS 二级品类
  168. ,a.title
  169. ,a.进入分发人数
  170. ,头部曝光pv AS 头部曝光
  171. ,头部realplay_pv AS 头部realplay
  172. ,头部分享pv AS 头部分享
  173. ,a.当日分发曝光pv AS 推荐曝光数
  174. ,a.当日分发播放pv
  175. ,分发realplay_pv AS 推荐realplay
  176. ,分发realplay_pv / a.当日分发播放pv AS 真实播放率pv
  177. ,当日分发播放uv
  178. ,c.realplay_uv AS 分发真实播uv
  179. ,c.realplay_uv / a.当日分发播放uv AS 真实播放率uv
  180. ,a.当日分发分享pv AS 推荐分享数
  181. ,a.当日分发分享pv / a.当日分发曝光pv AS str
  182. ,NVL(b.当日分发回流人数,0) AS 推荐回流数
  183. ,NVL(b.当日回流进入分发人数,0) AS 当日回流进入分发人数
  184. ,NVL(b.当日回流进入分发曝光次数,0) AS 当日回流进入分发曝光次数
  185. ,NVL(b.当日回流进入分发曝光次数,0) / a.当日分发曝光pv AS vov分子
  186. ,d.头部回流人数
  187. ,当日分发曝光uv
  188. ,头部曝光uv
  189. ,当日分发分享uv
  190. ,头部分享uv
  191. ,分发realplay_uv
  192. ,头部realplay_uv
  193. FROM (
  194. SELECT account_name AS 公众号名
  195. ,videoid
  196. ,title
  197. ,COUNT(DISTINCT mid) AS 进入分发人数
  198. ,COUNT(
  199. CASE WHEN pagesource REGEXP 'category$|recommend$|-pages/user-videos-detail$' AND businesstype = 'videoView' THEN mid END
  200. ) AS 当日分发曝光pv
  201. ,COUNT(DISTINCT
  202. CASE WHEN pagesource REGEXP 'category$|recommend$|-pages/user-videos-detail$' AND businesstype = 'videoView' THEN mid END
  203. ) AS 当日分发曝光uv
  204. ,COUNT(
  205. CASE WHEN pagesource REGEXP 'pages/user-videos-share$' AND businesstype = 'videoView' THEN mid END
  206. ) AS 头部曝光pv
  207. ,COUNT(DISTINCT
  208. CASE WHEN pagesource REGEXP 'pages/user-videos-share$' AND businesstype = 'videoView' THEN mid END
  209. ) AS 头部曝光uv
  210. ,COUNT(
  211. CASE WHEN pagesource REGEXP 'category$|recommend$|-pages/user-videos-detail$' AND businesstype = 'videoPlay' THEN mid END
  212. ) AS 当日分发播放pv
  213. ,COUNT(DISTINCT
  214. CASE WHEN pagesource REGEXP 'category$|recommend$|-pages/user-videos-detail$' AND businesstype = 'videoPlay' THEN mid END
  215. ) AS 当日分发播放uv
  216. ,COUNT(
  217. CASE WHEN pagesource REGEXP 'category$|recommend$|-pages/user-videos-detail$' AND businesstype = 'videoShareFriend' THEN mid END
  218. ) AS 当日分发分享pv
  219. ,COUNT(DISTINCT
  220. CASE WHEN pagesource REGEXP 'category$|recommend$|-pages/user-videos-detail$' AND businesstype = 'videoShareFriend' THEN mid END
  221. ) AS 当日分发分享uv
  222. ,COUNT(
  223. CASE WHEN pagesource REGEXP 'pages/user-videos-share$' AND businesstype = 'videoShareFriend' THEN mid END
  224. ) AS 头部分享pv
  225. ,COUNT(DISTINCT
  226. CASE WHEN pagesource REGEXP 'pages/user-videos-share$' AND businesstype = 'videoShareFriend' THEN mid END
  227. ) AS 头部分享uv
  228. FROM (
  229. SELECT DISTINCT a.mid
  230. ,a.videoid
  231. ,a.businesstype
  232. ,a.pagesource
  233. ,a.subsessionid
  234. ,account_name
  235. ,e.title
  236. FROM loghubods.video_action_log_rp a
  237. LEFT JOIN loghubods.user_wechat_identity_info_ha b
  238. ON a.mid = CONCAT('weixin_openid_',b.open_id)
  239. AND b.dt = MAX_PT("loghubods.user_wechat_identity_info_ha")
  240. LEFT JOIN loghubods.gzh_fans_info d
  241. ON b.union_id = d.union_id
  242. AND d.dt = MAX_PT("loghubods.gzh_fans_info")
  243. LEFT JOIN videoods.wx_video e
  244. ON a.videoid = e.id
  245. WHERE a.dt >= '{bizdatemin}'
  246. AND a.dt <= '{bizdatemax}'
  247. AND businesstype IN ('videoView','videoPlay','videoShareFriend')
  248. AND d.user_create_time IS NOT NULL
  249. AND account_name = '{account_name}'
  250. AND a.videoid IN (
  251. SELECT
  252. DISTINCT content_id AS videoid
  253. FROM
  254. videoods.content_profile
  255. WHERE status=3
  256. AND is_deleted = 0
  257. )
  258. ) t
  259. GROUP BY 公众号名
  260. ,videoid
  261. ,title
  262. ) a
  263. LEFT JOIN (
  264. SELECT t.account_name AS 公众号名
  265. ,t.videoid
  266. ,COUNT(DISTINCT s.machinecode) AS 当日分发回流人数
  267. ,COUNT(DISTINCT v.mid) AS 当日回流进入分发人数
  268. ,COUNT(v.mid) AS 当日回流进入分发曝光次数
  269. FROM (
  270. SELECT DISTINCT a.subsessionid
  271. ,a.videoid
  272. ,a.mid
  273. ,d.account_name
  274. ,GET_JSON_OBJECT(extparams,'$.recomTraceId') AS recomtraceid
  275. FROM loghubods.video_action_log_rp a
  276. LEFT JOIN loghubods.user_wechat_identity_info_ha b
  277. ON a.mid = CONCAT('weixin_openid_',b.open_id)
  278. AND b.dt = MAX_PT("loghubods.user_wechat_identity_info_ha")
  279. LEFT JOIN loghubods.gzh_fans_info d
  280. ON b.union_id = d.union_id
  281. AND d.dt = MAX_PT("loghubods.gzh_fans_info")
  282. WHERE a.dt >= '{bizdatemin}'
  283. AND a.dt <= '{bizdatemax}'
  284. AND a.businesstype = 'videoShareFriend'
  285. AND a.pagesource REGEXP 'category$|recommend$|-pages/user-videos-detail$'
  286. AND d.user_create_time IS NOT NULL
  287. AND d.account_name = '{account_name}'
  288. ) t
  289. LEFT JOIN (
  290. SELECT DISTINCT subsessionid
  291. ,machinecode
  292. ,recomtraceid
  293. ,clickobjectid
  294. FROM loghubods.user_share_log
  295. WHERE dt >= '{bizdatemin}'
  296. AND dt <= '{bizdatemax}'
  297. AND topic = 'click'
  298. ) s
  299. ON t.recomtraceid = s.recomtraceid
  300. AND t.videoid = s.clickobjectid
  301. LEFT JOIN (
  302. SELECT subsessionid
  303. ,mid
  304. ,videoid
  305. FROM loghubods.video_action_log_rp
  306. WHERE dt >= '{bizdatemin}'
  307. AND dt <= '{bizdatemax}'
  308. AND pagesource REGEXP 'category$|recommend$|-pages/user-videos-detail$'
  309. AND businesstype = 'videoView'
  310. ) v
  311. ON s.subsessionid = v.subsessionid
  312. AND s.machinecode = v.mid
  313. GROUP BY account_name
  314. ,t.videoid
  315. ) b
  316. ON a.公众号名 = b.公众号名
  317. AND a.videoid = b.videoid
  318. LEFT JOIN (
  319. SELECT d.account_name AS 公众号名
  320. ,a.videoid
  321. ,COUNT(DISTINCT a.mid) AS realplay_uv
  322. ,COUNT(
  323. CASE WHEN a.pagesource REGEXP 'category$|recommend$|-pages/user-videos-detail$' THEN a.mid END
  324. ) AS 分发realplay_pv
  325. ,COUNT(CASE WHEN a.pagesource REGEXP 'pages/user-videos-share$' THEN a.mid END) AS 头部realplay_pv
  326. ,COUNT(DISTINCT
  327. CASE WHEN a.pagesource REGEXP 'category$|recommend$|-pages/user-videos-detail$' THEN a.mid END
  328. ) AS 分发realplay_uv
  329. ,COUNT(DISTINCT CASE WHEN a.pagesource REGEXP 'pages/user-videos-share$' THEN a.mid END) AS 头部realplay_uv
  330. FROM loghubods.ods_video_play_log_day a
  331. LEFT JOIN (
  332. SELECT DISTINCT open_id
  333. ,union_id
  334. FROM loghubods.user_wechat_identity_info_ha
  335. WHERE dt = MAX_PT("loghubods.user_wechat_identity_info_ha")
  336. ) b
  337. ON a.mid = CONCAT('weixin_openid_',b.open_id)
  338. LEFT JOIN loghubods.gzh_fans_info d
  339. ON b.union_id = d.union_id
  340. AND d.dt = MAX_PT("loghubods.gzh_fans_info")
  341. WHERE a.dt >= '{bizdatemin}'
  342. AND a.dt <= '{bizdatemax}'
  343. AND a.businesstype = 'videoRealPlay'
  344. AND d.user_create_time IS NOT NULL
  345. AND d.account_name = '{account_name}'
  346. GROUP BY d.account_name
  347. ,a.videoid
  348. ORDER BY 分发realplay_pv DESC
  349. ) c
  350. ON a.公众号名 = c.公众号名
  351. AND a.videoid = c.videoid
  352. LEFT JOIN (
  353. SELECT t.account_name AS 公众号名
  354. ,t.videoid
  355. ,COUNT(DISTINCT s.machinecode) AS 头部回流人数
  356. FROM (
  357. SELECT DISTINCT a.shareobjectid AS videoid
  358. ,a.shareid
  359. ,a.machinecode
  360. ,d.account_name
  361. FROM loghubods.user_share_log a
  362. LEFT JOIN loghubods.user_wechat_identity_info_ha b
  363. ON a.machinecode = CONCAT('weixin_openid_',b.open_id)
  364. AND b.dt = MAX_PT("loghubods.user_wechat_identity_info_ha")
  365. LEFT JOIN loghubods.gzh_fans_info d
  366. ON b.union_id = d.union_id
  367. AND d.dt = MAX_PT("loghubods.gzh_fans_info")
  368. WHERE a.dt >= '{bizdatemin}'
  369. AND a.dt <= '{bizdatemax}'
  370. AND a.topic = 'share'
  371. AND a.pagesource REGEXP 'pages/user-videos-share$'
  372. AND d.user_create_time IS NOT NULL
  373. AND d.account_name = '{account_name}'
  374. ) t
  375. LEFT JOIN (
  376. SELECT DISTINCT shareid
  377. ,machinecode
  378. ,clickobjectid
  379. FROM loghubods.user_share_log
  380. WHERE dt >= '{bizdatemin}'
  381. AND dt <= '{bizdatemax}'
  382. AND topic = 'click'
  383. ) s
  384. ON t.shareid = s.shareid
  385. GROUP BY account_name
  386. ,t.videoid
  387. ) d
  388. ON a.公众号名 = d.公众号名
  389. AND a.videoid = d.videoid
  390. LEFT JOIN loghubods.video_merge_tag e
  391. ON a.videoid = e.videoid
  392. )
  393. ORDER BY 推荐曝光数 DESC
  394. '''
  395. result_list = []
  396. data = get_odps_data(sql_query)
  397. if data:
  398. for r in data:
  399. result_list.append(
  400. {
  401. "account_name": r[0],
  402. "videoid": r[1],
  403. "一级品类": r[2],
  404. "二级品类": r[3],
  405. "ext_data": {
  406. "头部曝光": r[4],
  407. "头部曝光uv": r[5],
  408. "头部realplay": r[6],
  409. "头部realplay_uv": r[7],
  410. "头部分享": r[8],
  411. "头部分享uv": r[9],
  412. "头部回流数": r[10],
  413. "推荐曝光数": r[11],
  414. "当日分发曝光uv": r[12],
  415. "推荐realplay": r[13],
  416. "分发realplay_uv": r[14],
  417. "推荐分享数": r[15],
  418. "当日分发分享uv": r[16],
  419. "推荐回流数": r[17],
  420. "vov分子": r[18],
  421. },
  422. }
  423. )
  424. # 输出到 examples/demand/data/changwen_data/
  425. output_dir = Path(__file__).parent / "data" / "changwen_data"
  426. output_dir.mkdir(parents=True, exist_ok=True)
  427. output_file = output_dir / f"{account_name}.json"
  428. with output_file.open("w", encoding="utf-8") as f:
  429. json.dump(result_list, f, ensure_ascii=False, indent=2)
  430. return result_list
  431. def get_zengzhang_weight(account_name):
  432. bizdatemax_date = date.today() - timedelta(days=1)
  433. bizdatemin_date = bizdatemax_date - timedelta(days=30)
  434. bizdatemax = bizdatemax_date.strftime("%Y%m%d")
  435. bizdatemin = bizdatemin_date.strftime("%Y%m%d")
  436. sql_query = f'''
  437. SELECT 合作方名
  438. ,合作方简称
  439. ,videoid
  440. ,一级品类
  441. ,二级品类
  442. ,SUM(头部曝光) as 头部曝光
  443. ,SUM(头部曝光uv) as 头部曝光uv
  444. ,SUM(头部realplay) as 头部realplay
  445. ,SUM(头部realplay_uv) as 头部realplay_uv
  446. ,SUM(头部分享) as 头部分享
  447. ,SUM(头部分享uv) as 头部分享uv
  448. ,SUM(头部回流数) as 头部回流数
  449. ,SUM(推荐曝光数) as 推荐曝光数
  450. ,SUM(当日分发曝光uv) as 当日分发曝光uv
  451. ,SUM(推荐realplay) as 推荐realplay
  452. ,SUM(分发realplay_uv) as 分发realplay_uv
  453. ,SUM(推荐分享数) as 推荐分享数
  454. ,SUM(当日分发分享uv) as 当日分发分享uv
  455. ,SUM(推荐回流数) as 推荐回流数
  456. ,SUM(vov分子) as vov分子
  457. FROM loghubods.dws_growth_partner_vid_data
  458. WHERE dt BETWEEN '{bizdatemin}' AND '{bizdatemax}'
  459. AND 合作方名 = '{account_name}'
  460. GROUP BY 合作方名
  461. ,合作方简称
  462. ,videoid
  463. ,一级品类
  464. ,二级品类
  465. ORDER BY SUM(推荐曝光数)
  466. ;
  467. '''
  468. result_list = []
  469. data = get_odps_data(sql_query)
  470. if data:
  471. for r in data:
  472. result_list.append(
  473. {
  474. "account_name": r[0],
  475. "合作方简称": r[1],
  476. "videoid": r[2],
  477. "一级品类": r[3],
  478. "二级品类": r[4],
  479. "ext_data": {
  480. "头部曝光": r[5],
  481. "头部曝光uv": r[6],
  482. "头部realplay": r[7],
  483. "头部realplay_uv": r[8],
  484. "头部分享": r[9],
  485. "头部分享uv": r[10],
  486. "头部回流数": r[11],
  487. "推荐曝光数": r[12],
  488. "当日分发曝光uv": r[13],
  489. "推荐realplay": r[14],
  490. "分发realplay_uv": r[15],
  491. "推荐分享数": r[16],
  492. "当日分发分享uv": r[17],
  493. "推荐回流数": r[18],
  494. "vov分子": r[19],
  495. },
  496. }
  497. )
  498. # 输出到 examples/demand/data/zengzhang_data/
  499. output_dir = Path(__file__).parent / "data" / "zengzhang_data"
  500. output_dir.mkdir(parents=True, exist_ok=True)
  501. output_file = output_dir / f"{account_name}.json"
  502. with output_file.open("w", encoding="utf-8") as f:
  503. json.dump(result_list, f, ensure_ascii=False, indent=2)
  504. return result_list
  505. def get_merge_leve2_by_video_ids(video_ids, batch_size=2000):
  506. result = {}
  507. if not video_ids:
  508. return result
  509. normalized_ids = [str(video_id) for video_id in video_ids if video_id is not None]
  510. for i in range(0, len(normalized_ids), batch_size):
  511. batch_ids = normalized_ids[i:i + batch_size]
  512. escaped_ids = [video_id.replace("'", "''") for video_id in batch_ids]
  513. video_ids_in_clause = ", ".join([f"'{video_id}'" for video_id in escaped_ids])
  514. sql_query = f'''
  515. SELECT videoid, merge_leve2
  516. FROM loghubods.video_merge_tag
  517. WHERE videoid IN ({video_ids_in_clause})
  518. '''
  519. data = get_odps_data(sql_query)
  520. if not data:
  521. continue
  522. for row in data:
  523. result[str(row[0])] = row[1]
  524. return result
  525. def get_all_decode_task_result_rows():
  526. return mysql_db.select(
  527. "workflow_decode_task_result",
  528. columns="id, channel_content_id, merge_leve2",
  529. )
  530. def update_decode_task_result_merge_leve2(channel_content_id, merge_leve2):
  531. return mysql_db.update(
  532. "workflow_decode_task_result",
  533. {"merge_leve2": str(merge_leve2)},
  534. "channel_content_id = %s",
  535. (str(channel_content_id),),
  536. )
  537. def backfill_merge_leve2_for_decode_task_result():
  538. rows = get_all_decode_task_result_rows()
  539. updated_count = 0
  540. skipped_count = 0
  541. valid_content_ids = []
  542. for row in rows:
  543. channel_content_id = row.get("channel_content_id")
  544. if channel_content_id is None:
  545. skipped_count += 1
  546. continue
  547. channel_content_id = str(channel_content_id)
  548. if len(channel_content_id) > 8:
  549. skipped_count += 1
  550. continue
  551. valid_content_ids.append(channel_content_id)
  552. merge_leve2_map = get_merge_leve2_by_video_ids(valid_content_ids, batch_size=2000)
  553. for channel_content_id in valid_content_ids:
  554. merge_leve2 = merge_leve2_map.get(channel_content_id)
  555. if not merge_leve2:
  556. continue
  557. affected = update_decode_task_result_merge_leve2(channel_content_id, merge_leve2)
  558. if affected > 0:
  559. updated_count += affected
  560. return {
  561. "total": len(rows),
  562. "updated": updated_count,
  563. "skipped": skipped_count,
  564. }
  565. #
  566. # if __name__ == '__main__':
  567. # backfill_merge_leve2_for_decode_task_result()