zhangbo 1 tahun lalu
induk
melakukan
69cc68049d
1 mengubah file dengan 57 tambahan dan 0 penghapusan
  1. 57 0
      zhangbo/utils.py

+ 57 - 0
zhangbo/utils.py

@@ -1,3 +1,60 @@
+# -*- coding: utf-8 -*-
+from odps import ODPS
+import argparse
 
+ODPS_CONFIG = {
+        'ENDPOINT': 'http://service.cn.maxcompute.aliyun.com/api',
+        'ACCESSID': 'LTAIWYUujJAm7CbH',
+        'ACCESSKEY': 'RfSjdiWwED1sGFlsjXv0DlfTnZTG1P',
+}
 
+def check_data(project, table, partition) -> int:
+    """检查数据是否准备好,输出数据条数"""
+    odps = ODPS(
+        access_id=ODPS_CONFIG['ACCESSID'],
+        secret_access_key=ODPS_CONFIG['ACCESSKEY'],
+        project=project,
+        endpoint=ODPS_CONFIG['ENDPOINT'],
+        connect_timeout=3000,
+        read_timeout=500000,
+        pool_maxsize=1000,
+        pool_connections=1000
+    )
+    try:
+        t = odps.get_table(name=table)
+        check_res = t.exist_partition(partition_spec=f'dt={partition}')
+        if check_res:
+            sql = f'select * from {project}.{table} where dt = {partition}'
+            with odps.execute_sql(sql=sql).open_reader() as reader:
+                data_count = reader.count
+        else:
+            data_count = 0
+    except Exception as e:
+        print("error:" + str(e))
+        data_count = 0
+    return data_count
+
+
+def check_origin_hive(args):
+    project = "loghubods"
+    table = "alg_recsys_view_sample_v2"
+    partition = args.partition
+    count = check_data(project, table, partition)
+    if count == 0:
+        print("上游未就绪,返回1")
+        exit(1)
+    else:
+        print("上游已就绪,返回0")
+
+
+if __name__ == '__main__':
+    parser = argparse.ArgumentParser(description='脚本utils')
+    parser.add_argument('--excute_program', type=str, help='执行程序')
+    parser.add_argument('--partition', type=str, help='表分区')
+    args = parser.parse_args()
+    if args.excute_program == "check_origin_hive":
+        check_origin_hive(args)
+    else:
+        print("无合法参数,验证失败。")
+        exit(999)