Browse Source

Update alg_growth_gh_reply_video_v1: add dry run function

StrayWarrior 7 months ago
parent
commit
8dcba04b65
1 changed files with 14 additions and 3 deletions
  1. 14 3
      alg_growth_gh_reply_video_v1.py

+ 14 - 3
alg_growth_gh_reply_video_v1.py

@@ -14,6 +14,7 @@ from my_config import set_config
 import numpy as np
 from log import Log
 import os
+from argparse import ArgumentParser
 
 CONFIG, _ = set_config()
 LOGGER = Log()
@@ -174,7 +175,7 @@ def rank_for_base(run_dt, run_hour, project, stats_table, rank_table):
             base_strategy_df,
             on=['gh_id', 'video_id'],
             how='left') \
-        .query('strategy_key.notna() or day0_return > 100')
+        .query('strategy_key.notna() or score > 0.1')
 
     # 合并default和分账号数据
     grouped_stats_df = pd.concat([default_stats_df, stats_with_strategy_df]).reset_index()
@@ -201,8 +202,9 @@ def check_result_data(df):
                 raise Exception(f"Result not enough for gh_id[{gh_id}] group[{group}]")
 
 
-def build_and_transfer_data(run_dt, run_hour, project):
+def build_and_transfer_data(run_dt, run_hour, project, **kwargs):
     dt_version = f'{run_dt}{run_hour}'
+    dry_run = kwargs.get('dry_run', False)
 
     layer1_rank = rank_for_layer1(run_dt, run_hour, ODS_PROJECT, EXPLORE_POOL_TABLE)
     layer2_rank = rank_for_layer2(run_dt, run_hour, ODS_PROJECT,
@@ -226,6 +228,10 @@ def build_and_transfer_data(run_dt, run_hour, project):
     # reverse sending order
     final_df['sort'] = SEND_N + 1 - final_df['sort']
 
+    if dry_run:
+        print(final_df[['strategy_key', 'gh_id', 'sort', 'video_id', 'score', 'title']])
+        return
+
     # save to ODPS
     t = odps_instance.get_table(ODPS_RANK_RESULT_TABLE)
     part_spec_dict = {'dt': run_dt, 'hour': run_hour, 'ctime': dt_version}
@@ -241,6 +247,10 @@ def build_and_transfer_data(run_dt, run_hour, project):
 
 
 def main_loop():
+    argparser = ArgumentParser()
+    argparser.add_argument('-n', '--dry-run', action='store_true')
+    args = argparser.parse_args()
+
     try:
         now_date = datetime.today()
         LOGGER.info(f"开始执行: {datetime.strftime(now_date, '%Y-%m-%d %H:%M')}")
@@ -255,7 +265,8 @@ def main_loop():
             LOGGER.info('上游数据表查询数据条数={},开始计算'.format(h_data_count))
             run_dt = now_date.strftime("%Y%m%d")
             LOGGER.info(f'run_dt: {run_dt}, run_hour: {now_hour}')
-            build_and_transfer_data(run_dt, now_hour, ODS_PROJECT)
+            build_and_transfer_data(run_dt, now_hour, ODS_PROJECT,
+                                    dry_run=args.dry_run)
             LOGGER.info('数据更新完成')
         else:
             LOGGER.info("上游数据未就绪,等待60s")