| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197 |
- #!/usr/bin/env python
- # coding=utf-8
- import os
- import time
- import uuid
- import threading
- from concurrent.futures import ThreadPoolExecutor, as_completed
- from odps import ODPS, options
- from odps.tunnel import TableTunnel
- from tqdm import tqdm
- import pyarrow as pa
- from pyarrow import csv as pa_csv
- # 开启 Instance Tunnel,解除 1 万条限制
- options.tunnel.use_instance_tunnel = True
- options.tunnel.limit_instance_tunnel = False
- # ODPS 配置
- ODPS_CONFIGS = {
- "default": {
- "access_id": "LTAIWYUujJAm7CbH",
- "access_secret": "RfSjdiWwED1sGFlsjXv0DlfTnZTG1P",
- "project": "loghubods",
- },
- "piaoquan_api": {
- "access_id": "LTAI5tKyXxh7C6349c1wbwUX",
- "access_secret": "H8doQDC20KugToRA3giERgRyRD1KR9",
- "project": "piaoquan_api",
- },
- }
- class ODPSClient(object):
- def __init__(self, project="loghubods", config="default"):
- """
- 初始化 ODPS 客户端
- Args:
- project: 项目名(可覆盖配置中的默认项目)
- config: 配置名,可选 "default" 或 "piaoquan_api"
- """
- cfg = ODPS_CONFIGS.get(config, ODPS_CONFIGS["default"])
- self.accessId = cfg["access_id"]
- self.accessSecret = cfg["access_secret"]
- self.endpoint = "http://service.odps.aliyun.com/api"
- self.tunnelUrl = "http://dt.cn-hangzhou.maxcompute.aliyun-inc.com"
- # 如果指定了 project 且不是默认值,使用指定的;否则用配置中的
- actual_project = project if project != "loghubods" else cfg["project"]
- self.odps = ODPS(
- self.accessId,
- self.accessSecret,
- actual_project,
- self.endpoint
- )
- def execute_sql(self, sql: str, print_logview: bool = True):
- """执行 SQL 并返回 DataFrame"""
- hints = {'odps.sql.submit.mode': 'script'}
- instance = self.odps.execute_sql(sql, hints=hints)
- if print_logview:
- print(f"LogView: {instance.get_logview_address()}")
- with instance.open_reader(tunnel=True, limit=False) as reader:
- pd_df = reader.to_pandas()
- return pd_df
- def execute_sql_result_save_file(self, sql: str, output_file: str):
- """执行 SQL 并保存到文件(Arrow 直接写 CSV,速度最快)"""
- hints = {'odps.sql.submit.mode': 'script'}
- start_time = time.time()
- instance = self.odps.execute_sql(sql, hints=hints)
- sql_time = time.time() - start_time
- print(f"LogView: {instance.get_logview_address()}")
- print(f"SQL 执行耗时: {sql_time:.1f}s")
- with instance.open_reader(tunnel=True, limit=False, arrow=True) as reader:
- total = reader.count
- # 边下载边写入,用 pyarrow 直接写 CSV
- with open(output_file, 'wb') as f:
- first = True
- with tqdm(total=total, unit='行', desc='下载中') as pbar:
- for batch in reader:
- # pyarrow 写 CSV(比 pandas 快很多)
- options = pa_csv.WriteOptions(include_header=first)
- pa_csv.write_csv(pa.Table.from_batches([batch]), f, write_options=options)
- first = False
- pbar.update(batch.num_rows)
- total_time = time.time() - start_time
- print(f"总耗时: {total_time:.1f}s")
- print(f"完成: {output_file}")
- def execute_sql_result_save_file_parallel(self, sql: str, output_file: str, workers: int = 4):
- """执行 SQL 并保存到文件(多线程并行下载,速度最快)"""
- hints = {'odps.sql.submit.mode': 'script'}
- # 生成临时表名
- tmp_table = f"tmp_download_{uuid.uuid4().hex[:8]}"
- create_sql = f"CREATE TABLE {tmp_table} LIFECYCLE 1 AS {sql}"
- start_time = time.time()
- # 1. 创建临时表
- print(f"创建临时表: {tmp_table}")
- instance = self.odps.execute_sql(create_sql, hints=hints)
- print(f"LogView: {instance.get_logview_address()}")
- instance.wait_for_success()
- sql_time = time.time() - start_time
- print(f"SQL 执行耗时: {sql_time:.1f}s")
- try:
- # 2. 获取表信息
- table = self.odps.get_table(tmp_table)
- tunnel = TableTunnel(self.odps)
- download_session = tunnel.create_download_session(table.name)
- total = download_session.count
- print(f"总行数: {total}")
- if total == 0:
- # 空表,直接写入空 CSV
- with open(output_file, 'w') as f:
- columns = [col.name for col in table.table_schema.columns]
- f.write(','.join(columns) + '\n')
- print(f"完成: {output_file} (空表)")
- return
- # 3. 分段
- chunk_size = (total + workers - 1) // workers
- chunks = []
- for i in range(workers):
- start = i * chunk_size
- end = min((i + 1) * chunk_size, total)
- if start < end:
- chunks.append((i, start, end - start)) # (index, start, count)
- print(f"并行下载: {len(chunks)} 个分片, {workers} 线程")
- # 4. 多线程下载到临时文件(放在输出目录)
- output_dir = os.path.dirname(output_file)
- tmp_prefix = os.path.join(output_dir, f".tmp_{os.path.basename(output_file)}_")
- pbar = tqdm(total=total, unit='行', desc='下载中')
- pbar_lock = threading.Lock()
- session_id = download_session.id
- tmp_files = {}
- def download_chunk(chunk_info):
- idx, start, count = chunk_info
- tmp_file = f"{tmp_prefix}{idx:04d}"
- session = tunnel.create_download_session(table.name, download_id=session_id)
- with session.open_arrow_reader(start, count) as reader:
- batches = []
- for batch in reader:
- batches.append(batch)
- with pbar_lock:
- pbar.update(batch.num_rows)
- if batches:
- tbl = pa.Table.from_batches(batches)
- pa_csv.write_csv(tbl, tmp_file)
- return idx, tmp_file
- # 并行下载
- with ThreadPoolExecutor(max_workers=workers) as executor:
- futures = [executor.submit(download_chunk, chunk) for chunk in chunks]
- for future in as_completed(futures):
- idx, tmp_file = future.result()
- tmp_files[idx] = tmp_file
- pbar.close()
- # 按顺序合并
- print("合并文件中...")
- with open(output_file, 'wb') as outf:
- for idx in range(len(chunks)):
- tmp_file = tmp_files.get(idx)
- if tmp_file and os.path.exists(tmp_file):
- with open(tmp_file, 'rb') as inf:
- if idx > 0:
- inf.readline() # 跳过表头
- outf.write(inf.read())
- os.remove(tmp_file)
- finally:
- # 6. 删除临时表
- print(f"删除临时表: {tmp_table}")
- self.odps.delete_table(tmp_table, if_exists=True)
- total_time = time.time() - start_time
- print(f"总耗时: {total_time:.1f}s")
- print(f"完成: {output_file}")
|