data_query_tools.py 31 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716
  1. from zoneinfo import ZoneInfo
  2. from odps import ODPS
  3. from odps.errors import ODPSError
  4. from datetime import date, datetime, timedelta
  5. import json
  6. from pathlib import Path
  7. from examples.demand.mysql import mysql_db
  8. def get_odps_data(sql):
  9. # 配置信息
  10. access_id = 'LTAI9EBa0bd5PrDa'
  11. access_key = 'vAalxds7YxhfOA2yVv8GziCg3Y87v5'
  12. project = 'loghubods'
  13. endpoint = 'http://service.odps.aliyun.com/api'
  14. # 1. 初始化 ODPS 入口
  15. o = ODPS(access_id, access_key, project, endpoint=endpoint)
  16. try:
  17. # 2. 执行 SQL 并获取结果
  18. # execute_sql 会等待任务完成,使用 open_reader 读取数据
  19. with o.execute_sql(sql).open_reader() as reader:
  20. # reader 类似于 Java 中的 List<Record>
  21. # 我们可以直接将其转换为 Python 的 list
  22. records = [record for record in reader]
  23. return records
  24. except ODPSError as e:
  25. print(f"ODPS 错误: {e}")
  26. return None
  27. def execute_odps_sql(sql) -> bool:
  28. # 配置信息
  29. access_id = 'LTAI9EBa0bd5PrDa'
  30. access_key = 'vAalxds7YxhfOA2yVv8GziCg3Y87v5'
  31. project = 'loghubods'
  32. endpoint = 'http://service.odps.aliyun.com/api'
  33. o = ODPS(access_id, access_key, project, endpoint=endpoint)
  34. try:
  35. instance = o.execute_sql(sql)
  36. instance.wait_for_success()
  37. return True
  38. except ODPSError as e:
  39. print(f"ODPS 错误: {e}")
  40. return False
  41. def write_feature_point_data_to_hive(names: list[str]) -> int:
  42. """
  43. 将需求名称写入 Hive 表 feature_point_data(按北京时间当天分区)。
  44. 仅写入以下字段:
  45. - 特征点
  46. - 总分发曝光pv(固定 5000)
  47. - 质bn_rovn(固定 0.1)
  48. """
  49. normalized_names = [str(name).strip() for name in names if name is not None and str(name).strip()]
  50. if not normalized_names:
  51. return 0
  52. dt = datetime.now(ZoneInfo("Asia/Shanghai")).strftime("%Y%m%d")
  53. select_parts = []
  54. for name in normalized_names:
  55. safe_name = name.replace("'", "''")
  56. select_parts.append(
  57. "SELECT "
  58. f"'{safe_name}' AS `特征点`, "
  59. "5000 AS `总分发曝光pv`, "
  60. "0.1 AS `质bn_rovn`"
  61. )
  62. union_sql = "\nUNION ALL\n".join(select_parts)
  63. insert_sql = f"""
  64. INSERT INTO TABLE feature_point_data
  65. PARTITION (dt='{dt}')
  66. (`特征点`, `总分发曝光pv`, `质bn_rovn`)
  67. {union_sql}
  68. """
  69. ok = execute_odps_sql(insert_sql)
  70. if not ok:
  71. return 0
  72. return len(normalized_names)
  73. def get_demand_merge_level2_names():
  74. date_time = datetime.now(ZoneInfo("Asia/Shanghai")).date() - timedelta(days=1)
  75. day = date_time.strftime("%Y%m%d")
  76. count = 50
  77. sql_query = f'''
  78. select *
  79. from (
  80. select
  81. dt,
  82. merge二级品类,
  83. sum(当日分发曝光pv) as 分发曝光pv,
  84. sum(累计分享回流uv) AS bn_总回流,
  85. sum(当日分发回流uv)/(sum(当日分发曝光pv)+100) as 质bn_rovn,
  86. case when sum(当日分发曝光pv)>=10000 then
  87. case when sum(当日分发回流uv)/(sum(当日分发曝光pv)+100)<0.035
  88. then -1*(count(DISTINCT 视频id)/avg(总日分发视频数))/((sum(累计分享回流uv)/avg(总日回流uv)))
  89. else 10*(sum(累计分享回流uv)/avg(总日回流uv)*sum(当日分发回流uv)/(sum(当日分发曝光pv)+100))/(count(DISTINCT 视频id)/avg(总日分发视频数))
  90. end
  91. else 0 end AS 总供需分,
  92. case when sum(当日分发曝光pv)>=10000 then
  93. case when sum(当日分发回流uv)/(sum(当日分发曝光pv)+100)<0.035
  94. then -1*(COUNT(DISTINCT CASE WHEN 推荐天数间隔<3 THEN 视频id END ) /avg(总日分发视频数))/(sum(累计分享回流uv)/avg(总日回流uv))
  95. else 10*(sum(累计分享回流uv)/avg(总日回流uv)*sum(当日分发回流uv)/(sum(当日分发曝光pv)+1000))/(COUNT(DISTINCT CASE WHEN 推荐天数间隔<3 THEN 视频id END ) /avg(总日分发视频数))
  96. end
  97. else 0 end AS 新供需分,
  98. count(DISTINCT 视频id) as 分发视频量,
  99. count(DISTINCT if(推荐天数间隔<3,视频id,null)) as 3日新推荐视频量,
  100. case when sum(当日分发曝光pv)>=10000 and sum(当日分发回流uv)/(sum(当日分发曝光pv)+100)>0.035
  101. then (avg(总日分发视频数)*(10*(sum(当日分发回流uv)/(sum(当日分发曝光pv)+100))*(sum(累计分享回流uv)/avg(总日回流uv) ))/0.5-count(DISTINCT 视频id))/3
  102. end as 缺量,
  103. case when sum(当日分发曝光pv)>=10000 and sum(当日分发回流uv)/(sum(当日分发曝光pv)+100)<=0.035
  104. then (avg(总日分发视频数)*(10*(sum(当日分发回流uv)/(sum(当日分发曝光pv)+100))*(sum(累计分享回流uv)/avg(总日回流uv) ))/(2)-count(DISTINCT 视频id))/3
  105. end as 控量,
  106. avg(总日回流uv) AS 总日回流uv,
  107. avg(总日分发视频数) AS 总日分发视频数,
  108. avg(总日推荐视频数) AS 总日推荐视频数,
  109. COUNT(DISTINCT CASE WHEN 总回流uv>0 THEN 视频id END )/avg(总日分发视频数) AS 回流视频个数占比,
  110. sum(当日分发回流uv) AS bn_当日分发回流,
  111. sum(当日分发回流uv)/avg(总日回流uv) AS 分发拉回回流uv占比,
  112. sum(累计分享回流uv)/avg(总日回流uv) AS 回流uv占比,
  113. count(DISTINCT 视频id)/avg(总日分发视频数) AS 分发视频量占比,
  114. COUNT(DISTINCT CASE WHEN 是否当日新推荐=1 THEN 视频id END ) /avg(总日分发视频数) AS 新推荐视频量占比
  115. from loghubods.video_dimension_detail_add_column
  116. where dt = '{day}'
  117. group by dt, merge二级品类
  118. ) t1
  119. where t1.缺量>= {count}
  120. '''
  121. data = get_odps_data(sql_query)
  122. result_list = []
  123. if data:
  124. for r in data:
  125. lack_count = r[9]
  126. if lack_count > 1000:
  127. count = 70
  128. elif 500 < lack_count <= 1000:
  129. count = 60
  130. elif 100 < lack_count <= 500:
  131. count = 40
  132. elif 50 < lack_count <= 100:
  133. count = 20
  134. else:
  135. count = 10
  136. if count == 0:
  137. continue
  138. result_list.append({
  139. "cluster_name": r[1],
  140. "platform_type": "piaoquan",
  141. "count": count,
  142. })
  143. return result_list
  144. def get_rov_by_merge_leve2_and_video_ids(merge_leve2, video_ids):
  145. merge_level_in_clause = f"'{merge_leve2}'"
  146. video_ids_in_clause = ", ".join([f"'{video_id}'" for video_id in video_ids])
  147. end_date = (date.today() - timedelta(days=1)).strftime("%Y%m%d")
  148. start_date = (date.today() - timedelta(days=14)).strftime("%Y%m%d")
  149. # sql_query = f'''
  150. # SELECT
  151. # v.videoid,
  152. # CASE
  153. # WHEN COALESCE(SUM(COALESCE(t3.`当日分发曝光pv`, 0)), 0) < 1000 THEN 0
  154. # ELSE COALESCE(AVG(NULLIF(t3.rov_t0, 0)), 0)
  155. # END AS avg_rov_t0
  156. # FROM
  157. # (
  158. # SELECT
  159. # t2.videoid,
  160. # t2.merge_leve2
  161. # FROM videoods.content_profile t1
  162. # JOIN loghubods.video_merge_tag t2
  163. # ON t1.content_id = t2.videoid
  164. # WHERE
  165. # t1.status = 3
  166. # AND t1.is_deleted = 0
  167. # AND t2.merge_leve2 IN ({merge_level_in_clause})
  168. # ) v
  169. # LEFT JOIN loghubods.video_dimension_detail_add_column t3
  170. # ON v.videoid = t3.视频id
  171. # AND t3.dt >= '{start_date}'
  172. # AND t3.dt <= '{end_date}'
  173. # WHERE v.videoid in ({video_ids_in_clause})
  174. # GROUP BY
  175. # v.videoid
  176. # ;
  177. # '''
  178. sql_query = f'''
  179. SELECT
  180. CAST(t3.视频id AS STRING) AS 视频id_str,
  181. CASE
  182. WHEN COALESCE(SUM(COALESCE(t3.`当日分发曝光pv`, 0)), 0) < 1000 THEN 0
  183. ELSE COALESCE(AVG(NULLIF(t3.rov_t0, 0)), 0)
  184. END AS avg_rov_t0
  185. FROM
  186. loghubods.video_dimension_detail_add_column t3
  187. WHERE t3.视频id in ({video_ids_in_clause})
  188. AND t3.dt >= '{start_date}'
  189. AND t3.dt <= '{end_date}'
  190. GROUP BY
  191. t3.视频id
  192. ;
  193. '''
  194. data = get_odps_data(sql_query)
  195. result_dict = {}
  196. if data:
  197. result_dict = {r[0]: r[1] for r in data}
  198. return result_dict
  199. def get_rov_by_tree_and_video_ids(video_ids):
  200. video_ids_in_clause = ", ".join([f"'{video_id}'" for video_id in video_ids])
  201. last_year_today = date.today() - timedelta(days=365)
  202. start_date = last_year_today.strftime("%Y%m%d")
  203. end_date = (last_year_today + timedelta(days=7)).strftime("%Y%m%d")
  204. sql_query = f'''
  205. SELECT
  206. CAST(t3.视频id AS STRING) AS 视频id_str,
  207. CASE
  208. WHEN COALESCE(SUM(COALESCE(t3.`当日分发曝光pv`, 0)), 0) < 1000 THEN 0
  209. ELSE COALESCE(AVG(NULLIF(t3.rov_t0, 0)), 0)
  210. END AS avg_rov_t0
  211. FROM
  212. loghubods.video_dimension_detail_add_column t3
  213. WHERE t3.视频id in ({video_ids_in_clause})
  214. AND t3.dt >= '{start_date}'
  215. AND t3.dt <= '{end_date}'
  216. GROUP BY
  217. t3.视频id
  218. ;
  219. '''
  220. data = get_odps_data(sql_query)
  221. result_dict = {}
  222. if data:
  223. result_dict = {r[0]: r[1] for r in data}
  224. return result_dict
  225. def get_changwen_weight(account_name):
  226. bizdatemax_date = date.today() - timedelta(days=1)
  227. bizdatemin_date = bizdatemax_date - timedelta(days=30)
  228. bizdatemax = bizdatemax_date.strftime("%Y%m%d")
  229. bizdatemin = bizdatemin_date.strftime("%Y%m%d")
  230. sql_query = f'''
  231. SELECT
  232. 公众号名
  233. ,videoid
  234. ,一级品类
  235. ,二级品类
  236. ,头部曝光
  237. ,头部曝光uv
  238. ,头部realplay
  239. ,头部realplay_uv
  240. ,头部分享
  241. ,头部分享uv
  242. ,头部回流人数 AS 头部回流数
  243. ,推荐曝光数
  244. ,当日分发曝光uv
  245. ,推荐realplay
  246. ,分发realplay_uv
  247. ,推荐分享数
  248. ,当日分发分享uv
  249. ,推荐回流数
  250. ,当日回流进入分发曝光次数 AS vov分子
  251. FROM (
  252. SELECT DISTINCT a.公众号名
  253. ,a.videoid
  254. ,e.merge_leve1 AS 一级品类
  255. ,e.merge_leve2 AS 二级品类
  256. ,a.title
  257. ,a.进入分发人数
  258. ,头部曝光pv AS 头部曝光
  259. ,头部realplay_pv AS 头部realplay
  260. ,头部分享pv AS 头部分享
  261. ,a.当日分发曝光pv AS 推荐曝光数
  262. ,a.当日分发播放pv
  263. ,分发realplay_pv AS 推荐realplay
  264. ,分发realplay_pv / a.当日分发播放pv AS 真实播放率pv
  265. ,当日分发播放uv
  266. ,c.realplay_uv AS 分发真实播uv
  267. ,c.realplay_uv / a.当日分发播放uv AS 真实播放率uv
  268. ,a.当日分发分享pv AS 推荐分享数
  269. ,a.当日分发分享pv / a.当日分发曝光pv AS str
  270. ,NVL(b.当日分发回流人数,0) AS 推荐回流数
  271. ,NVL(b.当日回流进入分发人数,0) AS 当日回流进入分发人数
  272. ,NVL(b.当日回流进入分发曝光次数,0) AS 当日回流进入分发曝光次数
  273. ,NVL(b.当日回流进入分发曝光次数,0) / a.当日分发曝光pv AS vov分子
  274. ,d.头部回流人数
  275. ,当日分发曝光uv
  276. ,头部曝光uv
  277. ,当日分发分享uv
  278. ,头部分享uv
  279. ,分发realplay_uv
  280. ,头部realplay_uv
  281. FROM (
  282. SELECT account_name AS 公众号名
  283. ,videoid
  284. ,title
  285. ,COUNT(DISTINCT mid) AS 进入分发人数
  286. ,COUNT(
  287. CASE WHEN pagesource REGEXP 'category$|recommend$|-pages/user-videos-detail$' AND businesstype = 'videoView' THEN mid END
  288. ) AS 当日分发曝光pv
  289. ,COUNT(DISTINCT
  290. CASE WHEN pagesource REGEXP 'category$|recommend$|-pages/user-videos-detail$' AND businesstype = 'videoView' THEN mid END
  291. ) AS 当日分发曝光uv
  292. ,COUNT(
  293. CASE WHEN pagesource REGEXP 'pages/user-videos-share$' AND businesstype = 'videoView' THEN mid END
  294. ) AS 头部曝光pv
  295. ,COUNT(DISTINCT
  296. CASE WHEN pagesource REGEXP 'pages/user-videos-share$' AND businesstype = 'videoView' THEN mid END
  297. ) AS 头部曝光uv
  298. ,COUNT(
  299. CASE WHEN pagesource REGEXP 'category$|recommend$|-pages/user-videos-detail$' AND businesstype = 'videoPlay' THEN mid END
  300. ) AS 当日分发播放pv
  301. ,COUNT(DISTINCT
  302. CASE WHEN pagesource REGEXP 'category$|recommend$|-pages/user-videos-detail$' AND businesstype = 'videoPlay' THEN mid END
  303. ) AS 当日分发播放uv
  304. ,COUNT(
  305. CASE WHEN pagesource REGEXP 'category$|recommend$|-pages/user-videos-detail$' AND businesstype = 'videoShareFriend' THEN mid END
  306. ) AS 当日分发分享pv
  307. ,COUNT(DISTINCT
  308. CASE WHEN pagesource REGEXP 'category$|recommend$|-pages/user-videos-detail$' AND businesstype = 'videoShareFriend' THEN mid END
  309. ) AS 当日分发分享uv
  310. ,COUNT(
  311. CASE WHEN pagesource REGEXP 'pages/user-videos-share$' AND businesstype = 'videoShareFriend' THEN mid END
  312. ) AS 头部分享pv
  313. ,COUNT(DISTINCT
  314. CASE WHEN pagesource REGEXP 'pages/user-videos-share$' AND businesstype = 'videoShareFriend' THEN mid END
  315. ) AS 头部分享uv
  316. FROM (
  317. SELECT DISTINCT a.mid
  318. ,a.videoid
  319. ,a.businesstype
  320. ,a.pagesource
  321. ,a.subsessionid
  322. ,account_name
  323. ,e.title
  324. FROM loghubods.video_action_log_rp a
  325. LEFT JOIN loghubods.user_wechat_identity_info_ha b
  326. ON a.mid = CONCAT('weixin_openid_',b.open_id)
  327. AND b.dt = MAX_PT("loghubods.user_wechat_identity_info_ha")
  328. LEFT JOIN loghubods.gzh_fans_info d
  329. ON b.union_id = d.union_id
  330. AND d.dt = MAX_PT("loghubods.gzh_fans_info")
  331. LEFT JOIN videoods.wx_video e
  332. ON a.videoid = e.id
  333. WHERE a.dt >= '{bizdatemin}'
  334. AND a.dt <= '{bizdatemax}'
  335. AND businesstype IN ('videoView','videoPlay','videoShareFriend')
  336. AND d.user_create_time IS NOT NULL
  337. AND account_name = '{account_name}'
  338. AND a.videoid IN (
  339. SELECT
  340. DISTINCT content_id AS videoid
  341. FROM
  342. videoods.content_profile
  343. WHERE status=3
  344. AND is_deleted = 0
  345. )
  346. ) t
  347. GROUP BY 公众号名
  348. ,videoid
  349. ,title
  350. ) a
  351. LEFT JOIN (
  352. SELECT t.account_name AS 公众号名
  353. ,t.videoid
  354. ,COUNT(DISTINCT s.machinecode) AS 当日分发回流人数
  355. ,COUNT(DISTINCT v.mid) AS 当日回流进入分发人数
  356. ,COUNT(v.mid) AS 当日回流进入分发曝光次数
  357. FROM (
  358. SELECT DISTINCT a.subsessionid
  359. ,a.videoid
  360. ,a.mid
  361. ,d.account_name
  362. ,GET_JSON_OBJECT(extparams,'$.recomTraceId') AS recomtraceid
  363. FROM loghubods.video_action_log_rp a
  364. LEFT JOIN loghubods.user_wechat_identity_info_ha b
  365. ON a.mid = CONCAT('weixin_openid_',b.open_id)
  366. AND b.dt = MAX_PT("loghubods.user_wechat_identity_info_ha")
  367. LEFT JOIN loghubods.gzh_fans_info d
  368. ON b.union_id = d.union_id
  369. AND d.dt = MAX_PT("loghubods.gzh_fans_info")
  370. WHERE a.dt >= '{bizdatemin}'
  371. AND a.dt <= '{bizdatemax}'
  372. AND a.businesstype = 'videoShareFriend'
  373. AND a.pagesource REGEXP 'category$|recommend$|-pages/user-videos-detail$'
  374. AND d.user_create_time IS NOT NULL
  375. AND d.account_name = '{account_name}'
  376. ) t
  377. LEFT JOIN (
  378. SELECT DISTINCT subsessionid
  379. ,machinecode
  380. ,recomtraceid
  381. ,clickobjectid
  382. FROM loghubods.user_share_log
  383. WHERE dt >= '{bizdatemin}'
  384. AND dt <= '{bizdatemax}'
  385. AND topic = 'click'
  386. ) s
  387. ON t.recomtraceid = s.recomtraceid
  388. AND t.videoid = s.clickobjectid
  389. LEFT JOIN (
  390. SELECT subsessionid
  391. ,mid
  392. ,videoid
  393. FROM loghubods.video_action_log_rp
  394. WHERE dt >= '{bizdatemin}'
  395. AND dt <= '{bizdatemax}'
  396. AND pagesource REGEXP 'category$|recommend$|-pages/user-videos-detail$'
  397. AND businesstype = 'videoView'
  398. ) v
  399. ON s.subsessionid = v.subsessionid
  400. AND s.machinecode = v.mid
  401. GROUP BY account_name
  402. ,t.videoid
  403. ) b
  404. ON a.公众号名 = b.公众号名
  405. AND a.videoid = b.videoid
  406. LEFT JOIN (
  407. SELECT d.account_name AS 公众号名
  408. ,a.videoid
  409. ,COUNT(DISTINCT a.mid) AS realplay_uv
  410. ,COUNT(
  411. CASE WHEN a.pagesource REGEXP 'category$|recommend$|-pages/user-videos-detail$' THEN a.mid END
  412. ) AS 分发realplay_pv
  413. ,COUNT(CASE WHEN a.pagesource REGEXP 'pages/user-videos-share$' THEN a.mid END) AS 头部realplay_pv
  414. ,COUNT(DISTINCT
  415. CASE WHEN a.pagesource REGEXP 'category$|recommend$|-pages/user-videos-detail$' THEN a.mid END
  416. ) AS 分发realplay_uv
  417. ,COUNT(DISTINCT CASE WHEN a.pagesource REGEXP 'pages/user-videos-share$' THEN a.mid END) AS 头部realplay_uv
  418. FROM loghubods.ods_video_play_log_day a
  419. LEFT JOIN (
  420. SELECT DISTINCT open_id
  421. ,union_id
  422. FROM loghubods.user_wechat_identity_info_ha
  423. WHERE dt = MAX_PT("loghubods.user_wechat_identity_info_ha")
  424. ) b
  425. ON a.mid = CONCAT('weixin_openid_',b.open_id)
  426. LEFT JOIN loghubods.gzh_fans_info d
  427. ON b.union_id = d.union_id
  428. AND d.dt = MAX_PT("loghubods.gzh_fans_info")
  429. WHERE a.dt >= '{bizdatemin}'
  430. AND a.dt <= '{bizdatemax}'
  431. AND a.businesstype = 'videoRealPlay'
  432. AND d.user_create_time IS NOT NULL
  433. AND d.account_name = '{account_name}'
  434. GROUP BY d.account_name
  435. ,a.videoid
  436. ORDER BY 分发realplay_pv DESC
  437. ) c
  438. ON a.公众号名 = c.公众号名
  439. AND a.videoid = c.videoid
  440. LEFT JOIN (
  441. SELECT t.account_name AS 公众号名
  442. ,t.videoid
  443. ,COUNT(DISTINCT s.machinecode) AS 头部回流人数
  444. FROM (
  445. SELECT DISTINCT a.shareobjectid AS videoid
  446. ,a.shareid
  447. ,a.machinecode
  448. ,d.account_name
  449. FROM loghubods.user_share_log a
  450. LEFT JOIN loghubods.user_wechat_identity_info_ha b
  451. ON a.machinecode = CONCAT('weixin_openid_',b.open_id)
  452. AND b.dt = MAX_PT("loghubods.user_wechat_identity_info_ha")
  453. LEFT JOIN loghubods.gzh_fans_info d
  454. ON b.union_id = d.union_id
  455. AND d.dt = MAX_PT("loghubods.gzh_fans_info")
  456. WHERE a.dt >= '{bizdatemin}'
  457. AND a.dt <= '{bizdatemax}'
  458. AND a.topic = 'share'
  459. AND a.pagesource REGEXP 'pages/user-videos-share$'
  460. AND d.user_create_time IS NOT NULL
  461. AND d.account_name = '{account_name}'
  462. ) t
  463. LEFT JOIN (
  464. SELECT DISTINCT shareid
  465. ,machinecode
  466. ,clickobjectid
  467. FROM loghubods.user_share_log
  468. WHERE dt >= '{bizdatemin}'
  469. AND dt <= '{bizdatemax}'
  470. AND topic = 'click'
  471. ) s
  472. ON t.shareid = s.shareid
  473. GROUP BY account_name
  474. ,t.videoid
  475. ) d
  476. ON a.公众号名 = d.公众号名
  477. AND a.videoid = d.videoid
  478. LEFT JOIN loghubods.video_merge_tag e
  479. ON a.videoid = e.videoid
  480. )
  481. ORDER BY 推荐曝光数 DESC
  482. '''
  483. result_list = []
  484. data = get_odps_data(sql_query)
  485. if data:
  486. for r in data:
  487. result_list.append(
  488. {
  489. "account_name": r[0],
  490. "videoid": r[1],
  491. "一级品类": r[2],
  492. "二级品类": r[3],
  493. "ext_data": {
  494. "头部曝光": r[4],
  495. "头部曝光uv": r[5],
  496. "头部realplay": r[6],
  497. "头部realplay_uv": r[7],
  498. "头部分享": r[8],
  499. "头部分享uv": r[9],
  500. "头部回流数": r[10],
  501. "推荐曝光数": r[11],
  502. "当日分发曝光uv": r[12],
  503. "推荐realplay": r[13],
  504. "分发realplay_uv": r[14],
  505. "推荐分享数": r[15],
  506. "当日分发分享uv": r[16],
  507. "推荐回流数": r[17],
  508. "vov分子": r[18],
  509. },
  510. }
  511. )
  512. # 输出到 examples/demand/data/changwen_data/
  513. output_dir = Path(__file__).parent / "data" / "changwen_data"
  514. output_dir.mkdir(parents=True, exist_ok=True)
  515. output_file = output_dir / f"{account_name}.json"
  516. with output_file.open("w", encoding="utf-8") as f:
  517. json.dump(result_list, f, ensure_ascii=False, indent=2)
  518. return result_list
  519. def get_zengzhang_weight(account_name):
  520. bizdatemax_date = date.today() - timedelta(days=1)
  521. bizdatemin_date = bizdatemax_date - timedelta(days=30)
  522. bizdatemax = bizdatemax_date.strftime("%Y%m%d")
  523. bizdatemin = bizdatemin_date.strftime("%Y%m%d")
  524. sql_query = f'''
  525. SELECT 合作方名
  526. ,合作方简称
  527. ,videoid
  528. ,一级品类
  529. ,二级品类
  530. ,SUM(头部曝光) as 头部曝光
  531. ,SUM(头部曝光uv) as 头部曝光uv
  532. ,SUM(头部realplay) as 头部realplay
  533. ,SUM(头部realplay_uv) as 头部realplay_uv
  534. ,SUM(头部分享) as 头部分享
  535. ,SUM(头部分享uv) as 头部分享uv
  536. ,SUM(头部回流数) as 头部回流数
  537. ,SUM(推荐曝光数) as 推荐曝光数
  538. ,SUM(当日分发曝光uv) as 当日分发曝光uv
  539. ,SUM(推荐realplay) as 推荐realplay
  540. ,SUM(分发realplay_uv) as 分发realplay_uv
  541. ,SUM(推荐分享数) as 推荐分享数
  542. ,SUM(当日分发分享uv) as 当日分发分享uv
  543. ,SUM(推荐回流数) as 推荐回流数
  544. ,SUM(vov分子) as vov分子
  545. FROM loghubods.dws_growth_partner_vid_data
  546. WHERE dt BETWEEN '{bizdatemin}' AND '{bizdatemax}'
  547. AND 合作方名 = '{account_name}'
  548. GROUP BY 合作方名
  549. ,合作方简称
  550. ,videoid
  551. ,一级品类
  552. ,二级品类
  553. ORDER BY SUM(推荐曝光数)
  554. ;
  555. '''
  556. result_list = []
  557. data = get_odps_data(sql_query)
  558. if data:
  559. for r in data:
  560. result_list.append(
  561. {
  562. "account_name": r[0],
  563. "合作方简称": r[1],
  564. "videoid": r[2],
  565. "一级品类": r[3],
  566. "二级品类": r[4],
  567. "ext_data": {
  568. "头部曝光": r[5],
  569. "头部曝光uv": r[6],
  570. "头部realplay": r[7],
  571. "头部realplay_uv": r[8],
  572. "头部分享": r[9],
  573. "头部分享uv": r[10],
  574. "头部回流数": r[11],
  575. "推荐曝光数": r[12],
  576. "当日分发曝光uv": r[13],
  577. "推荐realplay": r[14],
  578. "分发realplay_uv": r[15],
  579. "推荐分享数": r[16],
  580. "当日分发分享uv": r[17],
  581. "推荐回流数": r[18],
  582. "vov分子": r[19],
  583. },
  584. }
  585. )
  586. # 输出到 examples/demand/data/zengzhang_data/
  587. output_dir = Path(__file__).parent / "data" / "zengzhang_data"
  588. output_dir.mkdir(parents=True, exist_ok=True)
  589. output_file = output_dir / f"{account_name}.json"
  590. with output_file.open("w", encoding="utf-8") as f:
  591. json.dump(result_list, f, ensure_ascii=False, indent=2)
  592. return result_list
  593. def get_merge_leve2_by_video_ids(video_ids, batch_size=2000):
  594. result = {}
  595. if not video_ids:
  596. return result
  597. normalized_ids = [str(video_id) for video_id in video_ids if video_id is not None]
  598. for i in range(0, len(normalized_ids), batch_size):
  599. batch_ids = normalized_ids[i:i + batch_size]
  600. escaped_ids = [video_id.replace("'", "''") for video_id in batch_ids]
  601. video_ids_in_clause = ", ".join([f"'{video_id}'" for video_id in escaped_ids])
  602. sql_query = f'''
  603. SELECT videoid, merge_leve2
  604. FROM loghubods.video_merge_tag
  605. WHERE videoid IN ({video_ids_in_clause})
  606. '''
  607. data = get_odps_data(sql_query)
  608. if not data:
  609. continue
  610. for row in data:
  611. result[str(row[0])] = row[1]
  612. return result
  613. def get_all_decode_task_result_rows():
  614. return mysql_db.select(
  615. "workflow_decode_task_result",
  616. columns="id, channel_content_id, merge_leve2",
  617. )
  618. def update_decode_task_result_merge_leve2(channel_content_id, merge_leve2):
  619. return mysql_db.update(
  620. "workflow_decode_task_result",
  621. {"merge_leve2": str(merge_leve2)},
  622. "channel_content_id = %s",
  623. (str(channel_content_id),),
  624. )
  625. def backfill_merge_leve2_for_decode_task_result():
  626. rows = get_all_decode_task_result_rows()
  627. updated_count = 0
  628. skipped_count = 0
  629. valid_content_ids = []
  630. for row in rows:
  631. channel_content_id = row.get("channel_content_id")
  632. if channel_content_id is None:
  633. skipped_count += 1
  634. continue
  635. channel_content_id = str(channel_content_id)
  636. if len(channel_content_id) > 8:
  637. skipped_count += 1
  638. continue
  639. valid_content_ids.append(channel_content_id)
  640. merge_leve2_map = get_merge_leve2_by_video_ids(valid_content_ids, batch_size=2000)
  641. for channel_content_id in valid_content_ids:
  642. merge_leve2 = merge_leve2_map.get(channel_content_id)
  643. if not merge_leve2:
  644. continue
  645. affected = update_decode_task_result_merge_leve2(channel_content_id, merge_leve2)
  646. if affected > 0:
  647. updated_count += affected
  648. return {
  649. "total": len(rows),
  650. "updated": updated_count,
  651. "skipped": skipped_count,
  652. }
  653. #
  654. # if __name__ == '__main__':
  655. # backfill_merge_leve2_for_decode_task_result()