Browse Source

feat:添加校验AD数据方法

zhaohaipeng 9 months ago
parent
commit
6fe639e90c
1 changed files with 27 additions and 1 deletions
  1. 27 1
      zhangbo/utils.py

+ 27 - 1
zhangbo/utils.py

@@ -35,6 +35,32 @@ def check_data(project, table, partition) -> int:
     return data_count
 
 
+def check_data_hh(project, table, partition) -> int:
+    """检查数据是否准备好,输出数据条数"""
+    odps = ODPS(
+        access_id=ODPS_CONFIG['ACCESSID'],
+        secret_access_key=ODPS_CONFIG['ACCESSKEY'],
+        project=project,
+        endpoint=ODPS_CONFIG['ENDPOINT'],
+        connect_timeout=3000,
+        read_timeout=500000,
+        pool_maxsize=1000,
+        pool_connections=1000
+    )
+    try:
+        t = odps.get_table(name=table)
+        check_res = t.exist_partition(partition_spec=f'dt={partition},hh=0')
+        if check_res:
+            sql = f'select * from {project}.{table} where dt = {partition}'
+            with odps.execute_sql(sql=sql).open_reader() as reader:
+                data_count = reader.count
+        else:
+            data_count = 0
+    except Exception as e:
+        print("error:" + str(e))
+        data_count = 0
+    return data_count
+
 def check_origin_hive(args):
     project = "loghubods"
     table = "alg_recsys_view_sample_v2"
@@ -82,7 +108,7 @@ def check_ad_origin_hive(args):
     project = "loghubods"
     table = "alg_recsys_ad_sample_all_new"
     partition = args.partition
-    count = check_data(project, table, partition)
+    count = check_data_hh(project, table, partition)
     if count == 0:
         print("1")
         exit(1)