Kaynağa Gözat

Update alg_growth_gh_reply_video_v1: add fallback for layer2

StrayWarrior 7 ay önce
ebeveyn
işleme
31279419a6
1 değiştirilmiş dosya ile 26 ekleme ve 11 silme
  1. 26 11
      alg_growth_gh_reply_video_v1.py

+ 26 - 11
alg_growth_gh_reply_video_v1.py

@@ -46,6 +46,15 @@ def check_data_partition(project, table, data_dt, data_hr=None):
     return data_count
 
 
+def get_last_strategy_result(project, rank_table, dt_version, key):
+    strategy_df = get_odps_df_of_max_partition(
+        project, rank_table, { 'ctime': dt_version }
+    ).to_pandas()
+    sub_df = strategy_df.query(f'strategy_key == "{key}"')
+    sub_df = sub_df[['gh_id', 'video_id', 'strategy_key', 'sort']].drop_duplicates()
+    return sub_df
+
+
 def process_reply_stats(project, table, period, run_dt):
     # 获取多天即转统计数据用于聚合
     df = get_odps_df_of_recent_partitions(project, table, period, {'dt': run_dt})
@@ -99,8 +108,8 @@ def rank_for_layer1(run_dt, run_hour, project, table):
     result_df = extend_df[['strategy_key', 'dt_version', 'gh_id', 'sort', 'video_id']]
     return result_df
 
-def rank_for_layer2(run_dt, run_hour, project, table):
-    stats_df = process_reply_stats(project, table, STATS_PERIOD_DAYS, run_dt)
+def rank_for_layer2(run_dt, run_hour, project, stats_table, rank_table):
+    stats_df = process_reply_stats(project, stats_table, STATS_PERIOD_DAYS, run_dt)
 
     # 确保重跑时可获得一致结果
     dt_version = f'{run_dt}{run_hour}'
@@ -121,14 +130,22 @@ def rank_for_layer2(run_dt, run_hour, project, table):
     # 基础过滤for账号
     df = stats_df.query('day0_return > 100')
     # TODO: fetch send_count
-    # TODO: 个数不足时的兜底逻辑
+
+    # fallback to base if necessary
+    base_strategy_df = get_last_strategy_result(
+        project, rank_table, dt_version, BASE_GROUP_NAME)
+
     for gh_id in GH_IDS:
         sub_df = df.query(f'gh_id == "{gh_id}"')
+        if len(sub_df) < SEND_N:
+            LOGGER.warning(
+                "gh_id[{}] rows[{}] not enough for layer2, fallback to base"
+                .format(gh_id, len(sub_df)))
+            sub_df = base_strategy_df.query(f'gh_id == "{gh_id}"')
+            sub_df['score'] = sub_df['sort']
         sampled_df = sub_df.sample(n=SEND_N, weights=sub_df['score'])
         sampled_df['sort'] = range(1, len(sampled_df) + 1)
         sampled_dfs.append(sampled_df)
-        if len(sampled_df) != SEND_N:
-            raise
 
     extend_df = pd.concat(sampled_dfs)
     extend_df['strategy_key'] = EXPLORE2_GROUP_NAME
@@ -143,11 +160,8 @@ def rank_for_base(run_dt, run_hour, project, stats_table, rank_table):
     dt_version = f'{run_dt}{run_hour}'
 
     # 获取当前base信息, 策略表dt_version(ctime partition)采用当前时间
-    strategy_df = get_odps_df_of_max_partition(
-        project, rank_table, { 'ctime': dt_version }
-    ).to_pandas()
-    base_strategy_df = strategy_df.query('strategy_key.str.contains("base")')
-    base_strategy_df = base_strategy_df[['gh_id', 'video_id', 'strategy_key']].drop_duplicates()
+    base_strategy_df = get_last_strategy_result(
+        project, rank_table, dt_version, BASE_GROUP_NAME)
 
     default_stats_df = stats_df.query('gh_id == "default"')
 
@@ -184,7 +198,8 @@ def build_and_transfer_data(run_dt, run_hour, project):
     dt_version = f'{run_dt}{run_hour}'
 
     layer1_rank = rank_for_layer1(run_dt, run_hour, ODS_PROJECT, EXPLORE_POOL_TABLE)
-    layer2_rank = rank_for_layer2(run_dt, run_hour, ODS_PROJECT, GH_REPLY_STATS_TABLE)
+    layer2_rank = rank_for_layer2(run_dt, run_hour, ODS_PROJECT,
+                                  GH_REPLY_STATS_TABLE, ODPS_RANK_RESULT_TABLE)
     base_rank = rank_for_base(run_dt, run_hour, ODS_PROJECT,
                               GH_REPLY_STATS_TABLE, ODPS_RANK_RESULT_TABLE)
     final_rank_df = pd.concat([layer1_rank, layer2_rank, base_rank]).reset_index(drop=True)