Bladeren bron

Merge branch 'feature/20241022-gh-reply-hourly-update' of algorithm/rov-offline into master

fengzhoutian 6 maanden geleden
bovenliggende
commit
0e706f227c
1 gewijzigde bestanden met toevoegingen van 32 en 19 verwijderingen
  1. 32 19
      alg_growth_gh_reply_video_v1.py

+ 32 - 19
alg_growth_gh_reply_video_v1.py

@@ -30,6 +30,7 @@ CDN_IMG_OPERATOR = "?x-oss-process=image/resize,m_fill,w_600,h_480,limit_0/forma
 ODS_PROJECT = "loghubods"
 EXPLORE_POOL_TABLE = 'alg_growth_video_return_stats_history'
 GH_REPLY_STATS_TABLE = 'alg_growth_gh_reply_video_stats'
+GH_REPLY_STATS_HOUR_TABLE = 'alg_growth_gh_reply_video_stats_hour'
 ODPS_RANK_RESULT_TABLE = 'alg_gh_autoreply_video_rank_data'
 RDS_RANK_RESULT_TABLE = 'alg_gh_autoreply_video_rank_data'
 GH_DETAIL = 'gh_detail'
@@ -76,10 +77,14 @@ def get_last_strategy_result(project, rank_table, dt_version, key):
     return sub_df
 
 
-def process_reply_stats(project, table, period, run_dt):
-    # 获取多天即转统计数据用于聚合
-    df = get_odps_df_of_recent_partitions(project, table, period, {'dt': run_dt})
-    df = df.to_pandas()
+def process_reply_stats(project, daily_table, hourly_table, period, run_dt, run_hour):
+    # 获取多天+当天即转统计数据用于聚合
+    df = get_odps_df_of_recent_partitions(
+        project, daily_table, period, {'dt': run_dt}).to_pandas()
+    hour_data_version = f'{run_dt}{run_hour}'
+    hourly_df = get_odps_df_of_recent_partitions(
+        project, hourly_table, 1, {'dt': hour_data_version}).to_pandas()
+    df = pd.concat([df, hourly_df]).reset_index(drop=True)
 
     df['video_id'] = df['video_id'].astype('int64')
     df = df[['gh_id', 'video_id', 'send_count', 'first_visit_uv', 'day0_return']]
@@ -125,8 +130,10 @@ def rank_for_layer1(run_dt, run_hour, project, table, gh_df):
     result_df = extend_df[['strategy_key', 'dt_version', 'gh_id', 'sort', 'video_id', 'score']]
     return result_df
 
-def rank_for_layer2(run_dt, run_hour, project, stats_table, rank_table):
-    stats_df = process_reply_stats(project, stats_table, STATS_PERIOD_DAYS, run_dt)
+def rank_for_layer2(run_dt, run_hour, rank_table):
+    stats_df = process_reply_stats(
+        ODS_PROJECT, GH_REPLY_STATS_TABLE, GH_REPLY_STATS_HOUR_TABLE,
+        STATS_PERIOD_DAYS, run_dt, run_hour)
 
     # 确保重跑时可获得一致结果
     dt_version = f'{run_dt}{run_hour}'
@@ -149,7 +156,7 @@ def rank_for_layer2(run_dt, run_hour, project, stats_table, rank_table):
 
     # fallback to base if necessary
     base_strategy_df = get_last_strategy_result(
-        project, rank_table, dt_version, BASE_GROUP_NAME)
+        ODS_PROJECT, rank_table, dt_version, BASE_GROUP_NAME)
     for gh_id in GH_IDS:
         if gh_id == 'default':
             continue
@@ -170,15 +177,17 @@ def rank_for_layer2(run_dt, run_hour, project, stats_table, rank_table):
     result_df = extend_df[['strategy_key', 'dt_version', 'gh_id', 'sort', 'video_id', 'score']]
     return result_df
 
