odps_utils.py 2.4 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586
  1. import logging
  2. import pandas as pd
  3. from odps import ODPS
  4. class ODPSUtils:
  5. """ODPS操作工具类,封装常用的ODPS操作"""
  6. # 默认配置
  7. DEFAULT_ACCESS_ID = 'LTAIWYUujJAm7CbH'
  8. DEFAULT_ACCESS_KEY = 'RfSjdiWwED1sGFlsjXv0DlfTnZTG1P'
  9. DEFAULT_PROJECT = 'loghubods'
  10. DEFAULT_ENDPOINT = 'http://service.cn.maxcompute.aliyun.com/api'
  11. DEFAULT_LOG_LEVEL = logging.INFO
  12. DEFAULT_LOG_FILE = None
  13. def __init__(self,
  14. access_id='LTAIWYUujJAm7CbH',
  15. access_key='RfSjdiWwED1sGFlsjXv0DlfTnZTG1P',
  16. project='loghubods',
  17. endpoint='http://service.cn.maxcompute.aliyun.com/api'):
  18. """
  19. 初始化ODPS连接
  20. 参数:
  21. access_id: ODPS访问ID
  22. access_key: ODPS访问密钥
  23. project: ODPS项目名
  24. endpoint: ODPS服务地址
  25. log_level: 日志级别,默认为INFO
  26. log_file: 日志文件路径,默认为None(不写入文件)
  27. """
  28. # 使用默认值或用户提供的值
  29. self.access_id = access_id
  30. self.access_key = access_key
  31. self.project = project
  32. self.endpoint = endpoint
  33. # 初始化ODPS连接
  34. self.odps = None
  35. self.connect()
  36. def connect(self):
  37. """建立ODPS连接"""
  38. try:
  39. self.odps = ODPS(self.access_id, self.access_key,
  40. project=self.project, endpoint=self.endpoint)
  41. return True
  42. except Exception as e:
  43. return False
  44. def execute_sql(self, sql, max_wait_time=3600, tunnel=True):
  45. """
  46. 执行SQL查询并返回结果
  47. 参数:
  48. sql: SQL查询语句
  49. max_wait_time: 最大等待时间(秒)
  50. tunnel: 是否使用Tunnel下载结果,默认为True
  51. 返回:
  52. pandas DataFrame包含查询结果
  53. """
  54. if not self.odps:
  55. return None
  56. try:
  57. with self.odps.execute_sql(sql).open_reader(tunnel=tunnel) as reader:
  58. # 转换结果为DataFrame
  59. records = []
  60. for record in reader:
  61. records.append(dict(record))
  62. if records:
  63. df = pd.DataFrame(records)
  64. return df
  65. else:
  66. return pd.DataFrame()
  67. except Exception as e:
  68. return None