from typing import Optional import pandas as pd from odps import ODPS class ODPSApi: """ODPS操作工具类,封装常用的ODPS操作""" # 默认配置 DEFAULT_ACCESS_ID = "LTAIWYUujJAm7CbH" DEFAULT_ACCESS_KEY = "RfSjdiWwED1sGFlsjXv0DlfTnZTG1P" DEFAULT_PROJECT = "loghubods" DEFAULT_ENDPOINT = "http://service.cn.maxcompute.aliyun.com/api" DEFAULT_LOG_FILE = None def __init__( self, access_id="LTAIWYUujJAm7CbH", access_key="RfSjdiWwED1sGFlsjXv0DlfTnZTG1P", project="loghubods", endpoint="http://service.cn.maxcompute.aliyun.com/api", ): """ 初始化ODPS连接 参数: access_id: ODPS访问ID access_key: ODPS访问密钥 project: ODPS项目名 endpoint: ODPS服务地址 log_level: 日志级别,默认为INFO log_file: 日志文件路径,默认为None(不写入文件) """ # 使用默认值或用户提供的值 self.access_id = access_id self.access_key = access_key self.project = project self.endpoint = endpoint # 初始化ODPS连接 self.odps = None self.connect() def connect(self): """建立ODPS连接""" try: self.odps = ODPS( self.access_id, self.access_key, project=self.project, endpoint=self.endpoint, ) return True except Exception as e: return False def execute_sql(self, sql, max_wait_time=3600, tunnel=True) -> Optional[pd.DataFrame]: """ 执行SQL查询并返回结果 参数: sql: SQL查询语句 max_wait_time: 最大等待时间(秒) tunnel: 是否使用Tunnel下载结果,默认为True 返回: pandas DataFrame包含查询结果 """ if not self.odps: return None try: with self.odps.execute_sql(sql).open_reader(tunnel=tunnel) as reader: # 转换结果为DataFrame records = [] for record in reader: records.append(dict(record)) if records: df = pd.DataFrame(records) return df else: return pd.DataFrame() except Exception as e: return None