123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107 |
- import logging
- import pandas as pd
- from odps import ODPS
- class ODPSUtils:
- """ODPS操作工具类,封装常用的ODPS操作"""
- # 默认配置
- DEFAULT_ACCESS_ID = 'LTAIWYUujJAm7CbH'
- DEFAULT_ACCESS_KEY = 'RfSjdiWwED1sGFlsjXv0DlfTnZTG1P'
- DEFAULT_PROJECT = 'loghubods'
- DEFAULT_ENDPOINT = 'http://service.cn.maxcompute.aliyun.com/api'
- DEFAULT_LOG_LEVEL = logging.INFO
- DEFAULT_LOG_FILE = None
- def __init__(self,
- access_id='LTAIWYUujJAm7CbH',
- access_key='RfSjdiWwED1sGFlsjXv0DlfTnZTG1P',
- project='loghubods',
- endpoint='http://service.cn.maxcompute.aliyun.com/api'):
- """
- 初始化ODPS连接
- 参数:
- access_id: ODPS访问ID
- access_key: ODPS访问密钥
- project: ODPS项目名
- endpoint: ODPS服务地址
- log_level: 日志级别,默认为INFO
- log_file: 日志文件路径,默认为None(不写入文件)
- """
- # 使用默认值或用户提供的值
- self.access_id = access_id
- self.access_key = access_key
- self.project = project
- self.endpoint = endpoint
- # 初始化ODPS连接
- self.odps = None
- self.connect()
- def connect(self):
- """建立ODPS连接"""
- try:
- self.odps = ODPS(self.access_id, self.access_key,
- project=self.project, endpoint=self.endpoint)
- return True
- except Exception as e:
- return False
- def execute_sql(self, sql, max_wait_time=3600, tunnel=True):
- """
- 执行SQL查询并返回结果
- 参数:
- sql: SQL查询语句
- max_wait_time: 最大等待时间(秒)
- tunnel: 是否使用Tunnel下载结果,默认为True
- 返回:
- pandas DataFrame包含查询结果
- """
- if not self.odps:
- return None
- try:
- with self.odps.execute_sql(sql).open_reader(tunnel=tunnel) as reader:
- # 转换结果为DataFrame
- records = []
- for record in reader:
- records.append(dict(record))
- if records:
- df = pd.DataFrame(records)
- return df
- else:
- return pd.DataFrame()
- except Exception as e:
- return None
- # 示例用法
- if __name__ == "__main__":
- # 创建ODPS工具类实例
- odps_utils = ODPSUtils()
- third_party_user_id = '7881300295218216'
- # 示例1: 查询数据
- sql = f"""
- SELECT * FROM third_party_user_date_version
- WHERE dt between '20250612' and '20250612' -- 添加分区条件
- and third_party_user_id = {third_party_user_id}
- and profile_data_v1 is not null
- order by dt desc
- limit 1
- """
- result_df = odps_utils.execute_sql(sql)
- if result_df is not None and not result_df.empty:
- print("查询结果预览:")
- print(result_df.head())
|