|
@@ -4,6 +4,7 @@ import pandas as pd
|
|
|
import traceback
|
|
|
import odps
|
|
|
from odps import ODPS
|
|
|
+import json
|
|
|
from threading import Timer
|
|
|
from datetime import datetime, timedelta
|
|
|
from db_helper import MysqlHelper
|
|
@@ -237,8 +238,38 @@ def check_result_data(df):
|
|
|
for gh_id in GH_IDS:
|
|
|
for key in (EXPLORE1_GROUP_NAME, EXPLORE2_GROUP_NAME, BASE_GROUP_NAME):
|
|
|
sub_df = df.query(f'gh_id == "{gh_id}" and strategy_key == "{key}"')
|
|
|
- if len(sub_df) != SEND_N:
|
|
|
- raise Exception(f"Result not enough for gh_id[{gh_id}], group[{key}]")
|
|
|
+ n_records = len(sub_df)
|
|
|
+ if n_records != SEND_N:
|
|
|
+ raise Exception(f"Unexpected record count: {gh_id},{key},{n_records}")
|
|
|
+
|
|
|
+
|
|
|
+def postprocess_override_by_config(df, dt_version):
|
|
|
+ config = json.load(open("configs/3rd_gh_reply_video.json"))
|
|
|
+ override_data = {
|
|
|
+ 'strategy_key': [],
|
|
|
+ 'gh_id': [],
|
|
|
+ 'sort': [],
|
|
|
+ 'video_id': []
|
|
|
+ }
|
|
|
+
|
|
|
+ for gh_id in config:
|
|
|
+ gh_config = config[gh_id]
|
|
|
+ for key in gh_config:
|
|
|
+ for video_config in gh_config[key]:
|
|
|
+
|
|
|
+ position = video_config['position']
|
|
|
+ video_id = video_config['video_id']
|
|
|
+ df = df.drop(df.query(f'gh_id == "{gh_id}" and strategy_key == "{key}" and sort == {position}').index)
|
|
|
+ override_data['strategy_key'].append(key)
|
|
|
+ override_data['gh_id'].append(gh_id)
|
|
|
+ override_data['sort'].append(position)
|
|
|
+ override_data['video_id'].append(video_id)
|
|
|
+ n_records = len(override_data['strategy_key'])
|
|
|
+ override_data['dt_version'] = [dt_version] * n_records
|
|
|
+ override_data['score'] = [0.0] * n_records
|
|
|
+ df_to_append = pd.DataFrame(override_data)
|
|
|
+ df = pd.concat([df, df_to_append], ignore_index=True)
|
|
|
+ return df
|
|
|
|
|
|
|
|
|
def rank_for_base_designate(run_dt, run_hour, stg_key):
|
|
@@ -274,13 +305,15 @@ def build_and_transfer_data(run_dt, run_hour, project, **kwargs):
|
|
|
|
|
|
gh_df = get_and_update_gh_ids(run_dt)
|
|
|
|
|
|
- layer1_rank = rank_for_layer1(run_dt, run_hour, ODS_PROJECT, EXPLORE_POOL_TABLE, ,gh_df)
|
|
|
+ layer1_rank = rank_for_layer1(run_dt, run_hour, ODS_PROJECT, EXPLORE_POOL_TABLE, gh_df)
|
|
|
|
|
|
|
|
|
layer2_rank = rank_for_base_designate(run_dt, run_hour, EXPLORE2_GROUP_NAME)
|
|
|
base_rank = rank_for_base_designate(run_dt, run_hour, BASE_GROUP_NAME)
|
|
|
|
|
|
final_rank_df = pd.concat([layer1_rank, layer2_rank, base_rank]).reset_index(drop=True)
|
|
|
+
|
|
|
+ final_rank_df = postprocess_override_by_config(final_rank_df, dt_version)
|
|
|
check_result_data(final_rank_df)
|
|
|
|
|
|
odps_instance = get_odps_instance(project)
|