123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899 |
- # -*- coding: utf-8 -*-
- from odps import ODPS
- import argparse
- ODPS_CONFIG = {
- 'ENDPOINT': 'http://service.cn.maxcompute.aliyun.com/api',
- 'ACCESSID': 'LTAIWYUujJAm7CbH',
- 'ACCESSKEY': 'RfSjdiWwED1sGFlsjXv0DlfTnZTG1P',
- }
- def check_data(project, table, partition) -> int:
- """检查数据是否准备好,输出数据条数"""
- odps = ODPS(
- access_id=ODPS_CONFIG['ACCESSID'],
- secret_access_key=ODPS_CONFIG['ACCESSKEY'],
- project=project,
- endpoint=ODPS_CONFIG['ENDPOINT'],
- connect_timeout=3000,
- read_timeout=500000,
- pool_maxsize=1000,
- pool_connections=1000
- )
- try:
- t = odps.get_table(name=table)
- check_res = t.exist_partition(partition_spec=f'dt={partition}')
- if check_res:
- sql = f'select * from {project}.{table} where dt = {partition}'
- with odps.execute_sql(sql=sql).open_reader() as reader:
- data_count = reader.count
- else:
- data_count = 0
- except Exception as e:
- print("error:" + str(e))
- data_count = 0
- return data_count
- def check_origin_hive(args):
- project = "loghubods"
- table = "alg_recsys_view_sample_v2"
- partition = args.partition
- count = check_data(project, table, partition)
- if count == 0:
- print("1")
- exit(1)
- else:
- print("0")
- def check_item_hive(args):
- project = "loghubods"
- table = "alg_recsys_video_info"
- partition = args.partition
- count = check_data(project, table, partition)
- if count == 0:
- print("1")
- exit(1)
- else:
- print("0")
- def check_user_hive(args):
- project = "loghubods"
- table = "alg_recsys_user_info"
- partition = args.partition
- count = check_data(project, table, partition)
- if count == 0:
- print("1")
- exit(1)
- else:
- print("0")
- def check_hive(args):
- project = args.project
- table = args.table
- partition = args.partition
- count = check_data(project, table, partition)
- if count == 0:
- print("1")
- exit(1)
- else:
- print("0")
- if __name__ == '__main__':
- parser = argparse.ArgumentParser(description='脚本utils')
- parser.add_argument('--excute_program', type=str, help='执行程序')
- parser.add_argument('--partition', type=str, help='表分区')
- parser.add_argument('--project', type=str, help='表空间')
- parser.add_argument('--table', type=str, help='表名')
- args = parser.parse_args()
- if args.excute_program == "check_origin_hive":
- check_origin_hive(args)
- elif args.excute_program == "check_item_hive":
- check_item_hive(args)
- elif args.excute_program == "check_user_hive":
- check_user_hive(args)
- elif args.excute_program == "check_hive":
- check_hive(args)
- else:
- print("无合法参数,验证失败。")
- exit(999)
|