odps_service.py 1.2 KB

123456789101112131415161718192021222324252627282930313233343536373839
  1. import asyncio
  2. from odps import ODPS
  3. class OdpsService:
  4. def __init__(self, access_id, secret_access_key, project, endpoint):
  5. self.odps_client = ODPS(access_id, secret_access_key, project, endpoint)
  6. async def execute_odps_query(self, query: str) -> bool:
  7. loop = asyncio.get_running_loop()
  8. def _execute():
  9. instance = self.odps_client.execute_sql(
  10. query,
  11. hints={"odps.sql.submit.mode": "script"}
  12. )
  13. instance.wait_for_success()
  14. try:
  15. await loop.run_in_executor(None, _execute)
  16. return True
  17. except Exception as e:
  18. print(f"[ODPS ERROR] {e}")
  19. return False
  20. async def read_from_odps(self, query: str):
  21. loop = asyncio.get_running_loop()
  22. def _read():
  23. with self.odps_client.execute_sql(query).open_reader() as reader:
  24. if reader:
  25. return [item for item in reader]
  26. return []
  27. try:
  28. return await loop.run_in_executor(None, _read)
  29. except Exception as e:
  30. print(f"[ODPS READ ERROR] {e}")
  31. return []