Ver Fonte

Merge branch 'bugfix/gh-reply-video' of algorithm/rov-offline into master

fengzhoutian há 7 meses atrás
pai
commit
bbbdb4aa3d
2 ficheiros alterados com 44 adições e 20 exclusões
  1. 42 19
      alg_growth_gh_reply_video_v1.py
  2. 2 1
      db_helper.py

+ 42 - 19
alg_growth_gh_reply_video_v1.py

@@ -46,6 +46,15 @@ def check_data_partition(project, table, data_dt, data_hr=None):
     return data_count
 
 
+def get_last_strategy_result(project, rank_table, dt_version, key):
+    strategy_df = get_odps_df_of_max_partition(
+        project, rank_table, { 'ctime': dt_version }
+    ).to_pandas()
+    sub_df = strategy_df.query(f'strategy_key == "{key}"')
+    sub_df = sub_df[['gh_id', 'video_id', 'strategy_key', 'sort']].drop_duplicates()
+    return sub_df
+
+
 def process_reply_stats(project, table, period, run_dt):
     # 获取多天即转统计数据用于聚合
     df = get_odps_df_of_recent_partitions(project, table, period, {'dt': run_dt})
@@ -71,7 +80,7 @@ def process_reply_stats(project, table, period, run_dt):
 
     merged_df = pd.concat([df, default_stats_df]).reset_index(drop=True)
 
-    merged_df['score'] = merged_df['day0_return'] / (merged_df['first_visit_uv'] + 1000)
+    merged_df['score'] = merged_df['day0_return'] / (merged_df['send_count'] + 1000)
     return merged_df
 
 
@@ -84,9 +93,9 @@ def rank_for_layer1(run_dt, run_hour, project, table):
     np.random.seed(int(dt_version)+1)
 
     # TODO: 修改权重计算策略
-    sample_weights = df['rov']
+    df['score'] = df['rov']
 
-    sampled_df = df.sample(n=SEND_N, weights=sample_weights)
+    sampled_df = df.sample(n=SEND_N, weights=df['score'])
     sampled_df['sort'] = range(1, len(sampled_df) + 1)
     sampled_df['strategy_key'] = EXPLORE1_GROUP_NAME
     sampled_df['dt_version'] = dt_version
@@ -96,11 +105,11 @@ def rank_for_layer1(run_dt, run_hour, project, table):
     gh_name_df['_tmpkey'] = 1
     extend_df = sampled_df.merge(gh_name_df, on='_tmpkey').drop('_tmpkey', axis=1)
 
-    result_df = extend_df[['strategy_key', 'dt_version', 'gh_id', 'sort', 'video_id']]
+    result_df = extend_df[['strategy_key', 'dt_version', 'gh_id', 'sort', 'video_id', 'score']]
     return result_df
 
-def rank_for_layer2(run_dt, run_hour, project, table):
-    stats_df = process_reply_stats(project, table, STATS_PERIOD_DAYS, run_dt)
+def rank_for_layer2(run_dt, run_hour, project, stats_table, rank_table):
+    stats_df = process_reply_stats(project, stats_table, STATS_PERIOD_DAYS, run_dt)
 
     # 确保重跑时可获得一致结果
     dt_version = f'{run_dt}{run_hour}'
@@ -120,20 +129,27 @@ def rank_for_layer2(run_dt, run_hour, project, table):
 
     # 基础过滤for账号
     df = stats_df.query('day0_return > 100')
-    # TODO: fetch send_count
-    # TODO: 个数不足时的兜底逻辑
+
+    # fallback to base if necessary
+    base_strategy_df = get_last_strategy_result(
+        project, rank_table, dt_version, BASE_GROUP_NAME)
+
     for gh_id in GH_IDS:
         sub_df = df.query(f'gh_id == "{gh_id}"')
+        if len(sub_df) < SEND_N:
+            LOGGER.warning(
+                "gh_id[{}] rows[{}] not enough for layer2, fallback to base"
+                .format(gh_id, len(sub_df)))
+            sub_df = base_strategy_df.query(f'gh_id == "{gh_id}"')
+            sub_df['score'] = sub_df['sort']
         sampled_df = sub_df.sample(n=SEND_N, weights=sub_df['score'])
         sampled_df['sort'] = range(1, len(sampled_df) + 1)
         sampled_dfs.append(sampled_df)
