data_query_tools.py 36 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849
  1. import hashlib
  2. from zoneinfo import ZoneInfo
  3. from odps import ODPS
  4. from odps.errors import ODPSError
  5. from datetime import date, datetime, timedelta
  6. import json
  7. from pathlib import Path
  8. from examples.demand.mysql import mysql_db
  9. def get_odps_data(sql):
  10. # 配置信息
  11. access_id = 'LTAI9EBa0bd5PrDa'
  12. access_key = 'vAalxds7YxhfOA2yVv8GziCg3Y87v5'
  13. project = 'loghubods'
  14. endpoint = 'http://service.odps.aliyun.com/api'
  15. # 1. 初始化 ODPS 入口
  16. o = ODPS(access_id, access_key, project, endpoint=endpoint)
  17. try:
  18. # 2. 执行 SQL 并获取结果
  19. # execute_sql 会等待任务完成,使用 open_reader 读取数据
  20. with o.execute_sql(sql).open_reader() as reader:
  21. # reader 类似于 Java 中的 List<Record>
  22. # 我们可以直接将其转换为 Python 的 list
  23. records = [record for record in reader]
  24. return records
  25. except ODPSError as e:
  26. print(f"ODPS 错误: {e}")
  27. return None
  28. def execute_odps_sql(sql) -> bool:
  29. # 配置信息
  30. access_id = 'LTAI9EBa0bd5PrDa'
  31. access_key = 'vAalxds7YxhfOA2yVv8GziCg3Y87v5'
  32. project = 'loghubods'
  33. endpoint = 'http://service.odps.aliyun.com/api'
  34. o = ODPS(access_id, access_key, project, endpoint=endpoint)
  35. try:
  36. instance = o.execute_sql(sql)
  37. instance.wait_for_success()
  38. return True
  39. except ODPSError as e:
  40. print(f"ODPS 错误: {e}")
  41. return False
  42. _STRATEGY_GAP = "当下供需gap"
  43. _STRATEGY_GAP_FENCI = "当下供需gap-分词"
  44. _HIVE_TABLE = "loghubods.dwd_multi_demand_pool_di"
  45. _HIVE_DT_FMT = "%Y%m%d" # 分区格式:yyyymmdd,如 20260519
  46. _CHINA_TZ = ZoneInfo("Asia/Shanghai")
  47. def _hive_partition_dt() -> str:
  48. """中国时区(Asia/Shanghai)当天日期,格式 yyyymmdd。"""
  49. return datetime.now(_CHINA_TZ).date().strftime(_HIVE_DT_FMT)
  50. def _escape_odps_string(value: object) -> str:
  51. return str(value).replace("'", "''")
  52. def _format_odps_string_array(values: list) -> str:
  53. if not values:
  54. return "ARRAY()"
  55. parts = [f"'{_escape_odps_string(v)}'" for v in values]
  56. return f"ARRAY({','.join(parts)})"
  57. def _parse_ext_data(ext_data_raw: object) -> dict:
  58. if isinstance(ext_data_raw, dict):
  59. return ext_data_raw
  60. if isinstance(ext_data_raw, str) and ext_data_raw.strip():
  61. try:
  62. return json.loads(ext_data_raw)
  63. except json.JSONDecodeError:
  64. return {}
  65. return {}
  66. def _build_hive_select_part(
  67. strategy: str,
  68. demand_id: str,
  69. demand_name: str,
  70. weight: float,
  71. type_str: str,
  72. video_count: int,
  73. video_ids: list[str],
  74. extend_json: str,
  75. ) -> str:
  76. return (
  77. "SELECT "
  78. f"'{_escape_odps_string(strategy)}' AS strategy, "
  79. f"'{_escape_odps_string(demand_id)}' AS demand_id, "
  80. f"'{_escape_odps_string(demand_name)}' AS demand_name, "
  81. f"{weight} AS weight, "
  82. f"'{_escape_odps_string(type_str)}' AS `type`, "
  83. f"{video_count} AS video_count, "
  84. f"{_format_odps_string_array(video_ids)} AS video_list, "
  85. f"'{_escape_odps_string(extend_json)}' AS extend"
  86. )
  87. def _insert_hive_select_parts(select_parts: list[str], partition_dt: str) -> bool:
  88. if not select_parts:
  89. return True
  90. union_sql = "\nUNION ALL\n".join(select_parts)
  91. insert_sql = f"""
  92. INSERT INTO TABLE {_HIVE_TABLE}
  93. PARTITION (dt='{partition_dt}')
  94. (strategy, demand_id, demand_name, weight, `type`, video_count, video_list, extend)
  95. {union_sql}
  96. """
  97. return execute_odps_sql(insert_sql)
  98. def write_dwd_multi_demand_pool_di_to_hive(rows: list[dict]) -> int:
  99. """
  100. 将行数据映射并写入 loghubods.dwd_multi_demand_pool_di(尽力插入,不校验结果)。
  101. 分区与 demand_id 的日期均为中国时区当天(yyyymmdd),不使用行内 dt 字段。
  102. 执行两次 INSERT(同表、同分区),策略不同:
  103. 1) 当下供需gap: demand_name=merge_leve2+' '+name, demand_id=md5(strategy+demand_name+type+dt)
  104. 2) 当下供需gap-分词: demand_name=name, demand_id=md5(strategy+name+品类+type+dt)
  105. """
  106. if not rows:
  107. return 0
  108. china_today = _hive_partition_dt()
  109. gap_parts: list[str] = []
  110. fenci_parts: list[str] = []
  111. for row in rows:
  112. merge_leve2 = str(row.get("merge_leve2") or "").strip()
  113. name = str(row.get("name") or "").strip()
  114. if not merge_leve2 or not name:
  115. continue
  116. weight = round(float(row.get("score") or 0.0), 6)
  117. ext_data = _parse_ext_data(row.get("ext_data"))
  118. type_str = str(ext_data.get("type") or "").strip()
  119. video_ids = ext_data.get("video_ids") or []
  120. if not isinstance(video_ids, list):
  121. video_ids = []
  122. video_ids = [str(v).strip() for v in video_ids if v is not None and str(v).strip()]
  123. video_count = len(video_ids)
  124. extend_json = json.dumps({"品类": merge_leve2}, ensure_ascii=False)
  125. demand_name_gap = f"{merge_leve2} {name}"
  126. demand_id_gap = hashlib.md5(
  127. f"{_STRATEGY_GAP}{demand_name_gap}{type_str}{china_today}".encode("utf-8")
  128. ).hexdigest()
  129. gap_parts.append(
  130. _build_hive_select_part(
  131. _STRATEGY_GAP, demand_id_gap, demand_name_gap,
  132. weight, type_str, video_count, video_ids, extend_json,
  133. )
  134. )
  135. demand_id_fenci = hashlib.md5(
  136. f"{_STRATEGY_GAP_FENCI}{name}{merge_leve2}{type_str}{china_today}".encode("utf-8")
  137. ).hexdigest()
  138. fenci_parts.append(
  139. _build_hive_select_part(
  140. _STRATEGY_GAP_FENCI, demand_id_fenci, name,
  141. weight, type_str, video_count, video_ids, extend_json,
  142. )
  143. )
  144. if not gap_parts:
  145. return 0
  146. _insert_hive_select_parts(gap_parts, china_today)
  147. _insert_hive_select_parts(fenci_parts, china_today)
  148. return len(gap_parts) + len(fenci_parts)
  149. def write_feature_point_data_to_hive(names: list[str]) -> int:
  150. """
  151. 将需求名称写入 Hive 表 feature_point_data(按北京时间当天分区)。
  152. 仅写入以下字段:
  153. - 特征点
  154. - 总分发曝光pv(固定 5000)
  155. - 质bn_rovn(固定 0.1)
  156. """
  157. normalized_names = [str(name).strip() for name in names if name is not None and str(name).strip()]
  158. if not normalized_names:
  159. return 0
  160. dt = datetime.now(ZoneInfo("Asia/Shanghai")).strftime("%Y%m%d")
  161. select_parts = []
  162. for name in normalized_names:
  163. safe_name = name.replace("'", "''")
  164. select_parts.append(
  165. "SELECT "
  166. f"'{safe_name}' AS `特征点`, "
  167. "5000 AS `总分发曝光pv`, "
  168. "0.1 AS `质bn_rovn`"
  169. )
  170. union_sql = "\nUNION ALL\n".join(select_parts)
  171. insert_sql = f"""
  172. INSERT INTO TABLE feature_point_data
  173. PARTITION (dt='{dt}')
  174. (`特征点`, `总分发曝光pv`, `质bn_rovn`)
  175. {union_sql}
  176. """
  177. ok = execute_odps_sql(insert_sql)
  178. if not ok:
  179. return 0
  180. return len(normalized_names)
  181. def get_demand_merge_level2_names():
  182. date_time = datetime.now(ZoneInfo("Asia/Shanghai")).date() - timedelta(days=1)
  183. day = date_time.strftime("%Y%m%d")
  184. count = 50
  185. sql_query = f'''
  186. select *
  187. from (
  188. select
  189. dt,
  190. merge二级品类,
  191. sum(当日分发曝光pv) as 分发曝光pv,
  192. sum(累计分享回流uv) AS bn_总回流,
  193. sum(当日分发回流uv)/(sum(当日分发曝光pv)+100) as 质bn_rovn,
  194. case when sum(当日分发曝光pv)>=10000 then
  195. case when sum(当日分发回流uv)/(sum(当日分发曝光pv)+100)<0.035
  196. then -1*(count(DISTINCT 视频id)/avg(总日分发视频数))/((sum(累计分享回流uv)/avg(总日回流uv)))
  197. else 10*(sum(累计分享回流uv)/avg(总日回流uv)*sum(当日分发回流uv)/(sum(当日分发曝光pv)+100))/(count(DISTINCT 视频id)/avg(总日分发视频数))
  198. end
  199. else 0 end AS 总供需分,
  200. case when sum(当日分发曝光pv)>=10000 then
  201. case when sum(当日分发回流uv)/(sum(当日分发曝光pv)+100)<0.035
  202. then -1*(COUNT(DISTINCT CASE WHEN 推荐天数间隔<3 THEN 视频id END ) /avg(总日分发视频数))/(sum(累计分享回流uv)/avg(总日回流uv))
  203. else 10*(sum(累计分享回流uv)/avg(总日回流uv)*sum(当日分发回流uv)/(sum(当日分发曝光pv)+1000))/(COUNT(DISTINCT CASE WHEN 推荐天数间隔<3 THEN 视频id END ) /avg(总日分发视频数))
  204. end
  205. else 0 end AS 新供需分,
  206. count(DISTINCT 视频id) as 分发视频量,
  207. count(DISTINCT if(推荐天数间隔<3,视频id,null)) as 3日新推荐视频量,
  208. case when sum(当日分发曝光pv)>=10000 and sum(当日分发回流uv)/(sum(当日分发曝光pv)+100)>0.035
  209. then (avg(总日分发视频数)*(10*(sum(当日分发回流uv)/(sum(当日分发曝光pv)+100))*(sum(累计分享回流uv)/avg(总日回流uv) ))/0.5-count(DISTINCT 视频id))/3
  210. end as 缺量,
  211. case when sum(当日分发曝光pv)>=10000 and sum(当日分发回流uv)/(sum(当日分发曝光pv)+100)<=0.035
  212. then (avg(总日分发视频数)*(10*(sum(当日分发回流uv)/(sum(当日分发曝光pv)+100))*(sum(累计分享回流uv)/avg(总日回流uv) ))/(2)-count(DISTINCT 视频id))/3
  213. end as 控量,
  214. avg(总日回流uv) AS 总日回流uv,
  215. avg(总日分发视频数) AS 总日分发视频数,
  216. avg(总日推荐视频数) AS 总日推荐视频数,
  217. COUNT(DISTINCT CASE WHEN 总回流uv>0 THEN 视频id END )/avg(总日分发视频数) AS 回流视频个数占比,
  218. sum(当日分发回流uv) AS bn_当日分发回流,
  219. sum(当日分发回流uv)/avg(总日回流uv) AS 分发拉回回流uv占比,
  220. sum(累计分享回流uv)/avg(总日回流uv) AS 回流uv占比,
  221. count(DISTINCT 视频id)/avg(总日分发视频数) AS 分发视频量占比,
  222. COUNT(DISTINCT CASE WHEN 是否当日新推荐=1 THEN 视频id END ) /avg(总日分发视频数) AS 新推荐视频量占比
  223. from loghubods.video_dimension_detail_add_column
  224. where dt = '{day}'
  225. group by dt, merge二级品类
  226. ) t1
  227. where t1.缺量>= {count}
  228. '''
  229. data = get_odps_data(sql_query)
  230. result_list = []
  231. if data:
  232. for r in data:
  233. lack_count = r[9]
  234. if lack_count > 1000:
  235. count = 70
  236. elif 500 < lack_count <= 1000:
  237. count = 60
  238. elif 100 < lack_count <= 500:
  239. count = 40
  240. elif 50 < lack_count <= 100:
  241. count = 20
  242. else:
  243. count = 10
  244. if count == 0:
  245. continue
  246. result_list.append({
  247. "cluster_name": r[1],
  248. "platform_type": "piaoquan",
  249. "count": count,
  250. })
  251. return result_list
  252. def get_rov_by_merge_leve2_and_video_ids(merge_leve2, video_ids):
  253. merge_level_in_clause = f"'{merge_leve2}'"
  254. video_ids_in_clause = ", ".join([f"'{video_id}'" for video_id in video_ids])
  255. end_date = (date.today() - timedelta(days=1)).strftime("%Y%m%d")
  256. start_date = (date.today() - timedelta(days=14)).strftime("%Y%m%d")
  257. # sql_query = f'''
  258. # SELECT
  259. # v.videoid,
  260. # CASE
  261. # WHEN COALESCE(SUM(COALESCE(t3.`当日分发曝光pv`, 0)), 0) < 1000 THEN 0
  262. # ELSE COALESCE(AVG(NULLIF(t3.rov_t0, 0)), 0)
  263. # END AS avg_rov_t0
  264. # FROM
  265. # (
  266. # SELECT
  267. # t2.videoid,
  268. # t2.merge_leve2
  269. # FROM videoods.content_profile t1
  270. # JOIN loghubods.video_merge_tag t2
  271. # ON t1.content_id = t2.videoid
  272. # WHERE
  273. # t1.status = 3
  274. # AND t1.is_deleted = 0
  275. # AND t2.merge_leve2 IN ({merge_level_in_clause})
  276. # ) v
  277. # LEFT JOIN loghubods.video_dimension_detail_add_column t3
  278. # ON v.videoid = t3.视频id
  279. # AND t3.dt >= '{start_date}'
  280. # AND t3.dt <= '{end_date}'
  281. # WHERE v.videoid in ({video_ids_in_clause})
  282. # GROUP BY
  283. # v.videoid
  284. # ;
  285. # '''
  286. sql_query = f'''
  287. SELECT
  288. CAST(t3.视频id AS STRING) AS 视频id_str,
  289. CASE
  290. WHEN COALESCE(SUM(COALESCE(t3.`当日分发曝光pv`, 0)), 0) < 1000 THEN 0
  291. ELSE COALESCE(AVG(NULLIF(t3.rov_t0, 0)), 0)
  292. END AS avg_rov_t0
  293. FROM
  294. loghubods.video_dimension_detail_add_column t3
  295. WHERE t3.视频id in ({video_ids_in_clause})
  296. AND t3.dt >= '{start_date}'
  297. AND t3.dt <= '{end_date}'
  298. GROUP BY
  299. t3.视频id
  300. ;
  301. '''
  302. data = get_odps_data(sql_query)
  303. result_dict = {}
  304. if data:
  305. result_dict = {r[0]: r[1] for r in data}
  306. return result_dict
  307. def get_rov_by_tree_and_video_ids(video_ids):
  308. video_ids_in_clause = ", ".join([f"'{video_id}'" for video_id in video_ids])
  309. last_year_today = date.today() - timedelta(days=365)
  310. start_date = last_year_today.strftime("%Y%m%d")
  311. end_date = (last_year_today + timedelta(days=7)).strftime("%Y%m%d")
  312. sql_query = f'''
  313. SELECT
  314. CAST(t3.视频id AS STRING) AS 视频id_str,
  315. CASE
  316. WHEN COALESCE(SUM(COALESCE(t3.`当日分发曝光pv`, 0)), 0) < 1000 THEN 0
  317. ELSE COALESCE(AVG(NULLIF(t3.rov_t0, 0)), 0)
  318. END AS avg_rov_t0
  319. FROM
  320. loghubods.video_dimension_detail_add_column t3
  321. WHERE t3.视频id in ({video_ids_in_clause})
  322. AND t3.dt >= '{start_date}'
  323. AND t3.dt <= '{end_date}'
  324. GROUP BY
  325. t3.视频id
  326. ;
  327. '''
  328. data = get_odps_data(sql_query)
  329. result_dict = {}
  330. if data:
  331. result_dict = {r[0]: r[1] for r in data}
  332. return result_dict
  333. def get_changwen_weight(account_name):
  334. bizdatemax_date = date.today() - timedelta(days=1)
  335. bizdatemin_date = bizdatemax_date - timedelta(days=30)
  336. bizdatemax = bizdatemax_date.strftime("%Y%m%d")
  337. bizdatemin = bizdatemin_date.strftime("%Y%m%d")
  338. sql_query = f'''
  339. SELECT
  340. 公众号名
  341. ,videoid
  342. ,一级品类
  343. ,二级品类
  344. ,头部曝光
  345. ,头部曝光uv
  346. ,头部realplay
  347. ,头部realplay_uv
  348. ,头部分享
  349. ,头部分享uv
  350. ,头部回流人数 AS 头部回流数
  351. ,推荐曝光数
  352. ,当日分发曝光uv
  353. ,推荐realplay
  354. ,分发realplay_uv
  355. ,推荐分享数
  356. ,当日分发分享uv
  357. ,推荐回流数
  358. ,当日回流进入分发曝光次数 AS vov分子
  359. FROM (
  360. SELECT DISTINCT a.公众号名
  361. ,a.videoid
  362. ,e.merge_leve1 AS 一级品类
  363. ,e.merge_leve2 AS 二级品类
  364. ,a.title
  365. ,a.进入分发人数
  366. ,头部曝光pv AS 头部曝光
  367. ,头部realplay_pv AS 头部realplay
  368. ,头部分享pv AS 头部分享
  369. ,a.当日分发曝光pv AS 推荐曝光数
  370. ,a.当日分发播放pv
  371. ,分发realplay_pv AS 推荐realplay
  372. ,分发realplay_pv / a.当日分发播放pv AS 真实播放率pv
  373. ,当日分发播放uv
  374. ,c.realplay_uv AS 分发真实播uv
  375. ,c.realplay_uv / a.当日分发播放uv AS 真实播放率uv
  376. ,a.当日分发分享pv AS 推荐分享数
  377. ,a.当日分发分享pv / a.当日分发曝光pv AS str
  378. ,NVL(b.当日分发回流人数,0) AS 推荐回流数
  379. ,NVL(b.当日回流进入分发人数,0) AS 当日回流进入分发人数
  380. ,NVL(b.当日回流进入分发曝光次数,0) AS 当日回流进入分发曝光次数
  381. ,NVL(b.当日回流进入分发曝光次数,0) / a.当日分发曝光pv AS vov分子
  382. ,d.头部回流人数
  383. ,当日分发曝光uv
  384. ,头部曝光uv
  385. ,当日分发分享uv
  386. ,头部分享uv
  387. ,分发realplay_uv
  388. ,头部realplay_uv
  389. FROM (
  390. SELECT account_name AS 公众号名
  391. ,videoid
  392. ,title
  393. ,COUNT(DISTINCT mid) AS 进入分发人数
  394. ,COUNT(
  395. CASE WHEN pagesource REGEXP 'category$|recommend$|-pages/user-videos-detail$' AND businesstype = 'videoView' THEN mid END
  396. ) AS 当日分发曝光pv
  397. ,COUNT(DISTINCT
  398. CASE WHEN pagesource REGEXP 'category$|recommend$|-pages/user-videos-detail$' AND businesstype = 'videoView' THEN mid END
  399. ) AS 当日分发曝光uv
  400. ,COUNT(
  401. CASE WHEN pagesource REGEXP 'pages/user-videos-share$' AND businesstype = 'videoView' THEN mid END
  402. ) AS 头部曝光pv
  403. ,COUNT(DISTINCT
  404. CASE WHEN pagesource REGEXP 'pages/user-videos-share$' AND businesstype = 'videoView' THEN mid END
  405. ) AS 头部曝光uv
  406. ,COUNT(
  407. CASE WHEN pagesource REGEXP 'category$|recommend$|-pages/user-videos-detail$' AND businesstype = 'videoPlay' THEN mid END
  408. ) AS 当日分发播放pv
  409. ,COUNT(DISTINCT
  410. CASE WHEN pagesource REGEXP 'category$|recommend$|-pages/user-videos-detail$' AND businesstype = 'videoPlay' THEN mid END
  411. ) AS 当日分发播放uv
  412. ,COUNT(
  413. CASE WHEN pagesource REGEXP 'category$|recommend$|-pages/user-videos-detail$' AND businesstype = 'videoShareFriend' THEN mid END
  414. ) AS 当日分发分享pv
  415. ,COUNT(DISTINCT
  416. CASE WHEN pagesource REGEXP 'category$|recommend$|-pages/user-videos-detail$' AND businesstype = 'videoShareFriend' THEN mid END
  417. ) AS 当日分发分享uv
  418. ,COUNT(
  419. CASE WHEN pagesource REGEXP 'pages/user-videos-share$' AND businesstype = 'videoShareFriend' THEN mid END
  420. ) AS 头部分享pv
  421. ,COUNT(DISTINCT
  422. CASE WHEN pagesource REGEXP 'pages/user-videos-share$' AND businesstype = 'videoShareFriend' THEN mid END
  423. ) AS 头部分享uv
  424. FROM (
  425. SELECT DISTINCT a.mid
  426. ,a.videoid
  427. ,a.businesstype
  428. ,a.pagesource
  429. ,a.subsessionid
  430. ,account_name
  431. ,e.title
  432. FROM loghubods.video_action_log_rp a
  433. LEFT JOIN loghubods.user_wechat_identity_info_ha b
  434. ON a.mid = CONCAT('weixin_openid_',b.open_id)
  435. AND b.dt = MAX_PT("loghubods.user_wechat_identity_info_ha")
  436. LEFT JOIN loghubods.gzh_fans_info d
  437. ON b.union_id = d.union_id
  438. AND d.dt = MAX_PT("loghubods.gzh_fans_info")
  439. LEFT JOIN videoods.wx_video e
  440. ON a.videoid = e.id
  441. WHERE a.dt >= '{bizdatemin}'
  442. AND a.dt <= '{bizdatemax}'
  443. AND businesstype IN ('videoView','videoPlay','videoShareFriend')
  444. AND d.user_create_time IS NOT NULL
  445. AND account_name = '{account_name}'
  446. AND a.videoid IN (
  447. SELECT
  448. DISTINCT content_id AS videoid
  449. FROM
  450. videoods.content_profile
  451. WHERE status=3
  452. AND is_deleted = 0
  453. )
  454. ) t
  455. GROUP BY 公众号名
  456. ,videoid
  457. ,title
  458. ) a
  459. LEFT JOIN (
  460. SELECT t.account_name AS 公众号名
  461. ,t.videoid
  462. ,COUNT(DISTINCT s.machinecode) AS 当日分发回流人数
  463. ,COUNT(DISTINCT v.mid) AS 当日回流进入分发人数
  464. ,COUNT(v.mid) AS 当日回流进入分发曝光次数
  465. FROM (
  466. SELECT DISTINCT a.subsessionid
  467. ,a.videoid
  468. ,a.mid
  469. ,d.account_name
  470. ,GET_JSON_OBJECT(extparams,'$.recomTraceId') AS recomtraceid
  471. FROM loghubods.video_action_log_rp a
  472. LEFT JOIN loghubods.user_wechat_identity_info_ha b
  473. ON a.mid = CONCAT('weixin_openid_',b.open_id)
  474. AND b.dt = MAX_PT("loghubods.user_wechat_identity_info_ha")
  475. LEFT JOIN loghubods.gzh_fans_info d
  476. ON b.union_id = d.union_id
  477. AND d.dt = MAX_PT("loghubods.gzh_fans_info")
  478. WHERE a.dt >= '{bizdatemin}'
  479. AND a.dt <= '{bizdatemax}'
  480. AND a.businesstype = 'videoShareFriend'
  481. AND a.pagesource REGEXP 'category$|recommend$|-pages/user-videos-detail$'
  482. AND d.user_create_time IS NOT NULL
  483. AND d.account_name = '{account_name}'
  484. ) t
  485. LEFT JOIN (
  486. SELECT DISTINCT subsessionid
  487. ,machinecode
  488. ,recomtraceid
  489. ,clickobjectid
  490. FROM loghubods.user_share_log
  491. WHERE dt >= '{bizdatemin}'
  492. AND dt <= '{bizdatemax}'
  493. AND topic = 'click'
  494. ) s
  495. ON t.recomtraceid = s.recomtraceid
  496. AND t.videoid = s.clickobjectid
  497. LEFT JOIN (
  498. SELECT subsessionid
  499. ,mid
  500. ,videoid
  501. FROM loghubods.video_action_log_rp
  502. WHERE dt >= '{bizdatemin}'
  503. AND dt <= '{bizdatemax}'
  504. AND pagesource REGEXP 'category$|recommend$|-pages/user-videos-detail$'
  505. AND businesstype = 'videoView'
  506. ) v
  507. ON s.subsessionid = v.subsessionid
  508. AND s.machinecode = v.mid
  509. GROUP BY account_name
  510. ,t.videoid
  511. ) b
  512. ON a.公众号名 = b.公众号名
  513. AND a.videoid = b.videoid
  514. LEFT JOIN (
  515. SELECT d.account_name AS 公众号名
  516. ,a.videoid
  517. ,COUNT(DISTINCT a.mid) AS realplay_uv
  518. ,COUNT(
  519. CASE WHEN a.pagesource REGEXP 'category$|recommend$|-pages/user-videos-detail$' THEN a.mid END
  520. ) AS 分发realplay_pv
  521. ,COUNT(CASE WHEN a.pagesource REGEXP 'pages/user-videos-share$' THEN a.mid END) AS 头部realplay_pv
  522. ,COUNT(DISTINCT
  523. CASE WHEN a.pagesource REGEXP 'category$|recommend$|-pages/user-videos-detail$' THEN a.mid END
  524. ) AS 分发realplay_uv
  525. ,COUNT(DISTINCT CASE WHEN a.pagesource REGEXP 'pages/user-videos-share$' THEN a.mid END) AS 头部realplay_uv
  526. FROM loghubods.ods_video_play_log_day a
  527. LEFT JOIN (
  528. SELECT DISTINCT open_id
  529. ,union_id
  530. FROM loghubods.user_wechat_identity_info_ha
  531. WHERE dt = MAX_PT("loghubods.user_wechat_identity_info_ha")
  532. ) b
  533. ON a.mid = CONCAT('weixin_openid_',b.open_id)
  534. LEFT JOIN loghubods.gzh_fans_info d
  535. ON b.union_id = d.union_id
  536. AND d.dt = MAX_PT("loghubods.gzh_fans_info")
  537. WHERE a.dt >= '{bizdatemin}'
  538. AND a.dt <= '{bizdatemax}'
  539. AND a.businesstype = 'videoRealPlay'
  540. AND d.user_create_time IS NOT NULL
  541. AND d.account_name = '{account_name}'
  542. GROUP BY d.account_name
  543. ,a.videoid
  544. ORDER BY 分发realplay_pv DESC
  545. ) c
  546. ON a.公众号名 = c.公众号名
  547. AND a.videoid = c.videoid
  548. LEFT JOIN (
  549. SELECT t.account_name AS 公众号名
  550. ,t.videoid
  551. ,COUNT(DISTINCT s.machinecode) AS 头部回流人数
  552. FROM (
  553. SELECT DISTINCT a.shareobjectid AS videoid
  554. ,a.shareid
  555. ,a.machinecode
  556. ,d.account_name
  557. FROM loghubods.user_share_log a
  558. LEFT JOIN loghubods.user_wechat_identity_info_ha b
  559. ON a.machinecode = CONCAT('weixin_openid_',b.open_id)
  560. AND b.dt = MAX_PT("loghubods.user_wechat_identity_info_ha")
  561. LEFT JOIN loghubods.gzh_fans_info d
  562. ON b.union_id = d.union_id
  563. AND d.dt = MAX_PT("loghubods.gzh_fans_info")
  564. WHERE a.dt >= '{bizdatemin}'
  565. AND a.dt <= '{bizdatemax}'
  566. AND a.topic = 'share'
  567. AND a.pagesource REGEXP 'pages/user-videos-share$'
  568. AND d.user_create_time IS NOT NULL
  569. AND d.account_name = '{account_name}'
  570. ) t
  571. LEFT JOIN (
  572. SELECT DISTINCT shareid
  573. ,machinecode
  574. ,clickobjectid
  575. FROM loghubods.user_share_log
  576. WHERE dt >= '{bizdatemin}'
  577. AND dt <= '{bizdatemax}'
  578. AND topic = 'click'
  579. ) s
  580. ON t.shareid = s.shareid
  581. GROUP BY account_name
  582. ,t.videoid
  583. ) d
  584. ON a.公众号名 = d.公众号名
  585. AND a.videoid = d.videoid
  586. LEFT JOIN loghubods.video_merge_tag e
  587. ON a.videoid = e.videoid
  588. )
  589. ORDER BY 推荐曝光数 DESC
  590. '''
  591. result_list = []
  592. data = get_odps_data(sql_query)
  593. if data:
  594. for r in data:
  595. result_list.append(
  596. {
  597. "account_name": r[0],
  598. "videoid": r[1],
  599. "一级品类": r[2],
  600. "二级品类": r[3],
  601. "ext_data": {
  602. "头部曝光": r[4],
  603. "头部曝光uv": r[5],
  604. "头部realplay": r[6],
  605. "头部realplay_uv": r[7],
  606. "头部分享": r[8],
  607. "头部分享uv": r[9],
  608. "头部回流数": r[10],
  609. "推荐曝光数": r[11],
  610. "当日分发曝光uv": r[12],
  611. "推荐realplay": r[13],
  612. "分发realplay_uv": r[14],
  613. "推荐分享数": r[15],
  614. "当日分发分享uv": r[16],
  615. "推荐回流数": r[17],
  616. "vov分子": r[18],
  617. },
  618. }
  619. )
  620. # 输出到 examples/demand/data/changwen_data/
  621. output_dir = Path(__file__).parent / "data" / "changwen_data"
  622. output_dir.mkdir(parents=True, exist_ok=True)
  623. output_file = output_dir / f"{account_name}.json"
  624. with output_file.open("w", encoding="utf-8") as f:
  625. json.dump(result_list, f, ensure_ascii=False, indent=2)
  626. return result_list
  627. def get_zengzhang_weight(account_name):
  628. bizdatemax_date = date.today() - timedelta(days=1)
  629. bizdatemin_date = bizdatemax_date - timedelta(days=30)
  630. bizdatemax = bizdatemax_date.strftime("%Y%m%d")
  631. bizdatemin = bizdatemin_date.strftime("%Y%m%d")
  632. sql_query = f'''
  633. SELECT 合作方名
  634. ,合作方简称
  635. ,videoid
  636. ,一级品类
  637. ,二级品类
  638. ,SUM(头部曝光) as 头部曝光
  639. ,SUM(头部曝光uv) as 头部曝光uv
  640. ,SUM(头部realplay) as 头部realplay
  641. ,SUM(头部realplay_uv) as 头部realplay_uv
  642. ,SUM(头部分享) as 头部分享
  643. ,SUM(头部分享uv) as 头部分享uv
  644. ,SUM(头部回流数) as 头部回流数
  645. ,SUM(推荐曝光数) as 推荐曝光数
  646. ,SUM(当日分发曝光uv) as 当日分发曝光uv
  647. ,SUM(推荐realplay) as 推荐realplay
  648. ,SUM(分发realplay_uv) as 分发realplay_uv
  649. ,SUM(推荐分享数) as 推荐分享数
  650. ,SUM(当日分发分享uv) as 当日分发分享uv
  651. ,SUM(推荐回流数) as 推荐回流数
  652. ,SUM(vov分子) as vov分子
  653. FROM loghubods.dws_growth_partner_vid_data
  654. WHERE dt BETWEEN '{bizdatemin}' AND '{bizdatemax}'
  655. AND 合作方名 = '{account_name}'
  656. GROUP BY 合作方名
  657. ,合作方简称
  658. ,videoid
  659. ,一级品类
  660. ,二级品类
  661. ORDER BY SUM(推荐曝光数)
  662. ;
  663. '''
  664. result_list = []
  665. data = get_odps_data(sql_query)
  666. if data:
  667. for r in data:
  668. result_list.append(
  669. {
  670. "account_name": r[0],
  671. "合作方简称": r[1],
  672. "videoid": r[2],
  673. "一级品类": r[3],
  674. "二级品类": r[4],
  675. "ext_data": {
  676. "头部曝光": r[5],
  677. "头部曝光uv": r[6],
  678. "头部realplay": r[7],
  679. "头部realplay_uv": r[8],
  680. "头部分享": r[9],
  681. "头部分享uv": r[10],
  682. "头部回流数": r[11],
  683. "推荐曝光数": r[12],
  684. "当日分发曝光uv": r[13],
  685. "推荐realplay": r[14],
  686. "分发realplay_uv": r[15],
  687. "推荐分享数": r[16],
  688. "当日分发分享uv": r[17],
  689. "推荐回流数": r[18],
  690. "vov分子": r[19],
  691. },
  692. }
  693. )
  694. # 输出到 examples/demand/data/zengzhang_data/
  695. output_dir = Path(__file__).parent / "data" / "zengzhang_data"
  696. output_dir.mkdir(parents=True, exist_ok=True)
  697. output_file = output_dir / f"{account_name}.json"
  698. with output_file.open("w", encoding="utf-8") as f:
  699. json.dump(result_list, f, ensure_ascii=False, indent=2)
  700. return result_list
  701. def get_merge_leve2_by_video_ids(video_ids, batch_size=2000):
  702. result = {}
  703. if not video_ids:
  704. return result
  705. normalized_ids = [str(video_id) for video_id in video_ids if video_id is not None]
  706. for i in range(0, len(normalized_ids), batch_size):
  707. batch_ids = normalized_ids[i:i + batch_size]
  708. escaped_ids = [video_id.replace("'", "''") for video_id in batch_ids]
  709. video_ids_in_clause = ", ".join([f"'{video_id}'" for video_id in escaped_ids])
  710. sql_query = f'''
  711. SELECT videoid, merge_leve2
  712. FROM loghubods.video_merge_tag
  713. WHERE videoid IN ({video_ids_in_clause})
  714. '''
  715. data = get_odps_data(sql_query)
  716. if not data:
  717. continue
  718. for row in data:
  719. result[str(row[0])] = row[1]
  720. return result
  721. def get_all_decode_task_result_rows():
  722. return mysql_db.select(
  723. "workflow_decode_task_result",
  724. columns="id, channel_content_id, merge_leve2",
  725. )
  726. def update_decode_task_result_merge_leve2(channel_content_id, merge_leve2):
  727. return mysql_db.update(
  728. "workflow_decode_task_result",
  729. {"merge_leve2": str(merge_leve2)},
  730. "channel_content_id = %s",
  731. (str(channel_content_id),),
  732. )
  733. def backfill_merge_leve2_for_decode_task_result():
  734. rows = get_all_decode_task_result_rows()
  735. updated_count = 0
  736. skipped_count = 0
  737. valid_content_ids = []
  738. for row in rows:
  739. channel_content_id = row.get("channel_content_id")
  740. if channel_content_id is None:
  741. skipped_count += 1
  742. continue
  743. channel_content_id = str(channel_content_id)
  744. if len(channel_content_id) > 8:
  745. skipped_count += 1
  746. continue
  747. valid_content_ids.append(channel_content_id)
  748. merge_leve2_map = get_merge_leve2_by_video_ids(valid_content_ids, batch_size=2000)
  749. for channel_content_id in valid_content_ids:
  750. merge_leve2 = merge_leve2_map.get(channel_content_id)
  751. if not merge_leve2:
  752. continue
  753. affected = update_decode_task_result_merge_leve2(channel_content_id, merge_leve2)
  754. if affected > 0:
  755. updated_count += affected
  756. return {
  757. "total": len(rows),
  758. "updated": updated_count,
  759. "skipped": skipped_count,
  760. }
  761. #
  762. # if __name__ == '__main__':
  763. # backfill_merge_leve2_for_decode_task_result()