123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131 |
- # -*- coding: utf-8 -*-
- from odps import ODPS
- from FeishuBot import FeishuBot
- import argparse
- ODPS_CONFIG = {
- 'ENDPOINT': 'http://service.cn.maxcompute.aliyun.com/api',
- 'ACCESSID': 'LTAIWYUujJAm7CbH',
- 'ACCESSKEY': 'RfSjdiWwED1sGFlsjXv0DlfTnZTG1P',
- }
- def check_origin_hive(args):
- project = "loghubods"
- table = args.table
- beginStr = args.beginStr
- endStr = args.endStr
- # 检查从begin到end的每一个小时级分区数据是否存在,有一个存在即算存在可以处理
- # 如果全都为空报警
- time_sequence = generate_time_sequence(beginStr, endStr)
- # exist_partition = []
- for time_str in time_sequence:
- result = split_date_time(time_str)
- partitionDt = result[0]
- partitionHh = result[1]
- count = check_data(project, table, partitionDt, partitionHh)
- if count == 0:
- bot = FeishuBot()
- # msg = (
- # f'推荐模型数据更新 \n【任务名称】:step1校验hive数据源\n【是否成功】:success\n【信息】:table:{table},beginStr:{beginStr},endStr:{endStr}\n【详细日志】:{exist_partition}')
- msg = (
- f'推荐模型数据更新 \n【任务名称】:step1校验hive数据源\n【是否成功】:error\n【信息】:table:{table},{time_str}分区数据不存在,继续检查')
- bot.send_message(msg)
- print('1')
- exit(1)
- else:
- continue
- print('0')
- # exist_partition.append(f'分区:dt={partitionDt}/hh={partitionHh},数据:{count}')
- # if len(exist_partition) == 0:
- # print('1')
- # exit(1)
- # else:
- # bot = FeishuBot()
- # msg = (
- # f'推荐模型数据更新 \n【任务名称】:step1校验hive数据源\n【是否成功】:success\n【信息】:table:{table},beginStr:{beginStr},endStr:{endStr}\n【详细日志】:{exist_partition}')
- # bot.send_message(msg)
- # print('0')
- def check_data(project, table, partitionDt, partitionDtHh) -> int:
- """检查数据是否准备好,输出数据条数"""
- odps = ODPS(
- access_id=ODPS_CONFIG['ACCESSID'],
- secret_access_key=ODPS_CONFIG['ACCESSKEY'],
- project=project,
- endpoint=ODPS_CONFIG['ENDPOINT'],
- # connect_timeout=300000,
- # read_timeout=500000,
- # pool_maxsize=1000,
- # pool_connections=1000
- )
- try:
- t = odps.get_table(name=table)
- # check_res = t.exist_partition(partition_spec=f'dt={partition}')
- # 含有hh分区
- # if not {partitionDtHh}:
- check_res = t.exist_partition(partition_spec=f'dt={partitionDt},hh={partitionDtHh}')
- if check_res:
- sql = f'select * from {project}.{table} where dt = {partitionDt} and hh={partitionDtHh}'
- with odps.execute_sql(sql=sql).open_reader() as reader:
- data_count = reader.count
- else:
- data_count = 0
- # else:
- # check_res = t.exist_partition(partition_spec=f'dt={partitionDt}')
- # if check_res:
- # sql = f'select * from {project}.{table} where dt = {partitionDt}'
- # with odps.execute_sql(sql=sql).open_reader() as reader:
- # data_count = reader.count
- # else:
- # data_count = 0
- except Exception as e:
- print("error:" + str(e))
- data_count = 0
- return data_count
- def generate_time_sequence(beginStr, endStr):
- # 将字符串时间转换为datetime对象
- from datetime import datetime, timedelta
- # 定义时间格式
- time_format = "%Y%m%d%H"
- # 转换字符串为datetime对象
- begin_time = datetime.strptime(beginStr, time_format)
- end_time = datetime.strptime(endStr, time_format)
- # 生成时间序列
- time_sequence = []
- current_time = begin_time
- while current_time <= end_time:
- # 将datetime对象转换回指定格式的字符串
- time_sequence.append(current_time.strftime(time_format))
- # 增加一个小时
- current_time += timedelta(hours=1)
- return time_sequence
- def split_date_time(date_time_str):
- # 假设date_time_str是一个长度为12的字符串,格式为YYYYMMDDHH
- # 切片获取日期部分(前8位)和时间部分(后4位中的前2位,因为后两位可能是分钟或秒,但这里只取小时)
- date_part = date_time_str[:8]
- time_part = date_time_str[8:10] # 只取小时部分
- # 将结果存储在一个数组中(在Python中通常使用列表)
- result = [date_part, time_part]
- return result
- if __name__ == '__main__':
- parser = argparse.ArgumentParser(description='脚本utils')
- parser.add_argument('--beginStr', type=str, help='表分区Dt,beginStr')
- parser.add_argument('--endStr', type=str, help='表分区Hh,endStr')
- parser.add_argument('--table', type=str, help='表名')
- argv = parser.parse_args()
- check_origin_hive(argv)
|