odps_service.py 1.1 KB

1234567891011121314151617181920212223242526272829303132333435363738
  1. import asyncio
  2. from odps import ODPS
  3. class OdpsService:
  4. def __init__(self, access_id, secret_access_key, project, endpoint):
  5. self.odps_client = ODPS(access_id, secret_access_key, project, endpoint)
  6. async def execute_odps_query(self, query: str) -> bool:
  7. loop = asyncio.get_running_loop()
  8. def _execute():
  9. instance = self.odps_client.execute_sql(
  10. query, hints={"odps.sql.submit.mode": "script"}
  11. )
  12. instance.wait_for_success()
  13. try:
  14. await loop.run_in_executor(None, _execute)
  15. return True
  16. except Exception as e:
  17. print(f"[ODPS ERROR] {e}")
  18. return False
  19. async def read_from_odps(self, query: str):
  20. loop = asyncio.get_running_loop()
  21. def _read():
  22. with self.odps_client.execute_sql(query).open_reader() as reader:
  23. if reader:
  24. return [item for item in reader]
  25. return []
  26. try:
  27. return await loop.run_in_executor(None, _read)
  28. except Exception as e:
  29. print(f"[ODPS READ ERROR] {e}")
  30. return []