|
@@ -267,6 +267,7 @@ EXPLORE_POOL_TABLE = 'alg_growth_video_return_stats_history'
|
|
|
GH_REPLY_STATS_TABLE = 'alg_growth_3rd_gh_reply_video_stats'
|
|
|
# ODPS_RANK_RESULT_TABLE = 'alg_gh_autoreply_video_rank_data'
|
|
|
ODPS_3RD_RANK_RESULT_TABLE = 'alg_3rd_gh_autoreply_video_rank_data'
|
|
|
+GH_DETAIL = 'gh_detail'
|
|
|
RDS_RANK_RESULT_TABLE = 'alg_gh_autoreply_video_rank_data'
|
|
|
STATS_PERIOD_DAYS = 5
|
|
|
SEND_N = 1
|
|
@@ -323,9 +324,11 @@ def process_reply_stats(project, table, period, run_dt):
|
|
|
return merged_df
|
|
|
|
|
|
|
|
|
-def rank_for_layer1(run_dt, run_hour, project, table):
|
|
|
+def rank_for_layer1(run_dt, run_hour, gh):
|
|
|
# TODO: 加审核&退场
|
|
|
- df = get_odps_df_of_max_partition(project, table, {'dt': run_dt})
|
|
|
+ gh = gh[gh['type'] == 2]
|
|
|
+
|
|
|
+ df = get_odps_df_of_max_partition(ODS_PROJECT, EXPLORE_POOL_TABLE, {'dt': run_dt})
|
|
|
df = df.to_pandas()
|
|
|
# 确保重跑时可获得一致结果
|
|
|
dt_version = f'{run_dt}{run_hour}'
|
|
@@ -333,17 +336,15 @@ def rank_for_layer1(run_dt, run_hour, project, table):
|
|
|
|
|
|
# TODO: 修改权重计算策略
|
|
|
df['score'] = df['ros']
|
|
|
-
|
|
|
- sampled_df = df.sample(n=SEND_N, weights=df['score'])
|
|
|
- sampled_df['sort'] = range(1, len(sampled_df) + 1)
|
|
|
+ # 处理每个分类 指定要保留的每个分类的得分最高数量SEND_N
|
|
|
+ sampled_df = df.groupby('category1').apply(lambda x: x.nlargest(SEND_N, 'score')).reset_index(drop=True)
|
|
|
+ # 添加'sort'列
|
|
|
+ sampled_df['sort'] = sampled_df.groupby('category1')['score'].rank(method='first', ascending=False).astype(int)
|
|
|
+ # 按得分排序
|
|
|
+ sampled_df = sampled_df.sort_values(by=['category1', 'score'], ascending=[True, False]).reset_index(drop=True)
|
|
|
sampled_df['strategy_key'] = EXPLORE1_GROUP_NAME
|
|
|
sampled_df['dt_version'] = dt_version
|
|
|
-
|
|
|
- gh_name_df = pd.DataFrame({'gh_id': GH_IDS + ('default',)})
|
|
|
- sampled_df['_tmpkey'] = 1
|
|
|
- gh_name_df['_tmpkey'] = 1
|
|
|
- extend_df = sampled_df.merge(gh_name_df, on='_tmpkey').drop('_tmpkey', axis=1)
|
|
|
-
|
|
|
+ extend_df = sampled_df.merge(gh, on='category1')
|
|
|
result_df = extend_df[['strategy_key', 'dt_version', 'gh_id', 'sort', 'video_id', 'score']]
|
|
|
return result_df
|
|
|
|
|
@@ -428,7 +429,6 @@ def rank_for_base(run_dt, run_hour, project, stats_table, rank_table, stg_key):
|
|
|
|
|
|
ranked_df = grouped_stats_df.groupby('gh_id').apply(set_top_n, SEND_N)
|
|
|
ranked_df = ranked_df.reset_index(drop=True)
|
|
|
- # ranked_df['sort'] = grouped_stats_df.groupby('gh_id')['score'].rank(ascending=False)
|
|
|
ranked_df['strategy_key'] = stg_key
|
|
|
ranked_df['dt_version'] = dt_version
|
|
|
ranked_df = ranked_df[['strategy_key', 'dt_version', 'gh_id', 'sort', 'video_id', 'score']]
|
|
@@ -436,7 +436,7 @@ def rank_for_base(run_dt, run_hour, project, stats_table, rank_table, stg_key):
|
|
|
|
|
|
|
|
|
def check_result_data(df):
|
|
|
- for gh_id in GH_IDS + ('default',):
|
|
|
+ for gh_id in GH_IDS:
|
|
|
for key in (EXPLORE1_GROUP_NAME, EXPLORE2_GROUP_NAME, BASE_GROUP_NAME):
|
|
|
sub_df = df.query(f'gh_id == "{gh_id}" and strategy_key == "{key}"')
|
|
|
if len(sub_df) != SEND_N:
|
|
@@ -447,7 +447,7 @@ def rank_for_base_designate(run_dt, run_hour, stg_key):
|
|
|
dt_version = f'{run_dt}{run_hour}'
|
|
|
ranked_df = pd.DataFrame() # 初始化一个空的 DataFrame
|
|
|
|
|
|
- for gh_id in GH_IDS + ('default',):
|
|
|
+ for gh_id in GH_IDS:
|
|
|
if gh_id in TARGET_GH_IDS:
|
|
|
temp_df = pd.DataFrame({
|
|
|
'strategy_key': [stg_key],
|
|
@@ -471,13 +471,27 @@ def rank_for_base_designate(run_dt, run_hour, stg_key):
|
|
|
|
|
|
|
|
|
def build_and_transfer_data(run_dt, run_hour, project, **kwargs):
|
|
|
- dt_version = f'{run_dt}{run_hour}'
|
|
|
+ gh = get_odps_df_of_max_partition(ODS_PROJECT, GH_DETAIL, {'dt': run_dt})
|
|
|
+ gh = gh.to_pandas()
|
|
|
+ gh = gh[gh['type'] == 2]
|
|
|
+ if 'default' not in gh['gh_id'].values:
|
|
|
+ # 如果没有,添加一行
|
|
|
+ new_row = pd.DataFrame({'gh_id': ['default'], 'gh_name': ['默认'], 'type': [2], 'category1': ['泛生活']},
|
|
|
+ index=[0])
|
|
|
+ # 使用pd.concat添加新行
|
|
|
+ gh = pd.concat([gh, new_row], ignore_index=True)
|
|
|
+
|
|
|
+ gh = gh.drop_duplicates(subset=['gh_id'])
|
|
|
+ gh_ids = tuple(gh['gh_id'])
|
|
|
+ global GH_IDS
|
|
|
+ GH_IDS = gh_ids
|
|
|
+
|
|
|
dry_run = kwargs.get('dry_run', False)
|
|
|
|
|
|
- # layer1_rank = rank_for_layer1(run_dt, run_hour, ODS_PROJECT, EXPLORE_POOL_TABLE)
|
|
|
+ layer1_rank = rank_for_layer1(run_dt, run_hour, gh)
|
|
|
# layer2_rank = rank_for_layer2(run_dt, run_hour, ODS_PROJECT, GH_REPLY_STATS_TABLE, ODPS_3RD_RANK_RESULT_TABLE)
|
|
|
# base_rank = rank_for_base(run_dt, run_hour, ODS_PROJECT, GH_REPLY_STATS_TABLE, ODPS_3RD_RANK_RESULT_TABLE,BASE_GROUP_NAME)
|
|
|
- layer1_rank = rank_for_base_designate(run_dt, run_hour, EXPLORE1_GROUP_NAME)
|
|
|
+ # layer1_rank = rank_for_base_designate(run_dt, run_hour, EXPLORE1_GROUP_NAME)
|
|
|
layer2_rank = rank_for_base_designate(run_dt, run_hour, EXPLORE2_GROUP_NAME)
|
|
|
base_rank = rank_for_base_designate(run_dt, run_hour, BASE_GROUP_NAME)
|
|
|
|
|
@@ -503,17 +517,17 @@ def build_and_transfer_data(run_dt, run_hour, project, **kwargs):
|
|
|
return
|
|
|
|
|
|
# save to ODPS
|
|
|
- t = odps_instance.get_table(ODPS_3RD_RANK_RESULT_TABLE)
|
|
|
- part_spec_dict = {'dt': run_dt, 'hour': run_hour, 'ctime': dt_version}
|
|
|
- part_spec = ','.join(['{}={}'.format(k, part_spec_dict[k]) for k in part_spec_dict.keys()])
|
|
|
- with t.open_writer(partition=part_spec, create_partition=True, overwrite=True) as writer:
|
|
|
- writer.write(list(final_df.itertuples(index=False)))
|
|
|
+ # t = odps_instance.get_table(ODPS_3RD_RANK_RESULT_TABLE)
|
|
|
+ # part_spec_dict = {'dt': run_dt, 'hour': run_hour, 'ctime': dt_version}
|
|
|
+ # part_spec = ','.join(['{}={}'.format(k, part_spec_dict[k]) for k in part_spec_dict.keys()])
|
|
|
+ # with t.open_writer(partition=part_spec, create_partition=True, overwrite=True) as writer:
|
|
|
+ # writer.write(list(final_df.itertuples(index=False)))
|
|
|
|
|
|
# sync to MySQL
|
|
|
- data_to_insert = [tuple(row) for row in final_df.itertuples(index=False)]
|
|
|
- data_columns = list(final_df.columns)
|
|
|
- mysql = MysqlHelper(CONFIG.MYSQL_CRAWLER_INFO)
|
|
|
- mysql.batch_insert(RDS_RANK_RESULT_TABLE, data_to_insert, data_columns)
|
|
|
+ # data_to_insert = [tuple(row) for row in final_df.itertuples(index=False)]
|
|
|
+ # data_columns = list(final_df.columns)
|
|
|
+ # mysql = MysqlHelper(CONFIG.MYSQL_CRAWLER_INFO)
|
|
|
+ # mysql.batch_insert(RDS_RANK_RESULT_TABLE, data_to_insert, data_columns)
|
|
|
|
|
|
|
|
|
def main_loop():
|