|
@@ -240,6 +240,7 @@ def check_result_data(df):
|
|
|
|
|
|
|
|
|
|
def postprocess_override_by_config(df, dt_version):
|
|
def postprocess_override_by_config(df, dt_version):
|
|
|
|
+ return df
|
|
config = json.load(open("configs/3rd_gh_reply_video.json"))
|
|
config = json.load(open("configs/3rd_gh_reply_video.json"))
|
|
override_data = {
|
|
override_data = {
|
|
'strategy_key': [],
|
|
'strategy_key': [],
|
|
@@ -274,7 +275,6 @@ def build_and_transfer_base_mode(gh_df, run_dt, run_hour, dt_version, dry_run):
|
|
base_rank = rank_for_base(run_dt, run_hour, ODS_PROJECT, GH_REPLY_STATS_TABLE, ODPS_RANK_RESULT_TABLE,BASE_GROUP_NAME)
|
|
base_rank = rank_for_base(run_dt, run_hour, ODS_PROJECT, GH_REPLY_STATS_TABLE, ODPS_RANK_RESULT_TABLE,BASE_GROUP_NAME)
|
|
|
|
|
|
final_rank_df = pd.concat([layer1_rank, layer2_rank, base_rank]).reset_index(drop=True)
|
|
final_rank_df = pd.concat([layer1_rank, layer2_rank, base_rank]).reset_index(drop=True)
|
|
-
|
|
|
|
final_rank_df = postprocess_override_by_config(final_rank_df, dt_version)
|
|
final_rank_df = postprocess_override_by_config(final_rank_df, dt_version)
|
|
check_result_data(final_rank_df)
|
|
check_result_data(final_rank_df)
|
|
|
|
|
|
@@ -284,11 +284,23 @@ def build_and_transfer_base_mode(gh_df, run_dt, run_hour, dt_version, dry_run):
|
|
final_df['sort'] = SEND_N + 1 - final_df['sort']
|
|
final_df['sort'] = SEND_N + 1 - final_df['sort']
|
|
|
|
|
|
if dry_run:
|
|
if dry_run:
|
|
|
|
+ print("==== ALL DATA ====")
|
|
print(final_df[['strategy_key', 'gh_id', 'sort', 'video_id', 'score', 'title']]
|
|
print(final_df[['strategy_key', 'gh_id', 'sort', 'video_id', 'score', 'title']]
|
|
.sort_values(by=['strategy_key', 'gh_id', 'sort']))
|
|
.sort_values(by=['strategy_key', 'gh_id', 'sort']))
|
|
|
|
+
|
|
|
|
+ last_odps_df = get_odps_df_of_max_partition(
|
|
|
|
+ ODS_PROJECT, ODPS_RANK_RESULT_TABLE, {'ctime': dt_version}
|
|
|
|
+ ).to_pandas()
|
|
|
|
+ merged_df = last_odps_df.merge(
|
|
|
|
+ final_df, on=['strategy_key', 'gh_id', 'sort'], how='outer')
|
|
|
|
+ delta_df = merged_df.query('title_x != title_y')
|
|
|
|
+ delta_df = delta_df[['strategy_key', 'gh_id', 'sort',
|
|
|
|
+ 'title_x', 'score_x', 'title_y', 'score_y']]
|
|
|
|
+ delta_df.to_csv('tmp_delta_data.csv')
|
|
return
|
|
return
|
|
|
|
|
|
# save to ODPS
|
|
# save to ODPS
|
|
|
|
+ odps_instance = get_odps_instance(ODS_PROJECT)
|
|
t = odps_instance.get_table(ODPS_RANK_RESULT_TABLE)
|
|
t = odps_instance.get_table(ODPS_RANK_RESULT_TABLE)
|
|
part_spec_dict = {'dt': run_dt, 'hour': run_hour, 'ctime': dt_version}
|
|
part_spec_dict = {'dt': run_dt, 'hour': run_hour, 'ctime': dt_version}
|
|
part_spec = ','.join(['{}={}'.format(k, part_spec_dict[k]) for k in part_spec_dict.keys()])
|
|
part_spec = ','.join(['{}={}'.format(k, part_spec_dict[k]) for k in part_spec_dict.keys()])
|
|
@@ -452,8 +464,7 @@ def main():
|
|
time.sleep(60)
|
|
time.sleep(60)
|
|
except Exception as e:
|
|
except Exception as e:
|
|
LOGGER.error(f"数据更新失败, exception: {e}, traceback: {traceback.format_exc()}")
|
|
LOGGER.error(f"数据更新失败, exception: {e}, traceback: {traceback.format_exc()}")
|
|
- return
|
|
|
|
- if CONFIG.ENV_TEXT == '开发环境':
|
|
|
|
|
|
+ if CONFIG.ENV_TEXT == '开发环境' or args.dry_run:
|
|
return
|
|
return
|
|
send_msg_to_feishu(
|
|
send_msg_to_feishu(
|
|
webhook=CONFIG.FEISHU_ROBOT['growth_task_robot'].get('webhook'),
|
|
webhook=CONFIG.FEISHU_ROBOT['growth_task_robot'].get('webhook'),
|