-def rank_for_base(run_dt, run_hour, project, stats_table, rank_table):
-    stats_df = process_reply_stats(project, stats_table, STATS_PERIOD_DAYS, run_dt)
+def rank_for_base(run_dt, run_hour, rank_table):
+    stats_df = process_reply_stats(
+        ODS_PROJECT, GH_REPLY_STATS_TABLE, GH_REPLY_STATS_HOUR_TABLE,
+        STATS_PERIOD_DAYS, run_dt, run_hour)
 
     #TODO: support to set base manually
     dt_version = f'{run_dt}{run_hour}'
 
     # 获取当前base信息, 策略表dt_version(ctime partition)采用当前时间
     base_strategy_df = get_last_strategy_result(
-        project, rank_table, dt_version, BASE_GROUP_NAME)
+        ODS_PROJECT, rank_table, dt_version, BASE_GROUP_NAME)
 
     default_stats_df = stats_df.query('gh_id == "default"')
 
@@ -227,10 +236,8 @@ def build_and_transfer_data(run_dt, run_hour, project, **kwargs):
     gh_df = get_and_update_gh_ids(next_dt)
 
     layer1_rank = rank_for_layer1(run_dt, run_hour, ODS_PROJECT, EXPLORE_POOL_TABLE, gh_df)
-    layer2_rank = rank_for_layer2(run_dt, run_hour, ODS_PROJECT,
-                                  GH_REPLY_STATS_TABLE, ODPS_RANK_RESULT_TABLE)
-    base_rank = rank_for_base(run_dt, run_hour, ODS_PROJECT,
-                              GH_REPLY_STATS_TABLE, ODPS_RANK_RESULT_TABLE)
+    layer2_rank = rank_for_layer2( run_dt, run_hour, ODPS_RANK_RESULT_TABLE)
+    base_rank = rank_for_base(run_dt, run_hour, ODPS_RANK_RESULT_TABLE)
     final_rank_df = pd.concat([layer1_rank, layer2_rank, base_rank]).reset_index(drop=True)
     check_result_data(final_rank_df)
 
@@ -269,23 +276,29 @@ def build_and_transfer_data(run_dt, run_hour, project, **kwargs):
 def main_loop():
     argparser = ArgumentParser()
     argparser.add_argument('-n', '--dry-run', action='store_true')
+    argparser.add_argument('--run-at',help='assume to run at date and hour, yyyyMMddHH')
     args = argparser.parse_args()
 
+    run_date = datetime.today()
+    if args.run_at:
+        run_date = datetime.strptime(args.run_at, "%Y%m%d%H")
+        LOGGER.info(f"Assume to run at {run_date.strftime('%Y-%m-%d %H:00')}")
+
     try:
         now_date = datetime.today()
         LOGGER.info(f"开始执行: {datetime.strftime(now_date, '%Y-%m-%d %H:%M')}")
-        now_hour = now_date.strftime("%H")
 
-        last_date = now_date - timedelta(1)
+        last_date = run_date - timedelta(1)
         last_dt = last_date.strftime("%Y%m%d")
         # 查看当前天级更新的数据是否已准备好
         # 当前上游统计表为天级更新,但字段设计为兼容小时级
         h_data_count = check_data_partition(ODS_PROJECT, GH_REPLY_STATS_TABLE, last_dt, '00')
         if h_data_count > 0:
             LOGGER.info('上游数据表查询数据条数={},开始计算'.format(h_data_count))
-            run_dt = now_date.strftime("%Y%m%d")
-            LOGGER.info(f'run_dt: {run_dt}, run_hour: {now_hour}')
-            build_and_transfer_data(run_dt, now_hour, ODS_PROJECT,
+            run_dt = run_date.strftime("%Y%m%d")
+            run_hour = run_date.strftime("%H")
+            LOGGER.info(f'run_dt: {run_dt}, run_hour: {run_hour}')
+            build_and_transfer_data(run_dt, run_hour, ODS_PROJECT,
                                     dry_run=args.dry_run)
             LOGGER.info('数据更新完成')
         else: