async_apollo_client.py 4.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131
  1. # -*- coding: utf-8 -*-
  2. import json
  3. import logging
  4. import socket
  5. import asyncio
  6. import aiohttp
  7. class AsyncApolloClient:
  8. def __init__(
  9. self,
  10. app_id,
  11. cluster="default",
  12. config_server_url="http://localhost:8080",
  13. timeout=35,
  14. ip=None,
  15. ):
  16. self.config_server_url = config_server_url
  17. self.appId = app_id
  18. self.cluster = cluster
  19. self.timeout = timeout
  20. self.stopped = False
  21. self.ip = ip or self._init_ip()
  22. self._cache = {}
  23. self._notification_map = {"application": -1}
  24. self._stop_event = asyncio.Event()
  25. def _init_ip(self):
  26. s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
  27. try:
  28. s.connect(("8.8.8.8", 53))
  29. return s.getsockname()[0]
  30. finally:
  31. s.close()
  32. async def get_value(
  33. self,
  34. key,
  35. default_val=None,
  36. namespace="application",
  37. auto_fetch_on_cache_miss=False,
  38. ):
  39. if namespace not in self._notification_map:
  40. self._notification_map[namespace] = -1
  41. logging.info(f"Add namespace '{namespace}' to local notification map")
  42. if namespace not in self._cache:
  43. self._cache[namespace] = {}
  44. logging.info(f"Add namespace '{namespace}' to local cache")
  45. await self._long_poll()
  46. if key in self._cache[namespace]:
  47. return self._cache[namespace][key]
  48. elif auto_fetch_on_cache_miss:
  49. return await self._cached_http_get(key, default_val, namespace)
  50. else:
  51. return default_val
  52. async def start(self):
  53. if len(self._cache) == 0:
  54. await self._long_poll()
  55. asyncio.create_task(self._listener())
  56. async def stop(self):
  57. logging.info("Stopping listener...")
  58. self._stop_event.set()
  59. async def _cached_http_get(self, key, default_val, namespace="application"):
  60. url = f"{self.config_server_url}/configfiles/json/{self.appId}/{self.cluster}/{namespace}?ip={self.ip}"
  61. async with aiohttp.ClientSession() as session:
  62. async with session.get(url) as r:
  63. if r.status == 200:
  64. data = await r.json()
  65. self._cache[namespace] = data
  66. logging.info(f"Updated local cache for namespace {namespace}")
  67. else:
  68. data = self._cache.get(namespace, {})
  69. return data.get(key, default_val)
  70. async def _uncached_http_get(self, namespace="application"):
  71. url = f"{self.config_server_url}/configs/{self.appId}/{self.cluster}/{namespace}?ip={self.ip}"
  72. async with aiohttp.ClientSession() as session:
  73. async with session.get(url) as r:
  74. if r.status == 200:
  75. data = await r.json()
  76. self._cache[namespace] = data["configurations"]
  77. logging.info(
  78. f"Updated local cache for namespace {namespace} release key {data['releaseKey']}"
  79. )
  80. async def _long_poll(self):
  81. url = f"{self.config_server_url}/notifications/v2"
  82. notifications = [
  83. {"namespaceName": k, "notificationId": v}
  84. for k, v in self._notification_map.items()
  85. ]
  86. params = {
  87. "appId": self.appId,
  88. "cluster": self.cluster,
  89. "notifications": json.dumps(notifications, ensure_ascii=False),
  90. }
  91. try:
  92. async with aiohttp.ClientSession() as session:
  93. async with session.get(url, params=params, timeout=self.timeout) as r:
  94. if r.status == 304:
  95. logging.debug("No change.")
  96. return
  97. if r.status == 200:
  98. data = await r.json()
  99. for entry in data:
  100. ns = entry["namespaceName"]
  101. nid = entry["notificationId"]
  102. logging.info(f"{ns} has changes: notificationId={nid}")
  103. await self._uncached_http_get(ns)
  104. self._notification_map[ns] = nid
  105. else:
  106. logging.warning("Sleep due to unexpected status...")
  107. await asyncio.sleep(self.timeout)
  108. except Exception as e:
  109. logging.warning(f"Error during long poll: {e}")
  110. await asyncio.sleep(self.timeout)
  111. async def _listener(self):
  112. logging.info("Entering listener loop...")
  113. while not self._stop_event.is_set():
  114. await self._long_poll()
  115. logging.info("Listener stopped.")
  116. self.stopped = True