Ver Fonte

Update alg_growth_3rd_gh_reply_video_v1: support manually run

StrayWarrior há 6 meses atrás
pai
commit
488fb85ba3
1 ficheiros alterados com 15 adições e 10 exclusões
  1. 15 10
      alg_growth_3rd_gh_reply_video_v1.py

+ 15 - 10
alg_growth_3rd_gh_reply_video_v1.py

@@ -49,8 +49,7 @@ CDN_IMG_OPERATOR = "?x-oss-process=image/resize,m_fill,w_600,h_480,limit_0/forma
 ODS_PROJECT = "loghubods"
 EXPLORE_POOL_TABLE = 'alg_growth_video_return_stats_history'
 GH_REPLY_STATS_TABLE = 'alg_growth_3rd_gh_reply_video_stats'
-# ODPS_RANK_RESULT_TABLE = 'alg_gh_autoreply_video_rank_data'
-ODPS_3RD_RANK_RESULT_TABLE = 'alg_3rd_gh_autoreply_video_rank_data'
+ODPS_RANK_RESULT_TABLE = 'alg_3rd_gh_autoreply_video_rank_data'
 GH_DETAIL = 'gh_detail'
 RDS_RANK_RESULT_TABLE = 'alg_gh_autoreply_video_rank_data'
 STATS_PERIOD_DAYS = 5
@@ -276,8 +275,8 @@ def build_and_transfer_data(run_dt, run_hour, project, **kwargs):
     gh_df = get_and_update_gh_ids(run_dt)
 
     layer1_rank = rank_for_layer1(run_dt, run_hour, ODS_PROJECT, EXPLORE_POOL_TABLE, ,gh_df)
-    # layer2_rank = rank_for_layer2(run_dt, run_hour, ODS_PROJECT, GH_REPLY_STATS_TABLE, ODPS_3RD_RANK_RESULT_TABLE)
-    # base_rank = rank_for_base(run_dt, run_hour, ODS_PROJECT, GH_REPLY_STATS_TABLE, ODPS_3RD_RANK_RESULT_TABLE,BASE_GROUP_NAME)
+    # layer2_rank = rank_for_layer2(run_dt, run_hour, ODS_PROJECT, GH_REPLY_STATS_TABLE, ODPS_RANK_RESULT_TABLE)
+    # base_rank = rank_for_base(run_dt, run_hour, ODS_PROJECT, GH_REPLY_STATS_TABLE, ODPS_RANK_RESULT_TABLE,BASE_GROUP_NAME)
     layer2_rank = rank_for_base_designate(run_dt, run_hour, EXPLORE2_GROUP_NAME)
     base_rank = rank_for_base_designate(run_dt, run_hour, BASE_GROUP_NAME)
 
@@ -303,7 +302,7 @@ def build_and_transfer_data(run_dt, run_hour, project, **kwargs):
         return
 
     # save to ODPS
-    t = odps_instance.get_table(ODPS_3RD_RANK_RESULT_TABLE)
+    t = odps_instance.get_table(ODPS_RANK_RESULT_TABLE)
     part_spec_dict = {'dt': run_dt, 'hour': run_hour, 'ctime': dt_version}
     part_spec = ','.join(['{}={}'.format(k, part_spec_dict[k]) for k in part_spec_dict.keys()])
     with t.open_writer(partition=part_spec, create_partition=True, overwrite=True) as writer:
@@ -319,23 +318,29 @@ def build_and_transfer_data(run_dt, run_hour, project, **kwargs):
 def main_loop():
     argparser = ArgumentParser()
     argparser.add_argument('-n', '--dry-run', action='store_true')
+    argparser.add_argument('--run-at',help='assume to run at date and hour, yyyyMMddHH')
     args = argparser.parse_args()
 
+    run_date = datetime.today()
+    if args.run_at:
+        run_date = datetime.strptime(args.run_at, "%Y%m%d%H")
+        LOGGER.info(f"Assume to run at {run_date.strftime('%Y-%m-%d %H:00')}")
+
     try:
         now_date = datetime.today()
         LOGGER.info(f"开始执行: {datetime.strftime(now_date, '%Y-%m-%d %H:%M')}")
-        now_hour = now_date.strftime("%H")
 
-        last_date = now_date - timedelta(1)
+        last_date = run_date - timedelta(1)
         last_dt = last_date.strftime("%Y%m%d")
         # 查看当前天级更新的数据是否已准备好
         # 当前上游统计表为天级更新,但字段设计为兼容小时级
         h_data_count = check_data_partition(ODS_PROJECT, GH_REPLY_STATS_TABLE, last_dt, '00')
         if h_data_count > 0:
             LOGGER.info('上游数据表查询数据条数={},开始计算'.format(h_data_count))
-            run_dt = now_date.strftime("%Y%m%d")
-            LOGGER.info(f'run_dt: {run_dt}, run_hour: {now_hour}')
-            build_and_transfer_data(run_dt, now_hour, ODS_PROJECT,
+            run_dt = run_date.strftime("%Y%m%d")
+            run_hour = run_date.strftime("%H")
+            LOGGER.info(f'run_dt: {run_dt}, run_hour: {run_hour}')
+            build_and_transfer_data(run_dt, run_hour, ODS_PROJECT,
                                     dry_run=args.dry_run)
             LOGGER.info('数据更新完成')
         else: