# -*- coding: utf-8 -*- from odps import ODPS import argparse ODPS_CONFIG = { 'ENDPOINT': 'http://service.cn.maxcompute.aliyun.com/api', 'ACCESSID': 'LTAIWYUujJAm7CbH', 'ACCESSKEY': 'RfSjdiWwED1sGFlsjXv0DlfTnZTG1P', } def check_origin_hive(args): project = "loghubods" # table = "alg_recsys_view_sample_v2" table = args.table partitionDt = args.partitionDt partitionHh = args.partitionHh count = check_data(project, table, partitionDt, partitionHh) if count == 0: print("1") exit(1) else: print('存在 数据 size:', count) print("0") def check_data(project, table, partitionDt, partitionDtHh) -> int: """检查数据是否准备好,输出数据条数""" odps = ODPS( access_id=ODPS_CONFIG['ACCESSID'], secret_access_key=ODPS_CONFIG['ACCESSKEY'], project=project, endpoint=ODPS_CONFIG['ENDPOINT'], # connect_timeout=300000, # read_timeout=500000, # pool_maxsize=1000, # pool_connections=1000 ) try: t = odps.get_table(name=table) # check_res = t.exist_partition(partition_spec=f'dt={partition}') # 含有hh分区 if not {partitionDtHh}: check_res = t.exist_partition(partition_spec=f'dt={partitionDt},hh={partitionDtHh}') if check_res: sql = f'select * from {project}.{table} where dt = {partitionDt} and hh={partitionDtHh}' with odps.execute_sql(sql=sql).open_reader() as reader: data_count = reader.count else: data_count = 0 else: check_res = t.exist_partition(partition_spec=f'dt={partitionDt}') if check_res: sql = f'select * from {project}.{table} where dt = {partitionDt}' with odps.execute_sql(sql=sql).open_reader() as reader: data_count = reader.count else: data_count = 0 except Exception as e: print("error:" + str(e)) data_count = 0 return data_count if __name__ == '__main__': parser = argparse.ArgumentParser(description='脚本utils') # parser.add_argument('--excute_program', type=str, help='执行程序') parser.add_argument('--partitionDt', type=str, help='表分区Dt') parser.add_argument('--partitionHh', type=str, help='表分区Hh') # parser.add_argument('--project', type=str, help='表空间') parser.add_argument('--table', type=str, help='表名') argv = parser.parse_args() # args = parser.parse_args() # table = argv[1] # partition = argv[2] # table = 'alg_recsys_sample_all' # partition = '20240703' check_origin_hive(argv)