Browse Source

Update alg_growth_3rd_gh_reply_video_v1: add delta mode

StrayWarrior 4 months ago
parent
commit
648d6be073
1 changed files with 109 additions and 23 deletions
  1. 109 23
      alg_growth_3rd_gh_reply_video_v1.py

+ 109 - 23
alg_growth_3rd_gh_reply_video_v1.py

@@ -82,9 +82,10 @@ def get_last_strategy_result(project, rank_table, dt_version, key):
     strategy_df = get_odps_df_of_max_partition(
     strategy_df = get_odps_df_of_max_partition(
         project, rank_table, {'ctime': dt_version}
         project, rank_table, {'ctime': dt_version}
     ).to_pandas()
     ).to_pandas()
+    dt_version = strategy_df.iloc[0]['dt_version']
     sub_df = strategy_df.query(f'strategy_key == "{key}"')
     sub_df = strategy_df.query(f'strategy_key == "{key}"')
     sub_df = sub_df[['gh_id', 'video_id', 'strategy_key', 'sort']].drop_duplicates()
     sub_df = sub_df[['gh_id', 'video_id', 'strategy_key', 'sort']].drop_duplicates()
-    return sub_df
+    return sub_df, dt_version
 
 
 
 
 def process_reply_stats(project, table, period, run_dt):
 def process_reply_stats(project, table, period, run_dt):
@@ -162,7 +163,7 @@ def rank_for_layer2(run_dt, run_hour, project, stats_table, rank_table):
     df = stats_df.query('send_count > 200 and score > 0')
     df = stats_df.query('send_count > 200 and score > 0')
 
 
     # fallback to base if necessary
     # fallback to base if necessary
-    base_strategy_df = get_last_strategy_result(
+    base_strategy_df, _ = get_last_strategy_result(
         project, rank_table, dt_version, BASE_GROUP_NAME)
         project, rank_table, dt_version, BASE_GROUP_NAME)
 
 
     for gh_id in GH_IDS:
     for gh_id in GH_IDS:
@@ -193,7 +194,7 @@ def rank_for_base(run_dt, run_hour, project, stats_table, rank_table, stg_key):
     dt_version = f'{run_dt}{run_hour}'
     dt_version = f'{run_dt}{run_hour}'
 
 
     # 获取当前base信息, 策略表dt_version(ctime partition)采用当前时间
     # 获取当前base信息, 策略表dt_version(ctime partition)采用当前时间
-    base_strategy_df = get_last_strategy_result(
+    base_strategy_df, _ = get_last_strategy_result(
         project, rank_table, dt_version, stg_key)
         project, rank_table, dt_version, stg_key)
 
 
     default_stats_df = stats_df.query('gh_id == "default"')
     default_stats_df = stats_df.query('gh_id == "default"')
@@ -267,12 +268,7 @@ def postprocess_override_by_config(df, dt_version):
     return df
     return df
 
 
 
 
-def build_and_transfer_data(run_dt, run_hour, project, **kwargs):
-    dt_version = f'{run_dt}{run_hour}'
-    dry_run = kwargs.get('dry_run', False)
-
-    gh_df = get_and_update_gh_ids(run_dt)
-
+def build_and_transfer_base_mode(gh_df, run_dt, run_hour, dt_version, dry_run):
     layer1_rank = rank_for_layer1(run_dt, run_hour, ODS_PROJECT, EXPLORE_POOL_TABLE, gh_df)
     layer1_rank = rank_for_layer1(run_dt, run_hour, ODS_PROJECT, EXPLORE_POOL_TABLE, gh_df)
     layer2_rank = rank_for_layer2(run_dt, run_hour, ODS_PROJECT, GH_REPLY_STATS_TABLE, ODPS_RANK_RESULT_TABLE)
     layer2_rank = rank_for_layer2(run_dt, run_hour, ODS_PROJECT, GH_REPLY_STATS_TABLE, ODPS_RANK_RESULT_TABLE)
     base_rank = rank_for_base(run_dt, run_hour, ODS_PROJECT, GH_REPLY_STATS_TABLE, ODPS_RANK_RESULT_TABLE,BASE_GROUP_NAME)
     base_rank = rank_for_base(run_dt, run_hour, ODS_PROJECT, GH_REPLY_STATS_TABLE, ODPS_RANK_RESULT_TABLE,BASE_GROUP_NAME)
