async_apollo_api.py 1.1 KB

123456789101112131415161718192021222324252627282930
  1. import json
  2. from typing import Optional, Dict
  3. from applications.utils import AsyncApolloClient
  4. class AsyncApolloApi:
  5. def __init__(self, app_id="LongArticlesJob", env="pre"):
  6. match env:
  7. case "pre":
  8. config_server_url = "http://preapolloconfig-internal.piaoquantv.com/"
  9. case "dev":
  10. config_server_url = "https://devapolloconfig-internal.piaoquantv.com/"
  11. case "prod":
  12. config_server_url = "https://apolloconfig-internal.piaoquantv.com/"
  13. case _:
  14. raise ValueError("env must be 'pre' or 'dev' or 'prod'")
  15. self.apollo_connection = AsyncApolloClient(
  16. app_id=app_id, config_server_url=config_server_url
  17. )
  18. async def get_config_value(
  19. self, key: str, output_type: str = "json"
  20. ) -> Optional[Dict]:
  21. match output_type:
  22. case "json":
  23. response = await self.apollo_connection.get_value(key)
  24. return json.loads(response)
  25. case _:
  26. return await self.apollo_connection.get_value(key)