mysql_manager.py 6.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197
  1. from __future__ import annotations
  2. import json
  3. import os
  4. from typing import Any, Dict, Mapping, Optional
  5. from dotenv import load_dotenv
  6. from .mysql_client import MySQLClient
  7. from .types import MySQLConfig
  8. class MySQLClientManager:
  9. """
  10. Manage multiple MySQLClient instances by "source" name.
  11. This is designed for future multi-data-source requirements:
  12. - Register configs for different sources (e.g. "default", "analytics", "crawler")
  13. - Get a client by source whenever you need to query different DBs
  14. """
  15. def __init__(self, configs: Optional[Mapping[str, MySQLConfig]] = None):
  16. self._configs: Dict[str, MySQLConfig] = {}
  17. self._clients: Dict[str, MySQLClient] = {}
  18. if configs:
  19. for _, cfg in configs.items():
  20. self.register_source(cfg)
  21. def register_source(self, config: MySQLConfig) -> None:
  22. source = config.source or "default"
  23. self._configs[source] = config
  24. # Drop existing instance to ensure updated config takes effect.
  25. if source in self._clients:
  26. del self._clients[source]
  27. def get_client(self, source: str = "default") -> MySQLClient:
  28. if source not in self._configs:
  29. raise KeyError(f"MySQL source not registered: {source}")
  30. if source not in self._clients:
  31. self._clients[source] = MySQLClient(self._configs[source])
  32. return self._clients[source]
  33. @classmethod
  34. def from_env(cls, *, prefix: str = "MYSQL_") -> "MySQLClientManager":
  35. """
  36. Build a manager from environment variables for a single "default" source.
  37. Expected env vars (all optional except host/user/password/database if you want to connect):
  38. - MYSQL_HOST
  39. - MYSQL_PORT
  40. - MYSQL_USER
  41. - MYSQL_PASSWORD
  42. - MYSQL_DATABASE
  43. - MYSQL_CHARSET
  44. """
  45. def _get(name: str, default: str = "") -> str:
  46. return os.getenv(f"{prefix}{name}", default)
  47. host = _get("HOST", "127.0.0.1")
  48. port_str = _get("PORT", "3306")
  49. user = _get("USER", "")
  50. password = _get("PASSWORD", "")
  51. database = _get("DATABASE", "")
  52. charset = _get("CHARSET", "utf8mb4")
  53. try:
  54. port = int(port_str)
  55. except ValueError:
  56. port = 3306
  57. cfg = MySQLConfig(
  58. source="default",
  59. host=host,
  60. port=port,
  61. user=user,
  62. password=password,
  63. database=database,
  64. charset=charset,
  65. )
  66. return cls(configs={"default": cfg})
  67. @classmethod
  68. def from_env_sources_info(
  69. cls,
  70. *,
  71. env_var: str = "MYSQL_SOURCES_INFO",
  72. dotenv_path: Optional[str] = None,
  73. allow_fallback_single_source: bool = True,
  74. ) -> "MySQLClientManager":
  75. """
  76. Build a manager from a single JSON env var `MYSQL_SOURCES_INFO`.
  77. Expected JSON format:
  78. {
  79. "default": {"host": "...", "port": 3306, "user": "...", "password": "...", "database": "..."},
  80. "crawler": {...}
  81. }
  82. Notes:
  83. - If `password` is missing, it also accepts `passwd`.
  84. - Unknown keys inside each source are ignored.
  85. - If env var is missing and `allow_fallback_single_source=True`, it falls back to `from_env()`.
  86. """
  87. if dotenv_path is None:
  88. load_dotenv()
  89. else:
  90. load_dotenv(dotenv_path)
  91. raw = os.getenv(env_var, "").strip()
  92. if not raw:
  93. if allow_fallback_single_source:
  94. return cls.from_env()
  95. return cls()
  96. try:
  97. parsed = json.loads(raw)
  98. except json.JSONDecodeError as e:
  99. raise ValueError(f"{env_var} is not valid JSON: {e}") from e
  100. if not isinstance(parsed, dict):
  101. raise ValueError(f"{env_var} must be a JSON object, got: {type(parsed).__name__}")
  102. configs: Dict[str, MySQLConfig] = {}
  103. for source_key, cfg in parsed.items():
  104. if not isinstance(source_key, str) or not source_key:
  105. continue
  106. if not isinstance(cfg, dict):
  107. continue
  108. cfg_dict: Dict[str, Any] = dict(cfg)
  109. # Accept synonyms
  110. if "password" not in cfg_dict and "passwd" in cfg_dict:
  111. cfg_dict["password"] = cfg_dict.get("passwd")
  112. # Coerce port if present
  113. if "port" in cfg_dict:
  114. try:
  115. cfg_dict["port"] = int(cfg_dict["port"])
  116. except Exception:
  117. cfg_dict["port"] = 3306
  118. # Ensure source
  119. cfg_dict["source"] = source_key
  120. # Filter keys to MySQLConfig fields (ignore unknown keys for forward compatibility)
  121. allowed = {
  122. "source",
  123. "host",
  124. "port",
  125. "user",
  126. "password",
  127. "database",
  128. "charset",
  129. "connect_timeout",
  130. "read_timeout",
  131. "write_timeout",
  132. "autocommit",
  133. "use_pool",
  134. "pool_mincached",
  135. "pool_maxconnections",
  136. }
  137. kwargs = {k: v for k, v in cfg_dict.items() if k in allowed}
  138. configs[source_key] = MySQLConfig(**kwargs)
  139. return cls(configs=configs)
  140. _GLOBAL_MANAGER = MySQLClientManager()
  141. _GLOBAL_MANAGER_INITIALIZED = False
  142. def get_global_manager() -> MySQLClientManager:
  143. global _GLOBAL_MANAGER_INITIALIZED
  144. if _GLOBAL_MANAGER_INITIALIZED:
  145. return _GLOBAL_MANAGER
  146. _GLOBAL_MANAGER_INITIALIZED = True
  147. # If the user already registered sources, don't override.
  148. if getattr(_GLOBAL_MANAGER, "_configs", None):
  149. return _GLOBAL_MANAGER
  150. # Try JSON multi-source init from env.
  151. try:
  152. mgr = MySQLClientManager.from_env_sources_info(allow_fallback_single_source=False)
  153. # Copy configs into the singleton instance.
  154. for source, cfg in getattr(mgr, "_configs", {}).items():
  155. _GLOBAL_MANAGER.register_source(cfg)
  156. except Exception:
  157. # Keep the global manager empty if env is missing/invalid.
  158. pass
  159. return _GLOBAL_MANAGER