12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091 |
- import json
- import re
- import requests
- from odps import ODPS
- from odps.tunnel import TableTunnel
- from util import convert_util, date_util
- class ODPSClient(object):
- def __init__(self, project="loghubods"):
- self.accessId = "LTAIWYUujJAm7CbH"
- self.accessSecret = "RfSjdiWwED1sGFlsjXv0DlfTnZTG1P"
- self.endpoint = "http://service.odps.aliyun.com/api"
- self.tunnelUrl = "http://dt.cn-hangzhou.maxcompute.aliyun-inc.com"
- self.odps = ODPS(
- self.accessId,
- self.accessSecret,
- project,
- self.endpoint
- )
- def get_all_record(self, table: str, dt: str) -> list:
- tunnel = TableTunnel(self.odps)
- download_session = tunnel.create_download_session(f"{table}", partition_spec=f"dt={dt}")
- count = download_session.count
- print(f"表: {table} 中的分区 {dt}, 共有 {count} 条数据")
- result = []
- with download_session.open_record_reader(0, download_session.count) as reader:
- for record in reader:
- result.append(record)
- return result
- def get_table(self, table: str):
- return self.odps.get_table(table)
- def get_download_session(self, table: str, dt: str):
- tunnel = TableTunnel(self.odps)
- return tunnel.create_download_session(f"{table}", partition_spec=f"dt={dt}")
- def get_all_record_batch(self, table: str, dt: str) -> list:
- tunnel = TableTunnel(self.odps)
- download_session = tunnel.create_download_session(f"{table}", partition_spec=f"dt={dt}")
- count = download_session.count
- print(f"表: {table} 中的分区 {dt}, 共有 {count} 条数据")
- result = []
- with download_session.open_arrow_reader(0, download_session.count) as reader:
- for batch in reader:
- result.append(batch)
- return result
- @classmethod
- def get_all_partition_info(cls, table_name: str, page_num=1, page_size=30, **kwargs) -> list[dict]:
- header = {
- "cookie": kwargs.get("cookie")
- }
- project = kwargs.get("project", "loghubods")
- url = f"https://bff-cn-hangzhou.data.aliyun.com/dma/listPartitions_2?pageSize={page_size}&pageNum={page_num}&entityGuid=odps.{project}.{table_name}&entityType=odps-table"
- print(f"请求的阿里云接口为: {url}")
- response = requests.get(url, headers=header)
- resp_json = json.loads(response.text)
- result = []
- dt_hh = re.compile(r'dt=(\d{8})/hh=(\d{2})')
- dt_hh_mm = re.compile(r'dt=(\d{8})/hh=(\d{2})/mm=(\d{2})')
- for datum in resp_json["data"]["data"]:
- s1 = dt_hh.search(datum['name'])
- s2 = dt_hh_mm.search(datum['name'])
- partition = datum['name']
- if s1:
- partition = f"{s1.group(1)}{s1.group(2)}"
- if s2:
- partition = f"{s2.group(1)}{s2.group(2)}{s2.group(3)}"
- item = {
- "表名": table_name,
- "name": datum["name"],
- "分区": partition,
- "数据量": datum["recordCount"],
- "数据大小": convert_util.byte_convert(datum['dataSize']),
- "创建时间": date_util.ts_cover_str(datum["gmtCreate"]),
- "更新时间": date_util.ts_cover_str(datum['gmtModified'])
- }
- result.append(item)
- return result
|