alg_growth_3rd_gh_reply_video_v1.py 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578
  1. # -*- coding: utf-8 -*-
  2. import pandas as pd
  3. import traceback
  4. import odps
  5. from odps import ODPS
  6. from threading import Timer
  7. from datetime import datetime, timedelta
  8. from db_helper import MysqlHelper
  9. from my_utils import check_table_partition_exits_v2, get_dataframe_from_odps, \
  10. get_odps_df_of_max_partition, get_odps_instance, get_odps_df_of_recent_partitions
  11. from my_utils import request_post, send_msg_to_feishu
  12. from my_config import set_config
  13. import numpy as np
  14. from log import Log
  15. import os
  16. from argparse import ArgumentParser
  17. from constants import AutoReplyAccountType
  18. CONFIG, _ = set_config()
  19. LOGGER = Log()
  20. BASE_GROUP_NAME = '3rd-party-base'
  21. EXPLORE1_GROUP_NAME = '3rd-party-explore1'
  22. EXPLORE2_GROUP_NAME = '3rd-party-explore2'
  23. # TODO: fetch gh_id from external data source
  24. GH_IDS = (
  25. 'gh_2863155cedcb',
  26. 'gh_c1acd6bac0f8',
  27. 'gh_da993c5f7f64',
  28. 'gh_495d71abbda2',
  29. 'gh_e2318164f869',
  30. 'gh_fc4ec610756e',
  31. 'gh_2450ad774945',
  32. 'gh_175925e40318',
  33. 'gh_994adaf7a539',
  34. 'gh_250c51d5ce69',
  35. 'gh_37adcd637351',
  36. 'gh_a36405f4e5d3',
  37. 'gh_ee5b4b07ed8b',
  38. 'gh_11debb2c8392',
  39. 'gh_d645c1ef7fb0',
  40. 'gh_1899b728af86',
  41. 'gh_059a27ea86b2',
  42. 'gh_6454c103be14',
  43. 'gh_63745bad4f21',
  44. 'gh_8a29eebc2012',
  45. 'gh_57bc9846c86a',
  46. 'gh_570967881eae',
  47. 'gh_197a0d8caa31',
  48. 'gh_93af434e3f47',
  49. 'gh_184f2d765f55',
  50. 'gh_8157d8fd284e',
  51. 'gh_8e1d1f19d44f',
  52. 'gh_1da8f62f4a0d',
  53. 'gh_fd4df7c45bb9',
  54. 'gh_dcfcf74b0846',
  55. 'gh_3afc3a8b8a3d',
  56. 'gh_ef699270bf64',
  57. 'gh_ba870f8b178b',
  58. 'gh_58cdb2f1f0d0',
  59. 'gh_3dce33de6994',
  60. 'gh_543e6d7e15f3',
  61. 'gh_0d55fee6b78d',
  62. 'gh_1c11651e0df4',
  63. 'gh_7f4741dd5fea',
  64. 'gh_33e9e4cbed84',
  65. 'gh_23fa67ebb016',
  66. 'gh_33c26df4eab0',
  67. 'gh_01c93b07605f',
  68. 'gh_c655e3c8a121',
  69. 'gh_83adb78f4ede',
  70. 'gh_0cc7c6d712eb',
  71. 'gh_1b8bfb5c4ffd',
  72. 'gh_e4fb77b1023b',
  73. 'gh_f994f5d9a4b6',
  74. 'gh_e0ca8ba4ed91',
  75. 'gh_b2b4d5aa6b49',
  76. 'gh_53759f90b0c5',
  77. 'gh_d219a0cc8a35',
  78. 'gh_930b5ef5a185',
  79. 'gh_22cad14dc4ec',
  80. 'gh_8734c02f2983',
  81. 'gh_8d68e68f2d08',
  82. 'gh_c603033bf881',
  83. 'gh_55ac4e447179',
  84. 'gh_8b5c838ac19a',
  85. 'gh_aed71f26e7e6',
  86. 'gh_330ef0db846d',
  87. 'gh_87eca527c626',
  88. 'gh_7a14b4c15090',
  89. 'gh_b74693fed783',
  90. 'gh_e1594e6db64b',
  91. 'gh_d32daba8ccf8',
  92. 'gh_23e084923def',
  93. 'gh_148aa4a776ce',
  94. 'gh_0df4d6b647ea',
  95. 'gh_041d1c819c30',
  96. 'gh_7e33fbba4398',
  97. 'gh_354ab82cf9b3',
  98. 'gh_b1f1d7a1f351',
  99. 'gh_793647539ef5',
  100. 'gh_1ff0c29f8408',
  101. 'gh_ecef1c08bcf4',
  102. 'gh_22f53f6a2b5d',
  103. 'gh_34820675d0fc',
  104. 'gh_4175a8f745f6',
  105. 'gh_81145598368a',
  106. 'gh_5f0bb5822e10',
  107. 'gh_65d8db4e97ca',
  108. 'gh_a09594d52fda',
  109. 'gh_4411cf1e5f4e',
  110. 'gh_9ee5f5e8425f',
  111. 'gh_df24adad2521',
  112. 'gh_30b472377707',
  113. 'gh_bb6775b47656',
  114. 'gh_69808935bba0',
  115. 'gh_fb77872bf907',
  116. 'gh_830c4aa1b262',
  117. 'gh_b5393e35caa4',
  118. 'gh_fa7dceae7c9d',
  119. 'gh_449fb0c2d817',
  120. 'gh_d6e75ad9094f',
  121. 'gh_1cbb453a800e',
  122. 'gh_1b243f162cbd',
  123. 'gh_50db6881c86e',
  124. 'gh_9d94775e8137',
  125. 'gh_d37101fb9b98',
  126. 'gh_ed86d05703eb',
  127. 'gh_ac4072121e24',
  128. 'gh_620af8e24fb9',
  129. 'gh_ee4783ded544',
  130. 'gh_d2bb5f1b9498',
  131. 'gh_5044de6e1597',
  132. 'gh_d94de77a8d08',
  133. 'gh_98624814f69a',
  134. 'gh_4c38b9d4474a',
  135. 'gh_f2a6c90c56cb',
  136. 'gh_26f1353fda5a',
  137. 'gh_143743361496',
  138. 'gh_126c99b39cea',
  139. 'gh_53e6e0a1b1bd',
  140. 'gh_859aafbcda3d',
  141. 'gh_cfce2617bd82',
  142. 'gh_db8ea2bc6687',
  143. 'gh_c4708b8cfe39',
  144. 'gh_57d2388bd01d',
  145. 'gh_5fffe35cc12a',
  146. 'gh_45980a6448f3',
  147. 'gh_f5120c12ee23',
  148. 'gh_bf79e3645d7a',
  149. 'gh_6c6d81dd642d',
  150. 'gh_57ee6a6ef204',
  151. 'gh_45be25d1c06b',
  152. 'gh_3ee85ba7c3ae',
  153. 'gh_7c89d5a3e745',
  154. 'gh_c46be9ea4eef',
  155. 'gh_cedc3c4eb48b',
  156. 'gh_8a91fa7f32aa',
  157. 'gh_5207b355776f',
  158. 'gh_6c7f73de400b',
  159. 'gh_d2f3805f8fa3',
  160. 'gh_7dd47f8aca4e',
  161. 'gh_967f9abb9ccd',
  162. 'gh_f46c6c9b53fa',
  163. 'gh_086abf2a536b',
  164. 'gh_6e11282216f3',
  165. 'gh_f5332b8dfb63',
  166. 'gh_f78610e292ba',
  167. 'gh_06699758fa4b',
  168. 'gh_92323d0bea11',
  169. 'gh_517aed4e8197',
  170. 'gh_c80462b5a330',
  171. 'gh_1b1c3ced734e',
  172. 'gh_dd54e30b03ad',
  173. 'gh_cadd0ea4fab3',
  174. 'gh_ef07a709127e',
  175. 'gh_ab6ca922e605',
  176. 'gh_8b69b67ea723',
  177. 'gh_363c54315788',
  178. 'gh_a363987c60bf',
  179. 'gh_86ca35774fcf',
  180. 'gh_518694803ae7',
  181. 'gh_f98d5f17e9ea',
  182. 'gh_5e0cd3f7b457',
  183. 'gh_9e0d149e2c0a',
  184. 'gh_7e77b09bb4f5',
  185. 'gh_261bbd99a906',
  186. 'gh_2dc8e3a7b6c9',
  187. 'gh_1ec8dae66c97',
  188. 'gh_7f062810b4e7',
  189. 'gh_3c112c0c9c8b',
  190. 'gh_01cd19465b39',
  191. 'gh_8cc8ae6eb9a5',
  192. 'gh_210f7ce6f418',
  193. 'gh_04804a94e325',
  194. 'gh_4685665647f0',
  195. 'gh_d7fa96aeb839',
  196. 'gh_210cb680d83d',
  197. 'gh_862b00a394e3',
  198. 'gh_3cf7b310906a',
  199. 'gh_669555ebea28',
  200. 'gh_aaac62205137',
  201. 'gh_0a03f8fa63ba',
  202. 'gh_b8b2d4184832',
  203. 'gh_819a632d4bb1',
  204. 'gh_db09b87a0fc9',
  205. 'gh_b673c01e7bd8',
  206. 'gh_6da61a15044a',
  207. 'gh_2f1fab4efaef',
  208. 'gh_da22f64152d5',
  209. 'gh_ff9fe99f2097',
  210. 'gh_33731afddbdb',
  211. 'gh_4d2f75c3c3fe',
  212. 'gh_40ff43c50773',
  213. 'gh_56b65b7d4520',
  214. 'gh_ff16c412ab97',
  215. 'gh_8bf689ae15cc',
  216. 'gh_650b17dbba8f',
  217. 'gh_b63b9dde3f4b',
  218. 'gh_36e74017026e',
  219. 'gh_a8851bfa953b',
  220. 'gh_ec5beb465640',
  221. 'gh_133c36b99b14',
  222. 'gh_b144210318e5',
  223. 'gh_3bffce62dbb4',
  224. 'gh_2fbff340c683',
  225. 'gh_3ceae370dcf5',
  226. 'gh_530b634707b0',
  227. 'gh_b7cdece20099',
  228. 'gh_9e0c7a370aaf',
  229. 'gh_96412c0393e3',
  230. 'gh_c8060587e6d1',
  231. 'gh_0d3c97cc30cc',
  232. 'gh_491189a534f2',
  233. 'gh_fe9620386c2c',
  234. 'gh_9d50b7067f07',
  235. 'gh_e1331141406a',
  236. 'gh_d6db13fcf14d',
  237. 'gh_5522900b6a67',
  238. 'gh_a7c21403c493',
  239. 'gh_eeec7c2e28a5',
  240. 'gh_c783350a9660',
  241. )
  242. TARGET_GH_IDS = (
  243. 'gh_250c51d5ce69',
  244. 'gh_8a29eebc2012',
  245. 'gh_ff16c412ab97',
  246. 'gh_1014734791e0',
  247. 'gh_570967881eae',
  248. 'gh_a7c21403c493',
  249. 'gh_7f062810b4e7',
  250. 'gh_c8060587e6d1',
  251. 'gh_1da8f62f4a0d',
  252. 'gh_56b65b7d4520',
  253. 'gh_eeec7c2e28a5',
  254. 'gh_7c89d5a3e745',
  255. 'gh_ee5b4b07ed8b',
  256. 'gh_0d3c97cc30cc',
  257. 'gh_c783350a9660',
  258. )
  259. CDN_IMG_OPERATOR = "?x-oss-process=image/resize,m_fill,w_600,h_480,limit_0/format,jpg/watermark,image_eXNoL3BpYy93YXRlcm1hcmtlci9pY29uX3BsYXlfd2hpdGUucG5nP3gtb3NzLXByb2Nlc3M9aW1hZ2UvcmVzaXplLHdfMTQ0,g_center"
  260. ODS_PROJECT = "loghubods"
  261. EXPLORE_POOL_TABLE = 'alg_growth_video_return_stats_history'
  262. GH_REPLY_STATS_TABLE = 'alg_growth_3rd_gh_reply_video_stats'
  263. # ODPS_RANK_RESULT_TABLE = 'alg_gh_autoreply_video_rank_data'
  264. ODPS_3RD_RANK_RESULT_TABLE = 'alg_3rd_gh_autoreply_video_rank_data'
  265. GH_DETAIL = 'gh_detail'
  266. RDS_RANK_RESULT_TABLE = 'alg_gh_autoreply_video_rank_data'
  267. STATS_PERIOD_DAYS = 5
  268. SEND_N = 1
  269. def get_and_update_gh_ids(run_dt):
  270. gh = get_odps_df_of_max_partition(ODS_PROJECT, GH_DETAIL, {'dt': run_dt})
  271. gh = gh.to_pandas()
  272. gh = gh[gh['type'] == AutoReplyAccountType.EXTERNAL_GZH.value]
  273. # default单独处理
  274. if 'default' not in gh['gh_id'].values:
  275. new_row = pd.DataFrame({'gh_id': ['default'], 'gh_name': ['默认'], 'type': [2], 'category1': ['泛生活']},
  276. index=[0])
  277. gh = pd.concat([gh, new_row], ignore_index=True)
  278. gh = gh.drop_duplicates(subset=['gh_id'])
  279. global GH_IDS
  280. GH_IDS = tuple(gh['gh_id'])
  281. return gh
  282. def check_data_partition(project, table, data_dt, data_hr=None):
  283. """检查数据是否准备好"""
  284. try:
  285. partition_spec = {'dt': data_dt}
  286. if data_hr:
  287. partition_spec['hour'] = data_hr
  288. part_exist, data_count = check_table_partition_exits_v2(
  289. project, table, partition_spec)
  290. except Exception as e:
  291. data_count = 0
  292. return data_count
  293. def get_last_strategy_result(project, rank_table, dt_version, key):
  294. strategy_df = get_odps_df_of_max_partition(
  295. project, rank_table, {'ctime': dt_version}
  296. ).to_pandas()
  297. sub_df = strategy_df.query(f'strategy_key == "{key}"')
  298. sub_df = sub_df[['gh_id', 'video_id', 'strategy_key', 'sort']].drop_duplicates()
  299. return sub_df
  300. def process_reply_stats(project, table, period, run_dt):
  301. # 获取多天即转统计数据用于聚合
  302. df = get_odps_df_of_recent_partitions(project, table, period, {'dt': run_dt})
  303. df = df.to_pandas()
  304. df['video_id'] = df['video_id'].astype('int64')
  305. df = df[['gh_id', 'video_id', 'send_count', 'first_visit_uv', 'day0_return']]
  306. # 账号内聚合
  307. df = df.groupby(['video_id', 'gh_id']).agg({
  308. 'send_count': 'sum',
  309. 'first_visit_uv': 'sum',
  310. 'day0_return': 'sum'
  311. }).reset_index()
  312. # 聚合所有数据作为default
  313. default_stats_df = df.groupby('video_id').agg({
  314. 'send_count': 'sum',
  315. 'first_visit_uv': 'sum',
  316. 'day0_return': 'sum'
  317. }).reset_index()
  318. default_stats_df['gh_id'] = 'default'
  319. merged_df = pd.concat([df, default_stats_df]).reset_index(drop=True)
  320. merged_df['score'] = merged_df['day0_return'] / (merged_df['send_count'] + 500)
  321. return merged_df
  322. def rank_for_layer1(run_dt, run_hour, project, table, gh):
  323. # TODO: 加审核&退场
  324. df = get_odps_df_of_max_partition(project, table, {'dt': run_dt})
  325. df = df.to_pandas()
  326. # 确保重跑时可获得一致结果
  327. dt_version = f'{run_dt}{run_hour}'
  328. np.random.seed(int(dt_version) + 1)
  329. # TODO: 修改权重计算策略
  330. df['score'] = df['ros']
  331. # 按照 category1 分类后进行加权随机抽样
  332. sampled_df = df.groupby('category1').apply(
  333. lambda x: x.sample(n=SEND_N, weights=x['score'], replace=False)).reset_index(drop=True)
  334. sampled_df['sort'] = sampled_df.groupby('category1')['score'].rank(method='first', ascending=False).astype(int)
  335. # 按得分排序
  336. sampled_df = sampled_df.sort_values(by=['category1', 'score'], ascending=[True, False]).reset_index(drop=True)
  337. sampled_df['strategy_key'] = EXPLORE1_GROUP_NAME
  338. sampled_df['dt_version'] = dt_version
  339. extend_df = sampled_df.merge(gh, on='category1')
  340. result_df = extend_df[['strategy_key', 'dt_version', 'gh_id', 'sort', 'video_id', 'score']]
  341. return result_df
  342. def rank_for_layer2(run_dt, run_hour, project, stats_table, rank_table):
  343. stats_df = process_reply_stats(project, stats_table, STATS_PERIOD_DAYS, run_dt)
  344. # 确保重跑时可获得一致结果
  345. dt_version = f'{run_dt}{run_hour}'
  346. np.random.seed(int(dt_version) + 1)
  347. # TODO: 计算账号间相关性
  348. ## 账号两两组合,取有RoVn数值视频的交集,单个账号内的RoVn(平滑后)组成向量
  349. ## 求向量相关系数或cosine相似度
  350. ## 单个视频的RoVn加权求和
  351. # 当前实现基础版本:只在账号内求二级探索排序分
  352. sampled_dfs = []
  353. # 处理default逻辑(default-explore2)
  354. default_stats_df = stats_df.query('gh_id == "default"')
  355. sampled_df = default_stats_df.sample(n=SEND_N, weights=default_stats_df['score'])
  356. sampled_df['sort'] = range(1, len(sampled_df) + 1)
  357. sampled_dfs.append(sampled_df)
  358. # 基础过滤for账号
  359. df = stats_df.query('day0_return > 100')
  360. # fallback to base if necessary
  361. base_strategy_df = get_last_strategy_result(
  362. project, rank_table, dt_version, BASE_GROUP_NAME)
  363. for gh_id in GH_IDS:
  364. if gh_id == 'default':
  365. continue
  366. sub_df = df.query(f'gh_id == "{gh_id}"')
  367. if len(sub_df) < SEND_N:
  368. LOGGER.warning(
  369. "gh_id[{}] rows[{}] not enough for layer2, fallback to base"
  370. .format(gh_id, len(sub_df)))
  371. sub_df = base_strategy_df.query(f'gh_id == "{gh_id}"')
  372. sub_df['score'] = sub_df['sort']
  373. sampled_df = sub_df.sample(n=SEND_N, weights=sub_df['score'])
  374. sampled_df['sort'] = range(1, len(sampled_df) + 1)
  375. sampled_dfs.append(sampled_df)
  376. extend_df = pd.concat(sampled_dfs)
  377. extend_df['strategy_key'] = EXPLORE2_GROUP_NAME
  378. extend_df['dt_version'] = dt_version
  379. result_df = extend_df[['strategy_key', 'dt_version', 'gh_id', 'sort', 'video_id', 'score']]
  380. return result_df
  381. def rank_for_base(run_dt, run_hour, project, stats_table, rank_table, stg_key):
  382. stats_df = process_reply_stats(project, stats_table, STATS_PERIOD_DAYS, run_dt)
  383. # TODO: support to set base manually
  384. dt_version = f'{run_dt}{run_hour}'
  385. # 获取当前base信息, 策略表dt_version(ctime partition)采用当前时间
  386. base_strategy_df = get_last_strategy_result(
  387. project, rank_table, dt_version, stg_key)
  388. default_stats_df = stats_df.query('gh_id == "default"')
  389. # 在账号内排序,决定该账号(包括default)的base利用内容
  390. # 排序过程中,确保当前base策略参与排序,因此先关联再过滤
  391. gh_ids_str = ','.join(f'"{x}"' for x in GH_IDS)
  392. stats_df = stats_df.query(f'gh_id in ({gh_ids_str})')
  393. stats_with_strategy_df = stats_df \
  394. .merge(
  395. base_strategy_df,
  396. on=['gh_id', 'video_id'],
  397. how='left') \
  398. .query('strategy_key.notna() or score > 0.1')
  399. # 合并default和分账号数据
  400. grouped_stats_df = pd.concat([default_stats_df, stats_with_strategy_df]).reset_index()
  401. def set_top_n(group, n=2):
  402. group_sorted = group.sort_values(by='score', ascending=False)
  403. top_n = group_sorted.head(n)
  404. top_n['sort'] = range(1, len(top_n) + 1)
  405. return top_n
  406. ranked_df = grouped_stats_df.groupby('gh_id').apply(set_top_n, SEND_N)
  407. ranked_df = ranked_df.reset_index(drop=True)
  408. ranked_df['strategy_key'] = stg_key
  409. ranked_df['dt_version'] = dt_version
  410. ranked_df = ranked_df[['strategy_key', 'dt_version', 'gh_id', 'sort', 'video_id', 'score']]
  411. return ranked_df
  412. def check_result_data(df):
  413. for gh_id in GH_IDS:
  414. for key in (EXPLORE1_GROUP_NAME, EXPLORE2_GROUP_NAME, BASE_GROUP_NAME):
  415. sub_df = df.query(f'gh_id == "{gh_id}" and strategy_key == "{key}"')
  416. if len(sub_df) != SEND_N:
  417. raise Exception(f"Result not enough for gh_id[{gh_id}], group[{key}]")
  418. def rank_for_base_designate(run_dt, run_hour, stg_key):
  419. dt_version = f'{run_dt}{run_hour}'
  420. ranked_df = pd.DataFrame() # 初始化一个空的 DataFrame
  421. for gh_id in GH_IDS:
  422. if gh_id in TARGET_GH_IDS:
  423. temp_df = pd.DataFrame({
  424. 'strategy_key': [stg_key],
  425. 'dt_version': [dt_version],
  426. 'gh_id': [gh_id],
  427. 'sort': [1],
  428. 'video_id': [13586800],
  429. 'score': [0.5]
  430. })
  431. else:
  432. temp_df = pd.DataFrame({
  433. 'strategy_key': [stg_key],
  434. 'dt_version': [dt_version],
  435. 'gh_id': [gh_id],
  436. 'sort': [1],
  437. 'video_id': [20463342],
  438. 'score': [0.5]
  439. })
  440. ranked_df = pd.concat([ranked_df, temp_df], ignore_index=True)
  441. return ranked_df
  442. def build_and_transfer_data(run_dt, run_hour, project, **kwargs):
  443. dt_version = f'{run_dt}{run_hour}'
  444. dry_run = kwargs.get('dry_run', False)
  445. gh_df = get_and_update_gh_ids(run_dt)
  446. layer1_rank = rank_for_layer1(run_dt, run_hour, ODS_PROJECT, EXPLORE_POOL_TABLE, ,gh_df)
  447. # layer2_rank = rank_for_layer2(run_dt, run_hour, ODS_PROJECT, GH_REPLY_STATS_TABLE, ODPS_3RD_RANK_RESULT_TABLE)
  448. # base_rank = rank_for_base(run_dt, run_hour, ODS_PROJECT, GH_REPLY_STATS_TABLE, ODPS_3RD_RANK_RESULT_TABLE,BASE_GROUP_NAME)
  449. layer2_rank = rank_for_base_designate(run_dt, run_hour, EXPLORE2_GROUP_NAME)
  450. base_rank = rank_for_base_designate(run_dt, run_hour, BASE_GROUP_NAME)
  451. final_rank_df = pd.concat([layer1_rank, layer2_rank, base_rank]).reset_index(drop=True)
  452. check_result_data(final_rank_df)
  453. odps_instance = get_odps_instance(project)
  454. odps_ranked_df = odps.DataFrame(final_rank_df)
  455. video_df = get_dataframe_from_odps('videoods', 'wx_video')
  456. video_df['cover_url'] = video_df['cover_img_path'] + CDN_IMG_OPERATOR
  457. video_df = video_df['id', 'title', 'cover_url']
  458. final_df = odps_ranked_df.join(video_df, on=('video_id', 'id'))
  459. final_df = final_df.to_pandas()
  460. final_df = final_df[['strategy_key', 'dt_version', 'gh_id', 'sort', 'video_id', 'title', 'cover_url', 'score']]
  461. # reverse sending order
  462. final_df['sort'] = SEND_N + 1 - final_df['sort']
  463. if dry_run:
  464. print(final_df[['strategy_key', 'gh_id', 'sort', 'video_id', 'score', 'title']])
  465. return
  466. # save to ODPS
  467. t = odps_instance.get_table(ODPS_3RD_RANK_RESULT_TABLE)
  468. part_spec_dict = {'dt': run_dt, 'hour': run_hour, 'ctime': dt_version}
  469. part_spec = ','.join(['{}={}'.format(k, part_spec_dict[k]) for k in part_spec_dict.keys()])
  470. with t.open_writer(partition=part_spec, create_partition=True, overwrite=True) as writer:
  471. writer.write(list(final_df.itertuples(index=False)))
  472. # sync to MySQL
  473. data_to_insert = [tuple(row) for row in final_df.itertuples(index=False)]
  474. data_columns = list(final_df.columns)
  475. mysql = MysqlHelper(CONFIG.MYSQL_CRAWLER_INFO)
  476. mysql.batch_insert(RDS_RANK_RESULT_TABLE, data_to_insert, data_columns)
  477. def main_loop():
  478. argparser = ArgumentParser()
  479. argparser.add_argument('-n', '--dry-run', action='store_true')
  480. args = argparser.parse_args()
  481. try:
  482. now_date = datetime.today()
  483. LOGGER.info(f"开始执行: {datetime.strftime(now_date, '%Y-%m-%d %H:%M')}")
  484. now_hour = now_date.strftime("%H")
  485. last_date = now_date - timedelta(1)
  486. last_dt = last_date.strftime("%Y%m%d")
  487. # 查看当前天级更新的数据是否已准备好
  488. # 当前上游统计表为天级更新,但字段设计为兼容小时级
  489. h_data_count = check_data_partition(ODS_PROJECT, GH_REPLY_STATS_TABLE, last_dt, '00')
  490. if h_data_count > 0:
  491. LOGGER.info('上游数据表查询数据条数={},开始计算'.format(h_data_count))
  492. run_dt = now_date.strftime("%Y%m%d")
  493. LOGGER.info(f'run_dt: {run_dt}, run_hour: {now_hour}')
  494. build_and_transfer_data(run_dt, now_hour, ODS_PROJECT,
  495. dry_run=args.dry_run)
  496. LOGGER.info('数据更新完成')
  497. else:
  498. LOGGER.info("上游数据未就绪,等待60s")
  499. Timer(60, main_loop).start()
  500. return
  501. except Exception as e:
  502. LOGGER.error(f"数据更新失败, exception: {e}, traceback: {traceback.format_exc()}")
  503. if CONFIG.ENV_TEXT == '开发环境':
  504. return
  505. send_msg_to_feishu(
  506. webhook=CONFIG.FEISHU_ROBOT['server_robot'].get('webhook'),
  507. key_word=CONFIG.FEISHU_ROBOT['server_robot'].get('key_word'),
  508. msg_text=f"rov-offline{CONFIG.ENV_TEXT} - 数据更新失败\n"
  509. f"exception: {e}\n"
  510. f"traceback: {traceback.format_exc()}"
  511. )
  512. if __name__ == '__main__':
  513. LOGGER.info("%s 开始执行" % os.path.basename(__file__))
  514. LOGGER.info(f"environment: {CONFIG.ENV_TEXT}")
  515. main_loop()