| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131 | # -*- coding: utf-8 -*-from odps import ODPSfrom FeishuBot import FeishuBotimport argparseODPS_CONFIG = {    'ENDPOINT': 'http://service.cn.maxcompute.aliyun.com/api',    'ACCESSID': 'LTAIWYUujJAm7CbH',    'ACCESSKEY': 'RfSjdiWwED1sGFlsjXv0DlfTnZTG1P',}def check_origin_hive(args):    project = "loghubods"    table = args.table    beginStr = args.beginStr    endStr = args.endStr    # 检查从begin到end的每一个小时级分区数据是否存在,有一个存在即算存在可以处理    # 如果全都为空报警    time_sequence = generate_time_sequence(beginStr, endStr)    # exist_partition = []    for time_str in time_sequence:        result = split_date_time(time_str)        partitionDt = result[0]        partitionHh = result[1]        count = check_data(project, table, partitionDt, partitionHh)        if count == 0:            bot = FeishuBot()            # msg = (            #     f'推荐模型数据更新 \n【任务名称】:step1校验hive数据源\n【是否成功】:success\n【信息】:table:{table},beginStr:{beginStr},endStr:{endStr}\n【详细日志】:{exist_partition}')            msg = (                f'推荐模型数据更新 \n【任务名称】:step1校验hive数据源\n【是否成功】:error\n【信息】:table:{table},{time_str}分区数据不存在,继续检查')            bot.send_message(msg)            print('1')            exit(1)        else:            continue    print('0')        # exist_partition.append(f'分区:dt={partitionDt}/hh={partitionHh},数据:{count}')# if len(exist_partition) == 0:#     print('1')#     exit(1)# else:#     bot = FeishuBot()#     msg = (#         f'推荐模型数据更新 \n【任务名称】:step1校验hive数据源\n【是否成功】:success\n【信息】:table:{table},beginStr:{beginStr},endStr:{endStr}\n【详细日志】:{exist_partition}')#     bot.send_message(msg)# print('0')def check_data(project, table, partitionDt, partitionDtHh) -> int:    """检查数据是否准备好,输出数据条数"""    odps = ODPS(        access_id=ODPS_CONFIG['ACCESSID'],        secret_access_key=ODPS_CONFIG['ACCESSKEY'],        project=project,        endpoint=ODPS_CONFIG['ENDPOINT'],        # connect_timeout=300000,        # read_timeout=500000,        # pool_maxsize=1000,        # pool_connections=1000    )    try:        t = odps.get_table(name=table)        # check_res = t.exist_partition(partition_spec=f'dt={partition}')        # 含有hh分区        # if not {partitionDtHh}:        check_res = t.exist_partition(partition_spec=f'dt={partitionDt},hh={partitionDtHh}')        if check_res:            sql = f'select * from {project}.{table} where dt = {partitionDt} and hh={partitionDtHh}'            with odps.execute_sql(sql=sql).open_reader() as reader:                data_count = reader.count        else:            data_count = 0        # else:        #     check_res = t.exist_partition(partition_spec=f'dt={partitionDt}')        #     if check_res:        #         sql = f'select * from {project}.{table} where dt = {partitionDt}'        #         with odps.execute_sql(sql=sql).open_reader() as reader:        #             data_count = reader.count        #     else:        #         data_count = 0    except Exception as e:        print("error:" + str(e))        data_count = 0    return data_countdef generate_time_sequence(beginStr, endStr):    # 将字符串时间转换为datetime对象    from datetime import datetime, timedelta    # 定义时间格式    time_format = "%Y%m%d%H"    # 转换字符串为datetime对象    begin_time = datetime.strptime(beginStr, time_format)    end_time = datetime.strptime(endStr, time_format)    # 生成时间序列    time_sequence = []    current_time = begin_time    while current_time <= end_time:        # 将datetime对象转换回指定格式的字符串        time_sequence.append(current_time.strftime(time_format))        # 增加一个小时        current_time += timedelta(hours=1)    return time_sequencedef split_date_time(date_time_str):    # 假设date_time_str是一个长度为12的字符串,格式为YYYYMMDDHH    # 切片获取日期部分(前8位)和时间部分(后4位中的前2位,因为后两位可能是分钟或秒,但这里只取小时)    date_part = date_time_str[:8]    time_part = date_time_str[8:10]  # 只取小时部分    # 将结果存储在一个数组中(在Python中通常使用列表)    result = [date_part, time_part]    return resultif __name__ == '__main__':    parser = argparse.ArgumentParser(description='脚本utils')    parser.add_argument('--beginStr', type=str, help='表分区Dt,beginStr')    parser.add_argument('--endStr', type=str, help='表分区Hh,endStr')    parser.add_argument('--table', type=str, help='表名')    argv = parser.parse_args()    check_origin_hive(argv)
 |