import json import re import requests from odps import ODPS from odps.tunnel import TableTunnel from util import convert_util, date_util class ODPSClient(object): def __init__(self, project="loghubods"): self.accessId = "LTAIWYUujJAm7CbH" self.accessSecret = "RfSjdiWwED1sGFlsjXv0DlfTnZTG1P" self.endpoint = "http://service.odps.aliyun.com/api" self.tunnelUrl = "http://dt.cn-hangzhou.maxcompute.aliyun-inc.com" self.odps = ODPS( self.accessId, self.accessSecret, project, self.endpoint ) def execute_sql(self, sql: str): hints = { 'odps.sql.submit.mode': 'script' } with self.odps.execute_sql(sql, hints=hints).open_reader(tunnel=True) as reader: pd_df = reader.to_pandas() return pd_df def execute_sql_file_result_save_fle(self, sql_file: str, param: dict, filepath: str): with open(sql_file, "r") as f: sql = f.read() for key in param: sql = sql.replace(f"${{{key}}}", param[key]) self.execute_sql_result_save_file(sql, filepath) def execute_sql_result_save_file(self, sql: str, filepath: str): result = self.execute_sql(sql) result.to_csv(filepath, index=False) def get_all_record(self, table: str, partition_spec: str) -> list: tunnel = TableTunnel(self.odps) download_session = tunnel.create_download_session(f"{table}", partition_spec=partition_spec) count = download_session.count print(f"表: {table} 中的分区 {partition_spec}, 共有 {count} 条数据") result = [] # 使用批量读取,并按指定大小读取记录 with download_session.open_record_reader(0, count) as reader: batch = [] for record in reader: batch.append(record) if len(batch) >= 1000: result.extend(batch) batch.clear() # 清空批次以便继续加载新的记录 # 加入最后一批剩余的数据 if batch: result.extend(batch) return result def process_all_records(self, table: str, partition_spec: str, func: callable) -> None: """ 处理指定表和分区中的所有记录,对每条记录应用指定函数。 :param table: 表名 :param partition_spec: 分区规范 :param func: 用于处理每条记录的函数 """ tunnel = TableTunnel(self.odps) download_session = tunnel.create_download_session(f"{table}", partition_spec=partition_spec) count = download_session.count print(f"表: {table} 中的分区 {partition_spec}, 共有 {count} 条数据") # 使用批量读取,并对每条记录应用 func with download_session.open_record_reader(0, count) as reader: for record in reader: func(record) def get_table(self, table: str): return self.odps.get_table(table) def get_download_session(self, table: str, dt: str): tunnel = TableTunnel(self.odps) return tunnel.create_download_session(f"{table}", partition_spec=f"dt={dt}") def get_all_record_batch(self, table: str, dt: str) -> list: tunnel = TableTunnel(self.odps) download_session = tunnel.create_download_session(f"{table}", partition_spec=f"dt={dt}") count = download_session.count print(f"表: {table} 中的分区 {dt}, 共有 {count} 条数据") result = [] with download_session.open_arrow_reader(0, download_session.count) as reader: for batch in reader: result.append(batch) return result @classmethod def get_all_partition_info(cls, table_name: str, page_num=1, page_size=30, **kwargs) -> list[dict]: header = { "cookie": kwargs.get("cookie") } project = kwargs.get("project", "loghubods") url = f"https://bff-cn-hangzhou.data.aliyun.com/dma/listPartitions_2?pageSize={page_size}&pageNum={page_num}&entityGuid=odps.{project}.{table_name}&entityType=odps-table" print(f"请求的阿里云接口为: {url}") response = requests.get(url, headers=header) resp_json = json.loads(response.text) result = [] dt_hh = re.compile(r'dt=(\d{8})/hh=(\d{2})') dt_hh_mm = re.compile(r'dt=(\d{8})/hh=(\d{2})/mm=(\d{2})') for datum in resp_json["data"]["data"]: s1 = dt_hh.search(datum['name']) s2 = dt_hh_mm.search(datum['name']) partition = datum['name'] if s1: partition = f"{s1.group(1)}{s1.group(2)}" if s2: partition = f"{s2.group(1)}{s2.group(2)}{s2.group(3)}" item = { "表名": table_name, "name": datum["name"], "分区": partition, "数据量": datum["recordCount"], "数据大小": convert_util.byte_convert(datum['dataSize']), "创建时间": date_util.ts_cover_str(datum["gmtCreate"]), "更新时间": date_util.ts_cover_str(datum['gmtModified']) } result.append(item) return result