| 123456789101112131415161718192021222324252627282930313233343536373839 |
- import asyncio
- from odps import ODPS
- class OdpsService:
- def __init__(self, access_id, secret_access_key, project, endpoint):
- self.odps_client = ODPS(access_id, secret_access_key, project, endpoint)
- async def execute_odps_query(self, query: str) -> bool:
- loop = asyncio.get_running_loop()
- def _execute():
- instance = self.odps_client.execute_sql(
- query,
- hints={"odps.sql.submit.mode": "script"}
- )
- instance.wait_for_success()
- try:
- await loop.run_in_executor(None, _execute)
- return True
- except Exception as e:
- print(f"[ODPS ERROR] {e}")
- return False
- async def read_from_odps(self, query: str):
- loop = asyncio.get_running_loop()
- def _read():
- with self.odps_client.execute_sql(query).open_reader() as reader:
- if reader:
- return [item for item in reader]
- return []
- try:
- return await loop.run_in_executor(None, _read)
- except Exception as e:
- print(f"[ODPS READ ERROR] {e}")
- return []
|