alg_growth_3rd_gh_reply_video_v1.py 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373
  1. # -*- coding: utf-8 -*-
  2. import pandas as pd
  3. import traceback
  4. import odps
  5. from odps import ODPS
  6. import json
  7. import time
  8. from datetime import datetime, timedelta
  9. from db_helper import MysqlHelper
  10. from my_utils import check_table_partition_exits_v2, get_dataframe_from_odps, \
  11. get_odps_df_of_max_partition, get_odps_instance, get_odps_df_of_recent_partitions
  12. from my_utils import request_post, send_msg_to_feishu
  13. from my_config import set_config
  14. import numpy as np
  15. from log import Log
  16. import os
  17. from argparse import ArgumentParser
  18. from constants import AutoReplyAccountType
  19. CONFIG, _ = set_config()
  20. LOGGER = Log()
  21. BASE_GROUP_NAME = '3rd-party-base'
  22. EXPLORE1_GROUP_NAME = '3rd-party-explore1'
  23. EXPLORE2_GROUP_NAME = '3rd-party-explore2'
  24. # GH_IDS will be updated by get_and_update_gh_ids
  25. GH_IDS = ('default',)
  26. pd.set_option('display.max_rows', None)
  27. CDN_IMG_OPERATOR = "?x-oss-process=image/resize,m_fill,w_600,h_480,limit_0/format,jpg/watermark,image_eXNoL3BpYy93YXRlcm1hcmtlci9pY29uX3BsYXlfd2hpdGUucG5nP3gtb3NzLXByb2Nlc3M9aW1hZ2UvcmVzaXplLHdfMTQ0,g_center"
  28. ODS_PROJECT = "loghubods"
  29. EXPLORE_POOL_TABLE = 'alg_growth_video_return_stats_history'
  30. GH_REPLY_STATS_TABLE = 'alg_growth_3rd_gh_reply_video_stats'
  31. ODPS_RANK_RESULT_TABLE = 'alg_3rd_gh_autoreply_video_rank_data'
  32. GH_DETAIL = 'gh_detail'
  33. RDS_RANK_RESULT_TABLE = 'alg_gh_autoreply_video_rank_data'
  34. STATS_PERIOD_DAYS = 5
  35. SEND_N = 1
  36. def get_and_update_gh_ids(run_dt):
  37. gh = get_odps_df_of_max_partition(ODS_PROJECT, GH_DETAIL, {'dt': run_dt})
  38. gh = gh.to_pandas()
  39. gh_type = AutoReplyAccountType.EXTERNAL_GZH.value
  40. gh = gh.query(f'type == {gh_type} and is_delete == 0')
  41. # default单独处理
  42. if 'default' not in gh['gh_id'].values:
  43. new_row = pd.DataFrame({'gh_id': ['default'], 'gh_name': ['默认'], 'type': [2], 'category1': ['泛生活']},
  44. index=[0])
  45. gh = pd.concat([gh, new_row], ignore_index=True)
  46. gh = gh.drop_duplicates(subset=['gh_id'])
  47. global GH_IDS
  48. GH_IDS = tuple(gh['gh_id'])
  49. return gh
  50. def check_data_partition(project, table, data_dt, data_hr=None):
  51. """检查数据是否准备好"""
  52. try:
  53. partition_spec = {'dt': data_dt}
  54. if data_hr:
  55. partition_spec['hour'] = data_hr
  56. part_exist, data_count = check_table_partition_exits_v2(
  57. project, table, partition_spec)
  58. except Exception as e:
  59. data_count = 0
  60. return data_count
  61. def get_last_strategy_result(project, rank_table, dt_version, key):
  62. strategy_df = get_odps_df_of_max_partition(
  63. project, rank_table, {'ctime': dt_version}
  64. ).to_pandas()
  65. sub_df = strategy_df.query(f'strategy_key == "{key}"')
  66. sub_df = sub_df[['gh_id', 'video_id', 'strategy_key', 'sort']].drop_duplicates()
  67. return sub_df
  68. def process_reply_stats(project, table, period, run_dt):
  69. # 获取多天即转统计数据用于聚合
  70. df = get_odps_df_of_recent_partitions(project, table, period, {'dt': run_dt})
  71. df = df.to_pandas()
  72. df['video_id'] = df['video_id'].astype('int64')
  73. df = df[['gh_id', 'video_id', 'send_count', 'first_visit_uv', 'day0_return']]
  74. # 账号内聚合
  75. df = df.groupby(['video_id', 'gh_id']).agg({
  76. 'send_count': 'sum',
  77. 'first_visit_uv': 'sum',
  78. 'day0_return': 'sum'
  79. }).reset_index()
  80. # 聚合所有数据作为default
  81. default_stats_df = df.groupby('video_id').agg({
  82. 'send_count': 'sum',
  83. 'first_visit_uv': 'sum',
  84. 'day0_return': 'sum'
  85. }).reset_index()
  86. default_stats_df['gh_id'] = 'default'
  87. merged_df = pd.concat([df, default_stats_df]).reset_index(drop=True)
  88. merged_df['score'] = merged_df['day0_return'] / (merged_df['first_visit_uv'] + 100)
  89. return merged_df
  90. def rank_for_layer1(run_dt, run_hour, project, table, gh):
  91. # TODO: 加审核&退场
  92. df = get_odps_df_of_max_partition(project, table, {'dt': run_dt})
  93. df = df.to_pandas()
  94. # 确保重跑时可获得一致结果
  95. dt_version = f'{run_dt}{run_hour}'
  96. np.random.seed(int(dt_version) + 1)
  97. # TODO: 修改权重计算策略
  98. df['score'] = df['ros']
  99. # 按照 category1 分类后进行加权随机抽样
  100. sampled_df = df.groupby('category1').apply(
  101. lambda x: x.sample(n=SEND_N, weights=x['score'], replace=False)).reset_index(drop=True)
  102. sampled_df['sort'] = sampled_df.groupby('category1')['score'].rank(method='first', ascending=False).astype(int)
  103. # 按得分排序
  104. sampled_df = sampled_df.sort_values(by=['category1', 'score'], ascending=[True, False]).reset_index(drop=True)
  105. sampled_df['strategy_key'] = EXPLORE1_GROUP_NAME
  106. sampled_df['dt_version'] = dt_version
  107. extend_df = sampled_df.merge(gh, on='category1')
  108. result_df = extend_df[['strategy_key', 'dt_version', 'gh_id', 'sort', 'video_id', 'score']]
  109. return result_df
  110. def rank_for_layer2(run_dt, run_hour, project, stats_table, rank_table):
  111. stats_df = process_reply_stats(project, stats_table, STATS_PERIOD_DAYS, run_dt)
  112. # 确保重跑时可获得一致结果
  113. dt_version = f'{run_dt}{run_hour}'
  114. np.random.seed(int(dt_version) + 1)
  115. # TODO: 计算账号间相关性
  116. ## 账号两两组合,取有RoVn数值视频的交集,单个账号内的RoVn(平滑后)组成向量
  117. ## 求向量相关系数或cosine相似度
  118. ## 单个视频的RoVn加权求和
  119. # 当前实现基础版本:只在账号内求二级探索排序分
  120. sampled_dfs = []
  121. # 处理default逻辑(default-explore2)
  122. default_stats_df = stats_df.query('gh_id == "default"')
  123. sampled_df = default_stats_df.sample(n=SEND_N, weights=default_stats_df['score'])
  124. sampled_df['sort'] = range(1, len(sampled_df) + 1)
  125. sampled_dfs.append(sampled_df)
  126. # 基础过滤for账号
  127. df = stats_df.query('send_count > 200 and score > 0')
  128. # fallback to base if necessary
  129. base_strategy_df = get_last_strategy_result(
  130. project, rank_table, dt_version, BASE_GROUP_NAME)
  131. for gh_id in GH_IDS:
  132. if gh_id == 'default':
  133. continue
  134. sub_df = df.query(f'gh_id == "{gh_id}"')
  135. if len(sub_df) < SEND_N:
  136. LOGGER.warning(
  137. "gh_id[{}] rows[{}] not enough for layer2, fallback to base"
  138. .format(gh_id, len(sub_df)))
  139. sub_df = base_strategy_df.query(f'gh_id == "{gh_id}"')
  140. sub_df['score'] = sub_df['sort']
  141. sampled_df = sub_df.sample(n=SEND_N, weights=sub_df['score'])
  142. sampled_df['sort'] = range(1, len(sampled_df) + 1)
  143. sampled_dfs.append(sampled_df)
  144. extend_df = pd.concat(sampled_dfs)
  145. extend_df['strategy_key'] = EXPLORE2_GROUP_NAME
  146. extend_df['dt_version'] = dt_version
  147. result_df = extend_df[['strategy_key', 'dt_version', 'gh_id', 'sort', 'video_id', 'score']]
  148. return result_df
  149. def rank_for_base(run_dt, run_hour, project, stats_table, rank_table, stg_key):
  150. stats_df = process_reply_stats(project, stats_table, STATS_PERIOD_DAYS, run_dt)
  151. # TODO: support to set base manually
  152. dt_version = f'{run_dt}{run_hour}'
  153. # 获取当前base信息, 策略表dt_version(ctime partition)采用当前时间
  154. base_strategy_df = get_last_strategy_result(
  155. project, rank_table, dt_version, stg_key)
  156. default_stats_df = stats_df.query('gh_id == "default"')
  157. # 在账号内排序,决定该账号(包括default)的base利用内容
  158. # 排序过程中,确保当前base策略参与排序,因此先关联再过滤
  159. non_default_ids = list(filter(lambda x: x != 'default', GH_IDS))
  160. gh_ids_str = ','.join(f'"{x}"' for x in non_default_ids)
  161. stats_df = stats_df.query(f'gh_id in ({gh_ids_str})')
  162. stats_with_strategy_df = stats_df \
  163. .merge(
  164. base_strategy_df,
  165. on=['gh_id', 'video_id'],
  166. how='outer') \
  167. .query('strategy_key.notna() or (send_count > 500 and score > 0.05)') \
  168. .fillna({'score': 0.0})
  169. # 合并default和分账号数据
  170. grouped_stats_df = pd.concat([default_stats_df, stats_with_strategy_df]).reset_index()
  171. def set_top_n(group, n=2):
  172. group_sorted = group.sort_values(by='score', ascending=False)
  173. top_n = group_sorted.head(n)
  174. top_n['sort'] = range(1, len(top_n) + 1)
  175. return top_n
  176. ranked_df = grouped_stats_df.groupby('gh_id').apply(set_top_n, SEND_N)
  177. ranked_df = ranked_df.reset_index(drop=True)
  178. ranked_df['strategy_key'] = stg_key
  179. ranked_df['dt_version'] = dt_version
  180. ranked_df = ranked_df[['strategy_key', 'dt_version', 'gh_id', 'sort', 'video_id', 'score']]
  181. return ranked_df
  182. def check_result_data(df):
  183. for gh_id in GH_IDS:
  184. for key in (EXPLORE1_GROUP_NAME, EXPLORE2_GROUP_NAME, BASE_GROUP_NAME):
  185. sub_df = df.query(f'gh_id == "{gh_id}" and strategy_key == "{key}"')
  186. n_records = len(sub_df)
  187. if n_records != SEND_N:
  188. raise Exception(f"Unexpected record count: {gh_id},{key},{n_records}")
  189. def postprocess_override_by_config(df, dt_version):
  190. config = json.load(open("configs/3rd_gh_reply_video.json"))
  191. override_data = {
  192. 'strategy_key': [],
  193. 'gh_id': [],
  194. 'sort': [],
  195. 'video_id': []
  196. }
  197. for gh_id in config:
  198. gh_config = config[gh_id]
  199. for key in gh_config:
  200. for video_config in gh_config[key]:
  201. # remove current
  202. position = video_config['position']
  203. video_id = video_config['video_id']
  204. df = df.drop(df.query(f'gh_id == "{gh_id}" and strategy_key == "{key}" and sort == {position}').index)
  205. override_data['strategy_key'].append(key)
  206. override_data['gh_id'].append(gh_id)
  207. override_data['sort'].append(position)
  208. override_data['video_id'].append(video_id)
  209. n_records = len(override_data['strategy_key'])
  210. override_data['dt_version'] = [dt_version] * n_records
  211. override_data['score'] = [0.0] * n_records
  212. df_to_append = pd.DataFrame(override_data)
  213. df = pd.concat([df, df_to_append], ignore_index=True)
  214. return df
  215. def build_and_transfer_data(run_dt, run_hour, project, **kwargs):
  216. dt_version = f'{run_dt}{run_hour}'
  217. dry_run = kwargs.get('dry_run', False)
  218. gh_df = get_and_update_gh_ids(run_dt)
  219. layer1_rank = rank_for_layer1(run_dt, run_hour, ODS_PROJECT, EXPLORE_POOL_TABLE, gh_df)
  220. layer2_rank = rank_for_layer2(run_dt, run_hour, ODS_PROJECT, GH_REPLY_STATS_TABLE, ODPS_RANK_RESULT_TABLE)
  221. base_rank = rank_for_base(run_dt, run_hour, ODS_PROJECT, GH_REPLY_STATS_TABLE, ODPS_RANK_RESULT_TABLE,BASE_GROUP_NAME)
  222. final_rank_df = pd.concat([layer1_rank, layer2_rank, base_rank]).reset_index(drop=True)
  223. final_rank_df = postprocess_override_by_config(final_rank_df, dt_version)
  224. check_result_data(final_rank_df)
  225. odps_instance = get_odps_instance(project)
  226. odps_ranked_df = odps.DataFrame(final_rank_df)
  227. video_df = get_dataframe_from_odps('videoods', 'wx_video')
  228. video_df['cover_url'] = video_df['cover_img_path'] + CDN_IMG_OPERATOR
  229. video_df = video_df['id', 'title', 'cover_url']
  230. final_df = odps_ranked_df.join(video_df, on=('video_id', 'id'))
  231. final_df = final_df.to_pandas()
  232. final_df = final_df[['strategy_key', 'dt_version', 'gh_id', 'sort', 'video_id', 'title', 'cover_url', 'score']]
  233. # reverse sending order
  234. final_df['sort'] = SEND_N + 1 - final_df['sort']
  235. if dry_run:
  236. print(final_df[['strategy_key', 'gh_id', 'sort', 'video_id', 'score', 'title']]
  237. .sort_values(by=['strategy_key', 'gh_id', 'sort']))
  238. return
  239. # save to ODPS
  240. t = odps_instance.get_table(ODPS_RANK_RESULT_TABLE)
  241. part_spec_dict = {'dt': run_dt, 'hour': run_hour, 'ctime': dt_version}
  242. part_spec = ','.join(['{}={}'.format(k, part_spec_dict[k]) for k in part_spec_dict.keys()])
  243. with t.open_writer(partition=part_spec, create_partition=True, overwrite=True) as writer:
  244. writer.write(list(final_df.itertuples(index=False)))
  245. # sync to MySQL
  246. data_to_insert = [tuple(row) for row in final_df.itertuples(index=False)]
  247. data_columns = list(final_df.columns)
  248. max_time_to_delete = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
  249. mysql = MysqlHelper(CONFIG.MYSQL_GROWTH_INFO)
  250. mysql.batch_insert(RDS_RANK_RESULT_TABLE, data_to_insert, data_columns)
  251. # remove old data of same version
  252. for key in final_df['strategy_key'].unique():
  253. sql = f"""
  254. update {RDS_RANK_RESULT_TABLE}
  255. set is_delete = 1
  256. where
  257. dt_version = '{dt_version}'
  258. and strategy_key = '{key}'
  259. and create_time < '{max_time_to_delete}'
  260. and is_delete = 0
  261. """
  262. rows = mysql.execute(sql)
  263. def main():
  264. argparser = ArgumentParser()
  265. argparser.add_argument('-n', '--dry-run', action='store_true')
  266. argparser.add_argument('--run-at',help='assume to run at date and hour, yyyyMMddHH')
  267. args = argparser.parse_args()
  268. run_date = datetime.today()
  269. if args.run_at:
  270. run_date = datetime.strptime(args.run_at, "%Y%m%d%H")
  271. LOGGER.info(f"Assume to run at {run_date.strftime('%Y-%m-%d %H:00')}")
  272. try:
  273. now_date = datetime.today()
  274. LOGGER.info(f"开始执行: {datetime.strftime(now_date, '%Y-%m-%d %H:%M')}")
  275. last_date = run_date - timedelta(1)
  276. last_dt = last_date.strftime("%Y%m%d")
  277. # 查看当前天级更新的数据是否已准备好
  278. # 当前上游统计表为天级更新,但字段设计为兼容小时级
  279. while True:
  280. h_data_count = check_data_partition(ODS_PROJECT, GH_REPLY_STATS_TABLE, last_dt, '00')
  281. if h_data_count > 0:
  282. LOGGER.info('上游数据表查询数据条数={},开始计算'.format(h_data_count))
  283. run_dt = run_date.strftime("%Y%m%d")
  284. run_hour = run_date.strftime("%H")
  285. LOGGER.info(f'run_dt: {run_dt}, run_hour: {run_hour}')
  286. build_and_transfer_data(run_dt, run_hour, ODS_PROJECT,
  287. dry_run=args.dry_run)
  288. LOGGER.info('数据更新完成')
  289. return
  290. else:
  291. LOGGER.info("上游数据未就绪,等待60s")
  292. time.sleep(60)
  293. except Exception as e:
  294. LOGGER.error(f"数据更新失败, exception: {e}, traceback: {traceback.format_exc()}")
  295. if CONFIG.ENV_TEXT == '开发环境':
  296. return
  297. send_msg_to_feishu(
  298. webhook=CONFIG.FEISHU_ROBOT['growth_task_robot'].get('webhook'),
  299. key_word=CONFIG.FEISHU_ROBOT['growth_task_robot'].get('key_word'),
  300. msg_text=f"rov-offline{CONFIG.ENV_TEXT} - 数据更新失败\n"
  301. f"exception: {e}\n"
  302. f"traceback: {traceback.format_exc()}"
  303. )
  304. if __name__ == '__main__':
  305. LOGGER.info("%s 开始执行" % os.path.basename(__file__))
  306. LOGGER.info(f"environment: {CONFIG.ENV_TEXT}")
  307. main()