123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145 |
- import json
- import re
- import requests
- from odps import ODPS
- from odps.tunnel import TableTunnel
- from util import convert_util, date_util
- class ODPSClient(object):
- def __init__(self, project="loghubods"):
- self.accessId = "LTAIWYUujJAm7CbH"
- self.accessSecret = "RfSjdiWwED1sGFlsjXv0DlfTnZTG1P"
- self.endpoint = "http://service.odps.aliyun.com/api"
- self.tunnelUrl = "http://dt.cn-hangzhou.maxcompute.aliyun-inc.com"
- self.odps = ODPS(
- self.accessId,
- self.accessSecret,
- project,
- self.endpoint
- )
- def execute_sql(self, sql: str):
- hints = {
- 'odps.sql.submit.mode': 'script'
- }
- with self.odps.execute_sql(sql, hints=hints).open_reader(tunnel=True) as reader:
- pd_df = reader.to_pandas()
- return pd_df
- def execute_sql_file_result_save_fle(self, sql_file: str, param: dict, filepath: str):
- with open(sql_file, "r") as f:
- sql = f.read()
- for key in param:
- sql = sql.replace(f"${{{key}}}", param[key])
- self.execute_sql_result_save_file(sql, filepath)
- def execute_sql_result_save_file(self, sql: str, filepath: str):
- result = self.execute_sql(sql)
- result.to_csv(filepath, index=False)
- def get_all_record(self, table: str, partition_spec: str) -> list:
- tunnel = TableTunnel(self.odps)
- download_session = tunnel.create_download_session(f"{table}", partition_spec=partition_spec)
- count = download_session.count
- print(f"表: {table} 中的分区 {partition_spec}, 共有 {count} 条数据")
- result = []
- # 使用批量读取,并按指定大小读取记录
- with download_session.open_record_reader(0, count) as reader:
- batch = []
- for record in reader:
- batch.append(record)
- if len(batch) >= 1000:
- result.extend(batch)
- batch.clear() # 清空批次以便继续加载新的记录
- # 加入最后一批剩余的数据
- if batch:
- result.extend(batch)
- return result
- def process_all_records(self, table: str, partition_spec: str, func: callable) -> None:
- """
- 处理指定表和分区中的所有记录,对每条记录应用指定函数。
- :param table: 表名
- :param partition_spec: 分区规范
- :param func: 用于处理每条记录的函数
- """
- tunnel = TableTunnel(self.odps)
- download_session = tunnel.create_download_session(f"{table}", partition_spec=partition_spec)
- count = download_session.count
- print(f"表: {table} 中的分区 {partition_spec}, 共有 {count} 条数据")
- # 使用批量读取,并对每条记录应用 func
- with download_session.open_record_reader(0, count) as reader:
- for record in reader:
- func(record)
- def get_table(self, table: str):
- return self.odps.get_table(table)
- def get_download_session(self, table: str, dt: str):
- tunnel = TableTunnel(self.odps)
- return tunnel.create_download_session(f"{table}", partition_spec=f"dt={dt}")
- def get_all_record_batch(self, table: str, dt: str) -> list:
- tunnel = TableTunnel(self.odps)
- download_session = tunnel.create_download_session(f"{table}", partition_spec=f"dt={dt}")
- count = download_session.count
- print(f"表: {table} 中的分区 {dt}, 共有 {count} 条数据")
- result = []
- with download_session.open_arrow_reader(0, download_session.count) as reader:
- for batch in reader:
- result.append(batch)
- return result
- def get_all_partition(self, table: str, project="loghubods") -> list[str]:
- result = []
- for partition in self.odps.get_table(project=project, name=table).partitions:
- result.append(partition)
- return result
- @classmethod
- def get_all_partition_info(cls, table_name: str, page_num=1, page_size=30, **kwargs) -> list[dict]:
- header = {
- "cookie": kwargs.get("cookie")
- }
- project = kwargs.get("project", "loghubods")
- url = f"https://bff-cn-hangzhou.data.aliyun.com/dma/listPartitions_2?pageSize={page_size}&pageNum={page_num}&entityGuid=odps.{project}.{table_name}&entityType=odps-table"
- print(f"请求的阿里云接口为: {url}")
- response = requests.get(url, headers=header)
- resp_json = json.loads(response.text)
- result = []
- dt_hh = re.compile(r'dt=(\d{8})/hh=(\d{2})')
- dt_hh_mm = re.compile(r'dt=(\d{8})/hh=(\d{2})/mm=(\d{2})')
- for datum in resp_json["data"]["data"]:
- s1 = dt_hh.search(datum['name'])
- s2 = dt_hh_mm.search(datum['name'])
- partition = datum['name']
- if s1:
- partition = f"{s1.group(1)}{s1.group(2)}"
- if s2:
- partition = f"{s2.group(1)}{s2.group(2)}{s2.group(3)}"
- item = {
- "表名": table_name,
- "name": datum["name"],
- "分区": partition,
- "数据量": datum["recordCount"],
- "数据大小": convert_util.byte_convert(datum['dataSize']),
- "创建时间": date_util.ts_cover_str(datum["gmtCreate"]),
- "更新时间": date_util.ts_cover_str(datum['gmtModified'])
- }
- result.append(item)
- return result
|