alg_growth_3rd_gh_reply_video_v1.py 23 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578
  1. # -*- coding: utf-8 -*-
  2. import pandas as pd
  3. import traceback
  4. import odps
  5. from odps import ODPS
  6. import json
  7. import time
  8. from pymysql.cursors import DictCursor
  9. from datetime import datetime, timedelta
  10. from db_helper import MysqlHelper
  11. from my_utils import check_table_partition_exits_v2, get_dataframe_from_odps, \
  12. get_odps_df_of_max_partition, get_odps_instance, get_odps_df_of_recent_partitions
  13. from my_utils import request_post, send_msg_to_feishu
  14. from my_config import set_config
  15. import numpy as np
  16. from log import Log
  17. import os
  18. from argparse import ArgumentParser
  19. from constants import AutoReplyAccountType
  20. from alg_growth_common import check_unsafe_video, filter_unsafe_video, filter_audit_failed_video
  21. CONFIG, _ = set_config()
  22. LOGGER = Log()
  23. BASE_GROUP_NAME = '3rd-party-base'
  24. EXPLORE1_GROUP_NAME = '3rd-party-explore1'
  25. EXPLORE2_GROUP_NAME = '3rd-party-explore2'
  26. # GH_IDS will be updated by get_and_update_gh_ids
  27. GH_IDS = ('default',)
  28. account_map = {}
  29. pd.set_option('display.max_rows', None)
  30. CDN_IMG_OPERATOR = "?x-oss-process=image/resize,m_fill,w_600,h_480,limit_0/format,jpg/watermark,image_eXNoL3BpYy93YXRlcm1hcmtlci9pY29uX3BsYXlfd2hpdGUucG5nP3gtb3NzLXByb2Nlc3M9aW1hZ2UvcmVzaXplLHdfMTQ0,g_center"
  31. ODS_PROJECT = "loghubods"
  32. EXPLORE_POOL_TABLE = 'alg_growth_video_return_stats_history'
  33. GH_REPLY_STATS_TABLE = 'alg_growth_3rd_gh_reply_video_stats'
  34. ODPS_RANK_RESULT_TABLE = 'alg_3rd_gh_autoreply_video_rank_data'
  35. GH_DETAIL = 'gh_detail'
  36. RDS_RANK_RESULT_TABLE = 'alg_gh_autoreply_video_rank_data'
  37. STATS_PERIOD_DAYS = 5
  38. DEFAULT_SEND_N = 1
  39. MAX_SEND_N = 3
  40. default_video = {
  41. '泛生活': [20463342, 14095344, 13737337],
  42. '泛历史': [13586800, 12794884, 12117356],
  43. }
  44. def get_and_update_gh_ids(run_dt):
  45. db = MysqlHelper(CONFIG.MYSQL_GROWTH_INFO)
  46. gh_type = AutoReplyAccountType.EXTERNAL_GZH.value
  47. sqlstr = f"""
  48. SELECT gh_id, gh_name, category1, category2, channel,
  49. video_ids, strategy_status,
  50. autoreply_send_minigram_num as send_n
  51. FROM {GH_DETAIL}
  52. WHERE is_delete = 0 AND `type` = {gh_type}
  53. """
  54. account_data = db.get_data(sqlstr, DictCursor)
  55. account_df = pd.DataFrame(account_data)
  56. # default单独处理
  57. if 'default' not in account_df['gh_id'].values:
  58. new_row = pd.DataFrame({'gh_id': ['default'], 'gh_name': ['默认'],
  59. 'type': [2], 'category1': ['泛生活'], 'send_n': 1},
  60. index=[0])
  61. account_df = pd.concat([account_df, new_row], ignore_index=True)
  62. account_df = account_df.drop_duplicates(subset=['gh_id'])
  63. global GH_IDS
  64. GH_IDS = tuple(account_df['gh_id'])
  65. global account_map
  66. account_map = { x['gh_id']: x for x in account_df.to_dict(orient='records') }
  67. for gh_id in account_map:
  68. account_info = account_map[gh_id]
  69. account_info['send_n'] = int(account_info.get('send_n', 1))
  70. return account_df
  71. def check_data_partition(project, table, data_dt, data_hr=None):
  72. """检查数据是否准备好"""
  73. try:
  74. partition_spec = {'dt': data_dt}
  75. if data_hr:
  76. partition_spec['hour'] = data_hr
  77. part_exist, data_count = check_table_partition_exits_v2(
  78. project, table, partition_spec)
  79. except Exception as e:
  80. data_count = 0
  81. return data_count
  82. def get_last_strategy_result(project, rank_table, dt_version, key):
  83. strategy_df = get_odps_df_of_max_partition(
  84. project, rank_table, {'ctime': dt_version}
  85. ).to_pandas()
  86. dt_version = strategy_df.iloc[0]['dt_version']
  87. sub_df = strategy_df.query(f'strategy_key == "{key}"')
  88. sub_df = sub_df[['gh_id', 'video_id', 'strategy_key', 'sort']].drop_duplicates()
  89. return sub_df, dt_version
  90. def process_reply_stats(project, table, period, run_dt):
  91. # 获取多天即转统计数据用于聚合
  92. df = get_odps_df_of_recent_partitions(project, table, period, {'dt': run_dt})
  93. df = df.to_pandas()
  94. df['video_id'] = df['video_id'].astype('int64')
  95. df = df[['gh_id', 'video_id', 'send_count', 'first_visit_uv', 'day0_return']]
  96. # 获取统计数据时统一去除不安全视频
  97. df = filter_unsafe_video(df)
  98. # 账号内聚合
  99. df = df.groupby(['video_id', 'gh_id']).agg({
  100. 'send_count': 'sum',
  101. 'first_visit_uv': 'sum',
  102. 'day0_return': 'sum'
  103. }).reset_index()
  104. # 聚合所有数据作为default
  105. default_stats_df = df.groupby('video_id').agg({
  106. 'send_count': 'sum',
  107. 'first_visit_uv': 'sum',
  108. 'day0_return': 'sum'
  109. }).reset_index()
  110. default_stats_df['gh_id'] = 'default'
  111. merged_df = pd.concat([df, default_stats_df]).reset_index(drop=True)
  112. merged_df['score'] = merged_df['day0_return'] / (merged_df['first_visit_uv'] + 100)
  113. return merged_df
  114. def rank_for_layer1(run_dt, run_hour, project, table, gh):
  115. # TODO: 加审核&退场
  116. df = get_odps_df_of_max_partition(project, table, {'dt': run_dt})
  117. df = df.to_pandas()
  118. df = filter_unsafe_video(df)
  119. # 确保重跑时可获得一致结果
  120. dt_version = f'{run_dt}{run_hour}'
  121. np.random.seed(int(dt_version) + 1)
  122. # TODO: 修改权重计算策略
  123. df['score'] = 1.0
  124. # 按照 category1 分类后进行加权随机抽样
  125. sampled_df = df.groupby('category1').apply(
  126. lambda x: x.sample(n=MAX_SEND_N, weights=x['score'], replace=False)).reset_index(drop=True)
  127. sampled_df['sort'] = sampled_df.groupby('category1')['score'].rank(method='first', ascending=False).astype(int)
  128. # 按得分排序
  129. sampled_df = sampled_df.sort_values(by=['category1', 'score'], ascending=[True, False]).reset_index(drop=True)
  130. sampled_df['strategy_key'] = EXPLORE1_GROUP_NAME
  131. sampled_df['dt_version'] = dt_version
  132. merged_dfs = []
  133. for gh_id in GH_IDS:
  134. sub_gh_df = gh.query(f'gh_id == "{gh_id}"')
  135. account_info = account_map[gh_id]
  136. send_n = account_info['send_n']
  137. sub_video_df = sampled_df.query(f'sort <= {send_n}').copy()
  138. merged_df = sub_video_df.merge(sub_gh_df, on='category1')
  139. merged_df['sort'] = send_n + 1 - merged_df['sort']
  140. merged_dfs.append(merged_df)
  141. extend_df = pd.concat(merged_dfs)
  142. result_df = extend_df[['strategy_key', 'dt_version', 'gh_id', 'sort', 'video_id', 'score']]
  143. return result_df
  144. def rank_for_layer2(run_dt, run_hour, project, stats_table, rank_table):
  145. stats_df = process_reply_stats(project, stats_table, STATS_PERIOD_DAYS, run_dt)
  146. # 确保重跑时可获得一致结果
  147. dt_version = f'{run_dt}{run_hour}'
  148. np.random.seed(int(dt_version) + 1)
  149. # TODO: 计算账号间相关性
  150. ## 账号两两组合,取有RoVn数值视频的交集,单个账号内的RoVn(平滑后)组成向量
  151. ## 求向量相关系数或cosine相似度
  152. ## 单个视频的RoVn加权求和
  153. # 当前实现基础版本:只在账号内求二级探索排序分
  154. sampled_dfs = []
  155. # 处理default逻辑(default-explore2)
  156. default_stats_df = stats_df.query('gh_id == "default"')
  157. sampled_df = default_stats_df.sample(n=DEFAULT_SEND_N, weights=default_stats_df['score'])
  158. sampled_df['sort'] = range(1, len(sampled_df) + 1)
  159. sampled_dfs.append(sampled_df)
  160. # 基础过滤for账号
  161. df = stats_df.query('send_count > 200 and score > 0')
  162. df = filter_audit_failed_video(df)
  163. # fallback to base if necessary
  164. base_strategy_df, _ = get_last_strategy_result(
  165. project, rank_table, dt_version, BASE_GROUP_NAME)
  166. base_strategy_df = filter_audit_failed_video(base_strategy_df)
  167. for gh_id in GH_IDS:
  168. if gh_id == 'default':
  169. continue
  170. account_info = account_map[gh_id]
  171. send_n = account_info['send_n']
  172. sub_df = df.query(f'gh_id == "{gh_id}"')
  173. if len(sub_df) < send_n:
  174. LOGGER.warning(
  175. "gh_id[{}] rows[{}] not enough for layer2, fallback to base"
  176. .format(gh_id, len(sub_df)))
  177. sub_df = base_strategy_df.query(f'gh_id == "{gh_id}"').copy()
  178. if len(sub_df) < send_n:
  179. LOGGER.warning(
  180. "gh_id[{}] rows[{}] still not enough for layer2, add backup"
  181. .format(gh_id, len(sub_df)))
  182. rows = []
  183. idx = len(sub_df)
  184. exist_video_ids = sub_df['video_id'].unique()
  185. for video_id in default_video[account_info['category1']]:
  186. if video_id in exist_video_ids:
  187. continue
  188. row = {
  189. 'gh_id': gh_id,
  190. 'sort': idx + 1,
  191. 'video_id': video_id,
  192. 'strategy_key': '' # this is not important
  193. }
  194. rows.append(row)
  195. appx_df = pd.DataFrame(rows)
  196. sub_df = pd.concat([sub_df, appx_df])
  197. sub_df['score'] = sub_df['sort']
  198. sampled_df = sub_df.sample(n=send_n, weights=sub_df['score'])
  199. sampled_df['sort'] = range(send_n, 0, -1)
  200. sampled_dfs.append(sampled_df)
  201. extend_df = pd.concat(sampled_dfs)
  202. extend_df['strategy_key'] = EXPLORE2_GROUP_NAME
  203. extend_df['dt_version'] = dt_version
  204. result_df = extend_df[['strategy_key', 'dt_version', 'gh_id', 'sort', 'video_id', 'score']]
  205. return result_df
  206. def rank_for_base(run_dt, run_hour, project, stats_table, rank_table, stg_key):
  207. stats_df = process_reply_stats(project, stats_table, STATS_PERIOD_DAYS, run_dt)
  208. # TODO: support to set base manually
  209. dt_version = f'{run_dt}{run_hour}'
  210. # 获取当前base信息, 策略表dt_version(ctime partition)采用当前时间
  211. base_strategy_df, _ = get_last_strategy_result(
  212. project, rank_table, dt_version, stg_key)
  213. default_stats_df = stats_df.query('gh_id == "default"')
  214. # 在账号内排序,决定该账号(包括default)的base利用内容
  215. # 排序过程中,确保当前base策略参与排序,因此先关联再过滤
  216. non_default_ids = list(filter(lambda x: x != 'default', GH_IDS))
  217. gh_ids_str = ','.join(f'"{x}"' for x in non_default_ids)
  218. stats_df = stats_df.query(f'gh_id in ({gh_ids_str})')
  219. stats_with_strategy_df = stats_df \
  220. .merge(
  221. base_strategy_df,
  222. on=['gh_id', 'video_id'],
  223. how='outer') \
  224. .query('strategy_key.notna() or (send_count > 500 and score > 0.05)') \
  225. .fillna({'score': 0.0})
  226. # 合并default和分账号数据
  227. grouped_stats_df = pd.concat([default_stats_df, stats_with_strategy_df]).reset_index()
  228. grouped_stats_df = filter_audit_failed_video(grouped_stats_df)
  229. def set_top_n(group, n=2):
  230. group_sorted = group.sort_values(by='score', ascending=False)
  231. top_n = group_sorted.head(n)
  232. top_n['sort'] = range(1, len(top_n) + 1)
  233. return top_n
  234. ranked_df = grouped_stats_df.groupby('gh_id').apply(set_top_n, MAX_SEND_N)
  235. sampled_dfs = []
  236. for gh_id in GH_IDS:
  237. account_info = account_map[gh_id]
  238. send_n = account_info['send_n']
  239. sub_df = ranked_df.query(f'gh_id == "{gh_id}" and sort <= {send_n}').copy()
  240. if len(sub_df) < send_n:
  241. LOGGER.warning(
  242. "gh_id[{}] rows[{}] still not enough for base, add backup"
  243. .format(gh_id, len(sub_df)))
  244. rows = []
  245. idx = len(sub_df)
  246. exist_video_ids = sub_df['video_id'].unique()
  247. for video_id in default_video[account_info['category1']]:
  248. if video_id in exist_video_ids:
  249. continue
  250. row = {
  251. 'gh_id': gh_id,
  252. 'sort': idx + 1,
  253. 'video_id': video_id,
  254. 'score': 0.0,
  255. 'strategy_key': '' # this is not important
  256. }
  257. rows.append(row)
  258. if len(sub_df) + len(rows) >= send_n:
  259. break
  260. appx_df = pd.DataFrame(rows)
  261. sub_df = pd.concat([sub_df, appx_df])
  262. sub_df['sort'] = send_n + 1 - sub_df['sort']
  263. sampled_dfs.append(sub_df)
  264. ranked_df = pd.concat(sampled_dfs)
  265. ranked_df = ranked_df.reset_index(drop=True)
  266. ranked_df['strategy_key'] = stg_key
  267. ranked_df['dt_version'] = dt_version
  268. ranked_df = ranked_df[['strategy_key', 'dt_version', 'gh_id', 'sort', 'video_id', 'score']]
  269. return ranked_df
  270. def check_result_data(df):
  271. check_unsafe_video(df)
  272. for gh_id in GH_IDS:
  273. account_info = account_map[gh_id]
  274. for key in (EXPLORE1_GROUP_NAME, EXPLORE2_GROUP_NAME, BASE_GROUP_NAME):
  275. sub_df = df.query(f'gh_id == "{gh_id}" and strategy_key == "{key}"')
  276. n_records = len(sub_df)
  277. if n_records != account_info['send_n']:
  278. raise Exception(f"Unexpected record count: {gh_id},{key},{n_records}")
  279. def postprocess_override_by_config(df, gh_df, dt_version):
  280. override_config = gh_df.query('strategy_status == 0').to_dict(orient='records')
  281. override_data = {
  282. 'strategy_key': [],
  283. 'gh_id': [],
  284. 'sort': [],
  285. 'video_id': []
  286. }
  287. for row in override_config:
  288. gh_id = row['gh_id']
  289. account_info = account_map[gh_id]
  290. try:
  291. video_ids = json.loads(row['video_ids'])
  292. if not isinstance(video_ids, list):
  293. raise Exception("video_ids is not list")
  294. video_ids = video_ids[:account_info['send_n']]
  295. except Exception as e:
  296. LOGGER.error(f"json parse error: {e}. content: {row['video_ids']}")
  297. continue
  298. for idx, video_id in enumerate(video_ids):
  299. for key in (BASE_GROUP_NAME, EXPLORE1_GROUP_NAME, EXPLORE2_GROUP_NAME):
  300. df = df.drop(df.query(
  301. f'gh_id == "{gh_id}" and strategy_key == "{key}" and sort == {idx + 1}'
  302. ).index)
  303. override_data['strategy_key'].append(key)
  304. override_data['gh_id'].append(gh_id)
  305. override_data['sort'].append(idx + 1)
  306. override_data['video_id'].append(video_id)
  307. n_records = len(override_data['strategy_key'])
  308. override_data['dt_version'] = [dt_version] * n_records
  309. override_data['score'] = [0.0] * n_records
  310. df_to_append = pd.DataFrame(override_data)
  311. df = pd.concat([df, df_to_append], ignore_index=True)
  312. # 强制更换不安全视频
  313. idx = df[df['video_id'] == 14403867].index
  314. df.loc[idx, 'video_id'] = 20463342
  315. return df
  316. def build_and_transfer_base_mode(gh_df, run_dt, run_hour, dt_version, dry_run):
  317. layer1_rank = rank_for_layer1(run_dt, run_hour, ODS_PROJECT, EXPLORE_POOL_TABLE, gh_df)
  318. layer2_rank = rank_for_layer2(run_dt, run_hour, ODS_PROJECT, GH_REPLY_STATS_TABLE, ODPS_RANK_RESULT_TABLE)
  319. base_rank = rank_for_base(run_dt, run_hour, ODS_PROJECT, GH_REPLY_STATS_TABLE, ODPS_RANK_RESULT_TABLE,BASE_GROUP_NAME)
  320. final_rank_df = pd.concat([layer1_rank, layer2_rank, base_rank]).reset_index(drop=True)
  321. final_rank_df = postprocess_override_by_config(final_rank_df, gh_df, dt_version)
  322. check_result_data(final_rank_df)
  323. final_df = join_video_info(final_rank_df)
  324. if dry_run:
  325. print("==== ALL DATA ====")
  326. print(final_df[['strategy_key', 'gh_id', 'sort', 'video_id', 'score', 'title']]
  327. .sort_values(by=['strategy_key', 'gh_id', 'sort']))
  328. last_odps_df = get_odps_df_of_max_partition(
  329. ODS_PROJECT, ODPS_RANK_RESULT_TABLE, {'ctime': dt_version}
  330. ).to_pandas()
  331. merged_df = last_odps_df.merge(
  332. final_df, on=['strategy_key', 'gh_id', 'sort'], how='outer')
  333. delta_df = merged_df.query('title_x != title_y')
  334. delta_df = delta_df[['strategy_key', 'gh_id', 'sort',
  335. 'title_x', 'score_x', 'title_y', 'score_y']]
  336. delta_df.to_csv('tmp_delta_data.csv')
  337. return
  338. # save to ODPS
  339. odps_instance = get_odps_instance(ODS_PROJECT)
  340. t = odps_instance.get_table(ODPS_RANK_RESULT_TABLE)
  341. part_spec_dict = {'dt': run_dt, 'hour': run_hour, 'ctime': dt_version}
  342. part_spec = ','.join(['{}={}'.format(k, part_spec_dict[k]) for k in part_spec_dict.keys()])
  343. with t.open_writer(partition=part_spec, create_partition=True, overwrite=True) as writer:
  344. writer.write(list(final_df.itertuples(index=False)))
  345. # sync to MySQL
  346. data_to_insert = [tuple(row) for row in final_df.itertuples(index=False)]
  347. data_columns = list(final_df.columns)
  348. max_time_to_delete = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
  349. mysql = MysqlHelper(CONFIG.MYSQL_GROWTH_INFO)
  350. mysql.batch_insert(RDS_RANK_RESULT_TABLE, data_to_insert, data_columns)
  351. # remove old data of same version
  352. for key in final_df['strategy_key'].unique():
  353. sql = f"""
  354. update {RDS_RANK_RESULT_TABLE}
  355. set is_delete = 1
  356. where
  357. dt_version = '{dt_version}'
  358. and strategy_key = '{key}'
  359. and create_time < '{max_time_to_delete}'
  360. and is_delete = 0
  361. """
  362. rows = mysql.execute(sql)
  363. def build_and_transfer_delta_mode(account_df, dt_version, dry_run):
  364. # 获取最新策略信息, 策略表dt_version(ctime partition)采用当前时间
  365. last_strategy, last_dt_version = get_last_strategy_result(
  366. ODS_PROJECT, ODPS_RANK_RESULT_TABLE, dt_version, BASE_GROUP_NAME)
  367. all_accounts = account_df['gh_id'].unique()
  368. accounts_in_strategy = last_strategy['gh_id'].unique()
  369. delta_accounts = [x for x in set(all_accounts) - set(accounts_in_strategy)]
  370. if len(delta_accounts) > 0:
  371. LOGGER.info('Found {} new accounts: {}'.format(
  372. len(delta_accounts), ','.join(delta_accounts)))
  373. else:
  374. LOGGER.info('Found 0 new account, do nothing.')
  375. return
  376. # 新增账号,不存在历史,可直接忽略strategy_status字段
  377. # TODO: set default by history stats
  378. groups = (BASE_GROUP_NAME, EXPLORE1_GROUP_NAME, EXPLORE2_GROUP_NAME)
  379. rows = []
  380. for gh_id in delta_accounts:
  381. account_info = account_map[gh_id]
  382. configured_videos = account_info['video_ids']
  383. video_ids = default_video[account_info['category1']]
  384. if configured_videos:
  385. LOGGER.info(f'{gh_id} has configured video IDs: {configured_videos}')
  386. try:
  387. video_ids = [int(x) for x in configured_videos.split(',')]
  388. except Exception as e:
  389. print('invalid configured video_ids, use default instead')
  390. for group_key in groups:
  391. for idx in range(account_info['send_n']):
  392. row = {
  393. 'strategy_key': group_key,
  394. 'gh_id': gh_id,
  395. 'sort': idx + 1,
  396. 'video_id': video_ids[idx],
  397. 'dt_version': last_dt_version,
  398. 'score': 0.0
  399. }
  400. rows.append(row)
  401. df = pd.DataFrame(rows)
  402. final_df = join_video_info(df)
  403. if dry_run:
  404. print(final_df)
  405. return
  406. # 增量记录更新至MySQL
  407. data_to_insert = [tuple(row) for row in final_df.itertuples(index=False)]
  408. data_columns = list(final_df.columns)
  409. mysql = MysqlHelper(CONFIG.MYSQL_GROWTH_INFO)
  410. mysql.batch_insert(RDS_RANK_RESULT_TABLE, data_to_insert, data_columns)
  411. # 全量记录写回ODPS
  412. last_odps_df = get_odps_df_of_max_partition(
  413. ODS_PROJECT, ODPS_RANK_RESULT_TABLE, {'ctime': dt_version}
  414. ).to_pandas()
  415. updated_odps_df = pd.concat([final_df, last_odps_df], ignore_index=True)
  416. odps_instance = get_odps_instance(ODS_PROJECT)
  417. t = odps_instance.get_table(ODPS_RANK_RESULT_TABLE)
  418. target_dt = last_dt_version[0:8]
  419. target_hour = last_dt_version[8:10]
  420. part_spec_dict = {'dt': target_dt, 'hour': target_hour, 'ctime': last_dt_version}
  421. part_spec = ','.join(['{}={}'.format(k, part_spec_dict[k]) for k in part_spec_dict.keys()])
  422. with t.open_writer(partition=part_spec, create_partition=True, overwrite=True) as writer:
  423. writer.write(list(updated_odps_df.itertuples(index=False)))
  424. def join_video_info(df):
  425. db = MysqlHelper(CONFIG.MYSQL_INFO)
  426. video_ids = df['video_id'].unique().tolist()
  427. video_ids_str = ','.join([str(x) for x in video_ids])
  428. sql = f"""
  429. SELECT id as video_id, title, cover_img_path FROM wx_video
  430. WHERE id in ({video_ids_str})
  431. """
  432. rows = db.get_data(sql, DictCursor)
  433. video_df = pd.DataFrame(rows)
  434. video_df['cover_url'] = video_df['cover_img_path'] + CDN_IMG_OPERATOR
  435. final_df = df.merge(video_df, on='video_id')
  436. # odps_instance = get_odps_instance(ODS_PROJECT)
  437. # odps_df = odps.DataFrame(df)
  438. # video_df = get_dataframe_from_odps('videoods', 'wx_video')
  439. # video_df['cover_url'] = video_df['cover_img_path'] + CDN_IMG_OPERATOR
  440. # video_df = video_df['id', 'title', 'cover_url']
  441. # final_df = odps_df.join(video_df, on=('video_id', 'id'))
  442. # final_df = final_df.to_pandas()
  443. final_df = final_df[['strategy_key', 'dt_version', 'gh_id', 'sort', 'video_id', 'title', 'cover_url', 'score']]
  444. return final_df
  445. def build_and_transfer_data(run_dt, run_hour, **kwargs):
  446. dt_version = f'{run_dt}{run_hour}'
  447. dry_run = kwargs.get('dry_run', False)
  448. mode = kwargs.get('mode')
  449. gh_df = get_and_update_gh_ids(run_dt)
  450. if mode == 'delta':
  451. return build_and_transfer_delta_mode(gh_df, dt_version, dry_run)
  452. else:
  453. return build_and_transfer_base_mode(gh_df, run_dt, run_hour, dt_version, dry_run)
  454. def main():
  455. LOGGER.info("%s 开始执行" % os.path.basename(__file__))
  456. LOGGER.info(f"environment: {CONFIG.ENV_TEXT}")
  457. argparser = ArgumentParser()
  458. argparser.add_argument('-n', '--dry-run', action='store_true')
  459. argparser.add_argument('--run-at',help='assume to run at date and hour, yyyyMMddHH')
  460. argparser.add_argument('--mode', default='base', choices=['base', 'delta'], help='run mode')
  461. args = argparser.parse_args()
  462. run_date = datetime.today()
  463. if args.run_at:
  464. run_date = datetime.strptime(args.run_at, "%Y%m%d%H")
  465. LOGGER.info(f"Assume to run at {run_date.strftime('%Y-%m-%d %H:00')}")
  466. try:
  467. now_date = datetime.today()
  468. LOGGER.info(f"开始执行: {datetime.strftime(now_date, '%Y-%m-%d %H:%M')}")
  469. last_date = run_date - timedelta(1)
  470. last_dt = last_date.strftime("%Y%m%d")
  471. # 查看当前天级更新的数据是否已准备好
  472. # 当前上游统计表为天级更新,但字段设计为兼容小时级
  473. while True:
  474. h_data_count = check_data_partition(ODS_PROJECT, GH_REPLY_STATS_TABLE, last_dt, '00')
  475. if h_data_count > 0:
  476. LOGGER.info('上游数据表查询数据条数={},开始计算'.format(h_data_count))
  477. run_dt = run_date.strftime("%Y%m%d")
  478. run_hour = run_date.strftime("%H")
  479. LOGGER.info(f'run_dt: {run_dt}, run_hour: {run_hour}')
  480. build_and_transfer_data(run_dt, run_hour, dry_run=args.dry_run, mode=args.mode)
  481. LOGGER.info('数据更新完成')
  482. return
  483. else:
  484. LOGGER.info("上游数据未就绪,等待60s")
  485. time.sleep(60)
  486. except Exception as e:
  487. LOGGER.error(f"数据更新失败, exception: {e}, traceback: {traceback.format_exc()}")
  488. if CONFIG.ENV_TEXT == '开发环境' or args.dry_run:
  489. return
  490. send_msg_to_feishu(
  491. webhook=CONFIG.FEISHU_ROBOT['growth_task_robot'].get('webhook'),
  492. key_word=CONFIG.FEISHU_ROBOT['growth_task_robot'].get('key_word'),
  493. msg_text=f"rov-offline{CONFIG.ENV_TEXT} - 数据更新失败\n"
  494. f"exception: {e}\n"
  495. f"traceback: {traceback.format_exc()}"
  496. )
  497. if __name__ == '__main__':
  498. main()