import logging import pandas as pd from odps import ODPS class ODPSUtils: """ODPS操作工具类,封装常用的ODPS操作""" # 默认配置 DEFAULT_ACCESS_ID = 'LTAIWYUujJAm7CbH' DEFAULT_ACCESS_KEY = 'RfSjdiWwED1sGFlsjXv0DlfTnZTG1P' DEFAULT_PROJECT = 'loghubods' DEFAULT_ENDPOINT = 'http://service.cn.maxcompute.aliyun.com/api' DEFAULT_LOG_LEVEL = logging.INFO DEFAULT_LOG_FILE = None def __init__(self, access_id='LTAIWYUujJAm7CbH', access_key='RfSjdiWwED1sGFlsjXv0DlfTnZTG1P', project='loghubods', endpoint='http://service.cn.maxcompute.aliyun.com/api'): """ 初始化ODPS连接 参数: access_id: ODPS访问ID access_key: ODPS访问密钥 project: ODPS项目名 endpoint: ODPS服务地址 log_level: 日志级别,默认为INFO log_file: 日志文件路径,默认为None(不写入文件) """ # 使用默认值或用户提供的值 self.access_id = access_id self.access_key = access_key self.project = project self.endpoint = endpoint # 初始化ODPS连接 self.odps = None self.connect() def connect(self): """建立ODPS连接""" try: self.odps = ODPS(self.access_id, self.access_key, project=self.project, endpoint=self.endpoint) return True except Exception as e: return False def execute_sql(self, sql, max_wait_time=3600, tunnel=True): """ 执行SQL查询并返回结果 参数: sql: SQL查询语句 max_wait_time: 最大等待时间(秒) tunnel: 是否使用Tunnel下载结果,默认为True 返回: pandas DataFrame包含查询结果 """ if not self.odps: return None try: with self.odps.execute_sql(sql).open_reader(tunnel=tunnel) as reader: # 转换结果为DataFrame records = [] for record in reader: records.append(dict(record)) if records: df = pd.DataFrame(records) return df else: return pd.DataFrame() except Exception as e: return None