# -*- coding: utf-8 -*- import json import logging import socket import asyncio import aiohttp class AsyncApolloClient: def __init__( self, app_id, cluster="default", config_server_url="http://localhost:8080", timeout=35, ip=None, ): self.config_server_url = config_server_url self.appId = app_id self.cluster = cluster self.timeout = timeout self.stopped = False self.ip = ip or self._init_ip() self._cache = {} self._notification_map = {"application": -1} self._stop_event = asyncio.Event() def _init_ip(self): s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) try: s.connect(("8.8.8.8", 53)) return s.getsockname()[0] finally: s.close() async def get_value( self, key, default_val=None, namespace="application", auto_fetch_on_cache_miss=False, ): if namespace not in self._notification_map: self._notification_map[namespace] = -1 logging.info(f"Add namespace '{namespace}' to local notification map") if namespace not in self._cache: self._cache[namespace] = {} logging.info(f"Add namespace '{namespace}' to local cache") await self._long_poll() if key in self._cache[namespace]: return self._cache[namespace][key] elif auto_fetch_on_cache_miss: return await self._cached_http_get(key, default_val, namespace) else: return default_val async def start(self): if len(self._cache) == 0: await self._long_poll() asyncio.create_task(self._listener()) async def stop(self): logging.info("Stopping listener...") self._stop_event.set() async def _cached_http_get(self, key, default_val, namespace="application"): url = f"{self.config_server_url}/configfiles/json/{self.appId}/{self.cluster}/{namespace}?ip={self.ip}" async with aiohttp.ClientSession() as session: async with session.get(url) as r: if r.status == 200: data = await r.json() self._cache[namespace] = data logging.info(f"Updated local cache for namespace {namespace}") else: data = self._cache.get(namespace, {}) return data.get(key, default_val) async def _uncached_http_get(self, namespace="application"): url = f"{self.config_server_url}/configs/{self.appId}/{self.cluster}/{namespace}?ip={self.ip}" async with aiohttp.ClientSession() as session: async with session.get(url) as r: if r.status == 200: data = await r.json() self._cache[namespace] = data["configurations"] logging.info( f"Updated local cache for namespace {namespace} release key {data['releaseKey']}" ) async def _long_poll(self): url = f"{self.config_server_url}/notifications/v2" notifications = [ {"namespaceName": k, "notificationId": v} for k, v in self._notification_map.items() ] params = { "appId": self.appId, "cluster": self.cluster, "notifications": json.dumps(notifications, ensure_ascii=False), } try: async with aiohttp.ClientSession() as session: async with session.get(url, params=params, timeout=self.timeout) as r: if r.status == 304: logging.debug("No change.") return if r.status == 200: data = await r.json() for entry in data: ns = entry["namespaceName"] nid = entry["notificationId"] logging.info(f"{ns} has changes: notificationId={nid}") await self._uncached_http_get(ns) self._notification_map[ns] = nid else: logging.warning("Sleep due to unexpected status...") await asyncio.sleep(self.timeout) except Exception as e: logging.warning(f"Error during long poll: {e}") await asyncio.sleep(self.timeout) async def _listener(self): logging.info("Entering listener loop...") while not self._stop_event.is_set(): await self._long_poll() logging.info("Listener stopped.") self.stopped = True