123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104 |
- import json
- import re
- import requests
- from odps import ODPS
- from odps.tunnel import TableTunnel
- from util import convert_util, date_util
- class ODPSClient(object):
- def __init__(self, project="loghubods"):
- self.accessId = "LTAIWYUujJAm7CbH"
- self.accessSecret = "RfSjdiWwED1sGFlsjXv0DlfTnZTG1P"
- self.endpoint = "http://service.odps.aliyun.com/api"
- self.tunnelUrl = "http://dt.cn-hangzhou.maxcompute.aliyun-inc.com"
- self.odps = ODPS(
- self.accessId,
- self.accessSecret,
- project,
- self.endpoint
- )
- def execute_sql(self, sql: str):
- hints = {
- 'odps.sql.submit.mode': 'script'
- }
- with self.odps.execute_sql(sql, hints=hints).open_reader(tunnel=True) as reader:
- pd_df = reader.to_pandas()
- return pd_df
- def execute_sql_result_save_file(self, sql: str, filepath: str):
- result = self.execute_sql(sql)
- result.to_csv(filepath, index=False)
- def get_all_record(self, table: str, dt: str) -> list:
- tunnel = TableTunnel(self.odps)
- download_session = tunnel.create_download_session(f"{table}", partition_spec=f"dt={dt}")
- count = download_session.count
- print(f"表: {table} 中的分区 {dt}, 共有 {count} 条数据")
- result = []
- with download_session.open_record_reader(0, download_session.count) as reader:
- for record in reader:
- result.append(record)
- return result
- def get_table(self, table: str):
- return self.odps.get_table(table)
- def get_download_session(self, table: str, dt: str):
- tunnel = TableTunnel(self.odps)
- return tunnel.create_download_session(f"{table}", partition_spec=f"dt={dt}")
- def get_all_record_batch(self, table: str, dt: str) -> list:
- tunnel = TableTunnel(self.odps)
- download_session = tunnel.create_download_session(f"{table}", partition_spec=f"dt={dt}")
- count = download_session.count
- print(f"表: {table} 中的分区 {dt}, 共有 {count} 条数据")
- result = []
- with download_session.open_arrow_reader(0, download_session.count) as reader:
- for batch in reader:
- result.append(batch)
- return result
- @classmethod
- def get_all_partition_info(cls, table_name: str, page_num=1, page_size=30, **kwargs) -> list[dict]:
- header = {
- "cookie": kwargs.get("cookie")
- }
- project = kwargs.get("project", "loghubods")
- url = f"https://bff-cn-hangzhou.data.aliyun.com/dma/listPartitions_2?pageSize={page_size}&pageNum={page_num}&entityGuid=odps.{project}.{table_name}&entityType=odps-table"
- print(f"请求的阿里云接口为: {url}")
- response = requests.get(url, headers=header)
- resp_json = json.loads(response.text)
- result = []
- dt_hh = re.compile(r'dt=(\d{8})/hh=(\d{2})')
- dt_hh_mm = re.compile(r'dt=(\d{8})/hh=(\d{2})/mm=(\d{2})')
- for datum in resp_json["data"]["data"]:
- s1 = dt_hh.search(datum['name'])
- s2 = dt_hh_mm.search(datum['name'])
- partition = datum['name']
- if s1:
- partition = f"{s1.group(1)}{s1.group(2)}"
- if s2:
- partition = f"{s2.group(1)}{s2.group(2)}{s2.group(3)}"
- item = {
- "表名": table_name,
- "name": datum["name"],
- "分区": partition,
- "数据量": datum["recordCount"],
- "数据大小": convert_util.byte_convert(datum['dataSize']),
- "创建时间": date_util.ts_cover_str(datum["gmtCreate"]),
- "更新时间": date_util.ts_cover_str(datum['gmtModified'])
- }
- result.append(item)
- return result
|