checkHiveDataUtil.py 3.3 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394
  1. # -*- coding: utf-8 -*-
  2. from odps import ODPS
  3. from FeishuBot import FeishuBot
  4. import argparse
  5. ODPS_CONFIG = {
  6. 'ENDPOINT': 'http://service.cn.maxcompute.aliyun.com/api',
  7. 'ACCESSID': 'LTAIWYUujJAm7CbH',
  8. 'ACCESSKEY': 'RfSjdiWwED1sGFlsjXv0DlfTnZTG1P',
  9. }
  10. def check_origin_hive(args):
  11. project = "loghubods"
  12. # table = "alg_recsys_view_sample_v2"
  13. table = args.table
  14. partitionDt = args.partitionDt
  15. partitionHh = args.partitionHh
  16. count = check_data(project, table, partitionDt, partitionHh)
  17. if count == 0:
  18. print("1")
  19. exit(1)
  20. else:
  21. # print('存在 数据 size:', count)
  22. bot = FeishuBot()
  23. msg = (f'推荐模型数据更新 \n --step1【校验hive数据源】【success】:\n'
  24. f'{project}.{table},分区:dt={partitionDt}/hh={partitionHh},数据总量:{count}')
  25. bot.send_message(msg)
  26. print("0")
  27. def check_data(project, table, partitionDt, partitionDtHh) -> int:
  28. """检查数据是否准备好,输出数据条数"""
  29. odps = ODPS(
  30. access_id=ODPS_CONFIG['ACCESSID'],
  31. secret_access_key=ODPS_CONFIG['ACCESSKEY'],
  32. project=project,
  33. endpoint=ODPS_CONFIG['ENDPOINT'],
  34. # connect_timeout=300000,
  35. # read_timeout=500000,
  36. # pool_maxsize=1000,
  37. # pool_connections=1000
  38. )
  39. try:
  40. t = odps.get_table(name=table)
  41. # check_res = t.exist_partition(partition_spec=f'dt={partition}')
  42. # 含有hh分区
  43. # if not {partitionDtHh}:
  44. check_res = t.exist_partition(partition_spec=f'dt={partitionDt},hh={partitionDtHh}')
  45. if check_res:
  46. sql = f'select * from {project}.{table} where dt = {partitionDt} and hh={partitionDtHh}'
  47. with odps.execute_sql(sql=sql).open_reader() as reader:
  48. data_count = reader.count
  49. else:
  50. data_count = 0
  51. # else:
  52. # check_res = t.exist_partition(partition_spec=f'dt={partitionDt}')
  53. # if check_res:
  54. # sql = f'select * from {project}.{table} where dt = {partitionDt}'
  55. # with odps.execute_sql(sql=sql).open_reader() as reader:
  56. # data_count = reader.count
  57. # else:
  58. # data_count = 0
  59. except Exception as e:
  60. print("error:" + str(e))
  61. data_count = 0
  62. return data_count
  63. if __name__ == '__main__':
  64. parser = argparse.ArgumentParser(description='脚本utils')
  65. # parser.add_argument('--excute_program', type=str, help='执行程序')
  66. parser.add_argument('--partitionDt', type=str, help='表分区Dt')
  67. parser.add_argument('--partitionHh', type=str, help='表分区Hh')
  68. # parser.add_argument('--project', type=str, help='表空间')
  69. parser.add_argument('--table', type=str, help='表名')
  70. argv = parser.parse_args()
  71. # args = parser.parse_args()
  72. # table = argv[1]
  73. # partition = argv[2]
  74. # table = 'alg_recsys_sample_all'
  75. # partition = '20240703'
  76. check_origin_hive(argv)
  77. # if __name__ == '__main__':
  78. # project='1'
  79. # table='1'
  80. # partitionDt='1'
  81. # partitionHh='1'
  82. # count='1'
  83. # bot = FeishuBot
  84. # msg = f'读取project:${project},table:${table},分区:dt= ${partitionDt}/hh=${partitionHh},查询数据总量:${count}'
  85. # bot.send_message(msg)