# -*- coding: utf-8 -*-
from odps import ODPS
import argparse
ODPS_CONFIG = {
'ENDPOINT': 'http://service.cn.maxcompute.aliyun.com/api',
'ACCESSID': 'LTAIWYUujJAm7CbH',
'ACCESSKEY': 'RfSjdiWwED1sGFlsjXv0DlfTnZTG1P',
}
def check_data_hh(project, table, partition, hh) -> int:
"""检查数据是否准备好,输出数据条数"""
odps = ODPS(
access_id=ODPS_CONFIG['ACCESSID'],
secret_access_key=ODPS_CONFIG['ACCESSKEY'],
project=project,
endpoint=ODPS_CONFIG['ENDPOINT'],
connect_timeout=3000,
read_timeout=500000,
pool_maxsize=1000,
pool_connections=1000
)
try:
t = odps.get_table(name=table)
check_res = t.exist_partition(partition_spec=f'dt={partition},hh={hh}')
if check_res:
sql = f'select * from {project}.{table} where dt = {partition}'
with odps.execute_sql(sql=sql).open_reader() as reader:
data_count = reader.count
else:
data_count = 0
except Exception as e:
print("error:" + str(e))
data_count = 0
return data_count
def check_ad_origin_hive(args):
project = "loghubods"
table = "alg_recsys_ad_sample_all"
partition = args.partition
hh = args.hh
count = check_data_hh(project, table, partition, hh)
if count == 0:
print("1")
exit(1)
else:
print("0")
if __name__ == '__main__':
parser = argparse.ArgumentParser(description='脚本utils')
parser.add_argument('--excute_program', type=str, help='执行程序')
parser.add_argument('--partition', type=str, help='表分区')
parser.add_argument('--hh', type=str, help='小时级分区时的小时')
parser.add_argument('--project', type=str, help='表空间')
parser.add_argument('--table', type=str, help='表名')
args = parser.parse_args()
if args.excute_program == "check_ad_origin_hive":
check_ad_origin_hive(args)
else:
print("无合法参数,验证失败。")
exit(999)