|
@@ -194,6 +194,14 @@ def rank_for_base(run_dt, run_hour, project, stats_table, rank_table):
|
|
|
return ranked_df
|
|
|
|
|
|
|
|
|
+def check_result_data(df):
|
|
|
+ for gh_id in GH_IDS + ('default', ):
|
|
|
+ for key in (EXPLORE1_GROUP_NAME, EXPLORE2_GROUP_NAME, BASE_GROUP_NAME):
|
|
|
+ sub_df = df.query(f'gh_id == "{gh_id}" and strategy_key == "{key}"')
|
|
|
+ if len(sub_df) != SEND_N:
|
|
|
+ raise Exception(f"Result not enough for gh_id[{gh_id}] group[{group}]")
|
|
|
+
|
|
|
+
|
|
|
def build_and_transfer_data(run_dt, run_hour, project):
|
|
|
dt_version = f'{run_dt}{run_hour}'
|
|
|
|
|
@@ -203,6 +211,7 @@ def build_and_transfer_data(run_dt, run_hour, project):
|
|
|
base_rank = rank_for_base(run_dt, run_hour, ODS_PROJECT,
|
|
|
GH_REPLY_STATS_TABLE, ODPS_RANK_RESULT_TABLE)
|
|
|
final_rank_df = pd.concat([layer1_rank, layer2_rank, base_rank]).reset_index(drop=True)
|
|
|
+ check_result_data(final_rank_df)
|
|
|
|
|
|
odps_instance = get_odps_instance(project)
|
|
|
odps_ranked_df = odps.DataFrame(final_rank_df)
|