1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586 |
- import logging
- import pandas as pd
- from odps import ODPS
- class ODPSUtils:
- """ODPS操作工具类,封装常用的ODPS操作"""
- # 默认配置
- DEFAULT_ACCESS_ID = 'LTAIWYUujJAm7CbH'
- DEFAULT_ACCESS_KEY = 'RfSjdiWwED1sGFlsjXv0DlfTnZTG1P'
- DEFAULT_PROJECT = 'loghubods'
- DEFAULT_ENDPOINT = 'http://service.cn.maxcompute.aliyun.com/api'
- DEFAULT_LOG_LEVEL = logging.INFO
- DEFAULT_LOG_FILE = None
- def __init__(self,
- access_id='LTAIWYUujJAm7CbH',
- access_key='RfSjdiWwED1sGFlsjXv0DlfTnZTG1P',
- project='loghubods',
- endpoint='http://service.cn.maxcompute.aliyun.com/api'):
- """
- 初始化ODPS连接
- 参数:
- access_id: ODPS访问ID
- access_key: ODPS访问密钥
- project: ODPS项目名
- endpoint: ODPS服务地址
- log_level: 日志级别,默认为INFO
- log_file: 日志文件路径,默认为None(不写入文件)
- """
- # 使用默认值或用户提供的值
- self.access_id = access_id
- self.access_key = access_key
- self.project = project
- self.endpoint = endpoint
- # 初始化ODPS连接
- self.odps = None
- self.connect()
- def connect(self):
- """建立ODPS连接"""
- try:
- self.odps = ODPS(self.access_id, self.access_key,
- project=self.project, endpoint=self.endpoint)
- return True
- except Exception as e:
- return False
- def execute_sql(self, sql, max_wait_time=3600, tunnel=True):
- """
- 执行SQL查询并返回结果
- 参数:
- sql: SQL查询语句
- max_wait_time: 最大等待时间(秒)
- tunnel: 是否使用Tunnel下载结果,默认为True
- 返回:
- pandas DataFrame包含查询结果
- """
- if not self.odps:
- return None
- try:
- with self.odps.execute_sql(sql).open_reader(tunnel=tunnel) as reader:
- # 转换结果为DataFrame
- records = []
- for record in reader:
- records.append(dict(record))
- if records:
- df = pd.DataFrame(records)
- return df
- else:
- return pd.DataFrame()
- except Exception as e:
- return None
|