checkHiveDataUtil.py 3.3 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495
  1. # -*- coding: utf-8 -*-
  2. from odps import ODPS
  3. from FeishuBot import FeishuBot
  4. import argparse
  5. ODPS_CONFIG = {
  6. 'ENDPOINT': 'http://service.cn.maxcompute.aliyun.com/api',
  7. 'ACCESSID': 'LTAIWYUujJAm7CbH',
  8. 'ACCESSKEY': 'RfSjdiWwED1sGFlsjXv0DlfTnZTG1P',
  9. }
  10. def check_origin_hive(args):
  11. project = "loghubods"
  12. # table = "alg_recsys_view_sample_v2"
  13. table = args.table
  14. partitionDt = args.partitionDt
  15. partitionHh = args.partitionHh
  16. count = check_data(project, table, partitionDt, partitionHh)
  17. if count == 0:
  18. print("1")
  19. exit(1)
  20. else:
  21. # print('存在 数据 size:', count)
  22. # todo 发送阿里云读取到数据通知开始执行下一步骤
  23. bot = FeishuBot()
  24. msg = (f'推荐模型数据更新 \n step1【校验hive数据源】【success】:\n'
  25. f'读取{project}.{table},分区:dt= {partitionDt}/hh={partitionHh},数据总量:{count}')
  26. bot.send_message(msg)
  27. print("0")
  28. def check_data(project, table, partitionDt, partitionDtHh) -> int:
  29. """检查数据是否准备好,输出数据条数"""
  30. odps = ODPS(
  31. access_id=ODPS_CONFIG['ACCESSID'],
  32. secret_access_key=ODPS_CONFIG['ACCESSKEY'],
  33. project=project,
  34. endpoint=ODPS_CONFIG['ENDPOINT'],
  35. # connect_timeout=300000,
  36. # read_timeout=500000,
  37. # pool_maxsize=1000,
  38. # pool_connections=1000
  39. )
  40. try:
  41. t = odps.get_table(name=table)
  42. # check_res = t.exist_partition(partition_spec=f'dt={partition}')
  43. # 含有hh分区
  44. # if not {partitionDtHh}:
  45. check_res = t.exist_partition(partition_spec=f'dt={partitionDt},hh={partitionDtHh}')
  46. if check_res:
  47. sql = f'select * from {project}.{table} where dt = {partitionDt} and hh={partitionDtHh}'
  48. with odps.execute_sql(sql=sql).open_reader() as reader:
  49. data_count = reader.count
  50. else:
  51. data_count = 0
  52. # else:
  53. # check_res = t.exist_partition(partition_spec=f'dt={partitionDt}')
  54. # if check_res:
  55. # sql = f'select * from {project}.{table} where dt = {partitionDt}'
  56. # with odps.execute_sql(sql=sql).open_reader() as reader:
  57. # data_count = reader.count
  58. # else:
  59. # data_count = 0
  60. except Exception as e:
  61. print("error:" + str(e))
  62. data_count = 0
  63. return data_count
  64. if __name__ == '__main__':
  65. parser = argparse.ArgumentParser(description='脚本utils')
  66. # parser.add_argument('--excute_program', type=str, help='执行程序')
  67. parser.add_argument('--partitionDt', type=str, help='表分区Dt')
  68. parser.add_argument('--partitionHh', type=str, help='表分区Hh')
  69. # parser.add_argument('--project', type=str, help='表空间')
  70. parser.add_argument('--table', type=str, help='表名')
  71. argv = parser.parse_args()
  72. # args = parser.parse_args()
  73. # table = argv[1]
  74. # partition = argv[2]
  75. # table = 'alg_recsys_sample_all'
  76. # partition = '20240703'
  77. check_origin_hive(argv)
  78. # if __name__ == '__main__':
  79. # project='1'
  80. # table='1'
  81. # partitionDt='1'
  82. # partitionHh='1'
  83. # count='1'
  84. # bot = FeishuBot
  85. # msg = f'读取project:${project},table:${table},分区:dt= ${partitionDt}/hh=${partitionHh},查询数据总量:${count}'
  86. # bot.send_message(msg)