checkHiveDataUtil.py 2.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869
  1. # -*- coding: utf-8 -*-
  2. # import argparse
  3. # import sys
  4. from odps import ODPS
  5. # import argparse
  6. ODPS_CONFIG = {
  7. 'ENDPOINT': 'http://service.cn.maxcompute.aliyun.com/api',
  8. 'ACCESSID': 'LTAIWYUujJAm7CbH',
  9. 'ACCESSKEY': 'RfSjdiWwED1sGFlsjXv0DlfTnZTG1P',
  10. }
  11. def check_origin_hive(table, partition):
  12. project = "loghubods"
  13. # table = "alg_recsys_view_sample_v2"
  14. # table = args.table
  15. # partition = args.partition
  16. count = check_data(project, table, partition)
  17. if count == 0:
  18. print("1")
  19. exit(1)
  20. else:
  21. print('存在 数据 size:', count)
  22. print("0")
  23. def check_data(project, table, partition) -> int:
  24. """检查数据是否准备好,输出数据条数"""
  25. odps = ODPS(
  26. access_id=ODPS_CONFIG['ACCESSID'],
  27. secret_access_key=ODPS_CONFIG['ACCESSKEY'],
  28. project=project,
  29. endpoint=ODPS_CONFIG['ENDPOINT'],
  30. connect_timeout=3000,
  31. read_timeout=500000,
  32. pool_maxsize=1000,
  33. pool_connections=1000
  34. )
  35. try:
  36. t = odps.get_table(name=table)
  37. check_res = t.exist_partition(partition_spec=f'dt={partition}')
  38. if check_res:
  39. sql = f'select * from {project}.{table} where dt = {partition}'
  40. with odps.execute_sql(sql=sql).open_reader() as reader:
  41. data_count = reader.count
  42. else:
  43. data_count = 0
  44. except Exception as e:
  45. print("error:" + str(e))
  46. data_count = 0
  47. return data_count
  48. if __name__ == '__main__':
  49. # parser = argparse.ArgumentParser(description='脚本utils')
  50. # parser.add_argument('--excute_program', type=str, help='执行程序')
  51. # parser.add_argument('--partition', type=str, help='表分区')
  52. # parser.add_argument('--project', type=str, help='表空间')
  53. # parser.add_argument('--table', type=str, help='表名')
  54. # argv = sys.argv
  55. # args = parser.parse_args()
  56. # table = argv[1]
  57. # partition = argv[2]
  58. table = 'alg_recsys_sample_all'
  59. partition = '20240703'
  60. print(check_origin_hive(table, partition))