apollo.py 5.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160
  1. # -*- coding: utf-8 -*-
  2. import json
  3. import logging
  4. import socket
  5. import asyncio
  6. import aiohttp
  7. from typing import Dict, Union
  8. from app.core.config.settings import ApolloConfig
  9. class AsyncApolloClient:
  10. def __init__(
  11. self,
  12. app_id,
  13. config_server_url="http://localhost:8080",
  14. cluster="default",
  15. timeout=35,
  16. ip=None,
  17. ):
  18. self.config_server_url = config_server_url
  19. self.appId = app_id
  20. self.cluster = cluster
  21. self.timeout = timeout
  22. self.stopped = False
  23. self.ip = ip or self._init_ip()
  24. self._cache = {}
  25. self._notification_map = {"application": -1}
  26. self._stop_event = asyncio.Event()
  27. @staticmethod
  28. def _init_ip():
  29. s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
  30. try:
  31. s.connect(("8.8.8.8", 53))
  32. return s.getsockname()[0]
  33. finally:
  34. s.close()
  35. async def get_value(
  36. self,
  37. key,
  38. default_val=None,
  39. namespace="application",
  40. auto_fetch_on_cache_miss=False,
  41. ):
  42. if namespace not in self._notification_map:
  43. self._notification_map[namespace] = -1
  44. logging.info(f"Add namespace '{namespace}' to local notification map")
  45. if namespace not in self._cache:
  46. self._cache[namespace] = {}
  47. logging.info(f"Add namespace '{namespace}' to local cache")
  48. await self._long_poll()
  49. if key in self._cache[namespace]:
  50. return self._cache[namespace][key]
  51. elif auto_fetch_on_cache_miss:
  52. return await self._cached_http_get(key, default_val, namespace)
  53. else:
  54. return default_val
  55. async def start(self):
  56. if len(self._cache) == 0:
  57. await self._long_poll()
  58. asyncio.create_task(self._listener())
  59. async def stop(self):
  60. logging.info("Stopping listener...")
  61. self._stop_event.set()
  62. async def _cached_http_get(self, key, default_val, namespace="application"):
  63. url = f"{self.config_server_url}/configfiles/json/{self.appId}/{self.cluster}/{namespace}?ip={self.ip}"
  64. async with aiohttp.ClientSession() as session:
  65. async with session.get(url) as r:
  66. if r.status == 200:
  67. data = await r.json()
  68. self._cache[namespace] = data
  69. logging.info(f"Updated local cache for namespace {namespace}")
  70. else:
  71. data = self._cache.get(namespace, {})
  72. return data.get(key, default_val)
  73. async def _uncached_http_get(self, namespace="application"):
  74. url = f"{self.config_server_url}/configs/{self.appId}/{self.cluster}/{namespace}?ip={self.ip}"
  75. async with aiohttp.ClientSession() as session:
  76. async with session.get(url) as r:
  77. if r.status == 200:
  78. data = await r.json()
  79. self._cache[namespace] = data["configurations"]
  80. logging.info(
  81. f"Updated local cache for namespace {namespace} release key {data['releaseKey']}"
  82. )
  83. async def _long_poll(self):
  84. url = f"{self.config_server_url}/notifications/v2"
  85. notifications = [
  86. {"namespaceName": k, "notificationId": v}
  87. for k, v in self._notification_map.items()
  88. ]
  89. params = {
  90. "appId": self.appId,
  91. "cluster": self.cluster,
  92. "notifications": json.dumps(notifications, ensure_ascii=False),
  93. }
  94. try:
  95. async with aiohttp.ClientSession() as session:
  96. async with session.get(url, params=params, timeout=self.timeout) as r:
  97. if r.status == 304:
  98. logging.debug("No change.")
  99. return
  100. if r.status == 200:
  101. data = await r.json()
  102. for entry in data:
  103. ns = entry["namespaceName"]
  104. nid = entry["notificationId"]
  105. logging.info(f"{ns} has changes: notificationId={nid}")
  106. await self._uncached_http_get(ns)
  107. self._notification_map[ns] = nid
  108. else:
  109. logging.warning("Sleep due to unexpected status...")
  110. await asyncio.sleep(self.timeout)
  111. except Exception as e:
  112. logging.warning(f"Error during long poll: {e}")
  113. await asyncio.sleep(self.timeout)
  114. async def _listener(self):
  115. logging.info("Entering listener loop...")
  116. while not self._stop_event.is_set():
  117. await self._long_poll()
  118. logging.info("Listener stopped.")
  119. self.stopped = True
  120. class AsyncApolloApi(AsyncApolloClient):
  121. def __init__(
  122. self, apollo_config: ApolloConfig, app_id: str | None, env: str | None
  123. ):
  124. if not app_id:
  125. app_id = apollo_config.app_id
  126. if not env:
  127. env = apollo_config.env
  128. server_url = apollo_config.apollo_map.get(app_id, {}).get(env)
  129. super().__init__(app_id, server_url)
  130. async def get_config_value(
  131. self, key: str, output_type: str = "json"
  132. ) -> Union[Dict, str]:
  133. match output_type:
  134. case "json":
  135. response = await self.get_value(key)
  136. return json.loads(response)
  137. case _:
  138. return await self.get_value(key)