utils.py 4.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139
  1. # -*- coding: utf-8 -*-
  2. from odps import ODPS
  3. import argparse
  4. ODPS_CONFIG = {
  5. 'ENDPOINT': 'http://service.cn.maxcompute.aliyun.com/api',
  6. 'ACCESSID': 'LTAIWYUujJAm7CbH',
  7. 'ACCESSKEY': 'RfSjdiWwED1sGFlsjXv0DlfTnZTG1P',
  8. }
  9. def check_data(project, table, partition) -> int:
  10. """检查数据是否准备好,输出数据条数"""
  11. odps = ODPS(
  12. access_id=ODPS_CONFIG['ACCESSID'],
  13. secret_access_key=ODPS_CONFIG['ACCESSKEY'],
  14. project=project,
  15. endpoint=ODPS_CONFIG['ENDPOINT'],
  16. connect_timeout=3000,
  17. read_timeout=500000,
  18. pool_maxsize=1000,
  19. pool_connections=1000
  20. )
  21. try:
  22. t = odps.get_table(name=table)
  23. check_res = t.exist_partition(partition_spec=f'dt={partition}')
  24. if check_res:
  25. sql = f'select * from {project}.{table} where dt = {partition}'
  26. with odps.execute_sql(sql=sql).open_reader() as reader:
  27. data_count = reader.count
  28. else:
  29. data_count = 0
  30. except Exception as e:
  31. print("error:" + str(e))
  32. data_count = 0
  33. return data_count
  34. def check_data_hh(project, table, partition) -> int:
  35. """检查数据是否准备好,输出数据条数"""
  36. odps = ODPS(
  37. access_id=ODPS_CONFIG['ACCESSID'],
  38. secret_access_key=ODPS_CONFIG['ACCESSKEY'],
  39. project=project,
  40. endpoint=ODPS_CONFIG['ENDPOINT'],
  41. connect_timeout=3000,
  42. read_timeout=500000,
  43. pool_maxsize=1000,
  44. pool_connections=1000
  45. )
  46. try:
  47. t = odps.get_table(name=table)
  48. check_res = t.exist_partition(partition_spec=f'dt={partition},hh=0')
  49. if check_res:
  50. sql = f'select * from {project}.{table} where dt = {partition}'
  51. with odps.execute_sql(sql=sql).open_reader() as reader:
  52. data_count = reader.count
  53. else:
  54. data_count = 0
  55. except Exception as e:
  56. print("error:" + str(e))
  57. data_count = 0
  58. return data_count
  59. def check_origin_hive(args):
  60. project = "loghubods"
  61. table = "alg_recsys_view_sample_v2"
  62. partition = args.partition
  63. count = check_data(project, table, partition)
  64. if count == 0:
  65. print("1")
  66. exit(1)
  67. else:
  68. print("0")
  69. def check_item_hive(args):
  70. project = "loghubods"
  71. table = "alg_recsys_video_info"
  72. partition = args.partition
  73. count = check_data(project, table, partition)
  74. if count == 0:
  75. print("1")
  76. exit(1)
  77. else:
  78. print("0")
  79. def check_user_hive(args):
  80. project = "loghubods"
  81. table = "alg_recsys_user_info"
  82. partition = args.partition
  83. count = check_data(project, table, partition)
  84. if count == 0:
  85. print("1")
  86. exit(1)
  87. else:
  88. print("0")
  89. def check_hive(args):
  90. project = args.project
  91. table = args.table
  92. partition = args.partition
  93. count = check_data(project, table, partition)
  94. if count == 0:
  95. print("1")
  96. exit(1)
  97. else:
  98. print("0")
  99. def check_ad_origin_hive(args):
  100. project = "loghubods"
  101. table = "alg_recsys_ad_sample_all_new"
  102. partition = args.partition
  103. count = check_data_hh(project, table, partition)
  104. if count == 0:
  105. print("1")
  106. exit(1)
  107. else:
  108. print("0")
  109. if __name__ == '__main__':
  110. parser = argparse.ArgumentParser(description='脚本utils')
  111. parser.add_argument('--excute_program', type=str, help='执行程序')
  112. parser.add_argument('--partition', type=str, help='表分区')
  113. parser.add_argument('--project', type=str, help='表空间')
  114. parser.add_argument('--table', type=str, help='表名')
  115. args = parser.parse_args()
  116. if args.excute_program == "check_origin_hive":
  117. check_origin_hive(args)
  118. elif args.excute_program == "check_item_hive":
  119. check_item_hive(args)
  120. elif args.excute_program == "check_user_hive":
  121. check_user_hive(args)
  122. elif args.excute_program == "check_ad_origin_hive":
  123. check_ad_origin_hive(args)
  124. elif args.excute_program == "check_hive":
  125. check_hive(args)
  126. else:
  127. print("无合法参数,验证失败。")
  128. exit(999)