# -*- coding: utf-8 -*- from odps import ODPS from FeishuBot import FeishuBot import argparse ODPS_CONFIG = { 'ENDPOINT': 'http://service.cn.maxcompute.aliyun.com/api', 'ACCESSID': 'LTAIWYUujJAm7CbH', 'ACCESSKEY': 'RfSjdiWwED1sGFlsjXv0DlfTnZTG1P', } def check_origin_hive(args): project = "loghubods" # table = "alg_recsys_view_sample_v2" table = args.table beginStr = args.beginStr endStr = args.endStr # 检查从begin到end的每一个小时级分区数据是否存在,有一个存在即算存在可以处理 # 如果全都为空报警 time_sequence = generate_time_sequence(beginStr, endStr) exist_partition = [] for time_str in time_sequence: result = split_date_time(time_str) partitionDt = result[0] partitionHh = result[1] count = check_data(project, table, partitionDt, partitionHh) if count == 0: print(f'分区:dt={partitionDt}/hh={partitionHh},数据为空') else: exist_partition.append(f'分区:dt={partitionDt}/hh={partitionHh},数据:{count}') if len(exist_partition) == 0: exit(1) else: bot = FeishuBot() msg = ( f'推荐模型数据更新 \n --step1【校验hive数据源】【success】:\n beginStr:{beginStr},endStr:{endStr}\n,detail:{exist_partition}') bot.send_message(msg) def check_data(project, table, partitionDt, partitionDtHh) -> int: """检查数据是否准备好,输出数据条数""" odps = ODPS( access_id=ODPS_CONFIG['ACCESSID'], secret_access_key=ODPS_CONFIG['ACCESSKEY'], project=project, endpoint=ODPS_CONFIG['ENDPOINT'], # connect_timeout=300000, # read_timeout=500000, # pool_maxsize=1000, # pool_connections=1000 ) try: t = odps.get_table(name=table) # check_res = t.exist_partition(partition_spec=f'dt={partition}') # 含有hh分区 # if not {partitionDtHh}: check_res = t.exist_partition(partition_spec=f'dt={partitionDt},hh={partitionDtHh}') if check_res: sql = f'select * from {project}.{table} where dt = {partitionDt} and hh={partitionDtHh}' with odps.execute_sql(sql=sql).open_reader() as reader: data_count = reader.count else: data_count = 0 # else: # check_res = t.exist_partition(partition_spec=f'dt={partitionDt}') # if check_res: # sql = f'select * from {project}.{table} where dt = {partitionDt}' # with odps.execute_sql(sql=sql).open_reader() as reader: # data_count = reader.count # else: # data_count = 0 except Exception as e: print("error:" + str(e)) data_count = 0 return data_count def generate_time_sequence(beginStr, endStr): # 将字符串时间转换为datetime对象 from datetime import datetime, timedelta # 定义时间格式 time_format = "%Y%m%d%H" # 转换字符串为datetime对象 begin_time = datetime.strptime(beginStr, time_format) end_time = datetime.strptime(endStr, time_format) # 生成时间序列 time_sequence = [] current_time = begin_time while current_time <= end_time: # 将datetime对象转换回指定格式的字符串 time_sequence.append(current_time.strftime(time_format)) # 增加一个小时 current_time += timedelta(hours=1) return time_sequence def split_date_time(date_time_str): # 假设date_time_str是一个长度为12的字符串,格式为YYYYMMDDHH # 切片获取日期部分(前8位)和时间部分(后4位中的前2位,因为后两位可能是分钟或秒,但这里只取小时) date_part = date_time_str[:8] time_part = date_time_str[8:10] # 只取小时部分 # 将结果存储在一个数组中(在Python中通常使用列表) result = [date_part, time_part] return result if __name__ == '__main__': parser = argparse.ArgumentParser(description='脚本utils') parser.add_argument('--beginStr', type=str, help='表分区Dt,beginStr') parser.add_argument('--endStr', type=str, help='表分区Hh,endStr') parser.add_argument('--table', type=str, help='表名') argv = parser.parse_args() check_origin_hive(argv)