checkHiveDataUtil.py 4.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122
  1. # -*- coding: utf-8 -*-
  2. from odps import ODPS
  3. from FeishuBot import FeishuBot
  4. import argparse
  5. ODPS_CONFIG = {
  6. 'ENDPOINT': 'http://service.cn.maxcompute.aliyun.com/api',
  7. 'ACCESSID': 'LTAIWYUujJAm7CbH',
  8. 'ACCESSKEY': 'RfSjdiWwED1sGFlsjXv0DlfTnZTG1P',
  9. }
  10. def check_origin_hive(args):
  11. project = "loghubods"
  12. # table = "alg_recsys_view_sample_v2"
  13. table = args.table
  14. beginStr = args.beginStr
  15. endStr = args.endStr
  16. # 检查从begin到end的每一个小时级分区数据是否存在,有一个存在即算存在可以处理
  17. # 如果全都为空报警
  18. time_sequence = generate_time_sequence(beginStr, endStr)
  19. exist_partition = []
  20. for time_str in time_sequence:
  21. result = split_date_time(time_str)
  22. partitionDt = result[0]
  23. partitionHh = result[1]
  24. count = check_data(project, table, partitionDt, partitionHh)
  25. if count == 0:
  26. print(f'分区:dt={partitionDt}/hh={partitionHh},数据为空')
  27. else:
  28. exist_partition.append(f'分区:dt={partitionDt}/hh={partitionHh},数据:{count}')
  29. if len(exist_partition) == 0:
  30. exit(1)
  31. else:
  32. bot = FeishuBot()
  33. msg = (
  34. f'推荐模型数据更新 \n --step1【校验hive数据源】【success】:\n beginStr:{beginStr},endStr:{endStr}\n,detail:{exist_partition}')
  35. bot.send_message(msg)
  36. print("0")
  37. def check_data(project, table, partitionDt, partitionDtHh) -> int:
  38. """检查数据是否准备好,输出数据条数"""
  39. odps = ODPS(
  40. access_id=ODPS_CONFIG['ACCESSID'],
  41. secret_access_key=ODPS_CONFIG['ACCESSKEY'],
  42. project=project,
  43. endpoint=ODPS_CONFIG['ENDPOINT'],
  44. # connect_timeout=300000,
  45. # read_timeout=500000,
  46. # pool_maxsize=1000,
  47. # pool_connections=1000
  48. )
  49. try:
  50. t = odps.get_table(name=table)
  51. # check_res = t.exist_partition(partition_spec=f'dt={partition}')
  52. # 含有hh分区
  53. # if not {partitionDtHh}:
  54. check_res = t.exist_partition(partition_spec=f'dt={partitionDt},hh={partitionDtHh}')
  55. if check_res:
  56. sql = f'select * from {project}.{table} where dt = {partitionDt} and hh={partitionDtHh}'
  57. with odps.execute_sql(sql=sql).open_reader() as reader:
  58. data_count = reader.count
  59. else:
  60. data_count = 0
  61. # else:
  62. # check_res = t.exist_partition(partition_spec=f'dt={partitionDt}')
  63. # if check_res:
  64. # sql = f'select * from {project}.{table} where dt = {partitionDt}'
  65. # with odps.execute_sql(sql=sql).open_reader() as reader:
  66. # data_count = reader.count
  67. # else:
  68. # data_count = 0
  69. except Exception as e:
  70. print("error:" + str(e))
  71. data_count = 0
  72. return data_count
  73. def generate_time_sequence(beginStr, endStr):
  74. # 将字符串时间转换为datetime对象
  75. from datetime import datetime, timedelta
  76. # 定义时间格式
  77. time_format = "%Y%m%d%H"
  78. # 转换字符串为datetime对象
  79. begin_time = datetime.strptime(beginStr, time_format)
  80. end_time = datetime.strptime(endStr, time_format)
  81. # 生成时间序列
  82. time_sequence = []
  83. current_time = begin_time
  84. while current_time <= end_time:
  85. # 将datetime对象转换回指定格式的字符串
  86. time_sequence.append(current_time.strftime(time_format))
  87. # 增加一个小时
  88. current_time += timedelta(hours=1)
  89. return time_sequence
  90. def split_date_time(date_time_str):
  91. # 假设date_time_str是一个长度为12的字符串,格式为YYYYMMDDHH
  92. # 切片获取日期部分(前8位)和时间部分(后4位中的前2位,因为后两位可能是分钟或秒,但这里只取小时)
  93. date_part = date_time_str[:8]
  94. time_part = date_time_str[8:10] # 只取小时部分
  95. # 将结果存储在一个数组中(在Python中通常使用列表)
  96. result = [date_part, time_part]
  97. return result
  98. if __name__ == '__main__':
  99. parser = argparse.ArgumentParser(description='脚本utils')
  100. parser.add_argument('--beginStr', type=str, help='表分区Dt,beginStr')
  101. parser.add_argument('--endStr', type=str, help='表分区Hh,endStr')
  102. parser.add_argument('--table', type=str, help='表名')
  103. argv = parser.parse_args()
  104. check_origin_hive(argv)