data_query_tools.py 31 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699
  1. from zoneinfo import ZoneInfo
  2. from odps import ODPS
  3. from odps.errors import ODPSError
  4. from datetime import date, datetime, timedelta
  5. import json
  6. from pathlib import Path
  7. from examples.demand.mysql import mysql_db
  8. def get_odps_data(sql):
  9. # 配置信息
  10. access_id = 'LTAI9EBa0bd5PrDa'
  11. access_key = 'vAalxds7YxhfOA2yVv8GziCg3Y87v5'
  12. project = 'loghubods'
  13. endpoint = 'http://service.odps.aliyun.com/api'
  14. # 1. 初始化 ODPS 入口
  15. o = ODPS(access_id, access_key, project, endpoint=endpoint)
  16. try:
  17. # 2. 执行 SQL 并获取结果
  18. # execute_sql 会等待任务完成,使用 open_reader 读取数据
  19. with o.execute_sql(sql).open_reader() as reader:
  20. # reader 类似于 Java 中的 List<Record>
  21. # 我们可以直接将其转换为 Python 的 list
  22. records = [record for record in reader]
  23. return records
  24. except ODPSError as e:
  25. print(f"ODPS 错误: {e}")
  26. return None
  27. def execute_odps_sql(sql) -> bool:
  28. # 配置信息
  29. access_id = 'LTAI9EBa0bd5PrDa'
  30. access_key = 'vAalxds7YxhfOA2yVv8GziCg3Y87v5'
  31. project = 'loghubods'
  32. endpoint = 'http://service.odps.aliyun.com/api'
  33. o = ODPS(access_id, access_key, project, endpoint=endpoint)
  34. try:
  35. instance = o.execute_sql(sql)
  36. instance.wait_for_success()
  37. return True
  38. except ODPSError as e:
  39. print(f"ODPS 错误: {e}")
  40. return False
  41. def write_feature_point_data_to_hive(names: list[str]) -> int:
  42. """
  43. 将需求名称写入 Hive 表 feature_point_data(按北京时间当天分区)。
  44. 仅写入以下字段:
  45. - 特征点
  46. - 总分发曝光pv(固定 5000)
  47. - 质bn_rovn(固定 0.1)
  48. """
  49. normalized_names = [str(name).strip() for name in names if name is not None and str(name).strip()]
  50. if not normalized_names:
  51. return 0
  52. dt = datetime.now(ZoneInfo("Asia/Shanghai")).strftime("%Y%m%d")
  53. select_parts = []
  54. for name in normalized_names:
  55. safe_name = name.replace("'", "''")
  56. select_parts.append(
  57. "SELECT "
  58. f"'{safe_name}' AS `特征点`, "
  59. "5000 AS `总分发曝光pv`, "
  60. "0.1 AS `质bn_rovn`"
  61. )
  62. union_sql = "\nUNION ALL\n".join(select_parts)
  63. insert_sql = f"""
  64. INSERT INTO TABLE feature_point_data
  65. PARTITION (dt='{dt}')
  66. (`特征点`, `总分发曝光pv`, `质bn_rovn`)
  67. {union_sql}
  68. """
  69. ok = execute_odps_sql(insert_sql)
  70. if not ok:
  71. return 0
  72. return len(normalized_names)
  73. def get_demand_merge_level2_names():
  74. date_time = datetime.now(ZoneInfo("Asia/Shanghai")).date() - timedelta(days=1)
  75. day = date_time.strftime("%Y%m%d")
  76. count = 50
  77. sql_query = f'''
  78. select *
  79. from (
  80. select
  81. dt,
  82. merge二级品类,
  83. sum(当日分发曝光pv) as 分发曝光pv,
  84. sum(累计分享回流uv) AS bn_总回流,
  85. sum(当日分发回流uv)/(sum(当日分发曝光pv)+100) as 质bn_rovn,
  86. case when sum(当日分发曝光pv)>=10000 then
  87. case when sum(当日分发回流uv)/(sum(当日分发曝光pv)+100)<0.035
  88. then -1*(count(DISTINCT 视频id)/avg(总日分发视频数))/((sum(累计分享回流uv)/avg(总日回流uv)))
  89. else 10*(sum(累计分享回流uv)/avg(总日回流uv)*sum(当日分发回流uv)/(sum(当日分发曝光pv)+100))/(count(DISTINCT 视频id)/avg(总日分发视频数))
  90. end
  91. else 0 end AS 总供需分,
  92. case when sum(当日分发曝光pv)>=10000 then
  93. case when sum(当日分发回流uv)/(sum(当日分发曝光pv)+100)<0.035
  94. then -1*(COUNT(DISTINCT CASE WHEN 推荐天数间隔<3 THEN 视频id END ) /avg(总日分发视频数))/(sum(累计分享回流uv)/avg(总日回流uv))
  95. else 10*(sum(累计分享回流uv)/avg(总日回流uv)*sum(当日分发回流uv)/(sum(当日分发曝光pv)+1000))/(COUNT(DISTINCT CASE WHEN 推荐天数间隔<3 THEN 视频id END ) /avg(总日分发视频数))
  96. end
  97. else 0 end AS 新供需分,
  98. count(DISTINCT 视频id) as 分发视频量,
  99. count(DISTINCT if(推荐天数间隔<3,视频id,null)) as 3日新推荐视频量,
  100. case when sum(当日分发曝光pv)>=10000 and sum(当日分发回流uv)/(sum(当日分发曝光pv)+100)>0.035
  101. then (avg(总日分发视频数)*(10*(sum(当日分发回流uv)/(sum(当日分发曝光pv)+100))*(sum(累计分享回流uv)/avg(总日回流uv) ))/0.5-count(DISTINCT 视频id))/3
  102. end as 缺量,
  103. case when sum(当日分发曝光pv)>=10000 and sum(当日分发回流uv)/(sum(当日分发曝光pv)+100)<=0.035
  104. then (avg(总日分发视频数)*(10*(sum(当日分发回流uv)/(sum(当日分发曝光pv)+100))*(sum(累计分享回流uv)/avg(总日回流uv) ))/(2)-count(DISTINCT 视频id))/3
  105. end as 控量,
  106. avg(总日回流uv) AS 总日回流uv,
  107. avg(总日分发视频数) AS 总日分发视频数,
  108. avg(总日推荐视频数) AS 总日推荐视频数,
  109. COUNT(DISTINCT CASE WHEN 总回流uv>0 THEN 视频id END )/avg(总日分发视频数) AS 回流视频个数占比,
  110. sum(当日分发回流uv) AS bn_当日分发回流,
  111. sum(当日分发回流uv)/avg(总日回流uv) AS 分发拉回回流uv占比,
  112. sum(累计分享回流uv)/avg(总日回流uv) AS 回流uv占比,
  113. count(DISTINCT 视频id)/avg(总日分发视频数) AS 分发视频量占比,
  114. COUNT(DISTINCT CASE WHEN 是否当日新推荐=1 THEN 视频id END ) /avg(总日分发视频数) AS 新推荐视频量占比
  115. from loghubods.video_dimension_detail_add_column
  116. where dt = '{day}'
  117. group by dt, merge二级品类
  118. ) t1
  119. where t1.缺量>= {count}
  120. '''
  121. data = get_odps_data(sql_query)
  122. result_list = []
  123. if data:
  124. for r in data:
  125. lack_count = r[9]
  126. if lack_count > 1000:
  127. count = 70
  128. elif 500 < lack_count <= 1000:
  129. count = 60
  130. elif 100 < lack_count <= 500:
  131. count = 40
  132. elif 50 < lack_count <= 100:
  133. count = 20
  134. else:
  135. count = 10
  136. if count == 0:
  137. continue
  138. result_list.append({
  139. "cluster_name": r[1],
  140. "platform_type": "piaoquan",
  141. "count": count,
  142. })
  143. return result_list
  144. def get_rov_by_merge_leve2_and_video_ids(merge_leve2, video_ids):
  145. merge_level_in_clause = f"'{merge_leve2}'"
  146. video_ids_in_clause = ", ".join([f"'{video_id}'" for video_id in video_ids])
  147. end_date = (date.today() - timedelta(days=1)).strftime("%Y%m%d")
  148. start_date = (date.today() - timedelta(days=14)).strftime("%Y%m%d")
  149. sql_query = f'''
  150. SELECT
  151. v.videoid,
  152. CASE
  153. WHEN COALESCE(SUM(COALESCE(t3.`当日分发曝光pv`, 0)), 0) < 1000 THEN 0
  154. ELSE COALESCE(AVG(NULLIF(t3.rov_t0, 0)), 0)
  155. END AS avg_rov_t0
  156. FROM
  157. (
  158. SELECT
  159. t2.videoid,
  160. t2.merge_leve2
  161. FROM videoods.content_profile t1
  162. JOIN loghubods.video_merge_tag t2
  163. ON t1.content_id = t2.videoid
  164. WHERE
  165. t1.status = 3
  166. AND t1.is_deleted = 0
  167. AND t2.merge_leve2 IN ({merge_level_in_clause})
  168. ) v
  169. LEFT JOIN loghubods.video_dimension_detail_add_column t3
  170. ON v.videoid = t3.视频id
  171. AND t3.dt >= '{start_date}'
  172. AND t3.dt <= '{end_date}'
  173. WHERE v.videoid in ({video_ids_in_clause})
  174. GROUP BY
  175. v.videoid
  176. ;
  177. '''
  178. data = get_odps_data(sql_query)
  179. result_dict = {}
  180. if data:
  181. result_dict = {r[0]: r[1] for r in data}
  182. return result_dict
  183. def get_rov_by_tree_and_video_ids(video_ids):
  184. video_ids_in_clause = ", ".join([f"'{video_id}'" for video_id in video_ids])
  185. last_year_today = date.today() - timedelta(days=365)
  186. start_date = last_year_today.strftime("%Y%m%d")
  187. end_date = (last_year_today + timedelta(days=7)).strftime("%Y%m%d")
  188. sql_query = f'''
  189. SELECT
  190. CAST(t3.视频id AS STRING) AS 视频id_str,
  191. CASE
  192. WHEN COALESCE(SUM(COALESCE(t3.`当日分发曝光pv`, 0)), 0) < 1000 THEN 0
  193. ELSE COALESCE(AVG(NULLIF(t3.rov_t0, 0)), 0)
  194. END AS avg_rov_t0
  195. FROM
  196. loghubods.video_dimension_detail_add_column t3
  197. WHERE t3.视频id in ({video_ids_in_clause})
  198. AND t3.dt >= '{start_date}'
  199. AND t3.dt <= '{end_date}'
  200. GROUP BY
  201. t3.视频id
  202. ;
  203. '''
  204. data = get_odps_data(sql_query)
  205. result_dict = {}
  206. if data:
  207. result_dict = {r[0]: r[1] for r in data}
  208. return result_dict
  209. def get_changwen_weight(account_name):
  210. bizdatemax_date = date.today() - timedelta(days=1)
  211. bizdatemin_date = bizdatemax_date - timedelta(days=30)
  212. bizdatemax = bizdatemax_date.strftime("%Y%m%d")
  213. bizdatemin = bizdatemin_date.strftime("%Y%m%d")
  214. sql_query = f'''
  215. SELECT
  216. 公众号名
  217. ,videoid
  218. ,一级品类
  219. ,二级品类
  220. ,头部曝光
  221. ,头部曝光uv
  222. ,头部realplay
  223. ,头部realplay_uv
  224. ,头部分享
  225. ,头部分享uv
  226. ,头部回流人数 AS 头部回流数
  227. ,推荐曝光数
  228. ,当日分发曝光uv
  229. ,推荐realplay
  230. ,分发realplay_uv
  231. ,推荐分享数
  232. ,当日分发分享uv
  233. ,推荐回流数
  234. ,当日回流进入分发曝光次数 AS vov分子
  235. FROM (
  236. SELECT DISTINCT a.公众号名
  237. ,a.videoid
  238. ,e.merge_leve1 AS 一级品类
  239. ,e.merge_leve2 AS 二级品类
  240. ,a.title
  241. ,a.进入分发人数
  242. ,头部曝光pv AS 头部曝光
  243. ,头部realplay_pv AS 头部realplay
  244. ,头部分享pv AS 头部分享
  245. ,a.当日分发曝光pv AS 推荐曝光数
  246. ,a.当日分发播放pv
  247. ,分发realplay_pv AS 推荐realplay
  248. ,分发realplay_pv / a.当日分发播放pv AS 真实播放率pv
  249. ,当日分发播放uv
  250. ,c.realplay_uv AS 分发真实播uv
  251. ,c.realplay_uv / a.当日分发播放uv AS 真实播放率uv
  252. ,a.当日分发分享pv AS 推荐分享数
  253. ,a.当日分发分享pv / a.当日分发曝光pv AS str
  254. ,NVL(b.当日分发回流人数,0) AS 推荐回流数
  255. ,NVL(b.当日回流进入分发人数,0) AS 当日回流进入分发人数
  256. ,NVL(b.当日回流进入分发曝光次数,0) AS 当日回流进入分发曝光次数
  257. ,NVL(b.当日回流进入分发曝光次数,0) / a.当日分发曝光pv AS vov分子
  258. ,d.头部回流人数
  259. ,当日分发曝光uv
  260. ,头部曝光uv
  261. ,当日分发分享uv
  262. ,头部分享uv
  263. ,分发realplay_uv
  264. ,头部realplay_uv
  265. FROM (
  266. SELECT account_name AS 公众号名
  267. ,videoid
  268. ,title
  269. ,COUNT(DISTINCT mid) AS 进入分发人数
  270. ,COUNT(
  271. CASE WHEN pagesource REGEXP 'category$|recommend$|-pages/user-videos-detail$' AND businesstype = 'videoView' THEN mid END
  272. ) AS 当日分发曝光pv
  273. ,COUNT(DISTINCT
  274. CASE WHEN pagesource REGEXP 'category$|recommend$|-pages/user-videos-detail$' AND businesstype = 'videoView' THEN mid END
  275. ) AS 当日分发曝光uv
  276. ,COUNT(
  277. CASE WHEN pagesource REGEXP 'pages/user-videos-share$' AND businesstype = 'videoView' THEN mid END
  278. ) AS 头部曝光pv
  279. ,COUNT(DISTINCT
  280. CASE WHEN pagesource REGEXP 'pages/user-videos-share$' AND businesstype = 'videoView' THEN mid END
  281. ) AS 头部曝光uv
  282. ,COUNT(
  283. CASE WHEN pagesource REGEXP 'category$|recommend$|-pages/user-videos-detail$' AND businesstype = 'videoPlay' THEN mid END
  284. ) AS 当日分发播放pv
  285. ,COUNT(DISTINCT
  286. CASE WHEN pagesource REGEXP 'category$|recommend$|-pages/user-videos-detail$' AND businesstype = 'videoPlay' THEN mid END
  287. ) AS 当日分发播放uv
  288. ,COUNT(
  289. CASE WHEN pagesource REGEXP 'category$|recommend$|-pages/user-videos-detail$' AND businesstype = 'videoShareFriend' THEN mid END
  290. ) AS 当日分发分享pv
  291. ,COUNT(DISTINCT
  292. CASE WHEN pagesource REGEXP 'category$|recommend$|-pages/user-videos-detail$' AND businesstype = 'videoShareFriend' THEN mid END
  293. ) AS 当日分发分享uv
  294. ,COUNT(
  295. CASE WHEN pagesource REGEXP 'pages/user-videos-share$' AND businesstype = 'videoShareFriend' THEN mid END
  296. ) AS 头部分享pv
  297. ,COUNT(DISTINCT
  298. CASE WHEN pagesource REGEXP 'pages/user-videos-share$' AND businesstype = 'videoShareFriend' THEN mid END
  299. ) AS 头部分享uv
  300. FROM (
  301. SELECT DISTINCT a.mid
  302. ,a.videoid
  303. ,a.businesstype
  304. ,a.pagesource
  305. ,a.subsessionid
  306. ,account_name
  307. ,e.title
  308. FROM loghubods.video_action_log_rp a
  309. LEFT JOIN loghubods.user_wechat_identity_info_ha b
  310. ON a.mid = CONCAT('weixin_openid_',b.open_id)
  311. AND b.dt = MAX_PT("loghubods.user_wechat_identity_info_ha")
  312. LEFT JOIN loghubods.gzh_fans_info d
  313. ON b.union_id = d.union_id
  314. AND d.dt = MAX_PT("loghubods.gzh_fans_info")
  315. LEFT JOIN videoods.wx_video e
  316. ON a.videoid = e.id
  317. WHERE a.dt >= '{bizdatemin}'
  318. AND a.dt <= '{bizdatemax}'
  319. AND businesstype IN ('videoView','videoPlay','videoShareFriend')
  320. AND d.user_create_time IS NOT NULL
  321. AND account_name = '{account_name}'
  322. AND a.videoid IN (
  323. SELECT
  324. DISTINCT content_id AS videoid
  325. FROM
  326. videoods.content_profile
  327. WHERE status=3
  328. AND is_deleted = 0
  329. )
  330. ) t
  331. GROUP BY 公众号名
  332. ,videoid
  333. ,title
  334. ) a
  335. LEFT JOIN (
  336. SELECT t.account_name AS 公众号名
  337. ,t.videoid
  338. ,COUNT(DISTINCT s.machinecode) AS 当日分发回流人数
  339. ,COUNT(DISTINCT v.mid) AS 当日回流进入分发人数
  340. ,COUNT(v.mid) AS 当日回流进入分发曝光次数
  341. FROM (
  342. SELECT DISTINCT a.subsessionid
  343. ,a.videoid
  344. ,a.mid
  345. ,d.account_name
  346. ,GET_JSON_OBJECT(extparams,'$.recomTraceId') AS recomtraceid
  347. FROM loghubods.video_action_log_rp a
  348. LEFT JOIN loghubods.user_wechat_identity_info_ha b
  349. ON a.mid = CONCAT('weixin_openid_',b.open_id)
  350. AND b.dt = MAX_PT("loghubods.user_wechat_identity_info_ha")
  351. LEFT JOIN loghubods.gzh_fans_info d
  352. ON b.union_id = d.union_id
  353. AND d.dt = MAX_PT("loghubods.gzh_fans_info")
  354. WHERE a.dt >= '{bizdatemin}'
  355. AND a.dt <= '{bizdatemax}'
  356. AND a.businesstype = 'videoShareFriend'
  357. AND a.pagesource REGEXP 'category$|recommend$|-pages/user-videos-detail$'
  358. AND d.user_create_time IS NOT NULL
  359. AND d.account_name = '{account_name}'
  360. ) t
  361. LEFT JOIN (
  362. SELECT DISTINCT subsessionid
  363. ,machinecode
  364. ,recomtraceid
  365. ,clickobjectid
  366. FROM loghubods.user_share_log
  367. WHERE dt >= '{bizdatemin}'
  368. AND dt <= '{bizdatemax}'
  369. AND topic = 'click'
  370. ) s
  371. ON t.recomtraceid = s.recomtraceid
  372. AND t.videoid = s.clickobjectid
  373. LEFT JOIN (
  374. SELECT subsessionid
  375. ,mid
  376. ,videoid
  377. FROM loghubods.video_action_log_rp
  378. WHERE dt >= '{bizdatemin}'
  379. AND dt <= '{bizdatemax}'
  380. AND pagesource REGEXP 'category$|recommend$|-pages/user-videos-detail$'
  381. AND businesstype = 'videoView'
  382. ) v
  383. ON s.subsessionid = v.subsessionid
  384. AND s.machinecode = v.mid
  385. GROUP BY account_name
  386. ,t.videoid
  387. ) b
  388. ON a.公众号名 = b.公众号名
  389. AND a.videoid = b.videoid
  390. LEFT JOIN (
  391. SELECT d.account_name AS 公众号名
  392. ,a.videoid
  393. ,COUNT(DISTINCT a.mid) AS realplay_uv
  394. ,COUNT(
  395. CASE WHEN a.pagesource REGEXP 'category$|recommend$|-pages/user-videos-detail$' THEN a.mid END
  396. ) AS 分发realplay_pv
  397. ,COUNT(CASE WHEN a.pagesource REGEXP 'pages/user-videos-share$' THEN a.mid END) AS 头部realplay_pv
  398. ,COUNT(DISTINCT
  399. CASE WHEN a.pagesource REGEXP 'category$|recommend$|-pages/user-videos-detail$' THEN a.mid END
  400. ) AS 分发realplay_uv
  401. ,COUNT(DISTINCT CASE WHEN a.pagesource REGEXP 'pages/user-videos-share$' THEN a.mid END) AS 头部realplay_uv
  402. FROM loghubods.ods_video_play_log_day a
  403. LEFT JOIN (
  404. SELECT DISTINCT open_id
  405. ,union_id
  406. FROM loghubods.user_wechat_identity_info_ha
  407. WHERE dt = MAX_PT("loghubods.user_wechat_identity_info_ha")
  408. ) b
  409. ON a.mid = CONCAT('weixin_openid_',b.open_id)
  410. LEFT JOIN loghubods.gzh_fans_info d
  411. ON b.union_id = d.union_id
  412. AND d.dt = MAX_PT("loghubods.gzh_fans_info")
  413. WHERE a.dt >= '{bizdatemin}'
  414. AND a.dt <= '{bizdatemax}'
  415. AND a.businesstype = 'videoRealPlay'
  416. AND d.user_create_time IS NOT NULL
  417. AND d.account_name = '{account_name}'
  418. GROUP BY d.account_name
  419. ,a.videoid
  420. ORDER BY 分发realplay_pv DESC
  421. ) c
  422. ON a.公众号名 = c.公众号名
  423. AND a.videoid = c.videoid
  424. LEFT JOIN (
  425. SELECT t.account_name AS 公众号名
  426. ,t.videoid
  427. ,COUNT(DISTINCT s.machinecode) AS 头部回流人数
  428. FROM (
  429. SELECT DISTINCT a.shareobjectid AS videoid
  430. ,a.shareid
  431. ,a.machinecode
  432. ,d.account_name
  433. FROM loghubods.user_share_log a
  434. LEFT JOIN loghubods.user_wechat_identity_info_ha b
  435. ON a.machinecode = CONCAT('weixin_openid_',b.open_id)
  436. AND b.dt = MAX_PT("loghubods.user_wechat_identity_info_ha")
  437. LEFT JOIN loghubods.gzh_fans_info d
  438. ON b.union_id = d.union_id
  439. AND d.dt = MAX_PT("loghubods.gzh_fans_info")
  440. WHERE a.dt >= '{bizdatemin}'
  441. AND a.dt <= '{bizdatemax}'
  442. AND a.topic = 'share'
  443. AND a.pagesource REGEXP 'pages/user-videos-share$'
  444. AND d.user_create_time IS NOT NULL
  445. AND d.account_name = '{account_name}'
  446. ) t
  447. LEFT JOIN (
  448. SELECT DISTINCT shareid
  449. ,machinecode
  450. ,clickobjectid
  451. FROM loghubods.user_share_log
  452. WHERE dt >= '{bizdatemin}'
  453. AND dt <= '{bizdatemax}'
  454. AND topic = 'click'
  455. ) s
  456. ON t.shareid = s.shareid
  457. GROUP BY account_name
  458. ,t.videoid
  459. ) d
  460. ON a.公众号名 = d.公众号名
  461. AND a.videoid = d.videoid
  462. LEFT JOIN loghubods.video_merge_tag e
  463. ON a.videoid = e.videoid
  464. )
  465. ORDER BY 推荐曝光数 DESC
  466. '''
  467. result_list = []
  468. data = get_odps_data(sql_query)
  469. if data:
  470. for r in data:
  471. result_list.append(
  472. {
  473. "account_name": r[0],
  474. "videoid": r[1],
  475. "一级品类": r[2],
  476. "二级品类": r[3],
  477. "ext_data": {
  478. "头部曝光": r[4],
  479. "头部曝光uv": r[5],
  480. "头部realplay": r[6],
  481. "头部realplay_uv": r[7],
  482. "头部分享": r[8],
  483. "头部分享uv": r[9],
  484. "头部回流数": r[10],
  485. "推荐曝光数": r[11],
  486. "当日分发曝光uv": r[12],
  487. "推荐realplay": r[13],
  488. "分发realplay_uv": r[14],
  489. "推荐分享数": r[15],
  490. "当日分发分享uv": r[16],
  491. "推荐回流数": r[17],
  492. "vov分子": r[18],
  493. },
  494. }
  495. )
  496. # 输出到 examples/demand/data/changwen_data/
  497. output_dir = Path(__file__).parent / "data" / "changwen_data"
  498. output_dir.mkdir(parents=True, exist_ok=True)
  499. output_file = output_dir / f"{account_name}.json"
  500. with output_file.open("w", encoding="utf-8") as f:
  501. json.dump(result_list, f, ensure_ascii=False, indent=2)
  502. return result_list
  503. def get_zengzhang_weight(account_name):
  504. bizdatemax_date = date.today() - timedelta(days=1)
  505. bizdatemin_date = bizdatemax_date - timedelta(days=30)
  506. bizdatemax = bizdatemax_date.strftime("%Y%m%d")
  507. bizdatemin = bizdatemin_date.strftime("%Y%m%d")
  508. sql_query = f'''
  509. SELECT 合作方名
  510. ,合作方简称
  511. ,videoid
  512. ,一级品类
  513. ,二级品类
  514. ,SUM(头部曝光) as 头部曝光
  515. ,SUM(头部曝光uv) as 头部曝光uv
  516. ,SUM(头部realplay) as 头部realplay
  517. ,SUM(头部realplay_uv) as 头部realplay_uv
  518. ,SUM(头部分享) as 头部分享
  519. ,SUM(头部分享uv) as 头部分享uv
  520. ,SUM(头部回流数) as 头部回流数
  521. ,SUM(推荐曝光数) as 推荐曝光数
  522. ,SUM(当日分发曝光uv) as 当日分发曝光uv
  523. ,SUM(推荐realplay) as 推荐realplay
  524. ,SUM(分发realplay_uv) as 分发realplay_uv
  525. ,SUM(推荐分享数) as 推荐分享数
  526. ,SUM(当日分发分享uv) as 当日分发分享uv
  527. ,SUM(推荐回流数) as 推荐回流数
  528. ,SUM(vov分子) as vov分子
  529. FROM loghubods.dws_growth_partner_vid_data
  530. WHERE dt BETWEEN '{bizdatemin}' AND '{bizdatemax}'
  531. AND 合作方名 = '{account_name}'
  532. GROUP BY 合作方名
  533. ,合作方简称
  534. ,videoid
  535. ,一级品类
  536. ,二级品类
  537. ORDER BY SUM(推荐曝光数)
  538. ;
  539. '''
  540. result_list = []
  541. data = get_odps_data(sql_query)
  542. if data:
  543. for r in data:
  544. result_list.append(
  545. {
  546. "account_name": r[0],
  547. "合作方简称": r[1],
  548. "videoid": r[2],
  549. "一级品类": r[3],
  550. "二级品类": r[4],
  551. "ext_data": {
  552. "头部曝光": r[5],
  553. "头部曝光uv": r[6],
  554. "头部realplay": r[7],
  555. "头部realplay_uv": r[8],
  556. "头部分享": r[9],
  557. "头部分享uv": r[10],
  558. "头部回流数": r[11],
  559. "推荐曝光数": r[12],
  560. "当日分发曝光uv": r[13],
  561. "推荐realplay": r[14],
  562. "分发realplay_uv": r[15],
  563. "推荐分享数": r[16],
  564. "当日分发分享uv": r[17],
  565. "推荐回流数": r[18],
  566. "vov分子": r[19],
  567. },
  568. }
  569. )
  570. # 输出到 examples/demand/data/zengzhang_data/
  571. output_dir = Path(__file__).parent / "data" / "zengzhang_data"
  572. output_dir.mkdir(parents=True, exist_ok=True)
  573. output_file = output_dir / f"{account_name}.json"
  574. with output_file.open("w", encoding="utf-8") as f:
  575. json.dump(result_list, f, ensure_ascii=False, indent=2)
  576. return result_list
  577. def get_merge_leve2_by_video_ids(video_ids, batch_size=2000):
  578. result = {}
  579. if not video_ids:
  580. return result
  581. normalized_ids = [str(video_id) for video_id in video_ids if video_id is not None]
  582. for i in range(0, len(normalized_ids), batch_size):
  583. batch_ids = normalized_ids[i:i + batch_size]
  584. escaped_ids = [video_id.replace("'", "''") for video_id in batch_ids]
  585. video_ids_in_clause = ", ".join([f"'{video_id}'" for video_id in escaped_ids])
  586. sql_query = f'''
  587. SELECT videoid, merge_leve2
  588. FROM loghubods.video_merge_tag
  589. WHERE videoid IN ({video_ids_in_clause})
  590. '''
  591. data = get_odps_data(sql_query)
  592. if not data:
  593. continue
  594. for row in data:
  595. result[str(row[0])] = row[1]
  596. return result
  597. def get_all_decode_task_result_rows():
  598. return mysql_db.select(
  599. "workflow_decode_task_result",
  600. columns="id, channel_content_id, merge_leve2",
  601. )
  602. def update_decode_task_result_merge_leve2(channel_content_id, merge_leve2):
  603. return mysql_db.update(
  604. "workflow_decode_task_result",
  605. {"merge_leve2": str(merge_leve2)},
  606. "channel_content_id = %s",
  607. (str(channel_content_id),),
  608. )
  609. def backfill_merge_leve2_for_decode_task_result():
  610. rows = get_all_decode_task_result_rows()
  611. updated_count = 0
  612. skipped_count = 0
  613. valid_content_ids = []
  614. for row in rows:
  615. channel_content_id = row.get("channel_content_id")
  616. if channel_content_id is None:
  617. skipped_count += 1
  618. continue
  619. channel_content_id = str(channel_content_id)
  620. if len(channel_content_id) > 8:
  621. skipped_count += 1
  622. continue
  623. valid_content_ids.append(channel_content_id)
  624. merge_leve2_map = get_merge_leve2_by_video_ids(valid_content_ids, batch_size=2000)
  625. for channel_content_id in valid_content_ids:
  626. merge_leve2 = merge_leve2_map.get(channel_content_id)
  627. if not merge_leve2:
  628. continue
  629. affected = update_decode_task_result_merge_leve2(channel_content_id, merge_leve2)
  630. if affected > 0:
  631. updated_count += affected
  632. return {
  633. "total": len(rows),
  634. "updated": updated_count,
  635. "skipped": skipped_count,
  636. }
  637. #
  638. # if __name__ == '__main__':
  639. # backfill_merge_leve2_for_decode_task_result()