odps_api.py 2.4 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586
  1. from typing import Optional
  2. import pandas as pd
  3. from odps import ODPS
  4. class ODPSApi:
  5. """ODPS操作工具类,封装常用的ODPS操作"""
  6. # 默认配置
  7. DEFAULT_ACCESS_ID = "LTAIWYUujJAm7CbH"
  8. DEFAULT_ACCESS_KEY = "RfSjdiWwED1sGFlsjXv0DlfTnZTG1P"
  9. DEFAULT_PROJECT = "loghubods"
  10. DEFAULT_ENDPOINT = "http://service.cn.maxcompute.aliyun.com/api"
  11. DEFAULT_LOG_FILE = None
  12. def __init__(
  13. self,
  14. access_id="LTAIWYUujJAm7CbH",
  15. access_key="RfSjdiWwED1sGFlsjXv0DlfTnZTG1P",
  16. project="loghubods",
  17. endpoint="http://service.cn.maxcompute.aliyun.com/api",
  18. ):
  19. """
  20. 初始化ODPS连接
  21. 参数:
  22. access_id: ODPS访问ID
  23. access_key: ODPS访问密钥
  24. project: ODPS项目名
  25. endpoint: ODPS服务地址
  26. log_level: 日志级别,默认为INFO
  27. log_file: 日志文件路径,默认为None(不写入文件)
  28. """
  29. # 使用默认值或用户提供的值
  30. self.access_id = access_id
  31. self.access_key = access_key
  32. self.project = project
  33. self.endpoint = endpoint
  34. # 初始化ODPS连接
  35. self.odps = None
  36. self.connect()
  37. def connect(self):
  38. """建立ODPS连接"""
  39. try:
  40. self.odps = ODPS(
  41. self.access_id,
  42. self.access_key,
  43. project=self.project,
  44. endpoint=self.endpoint,
  45. )
  46. return True
  47. except Exception as e:
  48. return False
  49. def execute_sql(self, sql, max_wait_time=3600, tunnel=True) -> Optional[pd.DataFrame]:
  50. """
  51. 执行SQL查询并返回结果
  52. 参数:
  53. sql: SQL查询语句
  54. max_wait_time: 最大等待时间(秒)
  55. tunnel: 是否使用Tunnel下载结果,默认为True
  56. 返回:
  57. pandas DataFrame包含查询结果
  58. """
  59. if not self.odps:
  60. return None
  61. try:
  62. with self.odps.execute_sql(sql).open_reader(tunnel=tunnel) as reader:
  63. # 转换结果为DataFrame
  64. records = []
  65. for record in reader:
  66. records.append(dict(record))
  67. if records:
  68. df = pd.DataFrame(records)
  69. return df
  70. else:
  71. return pd.DataFrame()
  72. except Exception as e:
  73. return None