|
@@ -44,6 +44,36 @@ def check_data_partition(project, table, data_dt, data_hr=None):
|
|
|
data_count = 0
|
|
|
return data_count
|
|
|
|
|
|
+
|
|
|
+def process_reply_stats(project, table, period, run_dt):
|
|
|
+ # 获取多天即转统计数据用于聚合
|
|
|
+ df = get_odps_df_of_recent_partitions(project, table, period, {'dt': run_dt})
|
|
|
+ df = df.to_pandas()
|
|
|
+
|
|
|
+ df['video_id'] = df['video_id'].astype('int64')
|
|
|
+ df = df[['gh_id', 'video_id', 'send_count', 'first_visit_uv', 'day0_return']]
|
|
|
+
|
|
|
+ # 账号内聚合
|
|
|
+ df = df.groupby(['video_id', 'gh_id']).agg({
|
|
|
+ 'send_count': 'sum',
|
|
|
+ 'first_visit_uv': 'sum',
|
|
|
+ 'day0_return': 'sum'
|
|
|
+ }).reset_index()
|
|
|
+
|
|
|
+ # 聚合所有数据作为default
|
|
|
+ default_stats_df = df.groupby('video_id').agg({
|
|
|
+ 'send_count': 'sum',
|
|
|
+ 'first_visit_uv': 'sum',
|
|
|
+ 'day0_return': 'sum'
|
|
|
+ }).reset_index()
|
|
|
+ default_stats_df['gh_id'] = 'default'
|
|
|
+
|
|
|
+ merged_df = pd.concat([df, default_stats_df]).reset_index(drop=True)
|
|
|
+
|
|
|
+ merged_df['score'] = merged_df['day0_return'] / (merged_df['first_visit_uv'] + 1000)
|
|
|
+ return merged_df
|
|
|
+
|
|
|
+
|
|
|
def rank_for_layer1(run_dt, run_hour, project, table):
|
|
|
# TODO: 加审核&退场
|
|
|
df = get_odps_df_of_max_partition(project, table, {'dt': run_dt})
|
|
@@ -69,15 +99,7 @@ def rank_for_layer1(run_dt, run_hour, project, table):
|
|
|
return result_df
|
|
|
|
|
|
def rank_for_layer2(run_dt, run_hour, project, table):
|
|
|
- df = get_odps_df_of_recent_partitions(project, table, STATS_PERIOD_DAYS, {'dt': run_dt})
|
|
|
- df = df.to_pandas()
|
|
|
- df['video_id'] = df['video_id'].astype('int64')
|
|
|
- df = df[['gh_id', 'video_id', 'send_count', 'first_visit_uv', 'day0_return']]
|
|
|
- df = df.groupby(['video_id', 'gh_id']).agg({
|
|
|
- 'send_count': 'sum',
|
|
|
- 'first_visit_uv': 'sum',
|
|
|
- 'day0_return': 'sum'
|
|
|
- }).reset_index()
|
|
|
+ stats_df = process_reply_stats(project, table, STATS_PERIOD_DAYS, run_dt)
|
|
|
|
|
|
# 确保重跑时可获得一致结果
|
|
|
dt_version = f'{run_dt}{run_hour}'
|
|
@@ -90,23 +112,15 @@ def rank_for_layer2(run_dt, run_hour, project, table):
|
|
|
|
|
|
sampled_dfs = []
|
|
|
# 处理default逻辑(default-explore2)
|
|
|
- default_stats_df = df \
|
|
|
- .groupby('video_id') \
|
|
|
- .agg({'send_count': 'sum',
|
|
|
- 'first_visit_uv': 'sum',
|
|
|
- 'day0_return': 'sum'}) \
|
|
|
- .reset_index()
|
|
|
- default_stats_df['gh_id'] = 'default'
|
|
|
- default_stats_df['score'] = default_stats_df['day0_return'] / (default_stats_df['first_visit_uv'] + 1000)
|
|
|
+ default_stats_df = stats_df.query('gh_id == "default"')
|
|
|
sampled_df = default_stats_df.sample(n=2, weights=default_stats_df['score'])
|
|
|
sampled_df['sort'] = range(1, len(sampled_df) + 1)
|
|
|
sampled_dfs.append(sampled_df)
|
|
|
|
|
|
# 基础过滤for账号
|
|
|
- df = df.query('day0_return > 100')
|
|
|
+ df = stats_df.query('day0_return > 100')
|
|
|
# TODO: fetch send_count
|
|
|
# TODO: 个数不足时的兜底逻辑
|
|
|
- df['score'] = df['day0_return'] / (df['first_visit_uv'] + 1000)
|
|
|
for gh_id in GH_IDS:
|
|
|
sub_df = df.query(f'gh_id == "{gh_id}"')
|
|
|
sampled_df = sub_df.sample(n=2, weights=sub_df['score'])
|
|
@@ -122,6 +136,8 @@ def rank_for_layer2(run_dt, run_hour, project, table):
|
|
|
return result_df
|
|
|
|
|
|
def rank_for_base(run_dt, run_hour, project, stats_table, rank_table):
|
|
|
+ stats_df = process_reply_stats(project, stats_table, STATS_PERIOD_DAYS, run_dt)
|
|
|
+
|
|
|
#TODO: support to set base manually
|
|
|
dt_version = f'{run_dt}{run_hour}'
|
|
|
|
|
@@ -132,26 +148,7 @@ def rank_for_base(run_dt, run_hour, project, stats_table, rank_table):
|
|
|
base_strategy_df = strategy_df.query('strategy_key.str.contains("base")')
|
|
|
base_strategy_df = base_strategy_df[['gh_id', 'video_id', 'strategy_key']].drop_duplicates()
|
|
|
|
|
|
- # 获取多天即转统计数据,聚合
|
|
|
- stats_df = get_odps_df_of_recent_partitions(
|
|
|
- project, stats_table, STATS_PERIOD_DAYS, {'dt': run_dt}
|
|
|
- ).to_pandas()
|
|
|
- stats_df['video_id'] = stats_df['video_id'].astype('int64')
|
|
|
- stats_df = stats_df[['gh_id', 'video_id', 'send_count', 'first_visit_uv', 'day0_return']]
|
|
|
- stats_df = stats_df.groupby(['video_id', 'gh_id']).agg({
|
|
|
- 'send_count': 'sum',
|
|
|
- 'first_visit_uv': 'sum',
|
|
|
- 'day0_return': 'sum'
|
|
|
- }).reset_index()
|
|
|
-
|
|
|
- # 聚合所有数据作为新号base利用数据(default-base)
|
|
|
- default_stats_df = stats_df \
|
|
|
- .groupby('video_id') \
|
|
|
- .agg({'send_count': 'sum',
|
|
|
- 'first_visit_uv': 'sum',
|
|
|
- 'day0_return': 'sum'}) \
|
|
|
- .reset_index()
|
|
|
- default_stats_df['gh_id'] = 'default'
|
|
|
+ default_stats_df = stats_df.query('gh_id == "default"')
|
|
|
|
|
|
# 在账号内排序,决定该账号(包括default)的base利用内容
|
|
|
# 排序过程中,确保当前base策略参与排序,因此先关联再过滤
|
|
@@ -168,7 +165,6 @@ def rank_for_base(run_dt, run_hour, project, stats_table, rank_table):
|
|
|
# 合并default和分账号数据
|
|
|
grouped_stats_df = pd.concat([default_stats_df, stats_with_strategy_df]).reset_index()
|
|
|
|
|
|
- grouped_stats_df['score'] = grouped_stats_df['day0_return'] / (grouped_stats_df['first_visit_uv'] + 1000)
|
|
|
def set_top_n(group, n=2):
|
|
|
group_sorted = group.sort_values(by='score', ascending=False)
|
|
|
top_n = group_sorted.head(n)
|