utils.py 1.9 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061
  1. # -*- coding: utf-8 -*-
  2. from odps import ODPS
  3. import argparse
  4. from collections.abc import Iterable
  5. ODPS_CONFIG = {
  6. 'ENDPOINT': 'http://service.cn.maxcompute.aliyun.com/api',
  7. 'ACCESSID': 'LTAIWYUujJAm7CbH',
  8. 'ACCESSKEY': 'RfSjdiWwED1sGFlsjXv0DlfTnZTG1P',
  9. }
  10. def check_data(project, table, partition) -> int:
  11. """检查数据是否准备好,输出数据条数"""
  12. odps = ODPS(
  13. access_id=ODPS_CONFIG['ACCESSID'],
  14. secret_access_key=ODPS_CONFIG['ACCESSKEY'],
  15. project=project,
  16. endpoint=ODPS_CONFIG['ENDPOINT'],
  17. connect_timeout=3000,
  18. read_timeout=500000,
  19. pool_maxsize=1000,
  20. pool_connections=1000
  21. )
  22. try:
  23. t = odps.get_table(name=table)
  24. check_res = t.exist_partition(partition_spec=f'dt={partition}')
  25. if check_res:
  26. sql = f'select * from {project}.{table} where dt = {partition}'
  27. with odps.execute_sql(sql=sql).open_reader() as reader:
  28. data_count = reader.count
  29. else:
  30. data_count = 0
  31. except Exception as e:
  32. print("error:" + str(e))
  33. data_count = 0
  34. return data_count
  35. def check_origin_hive(args):
  36. project = "loghubods"
  37. table = "alg_recsys_view_sample_v2"
  38. partition = args.partition
  39. count = check_data(project, table, partition)
  40. if count == 0:
  41. print("上游未就绪,返回1")
  42. exit(1)
  43. else:
  44. print("上游已就绪,返回0")
  45. if __name__ == '__main__':
  46. parser = argparse.ArgumentParser(description='脚本utils')
  47. parser.add_argument('--excute_program', type=str, help='执行程序')
  48. parser.add_argument('--partition', type=str, help='表分区')
  49. args = parser.parse_args()
  50. if args.excute_program == "check_origin_hive":
  51. check_origin_hive(args)
  52. else:
  53. print("无合法参数,验证失败。")
  54. exit(999)