ODPSClient.py 3.4 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091
  1. import json
  2. import re
  3. import requests
  4. from odps import ODPS
  5. from odps.tunnel import TableTunnel
  6. from util import convert_util, date_util
  7. class ODPSClient(object):
  8. def __init__(self, project="loghubods"):
  9. self.accessId = "LTAIWYUujJAm7CbH"
  10. self.accessSecret = "RfSjdiWwED1sGFlsjXv0DlfTnZTG1P"
  11. self.endpoint = "http://service.odps.aliyun.com/api"
  12. self.tunnelUrl = "http://dt.cn-hangzhou.maxcompute.aliyun-inc.com"
  13. self.odps = ODPS(
  14. self.accessId,
  15. self.accessSecret,
  16. project,
  17. self.endpoint
  18. )
  19. def get_all_record(self, table: str, dt: str) -> list:
  20. tunnel = TableTunnel(self.odps)
  21. download_session = tunnel.create_download_session(f"{table}", partition_spec=f"dt={dt}")
  22. count = download_session.count
  23. print(f"表: {table} 中的分区 {dt}, 共有 {count} 条数据")
  24. result = []
  25. with download_session.open_record_reader(0, download_session.count) as reader:
  26. for record in reader:
  27. result.append(record)
  28. return result
  29. def get_table(self, table: str):
  30. return self.odps.get_table(table)
  31. def get_download_session(self, table: str, dt: str):
  32. tunnel = TableTunnel(self.odps)
  33. return tunnel.create_download_session(f"{table}", partition_spec=f"dt={dt}")
  34. def get_all_record_batch(self, table: str, dt: str) -> list:
  35. tunnel = TableTunnel(self.odps)
  36. download_session = tunnel.create_download_session(f"{table}", partition_spec=f"dt={dt}")
  37. count = download_session.count
  38. print(f"表: {table} 中的分区 {dt}, 共有 {count} 条数据")
  39. result = []
  40. with download_session.open_arrow_reader(0, download_session.count) as reader:
  41. for batch in reader:
  42. result.append(batch)
  43. return result
  44. @classmethod
  45. def get_all_partition_info(cls, table_name: str, page_num=1, page_size=30, **kwargs) -> list[dict]:
  46. header = {
  47. "cookie": kwargs.get("cookie")
  48. }
  49. project = kwargs.get("project", "loghubods")
  50. url = f"https://bff-cn-hangzhou.data.aliyun.com/dma/listPartitions_2?pageSize={page_size}&pageNum={page_num}&entityGuid=odps.{project}.{table_name}&entityType=odps-table"
  51. print(f"请求的阿里云接口为: {url}")
  52. response = requests.get(url, headers=header)
  53. resp_json = json.loads(response.text)
  54. result = []
  55. dt_hh = re.compile(r'dt=(\d{8})/hh=(\d{2})')
  56. dt_hh_mm = re.compile(r'dt=(\d{8})/hh=(\d{2})/mm=(\d{2})')
  57. for datum in resp_json["data"]["data"]:
  58. s1 = dt_hh.search(datum['name'])
  59. s2 = dt_hh_mm.search(datum['name'])
  60. partition = datum['name']
  61. if s1:
  62. partition = f"{s1.group(1)}{s1.group(2)}"
  63. if s2:
  64. partition = f"{s2.group(1)}{s2.group(2)}{s2.group(3)}"
  65. item = {
  66. "表名": table_name,
  67. "name": datum["name"],
  68. "分区": partition,
  69. "数据量": datum["recordCount"],
  70. "数据大小": convert_util.byte_convert(datum['dataSize']),
  71. "创建时间": date_util.ts_cover_str(datum["gmtCreate"]),
  72. "更新时间": date_util.ts_cover_str(datum['gmtModified'])
  73. }
  74. result.append(item)
  75. return result