@@ -282,16 +278,7 @@ def build_and_transfer_data(run_dt, run_hour, project, **kwargs):
     final_rank_df = postprocess_override_by_config(final_rank_df, dt_version)
     final_rank_df = postprocess_override_by_config(final_rank_df, dt_version)
     check_result_data(final_rank_df)
     check_result_data(final_rank_df)
 
 
-    odps_instance = get_odps_instance(project)
-    odps_ranked_df = odps.DataFrame(final_rank_df)
-
-    video_df = get_dataframe_from_odps('videoods', 'wx_video')
-    video_df['cover_url'] = video_df['cover_img_path'] + CDN_IMG_OPERATOR
-    video_df = video_df['id', 'title', 'cover_url']
-    final_df = odps_ranked_df.join(video_df, on=('video_id', 'id'))
-
-    final_df = final_df.to_pandas()
-    final_df = final_df[['strategy_key', 'dt_version', 'gh_id', 'sort', 'video_id', 'title', 'cover_url', 'score']]
+    final_df = join_video_info(final_rank_df)
 
 
     # reverse sending order
     # reverse sending order
     final_df['sort'] = SEND_N + 1 - final_df['sort']
     final_df['sort'] = SEND_N + 1 - final_df['sort']
@@ -329,10 +316,112 @@ def build_and_transfer_data(run_dt, run_hour, project, **kwargs):
         rows = mysql.execute(sql)
         rows = mysql.execute(sql)
 
 
 
 
+def build_and_transfer_delta_mode(account_df, dt_version, dry_run):
+    # 获取最新策略信息, 策略表dt_version(ctime partition)采用当前时间
+    last_strategy, last_dt_version = get_last_strategy_result(
+        ODS_PROJECT, ODPS_RANK_RESULT_TABLE, dt_version, BASE_GROUP_NAME)
+    account_map = { x['gh_id']: x for x in account_df.to_dict(orient='records') }
+    all_accounts = account_df['gh_id'].unique()
+    accounts_in_strategy = last_strategy['gh_id'].unique()
+    delta_accounts = [x for x in set(all_accounts) - set(accounts_in_strategy)]
+    if len(delta_accounts) > 0:
+        LOGGER.info('Found {} new accounts: {}'.format(
+            len(delta_accounts), ','.join(delta_accounts)))
+    else:
+        LOGGER.info('Found 0 new account, do nothing.')
+        return
+
+    default_video = {
+        '泛生活': [20463342],
+        '泛历史': [13586800],
+    }
+
+    # 新增账号,不存在历史,可直接忽略strategy_status字段
+    # TODO: set default by history stats
+    groups = (BASE_GROUP_NAME, EXPLORE1_GROUP_NAME, EXPLORE2_GROUP_NAME)
+    rows = []
+    for gh_id in delta_accounts:
+        account_info = account_map[gh_id]
+        configured_videos = account_info['video_ids']
+        if configured_videos:
+            LOGGER.info(f'{gh_id} has configured video IDs: {configured_videos}')
+            video_ids = [int(x) for x in configured_videos.split(',')]
+        else:
+            video_ids = default_video[account_info['category1']]
+        for group_key in groups:
+            for idx in range(SEND_N):
+                row = {
+                    'strategy_key': group_key,
+                    'gh_id': gh_id,
+                    'sort': idx + 1,
+                    'video_id': video_ids[idx],
+                    'dt_version': last_dt_version,
+                    'score': 0.0
+                }
+            rows.append(row)
+    df = pd.DataFrame(rows)
+    final_df = join_video_info(df)
+    if dry_run:
+        print(final_df)
+        return
+
+    # 增量记录更新至MySQL
+    data_to_insert = [tuple(row) for row in final_df.itertuples(index=False)]
+    data_columns = list(final_df.columns)
+    mysql = MysqlHelper(CONFIG.MYSQL_GROWTH_INFO)
+    mysql.batch_insert(RDS_RANK_RESULT_TABLE, data_to_insert, data_columns)
+
+    # 全量记录写回ODPS
+    last_odps_df = get_odps_df_of_max_partition(
+        ODS_PROJECT, ODPS_RANK_RESULT_TABLE, {'ctime': dt_version}
+    ).to_pandas()
+    updated_odps_df = pd.concat([final_df, last_odps_df], ignore_index=True)
+
+    odps_instance = get_odps_instance(ODS_PROJECT)
+    t = odps_instance.get_table(ODPS_RANK_RESULT_TABLE)
+    target_dt = last_dt_version[0:8]
+    target_hour = last_dt_version[8:10]
+    part_spec_dict = {'dt': target_dt, 'hour': target_hour, 'ctime': last_dt_version}
+    part_spec = ','.join(['{}={}'.format(k, part_spec_dict[k]) for k in part_spec_dict.keys()])
+    with t.open_writer(partition=part_spec, create_partition=True, overwrite=True) as writer:
+        writer.write(list(updated_odps_df.itertuples(index=False)))
+
+
+def join_video_info(df):
+    odps_instance = get_odps_instance(ODS_PROJECT)
+    odps_df = odps.DataFrame(df)
+
+    video_df = get_dataframe_from_odps('videoods', 'wx_video')
+    video_df['cover_url'] = video_df['cover_img_path'] + CDN_IMG_OPERATOR
+    video_df = video_df['id', 'title', 'cover_url']
+    final_df = odps_df.join(video_df, on=('video_id', 'id'))
+
+    final_df = final_df.to_pandas()
+    final_df = final_df[['strategy_key', 'dt_version', 'gh_id', 'sort', 'video_id', 'title', 'cover_url', 'score']]
+    return final_df
+
+
+def build_and_transfer_data(run_dt, run_hour, **kwargs):
+    dt_version = f'{run_dt}{run_hour}'
+    dry_run = kwargs.get('dry_run', False)
+    mode = kwargs.get('mode')
+
+    gh_df = get_and_update_gh_ids(run_dt)
+
+    if mode == 'delta':
+        return build_and_transfer_delta_mode(gh_df, dt_version, dry_run)
+    else:
+        return build_and_transfer_base_mode(gh_df, run_dt, run_hour, dt_version, dry_run)
+
+
 def main():
 def main():
