فهرست منبع

Update alg_growth_gh_reply_video_v1: support overwrite old data

StrayWarrior 7 ماه پیش
والد
کامیت
1d5ab962dc
2فایلهای تغییر یافته به همراه46 افزوده شده و 0 حذف شده
  1. 45 0
      alg_growth_gh_reply_video_v1.py
  2. 1 0
      configs/gh_reply_video.json

+ 45 - 0
alg_growth_gh_reply_video_v1.py

@@ -258,6 +258,35 @@ def check_result_data(df):
                 raise Exception(f"Result not enough for gh_id[{gh_id}] group[{key}]")
 
 
+def postprocess_override_by_config(df, dt_version):
+    config = json.load(open("configs/gh_reply_video.json"))
+    override_data = {
+        'strategy_key': [],
+        'gh_id': [],
+        'sort': [],
+        'video_id': []
+    }
+
+    for gh_id in config:
+        gh_config = config[gh_id]
+        for key in gh_config:
+            for video_config in gh_config[key]:
+                # remove current
+                position = video_config['position']
+                video_id = video_config['video_id']
+                df = df.drop(df.query(f'gh_id == "{gh_id}" and strategy_key == "{key}" and sort == {position}').index)
+                override_data['strategy_key'].append(key)
+                override_data['gh_id'].append(gh_id)
+                override_data['sort'].append(position)
+                override_data['video_id'].append(video_id)
+    n_records = len(override_data['strategy_key'])
+    override_data['dt_version'] = [dt_version] * n_records
+    override_data['score'] = [0.0] * n_records
+    df_to_append = pd.DataFrame(override_data)
+    df = pd.concat([df, df_to_append], ignore_index=True)
+    return df
+
+
 def build_and_transfer_data(run_dt, run_hour, project, **kwargs):
     dt_version = f'{run_dt}{run_hour}'
     dry_run = kwargs.get('dry_run', False)
@@ -268,7 +297,10 @@ def build_and_transfer_data(run_dt, run_hour, project, **kwargs):
     layer1_rank = rank_for_layer1(run_dt, run_hour, ODS_PROJECT, EXPLORE_POOL_TABLE, gh_df)
     layer2_rank = rank_for_layer2( run_dt, run_hour, ODPS_RANK_RESULT_TABLE)
     base_rank = rank_for_base(run_dt, run_hour, ODPS_RANK_RESULT_TABLE)
+
     final_rank_df = pd.concat([layer1_rank, layer2_rank, base_rank]).reset_index(drop=True)
+    final_rank_df = postprocess_override_by_config(final_rank_df, dt_version)
+
     check_result_data(final_rank_df)
 
     odps_instance = get_odps_instance(project)
@@ -303,6 +335,19 @@ def build_and_transfer_data(run_dt, run_hour, project, **kwargs):
     mysql = MysqlHelper(CONFIG.MYSQL_GROWTH_INFO)
     mysql.batch_insert(RDS_RANK_RESULT_TABLE, data_to_insert, data_columns)
 
+    # remove old data of same version
+    for key in final_df['strategy_key'].unique():
+        sql = f"""
+            update {RDS_RANK_RESULT_TABLE}
+            set is_delete = 1
+            where
+                dt_version = '{dt_version}'
+                and strategy_key = '{key}'
+                and create_time < '{max_time_to_delete}'
+                and is_delete = 0
+        """
+        rows = mysql.execute(sql)
+
 
 def main_loop():
     argparser = ArgumentParser()

+ 1 - 0
configs/gh_reply_video.json

@@ -0,0 +1 @@
+{}