odps_utils.py 3.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107
  1. import logging
  2. import pandas as pd
  3. from odps import ODPS
  4. class ODPSUtils:
  5. """ODPS操作工具类,封装常用的ODPS操作"""
  6. # 默认配置
  7. DEFAULT_ACCESS_ID = 'LTAIWYUujJAm7CbH'
  8. DEFAULT_ACCESS_KEY = 'RfSjdiWwED1sGFlsjXv0DlfTnZTG1P'
  9. DEFAULT_PROJECT = 'loghubods'
  10. DEFAULT_ENDPOINT = 'http://service.cn.maxcompute.aliyun.com/api'
  11. DEFAULT_LOG_LEVEL = logging.INFO
  12. DEFAULT_LOG_FILE = None
  13. def __init__(self,
  14. access_id='LTAIWYUujJAm7CbH',
  15. access_key='RfSjdiWwED1sGFlsjXv0DlfTnZTG1P',
  16. project='loghubods',
  17. endpoint='http://service.cn.maxcompute.aliyun.com/api'):
  18. """
  19. 初始化ODPS连接
  20. 参数:
  21. access_id: ODPS访问ID
  22. access_key: ODPS访问密钥
  23. project: ODPS项目名
  24. endpoint: ODPS服务地址
  25. log_level: 日志级别,默认为INFO
  26. log_file: 日志文件路径,默认为None(不写入文件)
  27. """
  28. # 使用默认值或用户提供的值
  29. self.access_id = access_id
  30. self.access_key = access_key
  31. self.project = project
  32. self.endpoint = endpoint
  33. # 初始化ODPS连接
  34. self.odps = None
  35. self.connect()
  36. def connect(self):
  37. """建立ODPS连接"""
  38. try:
  39. self.odps = ODPS(self.access_id, self.access_key,
  40. project=self.project, endpoint=self.endpoint)
  41. return True
  42. except Exception as e:
  43. return False
  44. def execute_sql(self, sql, max_wait_time=3600, tunnel=True):
  45. """
  46. 执行SQL查询并返回结果
  47. 参数:
  48. sql: SQL查询语句
  49. max_wait_time: 最大等待时间(秒)
  50. tunnel: 是否使用Tunnel下载结果,默认为True
  51. 返回:
  52. pandas DataFrame包含查询结果
  53. """
  54. if not self.odps:
  55. return None
  56. try:
  57. with self.odps.execute_sql(sql).open_reader(tunnel=tunnel) as reader:
  58. # 转换结果为DataFrame
  59. records = []
  60. for record in reader:
  61. records.append(dict(record))
  62. if records:
  63. df = pd.DataFrame(records)
  64. return df
  65. else:
  66. return pd.DataFrame()
  67. except Exception as e:
  68. return None
  69. # 示例用法
  70. if __name__ == "__main__":
  71. # 创建ODPS工具类实例
  72. odps_utils = ODPSUtils()
  73. third_party_user_id = '7881300295218216'
  74. # 示例1: 查询数据
  75. sql = f"""
  76. SELECT * FROM third_party_user_date_version
  77. WHERE dt between '20250612' and '20250612' -- 添加分区条件
  78. and third_party_user_id = {third_party_user_id}
  79. and profile_data_v1 is not null
  80. order by dt desc
  81. limit 1
  82. """
  83. result_df = odps_utils.execute_sql(sql)
  84. if result_df is not None and not result_df.empty:
  85. print("查询结果预览:")
  86. print(result_df.head())