1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586 |
- from typing import Optional
- import pandas as pd
- from odps import ODPS
- class ODPSApi:
- """ODPS操作工具类,封装常用的ODPS操作"""
- # 默认配置
- DEFAULT_ACCESS_ID = "LTAIWYUujJAm7CbH"
- DEFAULT_ACCESS_KEY = "RfSjdiWwED1sGFlsjXv0DlfTnZTG1P"
- DEFAULT_PROJECT = "loghubods"
- DEFAULT_ENDPOINT = "http://service.cn.maxcompute.aliyun.com/api"
- DEFAULT_LOG_FILE = None
- def __init__(
- self,
- access_id="LTAIWYUujJAm7CbH",
- access_key="RfSjdiWwED1sGFlsjXv0DlfTnZTG1P",
- project="loghubods",
- endpoint="http://service.cn.maxcompute.aliyun.com/api",
- ):
- """
- 初始化ODPS连接
- 参数:
- access_id: ODPS访问ID
- access_key: ODPS访问密钥
- project: ODPS项目名
- endpoint: ODPS服务地址
- log_level: 日志级别,默认为INFO
- log_file: 日志文件路径,默认为None(不写入文件)
- """
- # 使用默认值或用户提供的值
- self.access_id = access_id
- self.access_key = access_key
- self.project = project
- self.endpoint = endpoint
- # 初始化ODPS连接
- self.odps = None
- self.connect()
- def connect(self):
- """建立ODPS连接"""
- try:
- self.odps = ODPS(
- self.access_id,
- self.access_key,
- project=self.project,
- endpoint=self.endpoint,
- )
- return True
- except Exception as e:
- return False
- def execute_sql(self, sql, max_wait_time=3600, tunnel=True) -> Optional[pd.DataFrame]:
- """
- 执行SQL查询并返回结果
- 参数:
- sql: SQL查询语句
- max_wait_time: 最大等待时间(秒)
- tunnel: 是否使用Tunnel下载结果,默认为True
- 返回:
- pandas DataFrame包含查询结果
- """
- if not self.odps:
- return None
- try:
- with self.odps.execute_sql(sql).open_reader(tunnel=tunnel) as reader:
- # 转换结果为DataFrame
- records = []
- for record in reader:
- records.append(dict(record))
- if records:
- df = pd.DataFrame(records)
- return df
- else:
- return pd.DataFrame()
- except Exception as e:
- return None
|