data_query_tools.py 36 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845
  1. import hashlib
  2. from zoneinfo import ZoneInfo
  3. from odps import ODPS
  4. from odps.errors import ODPSError
  5. from datetime import date, datetime, timedelta
  6. import json
  7. from pathlib import Path
  8. from examples.demand.mysql import mysql_db
  9. def get_odps_data(sql):
  10. # 配置信息
  11. access_id = 'LTAI9EBa0bd5PrDa'
  12. access_key = 'vAalxds7YxhfOA2yVv8GziCg3Y87v5'
  13. project = 'loghubods'
  14. endpoint = 'http://service.odps.aliyun.com/api'
  15. # 1. 初始化 ODPS 入口
  16. o = ODPS(access_id, access_key, project, endpoint=endpoint)
  17. try:
  18. # 2. 执行 SQL 并获取结果
  19. # execute_sql 会等待任务完成,使用 open_reader 读取数据
  20. with o.execute_sql(sql).open_reader() as reader:
  21. # reader 类似于 Java 中的 List<Record>
  22. # 我们可以直接将其转换为 Python 的 list
  23. records = [record for record in reader]
  24. return records
  25. except ODPSError as e:
  26. print(f"ODPS 错误: {e}")
  27. return None
  28. def execute_odps_sql(sql) -> bool:
  29. # 配置信息
  30. access_id = 'LTAI9EBa0bd5PrDa'
  31. access_key = 'vAalxds7YxhfOA2yVv8GziCg3Y87v5'
  32. project = 'loghubods'
  33. endpoint = 'http://service.odps.aliyun.com/api'
  34. o = ODPS(access_id, access_key, project, endpoint=endpoint)
  35. try:
  36. instance = o.execute_sql(sql)
  37. instance.wait_for_success()
  38. return True
  39. except ODPSError as e:
  40. print(f"ODPS 错误: {e}")
  41. return False
  42. _STRATEGY_GAP = "当下供需gap"
  43. _STRATEGY_GAP_FENCI = "当下供需gap-分词"
  44. _HIVE_TABLE = "loghubods.dwd_multi_demand_pool_di"
  45. _HIVE_DT_FMT = "%Y%m%d" # 分区格式:yyyymmdd,如 20260519
  46. _CHINA_TZ = ZoneInfo("Asia/Shanghai")
  47. def _hive_partition_dt() -> str:
  48. """中国时区(Asia/Shanghai)当天日期,格式 yyyymmdd。"""
  49. return datetime.now(_CHINA_TZ).date().strftime(_HIVE_DT_FMT)
  50. def _escape_odps_string(value: object) -> str:
  51. return str(value).replace("'", "''")
  52. def _format_odps_string_array(values: list) -> str:
  53. if not values:
  54. return "ARRAY()"
  55. parts = [f"'{_escape_odps_string(v)}'" for v in values]
  56. return f"ARRAY({','.join(parts)})"
  57. def _parse_ext_data(ext_data_raw: object) -> dict:
  58. if isinstance(ext_data_raw, dict):
  59. return ext_data_raw
  60. if isinstance(ext_data_raw, str) and ext_data_raw.strip():
  61. try:
  62. return json.loads(ext_data_raw)
  63. except json.JSONDecodeError:
  64. return {}
  65. return {}
  66. def _build_hive_select_part(
  67. strategy: str,
  68. demand_id: str,
  69. demand_name: str,
  70. weight: float,
  71. type_str: str,
  72. video_count: int,
  73. video_ids: list[str],
  74. extend_json: str,
  75. ) -> str:
  76. return (
  77. "SELECT "
  78. f"'{_escape_odps_string(strategy)}' AS strategy, "
  79. f"'{_escape_odps_string(demand_id)}' AS demand_id, "
  80. f"'{_escape_odps_string(demand_name)}' AS demand_name, "
  81. f"{weight} AS weight, "
  82. f"'{_escape_odps_string(type_str)}' AS `type`, "
  83. f"{video_count} AS video_count, "
  84. f"{_format_odps_string_array(video_ids)} AS video_list, "
  85. f"'{_escape_odps_string(extend_json)}' AS extend"
  86. )
  87. def _insert_hive_select_parts(select_parts: list[str], partition_dt: str) -> bool:
  88. if not select_parts:
  89. return True
  90. union_sql = "\nUNION ALL\n".join(select_parts)
  91. insert_sql = f"""
  92. INSERT INTO TABLE {_HIVE_TABLE}
  93. PARTITION (dt='{partition_dt}')
  94. (strategy, demand_id, demand_name, weight, `type`, video_count, video_list, extend)
  95. {union_sql}
  96. """
  97. return execute_odps_sql(insert_sql)
  98. def write_dwd_multi_demand_pool_di_to_hive(rows: list[dict]) -> int:
  99. """
  100. 将行数据映射并写入 loghubods.dwd_multi_demand_pool_di(尽力插入,不校验结果)。
  101. 分区与 demand_id 的日期均为中国时区当天(yyyymmdd),不使用行内 dt 字段。
  102. 执行两次 INSERT(同表、同分区),策略不同:
  103. 1) 当下供需gap: demand_name=merge_leve2+' '+name, demand_id=md5(strategy+demand_name+dt)
  104. 2) 当下供需gap-分词: demand_name=name, demand_id=md5(strategy+name+dt)
  105. """
  106. if not rows:
  107. return 0
  108. china_today = _hive_partition_dt()
  109. gap_parts: list[str] = []
  110. fenci_parts: list[str] = []
  111. for row in rows:
  112. merge_leve2 = str(row.get("merge_leve2") or "").strip()
  113. name = str(row.get("name") or "").strip()
  114. if not merge_leve2 or not name:
  115. continue
  116. weight = round(float(row.get("score") or 0.0), 6)
  117. ext_data = _parse_ext_data(row.get("ext_data"))
  118. type_str = str(ext_data.get("type") or "").strip()
  119. video_ids = ext_data.get("video_ids") or []
  120. if not isinstance(video_ids, list):
  121. video_ids = []
  122. video_ids = [str(v).strip() for v in video_ids if v is not None and str(v).strip()]
  123. video_count = len(video_ids)
  124. extend_json = json.dumps({"品类": merge_leve2}, ensure_ascii=False)
  125. demand_name_gap = f"{merge_leve2} {name}"
  126. demand_id_gap = hashlib.md5(f"{_STRATEGY_GAP}{demand_name_gap}{china_today}".encode("utf-8")).hexdigest()
  127. gap_parts.append(
  128. _build_hive_select_part(
  129. _STRATEGY_GAP, demand_id_gap, demand_name_gap,
  130. weight, type_str, video_count, video_ids, extend_json,
  131. )
  132. )
  133. demand_id_fenci = hashlib.md5(f"{_STRATEGY_GAP_FENCI}{name}{china_today}".encode("utf-8")).hexdigest()
  134. fenci_parts.append(
  135. _build_hive_select_part(
  136. _STRATEGY_GAP_FENCI, demand_id_fenci, name,
  137. weight, type_str, video_count, video_ids, extend_json,
  138. )
  139. )
  140. if not gap_parts:
  141. return 0
  142. _insert_hive_select_parts(gap_parts, china_today)
  143. _insert_hive_select_parts(fenci_parts, china_today)
  144. return len(gap_parts) + len(fenci_parts)
  145. def write_feature_point_data_to_hive(names: list[str]) -> int:
  146. """
  147. 将需求名称写入 Hive 表 feature_point_data(按北京时间当天分区)。
  148. 仅写入以下字段:
  149. - 特征点
  150. - 总分发曝光pv(固定 5000)
  151. - 质bn_rovn(固定 0.1)
  152. """
  153. normalized_names = [str(name).strip() for name in names if name is not None and str(name).strip()]
  154. if not normalized_names:
  155. return 0
  156. dt = datetime.now(ZoneInfo("Asia/Shanghai")).strftime("%Y%m%d")
  157. select_parts = []
  158. for name in normalized_names:
  159. safe_name = name.replace("'", "''")
  160. select_parts.append(
  161. "SELECT "
  162. f"'{safe_name}' AS `特征点`, "
  163. "5000 AS `总分发曝光pv`, "
  164. "0.1 AS `质bn_rovn`"
  165. )
  166. union_sql = "\nUNION ALL\n".join(select_parts)
  167. insert_sql = f"""
  168. INSERT INTO TABLE feature_point_data
  169. PARTITION (dt='{dt}')
  170. (`特征点`, `总分发曝光pv`, `质bn_rovn`)
  171. {union_sql}
  172. """
  173. ok = execute_odps_sql(insert_sql)
  174. if not ok:
  175. return 0
  176. return len(normalized_names)
  177. def get_demand_merge_level2_names():
  178. date_time = datetime.now(ZoneInfo("Asia/Shanghai")).date() - timedelta(days=1)
  179. day = date_time.strftime("%Y%m%d")
  180. count = 50
  181. sql_query = f'''
  182. select *
  183. from (
  184. select
  185. dt,
  186. merge二级品类,
  187. sum(当日分发曝光pv) as 分发曝光pv,
  188. sum(累计分享回流uv) AS bn_总回流,
  189. sum(当日分发回流uv)/(sum(当日分发曝光pv)+100) as 质bn_rovn,
  190. case when sum(当日分发曝光pv)>=10000 then
  191. case when sum(当日分发回流uv)/(sum(当日分发曝光pv)+100)<0.035
  192. then -1*(count(DISTINCT 视频id)/avg(总日分发视频数))/((sum(累计分享回流uv)/avg(总日回流uv)))
  193. else 10*(sum(累计分享回流uv)/avg(总日回流uv)*sum(当日分发回流uv)/(sum(当日分发曝光pv)+100))/(count(DISTINCT 视频id)/avg(总日分发视频数))
  194. end
  195. else 0 end AS 总供需分,
  196. case when sum(当日分发曝光pv)>=10000 then
  197. case when sum(当日分发回流uv)/(sum(当日分发曝光pv)+100)<0.035
  198. then -1*(COUNT(DISTINCT CASE WHEN 推荐天数间隔<3 THEN 视频id END ) /avg(总日分发视频数))/(sum(累计分享回流uv)/avg(总日回流uv))
  199. else 10*(sum(累计分享回流uv)/avg(总日回流uv)*sum(当日分发回流uv)/(sum(当日分发曝光pv)+1000))/(COUNT(DISTINCT CASE WHEN 推荐天数间隔<3 THEN 视频id END ) /avg(总日分发视频数))
  200. end
  201. else 0 end AS 新供需分,
  202. count(DISTINCT 视频id) as 分发视频量,
  203. count(DISTINCT if(推荐天数间隔<3,视频id,null)) as 3日新推荐视频量,
  204. case when sum(当日分发曝光pv)>=10000 and sum(当日分发回流uv)/(sum(当日分发曝光pv)+100)>0.035
  205. then (avg(总日分发视频数)*(10*(sum(当日分发回流uv)/(sum(当日分发曝光pv)+100))*(sum(累计分享回流uv)/avg(总日回流uv) ))/0.5-count(DISTINCT 视频id))/3
  206. end as 缺量,
  207. case when sum(当日分发曝光pv)>=10000 and sum(当日分发回流uv)/(sum(当日分发曝光pv)+100)<=0.035
  208. then (avg(总日分发视频数)*(10*(sum(当日分发回流uv)/(sum(当日分发曝光pv)+100))*(sum(累计分享回流uv)/avg(总日回流uv) ))/(2)-count(DISTINCT 视频id))/3
  209. end as 控量,
  210. avg(总日回流uv) AS 总日回流uv,
  211. avg(总日分发视频数) AS 总日分发视频数,
  212. avg(总日推荐视频数) AS 总日推荐视频数,
  213. COUNT(DISTINCT CASE WHEN 总回流uv>0 THEN 视频id END )/avg(总日分发视频数) AS 回流视频个数占比,
  214. sum(当日分发回流uv) AS bn_当日分发回流,
  215. sum(当日分发回流uv)/avg(总日回流uv) AS 分发拉回回流uv占比,
  216. sum(累计分享回流uv)/avg(总日回流uv) AS 回流uv占比,
  217. count(DISTINCT 视频id)/avg(总日分发视频数) AS 分发视频量占比,
  218. COUNT(DISTINCT CASE WHEN 是否当日新推荐=1 THEN 视频id END ) /avg(总日分发视频数) AS 新推荐视频量占比
  219. from loghubods.video_dimension_detail_add_column
  220. where dt = '{day}'
  221. group by dt, merge二级品类
  222. ) t1
  223. where t1.缺量>= {count}
  224. '''
  225. data = get_odps_data(sql_query)
  226. result_list = []
  227. if data:
  228. for r in data:
  229. lack_count = r[9]
  230. if lack_count > 1000:
  231. count = 70
  232. elif 500 < lack_count <= 1000:
  233. count = 60
  234. elif 100 < lack_count <= 500:
  235. count = 40
  236. elif 50 < lack_count <= 100:
  237. count = 20
  238. else:
  239. count = 10
  240. if count == 0:
  241. continue
  242. result_list.append({
  243. "cluster_name": r[1],
  244. "platform_type": "piaoquan",
  245. "count": count,
  246. })
  247. return result_list
  248. def get_rov_by_merge_leve2_and_video_ids(merge_leve2, video_ids):
  249. merge_level_in_clause = f"'{merge_leve2}'"
  250. video_ids_in_clause = ", ".join([f"'{video_id}'" for video_id in video_ids])
  251. end_date = (date.today() - timedelta(days=1)).strftime("%Y%m%d")
  252. start_date = (date.today() - timedelta(days=14)).strftime("%Y%m%d")
  253. # sql_query = f'''
  254. # SELECT
  255. # v.videoid,
  256. # CASE
  257. # WHEN COALESCE(SUM(COALESCE(t3.`当日分发曝光pv`, 0)), 0) < 1000 THEN 0
  258. # ELSE COALESCE(AVG(NULLIF(t3.rov_t0, 0)), 0)
  259. # END AS avg_rov_t0
  260. # FROM
  261. # (
  262. # SELECT
  263. # t2.videoid,
  264. # t2.merge_leve2
  265. # FROM videoods.content_profile t1
  266. # JOIN loghubods.video_merge_tag t2
  267. # ON t1.content_id = t2.videoid
  268. # WHERE
  269. # t1.status = 3
  270. # AND t1.is_deleted = 0
  271. # AND t2.merge_leve2 IN ({merge_level_in_clause})
  272. # ) v
  273. # LEFT JOIN loghubods.video_dimension_detail_add_column t3
  274. # ON v.videoid = t3.视频id
  275. # AND t3.dt >= '{start_date}'
  276. # AND t3.dt <= '{end_date}'
  277. # WHERE v.videoid in ({video_ids_in_clause})
  278. # GROUP BY
  279. # v.videoid
  280. # ;
  281. # '''
  282. sql_query = f'''
  283. SELECT
  284. CAST(t3.视频id AS STRING) AS 视频id_str,
  285. CASE
  286. WHEN COALESCE(SUM(COALESCE(t3.`当日分发曝光pv`, 0)), 0) < 1000 THEN 0
  287. ELSE COALESCE(AVG(NULLIF(t3.rov_t0, 0)), 0)
  288. END AS avg_rov_t0
  289. FROM
  290. loghubods.video_dimension_detail_add_column t3
  291. WHERE t3.视频id in ({video_ids_in_clause})
  292. AND t3.dt >= '{start_date}'
  293. AND t3.dt <= '{end_date}'
  294. GROUP BY
  295. t3.视频id
  296. ;
  297. '''
  298. data = get_odps_data(sql_query)
  299. result_dict = {}
  300. if data:
  301. result_dict = {r[0]: r[1] for r in data}
  302. return result_dict
  303. def get_rov_by_tree_and_video_ids(video_ids):
  304. video_ids_in_clause = ", ".join([f"'{video_id}'" for video_id in video_ids])
  305. last_year_today = date.today() - timedelta(days=365)
  306. start_date = last_year_today.strftime("%Y%m%d")
  307. end_date = (last_year_today + timedelta(days=7)).strftime("%Y%m%d")
  308. sql_query = f'''
  309. SELECT
  310. CAST(t3.视频id AS STRING) AS 视频id_str,
  311. CASE
  312. WHEN COALESCE(SUM(COALESCE(t3.`当日分发曝光pv`, 0)), 0) < 1000 THEN 0
  313. ELSE COALESCE(AVG(NULLIF(t3.rov_t0, 0)), 0)
  314. END AS avg_rov_t0
  315. FROM
  316. loghubods.video_dimension_detail_add_column t3
  317. WHERE t3.视频id in ({video_ids_in_clause})
  318. AND t3.dt >= '{start_date}'
  319. AND t3.dt <= '{end_date}'
  320. GROUP BY
  321. t3.视频id
  322. ;
  323. '''
  324. data = get_odps_data(sql_query)
  325. result_dict = {}
  326. if data:
  327. result_dict = {r[0]: r[1] for r in data}
  328. return result_dict
  329. def get_changwen_weight(account_name):
  330. bizdatemax_date = date.today() - timedelta(days=1)
  331. bizdatemin_date = bizdatemax_date - timedelta(days=30)
  332. bizdatemax = bizdatemax_date.strftime("%Y%m%d")
  333. bizdatemin = bizdatemin_date.strftime("%Y%m%d")
  334. sql_query = f'''
  335. SELECT
  336. 公众号名
  337. ,videoid
  338. ,一级品类
  339. ,二级品类
  340. ,头部曝光
  341. ,头部曝光uv
  342. ,头部realplay
  343. ,头部realplay_uv
  344. ,头部分享
  345. ,头部分享uv
  346. ,头部回流人数 AS 头部回流数
  347. ,推荐曝光数
  348. ,当日分发曝光uv
  349. ,推荐realplay
  350. ,分发realplay_uv
  351. ,推荐分享数
  352. ,当日分发分享uv
  353. ,推荐回流数
  354. ,当日回流进入分发曝光次数 AS vov分子
  355. FROM (
  356. SELECT DISTINCT a.公众号名
  357. ,a.videoid
  358. ,e.merge_leve1 AS 一级品类
  359. ,e.merge_leve2 AS 二级品类
  360. ,a.title
  361. ,a.进入分发人数
  362. ,头部曝光pv AS 头部曝光
  363. ,头部realplay_pv AS 头部realplay
  364. ,头部分享pv AS 头部分享
  365. ,a.当日分发曝光pv AS 推荐曝光数
  366. ,a.当日分发播放pv
  367. ,分发realplay_pv AS 推荐realplay
  368. ,分发realplay_pv / a.当日分发播放pv AS 真实播放率pv
  369. ,当日分发播放uv
  370. ,c.realplay_uv AS 分发真实播uv
  371. ,c.realplay_uv / a.当日分发播放uv AS 真实播放率uv
  372. ,a.当日分发分享pv AS 推荐分享数
  373. ,a.当日分发分享pv / a.当日分发曝光pv AS str
  374. ,NVL(b.当日分发回流人数,0) AS 推荐回流数
  375. ,NVL(b.当日回流进入分发人数,0) AS 当日回流进入分发人数
  376. ,NVL(b.当日回流进入分发曝光次数,0) AS 当日回流进入分发曝光次数
  377. ,NVL(b.当日回流进入分发曝光次数,0) / a.当日分发曝光pv AS vov分子
  378. ,d.头部回流人数
  379. ,当日分发曝光uv
  380. ,头部曝光uv
  381. ,当日分发分享uv
  382. ,头部分享uv
  383. ,分发realplay_uv
  384. ,头部realplay_uv
  385. FROM (
  386. SELECT account_name AS 公众号名
  387. ,videoid
  388. ,title
  389. ,COUNT(DISTINCT mid) AS 进入分发人数
  390. ,COUNT(
  391. CASE WHEN pagesource REGEXP 'category$|recommend$|-pages/user-videos-detail$' AND businesstype = 'videoView' THEN mid END
  392. ) AS 当日分发曝光pv
  393. ,COUNT(DISTINCT
  394. CASE WHEN pagesource REGEXP 'category$|recommend$|-pages/user-videos-detail$' AND businesstype = 'videoView' THEN mid END
  395. ) AS 当日分发曝光uv
  396. ,COUNT(
  397. CASE WHEN pagesource REGEXP 'pages/user-videos-share$' AND businesstype = 'videoView' THEN mid END
  398. ) AS 头部曝光pv
  399. ,COUNT(DISTINCT
  400. CASE WHEN pagesource REGEXP 'pages/user-videos-share$' AND businesstype = 'videoView' THEN mid END
  401. ) AS 头部曝光uv
  402. ,COUNT(
  403. CASE WHEN pagesource REGEXP 'category$|recommend$|-pages/user-videos-detail$' AND businesstype = 'videoPlay' THEN mid END
  404. ) AS 当日分发播放pv
  405. ,COUNT(DISTINCT
  406. CASE WHEN pagesource REGEXP 'category$|recommend$|-pages/user-videos-detail$' AND businesstype = 'videoPlay' THEN mid END
  407. ) AS 当日分发播放uv
  408. ,COUNT(
  409. CASE WHEN pagesource REGEXP 'category$|recommend$|-pages/user-videos-detail$' AND businesstype = 'videoShareFriend' THEN mid END
  410. ) AS 当日分发分享pv
  411. ,COUNT(DISTINCT
  412. CASE WHEN pagesource REGEXP 'category$|recommend$|-pages/user-videos-detail$' AND businesstype = 'videoShareFriend' THEN mid END
  413. ) AS 当日分发分享uv
  414. ,COUNT(
  415. CASE WHEN pagesource REGEXP 'pages/user-videos-share$' AND businesstype = 'videoShareFriend' THEN mid END
  416. ) AS 头部分享pv
  417. ,COUNT(DISTINCT
  418. CASE WHEN pagesource REGEXP 'pages/user-videos-share$' AND businesstype = 'videoShareFriend' THEN mid END
  419. ) AS 头部分享uv
  420. FROM (
  421. SELECT DISTINCT a.mid
  422. ,a.videoid
  423. ,a.businesstype
  424. ,a.pagesource
  425. ,a.subsessionid
  426. ,account_name
  427. ,e.title
  428. FROM loghubods.video_action_log_rp a
  429. LEFT JOIN loghubods.user_wechat_identity_info_ha b
  430. ON a.mid = CONCAT('weixin_openid_',b.open_id)
  431. AND b.dt = MAX_PT("loghubods.user_wechat_identity_info_ha")
  432. LEFT JOIN loghubods.gzh_fans_info d
  433. ON b.union_id = d.union_id
  434. AND d.dt = MAX_PT("loghubods.gzh_fans_info")
  435. LEFT JOIN videoods.wx_video e
  436. ON a.videoid = e.id
  437. WHERE a.dt >= '{bizdatemin}'
  438. AND a.dt <= '{bizdatemax}'
  439. AND businesstype IN ('videoView','videoPlay','videoShareFriend')
  440. AND d.user_create_time IS NOT NULL
  441. AND account_name = '{account_name}'
  442. AND a.videoid IN (
  443. SELECT
  444. DISTINCT content_id AS videoid
  445. FROM
  446. videoods.content_profile
  447. WHERE status=3
  448. AND is_deleted = 0
  449. )
  450. ) t
  451. GROUP BY 公众号名
  452. ,videoid
  453. ,title
  454. ) a
  455. LEFT JOIN (
  456. SELECT t.account_name AS 公众号名
  457. ,t.videoid
  458. ,COUNT(DISTINCT s.machinecode) AS 当日分发回流人数
  459. ,COUNT(DISTINCT v.mid) AS 当日回流进入分发人数
  460. ,COUNT(v.mid) AS 当日回流进入分发曝光次数
  461. FROM (
  462. SELECT DISTINCT a.subsessionid
  463. ,a.videoid
  464. ,a.mid
  465. ,d.account_name
  466. ,GET_JSON_OBJECT(extparams,'$.recomTraceId') AS recomtraceid
  467. FROM loghubods.video_action_log_rp a
  468. LEFT JOIN loghubods.user_wechat_identity_info_ha b
  469. ON a.mid = CONCAT('weixin_openid_',b.open_id)
  470. AND b.dt = MAX_PT("loghubods.user_wechat_identity_info_ha")
  471. LEFT JOIN loghubods.gzh_fans_info d
  472. ON b.union_id = d.union_id
  473. AND d.dt = MAX_PT("loghubods.gzh_fans_info")
  474. WHERE a.dt >= '{bizdatemin}'
  475. AND a.dt <= '{bizdatemax}'
  476. AND a.businesstype = 'videoShareFriend'
  477. AND a.pagesource REGEXP 'category$|recommend$|-pages/user-videos-detail$'
  478. AND d.user_create_time IS NOT NULL
  479. AND d.account_name = '{account_name}'
  480. ) t
  481. LEFT JOIN (
  482. SELECT DISTINCT subsessionid
  483. ,machinecode
  484. ,recomtraceid
  485. ,clickobjectid
  486. FROM loghubods.user_share_log
  487. WHERE dt >= '{bizdatemin}'
  488. AND dt <= '{bizdatemax}'
  489. AND topic = 'click'
  490. ) s
  491. ON t.recomtraceid = s.recomtraceid
  492. AND t.videoid = s.clickobjectid
  493. LEFT JOIN (
  494. SELECT subsessionid
  495. ,mid
  496. ,videoid
  497. FROM loghubods.video_action_log_rp
  498. WHERE dt >= '{bizdatemin}'
  499. AND dt <= '{bizdatemax}'
  500. AND pagesource REGEXP 'category$|recommend$|-pages/user-videos-detail$'
  501. AND businesstype = 'videoView'
  502. ) v
  503. ON s.subsessionid = v.subsessionid
  504. AND s.machinecode = v.mid
  505. GROUP BY account_name
  506. ,t.videoid
  507. ) b
  508. ON a.公众号名 = b.公众号名
  509. AND a.videoid = b.videoid
  510. LEFT JOIN (
  511. SELECT d.account_name AS 公众号名
  512. ,a.videoid
  513. ,COUNT(DISTINCT a.mid) AS realplay_uv
  514. ,COUNT(
  515. CASE WHEN a.pagesource REGEXP 'category$|recommend$|-pages/user-videos-detail$' THEN a.mid END
  516. ) AS 分发realplay_pv
  517. ,COUNT(CASE WHEN a.pagesource REGEXP 'pages/user-videos-share$' THEN a.mid END) AS 头部realplay_pv
  518. ,COUNT(DISTINCT
  519. CASE WHEN a.pagesource REGEXP 'category$|recommend$|-pages/user-videos-detail$' THEN a.mid END
  520. ) AS 分发realplay_uv
  521. ,COUNT(DISTINCT CASE WHEN a.pagesource REGEXP 'pages/user-videos-share$' THEN a.mid END) AS 头部realplay_uv
  522. FROM loghubods.ods_video_play_log_day a
  523. LEFT JOIN (
  524. SELECT DISTINCT open_id
  525. ,union_id
  526. FROM loghubods.user_wechat_identity_info_ha
  527. WHERE dt = MAX_PT("loghubods.user_wechat_identity_info_ha")
  528. ) b
  529. ON a.mid = CONCAT('weixin_openid_',b.open_id)
  530. LEFT JOIN loghubods.gzh_fans_info d
  531. ON b.union_id = d.union_id
  532. AND d.dt = MAX_PT("loghubods.gzh_fans_info")
  533. WHERE a.dt >= '{bizdatemin}'
  534. AND a.dt <= '{bizdatemax}'
  535. AND a.businesstype = 'videoRealPlay'
  536. AND d.user_create_time IS NOT NULL
  537. AND d.account_name = '{account_name}'
  538. GROUP BY d.account_name
  539. ,a.videoid
  540. ORDER BY 分发realplay_pv DESC
  541. ) c
  542. ON a.公众号名 = c.公众号名
  543. AND a.videoid = c.videoid
  544. LEFT JOIN (
  545. SELECT t.account_name AS 公众号名
  546. ,t.videoid
  547. ,COUNT(DISTINCT s.machinecode) AS 头部回流人数
  548. FROM (
  549. SELECT DISTINCT a.shareobjectid AS videoid
  550. ,a.shareid
  551. ,a.machinecode
  552. ,d.account_name
  553. FROM loghubods.user_share_log a
  554. LEFT JOIN loghubods.user_wechat_identity_info_ha b
  555. ON a.machinecode = CONCAT('weixin_openid_',b.open_id)
  556. AND b.dt = MAX_PT("loghubods.user_wechat_identity_info_ha")
  557. LEFT JOIN loghubods.gzh_fans_info d
  558. ON b.union_id = d.union_id
  559. AND d.dt = MAX_PT("loghubods.gzh_fans_info")
  560. WHERE a.dt >= '{bizdatemin}'
  561. AND a.dt <= '{bizdatemax}'
  562. AND a.topic = 'share'
  563. AND a.pagesource REGEXP 'pages/user-videos-share$'
  564. AND d.user_create_time IS NOT NULL
  565. AND d.account_name = '{account_name}'
  566. ) t
  567. LEFT JOIN (
  568. SELECT DISTINCT shareid
  569. ,machinecode
  570. ,clickobjectid
  571. FROM loghubods.user_share_log
  572. WHERE dt >= '{bizdatemin}'
  573. AND dt <= '{bizdatemax}'
  574. AND topic = 'click'
  575. ) s
  576. ON t.shareid = s.shareid
  577. GROUP BY account_name
  578. ,t.videoid
  579. ) d
  580. ON a.公众号名 = d.公众号名
  581. AND a.videoid = d.videoid
  582. LEFT JOIN loghubods.video_merge_tag e
  583. ON a.videoid = e.videoid
  584. )
  585. ORDER BY 推荐曝光数 DESC
  586. '''
  587. result_list = []
  588. data = get_odps_data(sql_query)
  589. if data:
  590. for r in data:
  591. result_list.append(
  592. {
  593. "account_name": r[0],
  594. "videoid": r[1],
  595. "一级品类": r[2],
  596. "二级品类": r[3],
  597. "ext_data": {
  598. "头部曝光": r[4],
  599. "头部曝光uv": r[5],
  600. "头部realplay": r[6],
  601. "头部realplay_uv": r[7],
  602. "头部分享": r[8],
  603. "头部分享uv": r[9],
  604. "头部回流数": r[10],
  605. "推荐曝光数": r[11],
  606. "当日分发曝光uv": r[12],
  607. "推荐realplay": r[13],
  608. "分发realplay_uv": r[14],
  609. "推荐分享数": r[15],
  610. "当日分发分享uv": r[16],
  611. "推荐回流数": r[17],
  612. "vov分子": r[18],
  613. },
  614. }
  615. )
  616. # 输出到 examples/demand/data/changwen_data/
  617. output_dir = Path(__file__).parent / "data" / "changwen_data"
  618. output_dir.mkdir(parents=True, exist_ok=True)
  619. output_file = output_dir / f"{account_name}.json"
  620. with output_file.open("w", encoding="utf-8") as f:
  621. json.dump(result_list, f, ensure_ascii=False, indent=2)
  622. return result_list
  623. def get_zengzhang_weight(account_name):
  624. bizdatemax_date = date.today() - timedelta(days=1)
  625. bizdatemin_date = bizdatemax_date - timedelta(days=30)
  626. bizdatemax = bizdatemax_date.strftime("%Y%m%d")
  627. bizdatemin = bizdatemin_date.strftime("%Y%m%d")
  628. sql_query = f'''
  629. SELECT 合作方名
  630. ,合作方简称
  631. ,videoid
  632. ,一级品类
  633. ,二级品类
  634. ,SUM(头部曝光) as 头部曝光
  635. ,SUM(头部曝光uv) as 头部曝光uv
  636. ,SUM(头部realplay) as 头部realplay
  637. ,SUM(头部realplay_uv) as 头部realplay_uv
  638. ,SUM(头部分享) as 头部分享
  639. ,SUM(头部分享uv) as 头部分享uv
  640. ,SUM(头部回流数) as 头部回流数
  641. ,SUM(推荐曝光数) as 推荐曝光数
  642. ,SUM(当日分发曝光uv) as 当日分发曝光uv
  643. ,SUM(推荐realplay) as 推荐realplay
  644. ,SUM(分发realplay_uv) as 分发realplay_uv
  645. ,SUM(推荐分享数) as 推荐分享数
  646. ,SUM(当日分发分享uv) as 当日分发分享uv
  647. ,SUM(推荐回流数) as 推荐回流数
  648. ,SUM(vov分子) as vov分子
  649. FROM loghubods.dws_growth_partner_vid_data
  650. WHERE dt BETWEEN '{bizdatemin}' AND '{bizdatemax}'
  651. AND 合作方名 = '{account_name}'
  652. GROUP BY 合作方名
  653. ,合作方简称
  654. ,videoid
  655. ,一级品类
  656. ,二级品类
  657. ORDER BY SUM(推荐曝光数)
  658. ;
  659. '''
  660. result_list = []
  661. data = get_odps_data(sql_query)
  662. if data:
  663. for r in data:
  664. result_list.append(
  665. {
  666. "account_name": r[0],
  667. "合作方简称": r[1],
  668. "videoid": r[2],
  669. "一级品类": r[3],
  670. "二级品类": r[4],
  671. "ext_data": {
  672. "头部曝光": r[5],
  673. "头部曝光uv": r[6],
  674. "头部realplay": r[7],
  675. "头部realplay_uv": r[8],
  676. "头部分享": r[9],
  677. "头部分享uv": r[10],
  678. "头部回流数": r[11],
  679. "推荐曝光数": r[12],
  680. "当日分发曝光uv": r[13],
  681. "推荐realplay": r[14],
  682. "分发realplay_uv": r[15],
  683. "推荐分享数": r[16],
  684. "当日分发分享uv": r[17],
  685. "推荐回流数": r[18],
  686. "vov分子": r[19],
  687. },
  688. }
  689. )
  690. # 输出到 examples/demand/data/zengzhang_data/
  691. output_dir = Path(__file__).parent / "data" / "zengzhang_data"
  692. output_dir.mkdir(parents=True, exist_ok=True)
  693. output_file = output_dir / f"{account_name}.json"
  694. with output_file.open("w", encoding="utf-8") as f:
  695. json.dump(result_list, f, ensure_ascii=False, indent=2)
  696. return result_list
  697. def get_merge_leve2_by_video_ids(video_ids, batch_size=2000):
  698. result = {}
  699. if not video_ids:
  700. return result
  701. normalized_ids = [str(video_id) for video_id in video_ids if video_id is not None]
  702. for i in range(0, len(normalized_ids), batch_size):
  703. batch_ids = normalized_ids[i:i + batch_size]
  704. escaped_ids = [video_id.replace("'", "''") for video_id in batch_ids]
  705. video_ids_in_clause = ", ".join([f"'{video_id}'" for video_id in escaped_ids])
  706. sql_query = f'''
  707. SELECT videoid, merge_leve2
  708. FROM loghubods.video_merge_tag
  709. WHERE videoid IN ({video_ids_in_clause})
  710. '''
  711. data = get_odps_data(sql_query)
  712. if not data:
  713. continue
  714. for row in data:
  715. result[str(row[0])] = row[1]
  716. return result
  717. def get_all_decode_task_result_rows():
  718. return mysql_db.select(
  719. "workflow_decode_task_result",
  720. columns="id, channel_content_id, merge_leve2",
  721. )
  722. def update_decode_task_result_merge_leve2(channel_content_id, merge_leve2):
  723. return mysql_db.update(
  724. "workflow_decode_task_result",
  725. {"merge_leve2": str(merge_leve2)},
  726. "channel_content_id = %s",
  727. (str(channel_content_id),),
  728. )
  729. def backfill_merge_leve2_for_decode_task_result():
  730. rows = get_all_decode_task_result_rows()
  731. updated_count = 0
  732. skipped_count = 0
  733. valid_content_ids = []
  734. for row in rows:
  735. channel_content_id = row.get("channel_content_id")
  736. if channel_content_id is None:
  737. skipped_count += 1
  738. continue
  739. channel_content_id = str(channel_content_id)
  740. if len(channel_content_id) > 8:
  741. skipped_count += 1
  742. continue
  743. valid_content_ids.append(channel_content_id)
  744. merge_leve2_map = get_merge_leve2_by_video_ids(valid_content_ids, batch_size=2000)
  745. for channel_content_id in valid_content_ids:
  746. merge_leve2 = merge_leve2_map.get(channel_content_id)
  747. if not merge_leve2:
  748. continue
  749. affected = update_decode_task_result_merge_leve2(channel_content_id, merge_leve2)
  750. if affected > 0:
  751. updated_count += affected
  752. return {
  753. "total": len(rows),
  754. "updated": updated_count,
  755. "skipped": skipped_count,
  756. }
  757. #
  758. # if __name__ == '__main__':
  759. # backfill_merge_leve2_for_decode_task_result()