checkHiveDataUtil.py 2.7 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879
  1. # -*- coding: utf-8 -*-
  2. from odps import ODPS
  3. import argparse
  4. ODPS_CONFIG = {
  5. 'ENDPOINT': 'http://service.cn.maxcompute.aliyun.com/api',
  6. 'ACCESSID': 'LTAIWYUujJAm7CbH',
  7. 'ACCESSKEY': 'RfSjdiWwED1sGFlsjXv0DlfTnZTG1P',
  8. }
  9. def check_origin_hive(args):
  10. project = "loghubods"
  11. # table = "alg_recsys_view_sample_v2"
  12. table = args.table
  13. partitionDt = args.partitionDt
  14. partitionHh = args.partitionHh
  15. count = check_data(project, table, partitionDt, partitionHh)
  16. if count == 0:
  17. print("1")
  18. exit(1)
  19. else:
  20. print('存在 数据 size:', count)
  21. print("0")
  22. def check_data(project, table, partitionDt, partitionDtHh) -> int:
  23. """检查数据是否准备好,输出数据条数"""
  24. odps = ODPS(
  25. access_id=ODPS_CONFIG['ACCESSID'],
  26. secret_access_key=ODPS_CONFIG['ACCESSKEY'],
  27. project=project,
  28. endpoint=ODPS_CONFIG['ENDPOINT'],
  29. # connect_timeout=300000,
  30. # read_timeout=500000,
  31. # pool_maxsize=1000,
  32. # pool_connections=1000
  33. )
  34. try:
  35. t = odps.get_table(name=table)
  36. # check_res = t.exist_partition(partition_spec=f'dt={partition}')
  37. # 含有hh分区
  38. if not {partitionDtHh}:
  39. check_res = t.exist_partition(partition_spec=f'dt={partitionDt},hh={partitionDtHh}')
  40. if check_res:
  41. sql = f'select * from {project}.{table} where dt = {partitionDt} and hh={partitionDtHh}'
  42. with odps.execute_sql(sql=sql).open_reader() as reader:
  43. data_count = reader.count
  44. else:
  45. data_count = 0
  46. else:
  47. check_res = t.exist_partition(partition_spec=f'dt={partitionDt}')
  48. if check_res:
  49. sql = f'select * from {project}.{table} where dt = {partitionDt}'
  50. with odps.execute_sql(sql=sql).open_reader() as reader:
  51. data_count = reader.count
  52. else:
  53. data_count = 0
  54. except Exception as e:
  55. print("error:" + str(e))
  56. data_count = 0
  57. return data_count
  58. if __name__ == '__main__':
  59. parser = argparse.ArgumentParser(description='脚本utils')
  60. # parser.add_argument('--excute_program', type=str, help='执行程序')
  61. parser.add_argument('--partitionDt', type=str, help='表分区Dt')
  62. parser.add_argument('--partitionHh', type=str, help='表分区Hh')
  63. # parser.add_argument('--project', type=str, help='表空间')
  64. parser.add_argument('--table', type=str, help='表名')
  65. argv = parser.parse_args()
  66. # args = parser.parse_args()
  67. # table = argv[1]
  68. # partition = argv[2]
  69. # table = 'alg_recsys_sample_all'
  70. # partition = '20240703'
  71. check_origin_hive(argv)