12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394 |
- # -*- coding: utf-8 -*-
- from odps import ODPS
- from FeishuBot import FeishuBot
- import argparse
- ODPS_CONFIG = {
- 'ENDPOINT': 'http://service.cn.maxcompute.aliyun.com/api',
- 'ACCESSID': 'LTAIWYUujJAm7CbH',
- 'ACCESSKEY': 'RfSjdiWwED1sGFlsjXv0DlfTnZTG1P',
- }
- def check_origin_hive(args):
- project = "loghubods"
- # table = "alg_recsys_view_sample_v2"
- table = args.table
- partitionDt = args.partitionDt
- partitionHh = args.partitionHh
- count = check_data(project, table, partitionDt, partitionHh)
- if count == 0:
- print("1")
- exit(1)
- else:
- # print('存在 数据 size:', count)
- bot = FeishuBot()
- msg = (f'推荐模型数据更新 \n --step1【校验hive数据源】【success】:\n'
- f'{project}.{table},分区:dt={partitionDt}/hh={partitionHh},数据总量:{count}')
- bot.send_message(msg)
- print("0")
- def check_data(project, table, partitionDt, partitionDtHh) -> int:
- """检查数据是否准备好,输出数据条数"""
- odps = ODPS(
- access_id=ODPS_CONFIG['ACCESSID'],
- secret_access_key=ODPS_CONFIG['ACCESSKEY'],
- project=project,
- endpoint=ODPS_CONFIG['ENDPOINT'],
- # connect_timeout=300000,
- # read_timeout=500000,
- # pool_maxsize=1000,
- # pool_connections=1000
- )
- try:
- t = odps.get_table(name=table)
- # check_res = t.exist_partition(partition_spec=f'dt={partition}')
- # 含有hh分区
- # if not {partitionDtHh}:
- check_res = t.exist_partition(partition_spec=f'dt={partitionDt},hh={partitionDtHh}')
- if check_res:
- sql = f'select * from {project}.{table} where dt = {partitionDt} and hh={partitionDtHh}'
- with odps.execute_sql(sql=sql).open_reader() as reader:
- data_count = reader.count
- else:
- data_count = 0
- # else:
- # check_res = t.exist_partition(partition_spec=f'dt={partitionDt}')
- # if check_res:
- # sql = f'select * from {project}.{table} where dt = {partitionDt}'
- # with odps.execute_sql(sql=sql).open_reader() as reader:
- # data_count = reader.count
- # else:
- # data_count = 0
- except Exception as e:
- print("error:" + str(e))
- data_count = 0
- return data_count
- if __name__ == '__main__':
- parser = argparse.ArgumentParser(description='脚本utils')
- # parser.add_argument('--excute_program', type=str, help='执行程序')
- parser.add_argument('--partitionDt', type=str, help='表分区Dt')
- parser.add_argument('--partitionHh', type=str, help='表分区Hh')
- # parser.add_argument('--project', type=str, help='表空间')
- parser.add_argument('--table', type=str, help='表名')
- argv = parser.parse_args()
- # args = parser.parse_args()
- # table = argv[1]
- # partition = argv[2]
- # table = 'alg_recsys_sample_all'
- # partition = '20240703'
- check_origin_hive(argv)
- # if __name__ == '__main__':
- # project='1'
- # table='1'
- # partitionDt='1'
- # partitionHh='1'
- # count='1'
- # bot = FeishuBot
- # msg = f'读取project:${project},table:${table},分区:dt= ${partitionDt}/hh=${partitionHh},查询数据总量:${count}'
- # bot.send_message(msg)
|