소스 검색

update get feature data dt

liqian 2 년 전
부모
커밋
3008a5d55d
1개의 변경된 파일19개의 추가작업 그리고 6개의 파일을 삭제
  1. 19 6
      rule_rank_h_by_24h.py

+ 19 - 6
rule_rank_h_by_24h.py

@@ -37,7 +37,7 @@ def get_rov_redis_key(now_date):
     return key_name
 
 
-def h_data_check(project, table, now_date):
+def h_data_check(project, table, now_date, now_h):
     """检查数据是否准备好"""
     odps = ODPS(
         access_id=config_.ODPS_CONFIG['ACCESSID'],
@@ -51,7 +51,13 @@ def h_data_check(project, table, now_date):
     )
 
     try:
-        dt = datetime.strftime(now_date, '%Y%m%d%H')
+        # 23点开始到8点之前(不含8点),全部用22点生成那个列表
+        if now_h == 23:
+            dt = datetime.strftime(now_date - timedelta(hours=1), '%Y%m%d%H')
+        elif now_h < 8:
+            dt = f"{datetime.strftime(now_date - timedelta(days=1), '%Y%m%d')}22"
+        else:
+            dt = datetime.strftime(now_date, '%Y%m%d%H')
         sql = f'select * from {project}.{table} where dt = {dt}'
         with odps.execute_sql(sql=sql).open_reader() as reader:
             data_count = reader.count
@@ -60,9 +66,16 @@ def h_data_check(project, table, now_date):
     return data_count
 
 
-def get_feature_data(now_date, project, table):
+def get_feature_data(now_date, now_h, project, table):
     """获取特征数据"""
-    dt = datetime.strftime(now_date, '%Y%m%d%H')
+    # 23点开始到8点之前(不含8点),全部用22点生成那个列表
+    if now_h == 23:
+        dt = datetime.strftime(now_date - timedelta(hours=1), '%Y%m%d%H')
+    elif now_h < 8:
+        dt = f"{datetime.strftime(now_date - timedelta(days=1), '%Y%m%d')}22"
+    else:
+        dt = datetime.strftime(now_date, '%Y%m%d%H')
+    log_.info({'feature_dt': dt})
     # dt = '20220425'
     records = get_data_from_odps(date=dt, project=project, table=table)
     feature_data = []
@@ -155,7 +168,7 @@ def video_rank_h(df, now_date, now_h, rule_key, param):
 
 def rank_by_h(now_date, now_h, rule_params, project, table):
     # 获取特征数据
-    feature_df = get_feature_data(now_date=now_date, project=project, table=table)
+    feature_df = get_feature_data(now_date=now_date, now_h=now_h, project=project, table=table)
     # rank
     for key, value in rule_params.items():
         log_.info(f"rule = {key}, param = {value}")
@@ -212,7 +225,7 @@ def h_timer_check():
     now_min = datetime.now().minute
     now_h = datetime.now().hour
     # 查看当前天级更新的数据是否已准备好
-    h_data_count = h_data_check(project=project, table=table, now_date=now_date)
+    h_data_count = h_data_check(project=project, table=table, now_date=now_date, now_h=now_h)
     if h_data_count > 0:
         log_.info(f'h_by24h_data_count = {h_data_count}')
         # 数据准备好,进行更新