Browse Source

update data_check

liqian 2 years ago
parent
commit
1e2d62c339
4 changed files with 60 additions and 17 deletions
  1. 9 5
      region_rule_rank_h.py
  2. 9 5
      region_rule_rank_h_by24h.py
  3. 8 4
      rule_rank_h_by_24h.py
  4. 34 3
      utils.py

+ 9 - 5
region_rule_rank_h.py

@@ -13,7 +13,7 @@ import math
 from functools import reduce
 from odps import ODPS
 from threading import Timer
-from utils import MysqlHelper, RedisHelper, get_data_from_odps, filter_video_status, filter_shield_video
+from utils import MysqlHelper, RedisHelper, get_data_from_odps, filter_video_status, filter_shield_video, check_table_partition_exits
 from config import set_config
 from log import Log
 from check_video_limit_distribute import update_limit_video_score
@@ -65,9 +65,13 @@ def h_data_check(project, table, now_date):
 
     try:
         dt = datetime.datetime.strftime(now_date, '%Y%m%d%H')
-        sql = f'select * from {project}.{table} where dt = {dt}'
-        with odps.execute_sql(sql=sql).open_reader() as reader:
-            data_count = reader.count
+        check_res = check_table_partition_exits(date=dt, project=project, table=table)
+        if check_res:
+            sql = f'select * from {project}.{table} where dt = {dt}'
+            with odps.execute_sql(sql=sql).open_reader() as reader:
+                data_count = reader.count
+        else:
+            data_count = 0
     except Exception as e:
         data_count = 0
     return data_count
@@ -561,7 +565,7 @@ def h_timer_check():
     # 查看当前小时更新的数据是否已准备好
     h_data_count = h_data_check(project=project, table=table, now_date=now_date)
     if h_data_count > 0:
-        log_.info(f'h_data_count = {h_data_count}')
+        log_.info(f'region_h_data_count = {h_data_count}')
         # 数据准备好,进行更新
         rank_by_h(now_date=now_date, now_h=now_h, rule_params=rule_params,
                   project=project, table=table, region_code_list=region_code_list)

+ 9 - 5
region_rule_rank_h_by24h.py

@@ -13,7 +13,7 @@ import math
 from functools import reduce
 from odps import ODPS
 from threading import Timer
-from utils import RedisHelper, get_data_from_odps, filter_video_status
+from utils import RedisHelper, get_data_from_odps, filter_video_status, check_table_partition_exits
 from config import set_config
 from log import Log
 
@@ -71,9 +71,13 @@ def data_check(project, table, now_date):
 
     try:
         dt = datetime.datetime.strftime(now_date, '%Y%m%d%H')
-        sql = f'select * from {project}.{table} where dt = {dt}'
-        with odps.execute_sql(sql=sql).open_reader() as reader:
-            data_count = reader.count
+        check_res = check_table_partition_exits(date=dt, project=project, table=table)
+        if check_res:
+            sql = f'select * from {project}.{table} where dt = {dt}'
+            with odps.execute_sql(sql=sql).open_reader() as reader:
+                data_count = reader.count
+        else:
+            data_count = 0
     except Exception as e:
         data_count = 0
     return data_count
@@ -362,7 +366,7 @@ def h_timer_check():
     # 查看当天更新的数据是否已准备好
     h_data_count = data_check(project=project, table=table, now_date=now_date)
     if h_data_count > 0:
-        log_.info(f'24h_data_count = {h_data_count}')
+        log_.info(f'region_24h_data_count = {h_data_count}')
         # 数据准备好,进行更新
         rank_by_24h(now_date=now_date, now_h=now_h, rule_params=rule_params,
                     project=project, table=table, region_code_list=region_code_list)

+ 8 - 4
rule_rank_h_by_24h.py

@@ -6,7 +6,7 @@ from threading import Timer
 from datetime import datetime, timedelta
 from get_data import get_data_from_odps
 from db_helper import RedisHelper
-from utils import filter_video_status
+from utils import filter_video_status, check_table_partition_exits
 from config import set_config
 from log import Log
 
@@ -67,9 +67,13 @@ def h_data_check(project, table, now_date, now_h):
             dt = f"{datetime.strftime(now_date - timedelta(days=1), '%Y%m%d')}22"
         else:
             dt = datetime.strftime(now_date, '%Y%m%d%H')
-        sql = f'select * from {project}.{table} where dt = {dt}'
-        with odps.execute_sql(sql=sql).open_reader() as reader:
-            data_count = reader.count
+        check_res = check_table_partition_exits(date=dt, project=project, table=table)
+        if check_res:
+            sql = f'select * from {project}.{table} where dt = {dt}'
+            with odps.execute_sql(sql=sql).open_reader() as reader:
+                data_count = reader.count
+        else:
+            data_count = 0
     except Exception as e:
         data_count = 0
     return data_count

+ 34 - 3
utils.py

@@ -57,6 +57,33 @@ def get_data_from_odps(date, project, table, connect_timeout=3000, read_timeout=
     return records
 
 
+def check_table_partition_exits(date, project, table, connect_timeout=3000, read_timeout=500000,
+                                pool_maxsize=1000, pool_connections=1000):
+    """
+    判断表中是否存在这个分区
+    :param date: 日期 type-string '%Y%m%d'
+    :param project: type-string
+    :param table: 表名 type-string
+    :param connect_timeout: 连接超时设置
+    :param read_timeout: 读取超时设置
+    :param pool_maxsize:
+    :param pool_connections:
+    :return: records
+    """
+    odps = ODPS(
+        access_id=config_.ODPS_CONFIG['ACCESSID'],
+        secret_access_key=config_.ODPS_CONFIG['ACCESSKEY'],
+        project=project,
+        endpoint=config_.ODPS_CONFIG['ENDPOINT'],
+        connect_timeout=connect_timeout,
+        read_timeout=read_timeout,
+        pool_maxsize=pool_maxsize,
+        pool_connections=pool_connections
+    )
+    t = odps.get_table(name=table)
+    return t.exist_partition(partition_spec=f'dt={date}')
+
+
 def write_to_pickle(data, filename, filepath=config_.DATA_DIR_PATH):
     """
     将数据写入pickle文件中
@@ -351,6 +378,10 @@ if __name__ == '__main__':
     # data_test = [9.20273281e+03, 7.00795065e+03, 5.54813112e+03, 9.97402494e-01, 9.96402495e-01, 9.96402494e-01]
     # data_normalization(data_test)
     # request_post(request_url=config_.NOTIFY_BACKEND_UPDATE_ROV_SCORE_URL, request_data={'videos': []})
-    video_ids = [110, 112, 113, 115, 116, 117, 8289883]
-    update_video_w_h_rate(video_ids=video_ids, key_name='')
-
+    # video_ids = [110, 112, 113, 115, 116, 117, 8289883]
+    # update_video_w_h_rate(video_ids=video_ids, key_name='')
+    project = config_.PROJECT_24H_APP_TYPE
+    table = config_.TABLE_24H_APP_TYPE
+    dt = '2022080115'
+    check_res = check_table_partition_exits(date=dt, project=project, table=table)
+    print(check_res)