# -*- coding: utf-8 -*-
from odps import ODPS
import argparse
ODPS_CONFIG = {
'ENDPOINT': 'http://service.cn.maxcompute.aliyun.com/api',
'ACCESSID': 'LTAIWYUujJAm7CbH',
'ACCESSKEY': 'RfSjdiWwED1sGFlsjXv0DlfTnZTG1P',
}
def check_data(project, table, partition) -> int:
"""检查数据是否准备好,输出数据条数"""
odps = ODPS(
access_id=ODPS_CONFIG['ACCESSID'],
secret_access_key=ODPS_CONFIG['ACCESSKEY'],
project=project,
endpoint=ODPS_CONFIG['ENDPOINT'],
connect_timeout=3000,
read_timeout=500000,
pool_maxsize=1000,
pool_connections=1000
)
try:
t = odps.get_table(name=table)
check_res = t.exist_partition(partition_spec=f'dt={partition}')
if check_res:
sql = f'select * from {project}.{table} where dt = {partition}'
with odps.execute_sql(sql=sql).open_reader() as reader:
data_count = reader.count
else:
data_count = 0
except Exception as e:
print("error:" + str(e))
data_count = 0
return data_count
def check_origin_hive(args):
project = "loghubods"
table = "alg_recsys_view_sample_v2"
partition = args.partition
count = check_data(project, table, partition)
if count == 0:
print("1")
exit(1)
else:
print("0")
def check_item_hive(args):
project = "loghubods"
table = "alg_recsys_video_info"
partition = args.partition
count = check_data(project, table, partition)
if count == 0:
print("1")
exit(1)
else:
print("0")
def check_user_hive(args):
project = "loghubods"
table = "alg_recsys_user_info"
partition = args.partition
count = check_data(project, table, partition)
if count == 0:
print("1")
exit(1)
else:
print("0")
if __name__ == '__main__':
parser = argparse.ArgumentParser(description='脚本utils')
parser.add_argument('--excute_program', type=str, help='执行程序')
parser.add_argument('--partition', type=str, help='表分区')
args = parser.parse_args()
if args.excute_program == "check_origin_hive":
check_origin_hive(args)
if args.excute_program == "check_item_hive":
check_item_hive(args)
if args.excute_program == "check_user_hive":
check_user_hive(args)
else:
print("无合法参数,验证失败。")
exit(999)