odps_module.py 7.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197
  1. #!/usr/bin/env python
  2. # coding=utf-8
  3. import os
  4. import time
  5. import uuid
  6. import threading
  7. from concurrent.futures import ThreadPoolExecutor, as_completed
  8. from odps import ODPS, options
  9. from odps.tunnel import TableTunnel
  10. from tqdm import tqdm
  11. import pyarrow as pa
  12. from pyarrow import csv as pa_csv
  13. # 开启 Instance Tunnel,解除 1 万条限制
  14. options.tunnel.use_instance_tunnel = True
  15. options.tunnel.limit_instance_tunnel = False
  16. # ODPS 配置
  17. ODPS_CONFIGS = {
  18. "default": {
  19. "access_id": "LTAIWYUujJAm7CbH",
  20. "access_secret": "RfSjdiWwED1sGFlsjXv0DlfTnZTG1P",
  21. "project": "loghubods",
  22. },
  23. "piaoquan_api": {
  24. "access_id": "LTAI5tKyXxh7C6349c1wbwUX",
  25. "access_secret": "H8doQDC20KugToRA3giERgRyRD1KR9",
  26. "project": "piaoquan_api",
  27. },
  28. }
  29. class ODPSClient(object):
  30. def __init__(self, project="loghubods", config="default"):
  31. """
  32. 初始化 ODPS 客户端
  33. Args:
  34. project: 项目名(可覆盖配置中的默认项目)
  35. config: 配置名,可选 "default" 或 "piaoquan_api"
  36. """
  37. cfg = ODPS_CONFIGS.get(config, ODPS_CONFIGS["default"])
  38. self.accessId = cfg["access_id"]
  39. self.accessSecret = cfg["access_secret"]
  40. self.endpoint = "http://service.odps.aliyun.com/api"
  41. self.tunnelUrl = "http://dt.cn-hangzhou.maxcompute.aliyun-inc.com"
  42. # 如果指定了 project 且不是默认值,使用指定的;否则用配置中的
  43. actual_project = project if project != "loghubods" else cfg["project"]
  44. self.odps = ODPS(
  45. self.accessId,
  46. self.accessSecret,
  47. actual_project,
  48. self.endpoint
  49. )
  50. def execute_sql(self, sql: str, print_logview: bool = True):
  51. """执行 SQL 并返回 DataFrame"""
  52. hints = {'odps.sql.submit.mode': 'script'}
  53. instance = self.odps.execute_sql(sql, hints=hints)
  54. if print_logview:
  55. print(f"LogView: {instance.get_logview_address()}")
  56. with instance.open_reader(tunnel=True, limit=False) as reader:
  57. pd_df = reader.to_pandas()
  58. return pd_df
  59. def execute_sql_result_save_file(self, sql: str, output_file: str):
  60. """执行 SQL 并保存到文件(Arrow 直接写 CSV,速度最快)"""
  61. hints = {'odps.sql.submit.mode': 'script'}
  62. start_time = time.time()
  63. instance = self.odps.execute_sql(sql, hints=hints)
  64. sql_time = time.time() - start_time
  65. print(f"LogView: {instance.get_logview_address()}")
  66. print(f"SQL 执行耗时: {sql_time:.1f}s")
  67. with instance.open_reader(tunnel=True, limit=False, arrow=True) as reader:
  68. total = reader.count
  69. # 边下载边写入,用 pyarrow 直接写 CSV
  70. with open(output_file, 'wb') as f:
  71. first = True
  72. with tqdm(total=total, unit='行', desc='下载中') as pbar:
  73. for batch in reader:
  74. # pyarrow 写 CSV(比 pandas 快很多)
  75. options = pa_csv.WriteOptions(include_header=first)
  76. pa_csv.write_csv(pa.Table.from_batches([batch]), f, write_options=options)
  77. first = False
  78. pbar.update(batch.num_rows)
  79. total_time = time.time() - start_time
  80. print(f"总耗时: {total_time:.1f}s")
  81. print(f"完成: {output_file}")
  82. def execute_sql_result_save_file_parallel(self, sql: str, output_file: str, workers: int = 4):
  83. """执行 SQL 并保存到文件(多线程并行下载,速度最快)"""
  84. hints = {'odps.sql.submit.mode': 'script'}
  85. # 生成临时表名
  86. tmp_table = f"tmp_download_{uuid.uuid4().hex[:8]}"
  87. create_sql = f"CREATE TABLE {tmp_table} LIFECYCLE 1 AS {sql}"
  88. start_time = time.time()
  89. # 1. 创建临时表
  90. print(f"创建临时表: {tmp_table}")
  91. instance = self.odps.execute_sql(create_sql, hints=hints)
  92. print(f"LogView: {instance.get_logview_address()}")
  93. instance.wait_for_success()
  94. sql_time = time.time() - start_time
  95. print(f"SQL 执行耗时: {sql_time:.1f}s")
  96. try:
  97. # 2. 获取表信息
  98. table = self.odps.get_table(tmp_table)
  99. tunnel = TableTunnel(self.odps)
  100. download_session = tunnel.create_download_session(table.name)
  101. total = download_session.count
  102. print(f"总行数: {total}")
  103. if total == 0:
  104. # 空表,直接写入空 CSV
  105. with open(output_file, 'w') as f:
  106. columns = [col.name for col in table.table_schema.columns]
  107. f.write(','.join(columns) + '\n')
  108. print(f"完成: {output_file} (空表)")
  109. return
  110. # 3. 分段
  111. chunk_size = (total + workers - 1) // workers
  112. chunks = []
  113. for i in range(workers):
  114. start = i * chunk_size
  115. end = min((i + 1) * chunk_size, total)
  116. if start < end:
  117. chunks.append((i, start, end - start)) # (index, start, count)
  118. print(f"并行下载: {len(chunks)} 个分片, {workers} 线程")
  119. # 4. 多线程下载到临时文件(放在输出目录)
  120. output_dir = os.path.dirname(output_file)
  121. tmp_prefix = os.path.join(output_dir, f".tmp_{os.path.basename(output_file)}_")
  122. pbar = tqdm(total=total, unit='行', desc='下载中')
  123. pbar_lock = threading.Lock()
  124. session_id = download_session.id
  125. tmp_files = {}
  126. def download_chunk(chunk_info):
  127. idx, start, count = chunk_info
  128. tmp_file = f"{tmp_prefix}{idx:04d}"
  129. session = tunnel.create_download_session(table.name, download_id=session_id)
  130. with session.open_arrow_reader(start, count) as reader:
  131. batches = []
  132. for batch in reader:
  133. batches.append(batch)
  134. with pbar_lock:
  135. pbar.update(batch.num_rows)
  136. if batches:
  137. tbl = pa.Table.from_batches(batches)
  138. pa_csv.write_csv(tbl, tmp_file)
  139. return idx, tmp_file
  140. # 并行下载
  141. with ThreadPoolExecutor(max_workers=workers) as executor:
  142. futures = [executor.submit(download_chunk, chunk) for chunk in chunks]
  143. for future in as_completed(futures):
  144. idx, tmp_file = future.result()
  145. tmp_files[idx] = tmp_file
  146. pbar.close()
  147. # 按顺序合并
  148. print("合并文件中...")
  149. with open(output_file, 'wb') as outf:
  150. for idx in range(len(chunks)):
  151. tmp_file = tmp_files.get(idx)
  152. if tmp_file and os.path.exists(tmp_file):
  153. with open(tmp_file, 'rb') as inf:
  154. if idx > 0:
  155. inf.readline() # 跳过表头
  156. outf.write(inf.read())
  157. os.remove(tmp_file)
  158. finally:
  159. # 6. 删除临时表
  160. print(f"删除临时表: {tmp_table}")
  161. self.odps.delete_table(tmp_table, if_exists=True)
  162. total_time = time.time() - start_time
  163. print(f"总耗时: {total_time:.1f}s")
  164. print(f"完成: {output_file}")