ODPSClient.py 3.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104
  1. import json
  2. import re
  3. import requests
  4. from odps import ODPS
  5. from odps.tunnel import TableTunnel
  6. from util import convert_util, date_util
  7. class ODPSClient(object):
  8. def __init__(self, project="loghubods"):
  9. self.accessId = "LTAIWYUujJAm7CbH"
  10. self.accessSecret = "RfSjdiWwED1sGFlsjXv0DlfTnZTG1P"
  11. self.endpoint = "http://service.odps.aliyun.com/api"
  12. self.tunnelUrl = "http://dt.cn-hangzhou.maxcompute.aliyun-inc.com"
  13. self.odps = ODPS(
  14. self.accessId,
  15. self.accessSecret,
  16. project,
  17. self.endpoint
  18. )
  19. def execute_sql(self, sql: str):
  20. hints = {
  21. 'odps.sql.submit.mode': 'script'
  22. }
  23. with self.odps.execute_sql(sql, hints=hints).open_reader(tunnel=True) as reader:
  24. pd_df = reader.to_pandas()
  25. return pd_df
  26. def execute_sql_result_save_file(self, sql: str, filepath: str):
  27. result = self.execute_sql(sql)
  28. result.to_csv(filepath, index=False)
  29. def get_all_record(self, table: str, dt: str) -> list:
  30. tunnel = TableTunnel(self.odps)
  31. download_session = tunnel.create_download_session(f"{table}", partition_spec=f"dt={dt}")
  32. count = download_session.count
  33. print(f"表: {table} 中的分区 {dt}, 共有 {count} 条数据")
  34. result = []
  35. with download_session.open_record_reader(0, download_session.count) as reader:
  36. for record in reader:
  37. result.append(record)
  38. return result
  39. def get_table(self, table: str):
  40. return self.odps.get_table(table)
  41. def get_download_session(self, table: str, dt: str):
  42. tunnel = TableTunnel(self.odps)
  43. return tunnel.create_download_session(f"{table}", partition_spec=f"dt={dt}")
  44. def get_all_record_batch(self, table: str, dt: str) -> list:
  45. tunnel = TableTunnel(self.odps)
  46. download_session = tunnel.create_download_session(f"{table}", partition_spec=f"dt={dt}")
  47. count = download_session.count
  48. print(f"表: {table} 中的分区 {dt}, 共有 {count} 条数据")
  49. result = []
  50. with download_session.open_arrow_reader(0, download_session.count) as reader:
  51. for batch in reader:
  52. result.append(batch)
  53. return result
  54. @classmethod
  55. def get_all_partition_info(cls, table_name: str, page_num=1, page_size=30, **kwargs) -> list[dict]:
  56. header = {
  57. "cookie": kwargs.get("cookie")
  58. }
  59. project = kwargs.get("project", "loghubods")
  60. url = f"https://bff-cn-hangzhou.data.aliyun.com/dma/listPartitions_2?pageSize={page_size}&pageNum={page_num}&entityGuid=odps.{project}.{table_name}&entityType=odps-table"
  61. print(f"请求的阿里云接口为: {url}")
  62. response = requests.get(url, headers=header)
  63. resp_json = json.loads(response.text)
  64. result = []
  65. dt_hh = re.compile(r'dt=(\d{8})/hh=(\d{2})')
  66. dt_hh_mm = re.compile(r'dt=(\d{8})/hh=(\d{2})/mm=(\d{2})')
  67. for datum in resp_json["data"]["data"]:
  68. s1 = dt_hh.search(datum['name'])
  69. s2 = dt_hh_mm.search(datum['name'])
  70. partition = datum['name']
  71. if s1:
  72. partition = f"{s1.group(1)}{s1.group(2)}"
  73. if s2:
  74. partition = f"{s2.group(1)}{s2.group(2)}{s2.group(3)}"
  75. item = {
  76. "表名": table_name,
  77. "name": datum["name"],
  78. "分区": partition,
  79. "数据量": datum["recordCount"],
  80. "数据大小": convert_util.byte_convert(datum['dataSize']),
  81. "创建时间": date_util.ts_cover_str(datum["gmtCreate"]),
  82. "更新时间": date_util.ts_cover_str(datum['gmtModified'])
  83. }
  84. result.append(item)
  85. return result