-        if len(sampled_df) != SEND_N:
-            raise
 
     extend_df = pd.concat(sampled_dfs)
     extend_df['strategy_key'] = EXPLORE2_GROUP_NAME
     extend_df['dt_version'] = dt_version
-    result_df = extend_df[['strategy_key', 'dt_version', 'gh_id', 'sort', 'video_id']]
+    result_df = extend_df[['strategy_key', 'dt_version', 'gh_id', 'sort', 'video_id', 'score']]
     return result_df
 
 def rank_for_base(run_dt, run_hour, project, stats_table, rank_table):
@@ -143,11 +159,8 @@ def rank_for_base(run_dt, run_hour, project, stats_table, rank_table):
     dt_version = f'{run_dt}{run_hour}'
 
     # 获取当前base信息, 策略表dt_version(ctime partition)采用当前时间
-    strategy_df = get_odps_df_of_max_partition(
-        project, rank_table, { 'ctime': dt_version }
-    ).to_pandas()
-    base_strategy_df = strategy_df.query('strategy_key.str.contains("base")')
-    base_strategy_df = base_strategy_df[['gh_id', 'video_id', 'strategy_key']].drop_duplicates()
+    base_strategy_df = get_last_strategy_result(
+        project, rank_table, dt_version, BASE_GROUP_NAME)
 
     default_stats_df = stats_df.query('gh_id == "default"')
 
@@ -176,18 +189,28 @@ def rank_for_base(run_dt, run_hour, project, stats_table, rank_table):
     #ranked_df['sort'] = grouped_stats_df.groupby('gh_id')['score'].rank(ascending=False)
     ranked_df['strategy_key'] = BASE_GROUP_NAME
     ranked_df['dt_version'] = dt_version
-    ranked_df = ranked_df[['strategy_key', 'dt_version', 'gh_id', 'sort', 'video_id']]
+    ranked_df = ranked_df[['strategy_key', 'dt_version', 'gh_id', 'sort', 'video_id', 'score']]
     return ranked_df
 
 
+def check_result_data(df):
+    for gh_id in GH_IDS + ('default', ):
+        for key in (EXPLORE1_GROUP_NAME, EXPLORE2_GROUP_NAME, BASE_GROUP_NAME):
+            sub_df = df.query(f'gh_id == "{gh_id}" and strategy_key == "{key}"')
+            if len(sub_df) != SEND_N:
+                raise Exception(f"Result not enough for gh_id[{gh_id}] group[{group}]")
+
+
 def build_and_transfer_data(run_dt, run_hour, project):
     dt_version = f'{run_dt}{run_hour}'
 
     layer1_rank = rank_for_layer1(run_dt, run_hour, ODS_PROJECT, EXPLORE_POOL_TABLE)
-    layer2_rank = rank_for_layer2(run_dt, run_hour, ODS_PROJECT, GH_REPLY_STATS_TABLE)
+    layer2_rank = rank_for_layer2(run_dt, run_hour, ODS_PROJECT,
+                                  GH_REPLY_STATS_TABLE, ODPS_RANK_RESULT_TABLE)
     base_rank = rank_for_base(run_dt, run_hour, ODS_PROJECT,
                               GH_REPLY_STATS_TABLE, ODPS_RANK_RESULT_TABLE)
     final_rank_df = pd.concat([layer1_rank, layer2_rank, base_rank]).reset_index(drop=True)
+    check_result_data(final_rank_df)
 
     odps_instance = get_odps_instance(project)
     odps_ranked_df = odps.DataFrame(final_rank_df)
@@ -198,7 +221,7 @@ def build_and_transfer_data(run_dt, run_hour, project):
     final_df = odps_ranked_df.join(video_df, on=('video_id', 'id'))
 
     final_df = final_df.to_pandas()
-    final_df = final_df[['strategy_key', 'dt_version', 'gh_id', 'sort', 'video_id', 'title', 'cover_url']]
+    final_df = final_df[['strategy_key', 'dt_version', 'gh_id', 'sort', 'video_id', 'title', 'cover_url', 'score']]
 
     # reverse sending order
     final_df['sort'] = SEND_N + 1 - final_df['sort']

+ 2 - 1
db_helper.py

@@ -385,7 +385,8 @@ class MysqlHelper(object):
                 conn.commit()
         except pymysql.MySQLError as e:
             print(f"Error in batch_insert: {e}")
-            connection.rollback()
+            conn.rollback()
+            raise e
         conn.close()