123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131 |
- # -*- coding: utf-8 -*-
- import json
- import logging
- import socket
- import asyncio
- import aiohttp
- class AsyncApolloClient:
- def __init__(
- self,
- app_id,
- cluster="default",
- config_server_url="http://localhost:8080",
- timeout=35,
- ip=None,
- ):
- self.config_server_url = config_server_url
- self.appId = app_id
- self.cluster = cluster
- self.timeout = timeout
- self.stopped = False
- self.ip = ip or self._init_ip()
- self._cache = {}
- self._notification_map = {"application": -1}
- self._stop_event = asyncio.Event()
- def _init_ip(self):
- s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
- try:
- s.connect(("8.8.8.8", 53))
- return s.getsockname()[0]
- finally:
- s.close()
- async def get_value(
- self,
- key,
- default_val=None,
- namespace="application",
- auto_fetch_on_cache_miss=False,
- ):
- if namespace not in self._notification_map:
- self._notification_map[namespace] = -1
- logging.info(f"Add namespace '{namespace}' to local notification map")
- if namespace not in self._cache:
- self._cache[namespace] = {}
- logging.info(f"Add namespace '{namespace}' to local cache")
- await self._long_poll()
- if key in self._cache[namespace]:
- return self._cache[namespace][key]
- elif auto_fetch_on_cache_miss:
- return await self._cached_http_get(key, default_val, namespace)
- else:
- return default_val
- async def start(self):
- if len(self._cache) == 0:
- await self._long_poll()
- asyncio.create_task(self._listener())
- async def stop(self):
- logging.info("Stopping listener...")
- self._stop_event.set()
- async def _cached_http_get(self, key, default_val, namespace="application"):
- url = f"{self.config_server_url}/configfiles/json/{self.appId}/{self.cluster}/{namespace}?ip={self.ip}"
- async with aiohttp.ClientSession() as session:
- async with session.get(url) as r:
- if r.status == 200:
- data = await r.json()
- self._cache[namespace] = data
- logging.info(f"Updated local cache for namespace {namespace}")
- else:
- data = self._cache.get(namespace, {})
- return data.get(key, default_val)
- async def _uncached_http_get(self, namespace="application"):
- url = f"{self.config_server_url}/configs/{self.appId}/{self.cluster}/{namespace}?ip={self.ip}"
- async with aiohttp.ClientSession() as session:
- async with session.get(url) as r:
- if r.status == 200:
- data = await r.json()
- self._cache[namespace] = data["configurations"]
- logging.info(
- f"Updated local cache for namespace {namespace} release key {data['releaseKey']}"
- )
- async def _long_poll(self):
- url = f"{self.config_server_url}/notifications/v2"
- notifications = [
- {"namespaceName": k, "notificationId": v}
- for k, v in self._notification_map.items()
- ]
- params = {
- "appId": self.appId,
- "cluster": self.cluster,
- "notifications": json.dumps(notifications, ensure_ascii=False),
- }
- try:
- async with aiohttp.ClientSession() as session:
- async with session.get(url, params=params, timeout=self.timeout) as r:
- if r.status == 304:
- logging.debug("No change.")
- return
- if r.status == 200:
- data = await r.json()
- for entry in data:
- ns = entry["namespaceName"]
- nid = entry["notificationId"]
- logging.info(f"{ns} has changes: notificationId={nid}")
- await self._uncached_http_get(ns)
- self._notification_map[ns] = nid
- else:
- logging.warning("Sleep due to unexpected status...")
- await asyncio.sleep(self.timeout)
- except Exception as e:
- logging.warning(f"Error during long poll: {e}")
- await asyncio.sleep(self.timeout)
- async def _listener(self):
- logging.info("Entering listener loop...")
- while not self._stop_event.is_set():
- await self._long_poll()
- logging.info("Listener stopped.")
- self.stopped = True
|