+    LOGGER.info("%s 开始执行" % os.path.basename(__file__))
+    LOGGER.info(f"environment: {CONFIG.ENV_TEXT}")
+
     argparser = ArgumentParser()
     argparser = ArgumentParser()
     argparser.add_argument('-n', '--dry-run', action='store_true')
     argparser.add_argument('-n', '--dry-run', action='store_true')
     argparser.add_argument('--run-at',help='assume to run at date and hour, yyyyMMddHH')
     argparser.add_argument('--run-at',help='assume to run at date and hour, yyyyMMddHH')
+    argparser.add_argument('--mode', default='base', choices=['base', 'delta'], help='run mode')
     args = argparser.parse_args()
     args = argparser.parse_args()
 
 
     run_date = datetime.today()
     run_date = datetime.today()
@@ -355,8 +444,7 @@ def main():
                 run_dt = run_date.strftime("%Y%m%d")
                 run_dt = run_date.strftime("%Y%m%d")
                 run_hour = run_date.strftime("%H")
                 run_hour = run_date.strftime("%H")
                 LOGGER.info(f'run_dt: {run_dt}, run_hour: {run_hour}')
                 LOGGER.info(f'run_dt: {run_dt}, run_hour: {run_hour}')
-                build_and_transfer_data(run_dt, run_hour, ODS_PROJECT,
-                                        dry_run=args.dry_run)
+                build_and_transfer_data(run_dt, run_hour, dry_run=args.dry_run, mode=args.mode)
                 LOGGER.info('数据更新完成')
                 LOGGER.info('数据更新完成')
                 return
                 return
             else:
             else:
@@ -377,6 +465,4 @@ def main():
 
 
 
 
 if __name__ == '__main__':
 if __name__ == '__main__':
-    LOGGER.info("%s 开始执行" % os.path.basename(__file__))
-    LOGGER.info(f"environment: {CONFIG.ENV_TEXT}")
     main()
     main()