ODPSClient.py 5.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139
  1. import json
  2. import re
  3. import requests
  4. from odps import ODPS
  5. from odps.tunnel import TableTunnel
  6. from util import convert_util, date_util
  7. class ODPSClient(object):
  8. def __init__(self, project="loghubods"):
  9. self.accessId = "LTAIWYUujJAm7CbH"
  10. self.accessSecret = "RfSjdiWwED1sGFlsjXv0DlfTnZTG1P"
  11. self.endpoint = "http://service.odps.aliyun.com/api"
  12. self.tunnelUrl = "http://dt.cn-hangzhou.maxcompute.aliyun-inc.com"
  13. self.odps = ODPS(
  14. self.accessId,
  15. self.accessSecret,
  16. project,
  17. self.endpoint
  18. )
  19. def execute_sql(self, sql: str):
  20. hints = {
  21. 'odps.sql.submit.mode': 'script'
  22. }
  23. with self.odps.execute_sql(sql, hints=hints).open_reader(tunnel=True) as reader:
  24. pd_df = reader.to_pandas()
  25. return pd_df
  26. def execute_sql_file_result_save_fle(self, sql_file: str, param: dict, filepath: str):
  27. with open(sql_file, "r") as f:
  28. sql = f.read()
  29. for key in param:
  30. sql = sql.replace(f"${{{key}}}", param[key])
  31. self.execute_sql_result_save_file(sql, filepath)
  32. def execute_sql_result_save_file(self, sql: str, filepath: str):
  33. result = self.execute_sql(sql)
  34. result.to_csv(filepath, index=False)
  35. def get_all_record(self, table: str, partition_spec: str) -> list:
  36. tunnel = TableTunnel(self.odps)
  37. download_session = tunnel.create_download_session(f"{table}", partition_spec=partition_spec)
  38. count = download_session.count
  39. print(f"表: {table} 中的分区 {partition_spec}, 共有 {count} 条数据")
  40. result = []
  41. # 使用批量读取,并按指定大小读取记录
  42. with download_session.open_record_reader(0, count) as reader:
  43. batch = []
  44. for record in reader:
  45. batch.append(record)
  46. if len(batch) >= 1000:
  47. result.extend(batch)
  48. batch.clear() # 清空批次以便继续加载新的记录
  49. # 加入最后一批剩余的数据
  50. if batch:
  51. result.extend(batch)
  52. return result
  53. def process_all_records(self, table: str, partition_spec: str, func: callable) -> None:
  54. """
  55. 处理指定表和分区中的所有记录,对每条记录应用指定函数。
  56. :param table: 表名
  57. :param partition_spec: 分区规范
  58. :param func: 用于处理每条记录的函数
  59. """
  60. tunnel = TableTunnel(self.odps)
  61. download_session = tunnel.create_download_session(f"{table}", partition_spec=partition_spec)
  62. count = download_session.count
  63. print(f"表: {table} 中的分区 {partition_spec}, 共有 {count} 条数据")
  64. # 使用批量读取,并对每条记录应用 func
  65. with download_session.open_record_reader(0, count) as reader:
  66. for record in reader:
  67. func(record)
  68. def get_table(self, table: str):
  69. return self.odps.get_table(table)
  70. def get_download_session(self, table: str, dt: str):
  71. tunnel = TableTunnel(self.odps)
  72. return tunnel.create_download_session(f"{table}", partition_spec=f"dt={dt}")
  73. def get_all_record_batch(self, table: str, dt: str) -> list:
  74. tunnel = TableTunnel(self.odps)
  75. download_session = tunnel.create_download_session(f"{table}", partition_spec=f"dt={dt}")
  76. count = download_session.count
  77. print(f"表: {table} 中的分区 {dt}, 共有 {count} 条数据")
  78. result = []
  79. with download_session.open_arrow_reader(0, download_session.count) as reader:
  80. for batch in reader:
  81. result.append(batch)
  82. return result
  83. @classmethod
  84. def get_all_partition_info(cls, table_name: str, page_num=1, page_size=30, **kwargs) -> list[dict]:
  85. header = {
  86. "cookie": kwargs.get("cookie")
  87. }
  88. project = kwargs.get("project", "loghubods")
  89. url = f"https://bff-cn-hangzhou.data.aliyun.com/dma/listPartitions_2?pageSize={page_size}&pageNum={page_num}&entityGuid=odps.{project}.{table_name}&entityType=odps-table"
  90. print(f"请求的阿里云接口为: {url}")
  91. response = requests.get(url, headers=header)
  92. resp_json = json.loads(response.text)
  93. result = []
  94. dt_hh = re.compile(r'dt=(\d{8})/hh=(\d{2})')
  95. dt_hh_mm = re.compile(r'dt=(\d{8})/hh=(\d{2})/mm=(\d{2})')
  96. for datum in resp_json["data"]["data"]:
  97. s1 = dt_hh.search(datum['name'])
  98. s2 = dt_hh_mm.search(datum['name'])
  99. partition = datum['name']
  100. if s1:
  101. partition = f"{s1.group(1)}{s1.group(2)}"
  102. if s2:
  103. partition = f"{s2.group(1)}{s2.group(2)}{s2.group(3)}"
  104. item = {
  105. "表名": table_name,
  106. "name": datum["name"],
  107. "分区": partition,
  108. "数据量": datum["recordCount"],
  109. "数据大小": convert_util.byte_convert(datum['dataSize']),
  110. "创建时间": date_util.ts_cover_str(datum["gmtCreate"]),
  111. "更新时间": date_util.ts_cover_str(datum['gmtModified'])
  112. }
  113. result.append(item)
  114. return result