from urllib.parse import quote_plus from pydantic_settings import BaseSettings, SettingsConfigDict class Settings(BaseSettings): app_name: str = "denamd-server" app_env: str = "dev" app_host: str = "0.0.0.0" app_port: int = 8000 cors_allow_origins: str = "*" scheduler_heartbeat_seconds: int = 3600 mysql_host: str = "" mysql_port: int = 3306 mysql_user: str = "root" mysql_password: str = "" mysql_database: str = "" odps_access_id: str = "LTAI9EBa0bd5PrDa" odps_access_key: str = "vAalxds7YxhfOA2yVv8GziCg3Y87v5" odps_project: str = "loghubods" odps_endpoint: str = "http://service.odps.aliyun.com/api" demand_pool_source_table: str = "dwd_multi_demand_pool_di" demand_pool_secondary_source_table: str = "dwd_demand_pool_di" demand_pool_secondary_strategy: str = "近期需求" demand_pool_secondary_default_ext_info: str = "{}" demand_pool_target_table: str = "multi_demand_pool_di" demand_pool_initial_partitions: str = "20260507,20260508,20260509" demand_pool_hourly_sync_enabled: bool = True demand_pool_hourly_sync_minute: int = 0 demand_pool_daily_strategy_alert_enabled: bool = True demand_pool_daily_strategy_alert_hour: int = 10 demand_pool_daily_strategy_alert_minute: int = 0 feishu_webhook_url: str = "" feishu_webhook_timeout_seconds: int = 30 # 默认不校验 HTTPS 证书,避免公司代理自签链导致发不出消息;生产若需严格校验可设为 true feishu_webhook_verify_ssl: bool = False model_config = SettingsConfigDict( env_file=".env", env_file_encoding="utf-8", extra="ignore", ) @property def mysql_dsn(self) -> str: return ( "mysql+pymysql://" f"{quote_plus(self.mysql_user)}:{quote_plus(self.mysql_password)}" f"@{self.mysql_host}:{self.mysql_port}/{quote_plus(self.mysql_database)}" ) @property def demand_pool_initial_partition_list(self) -> list[str]: return [ partition.strip() for partition in self.demand_pool_initial_partitions.split(",") if partition.strip() ] @property def cors_allow_origin_list(self) -> list[str]: if self.cors_allow_origins.strip() == "*": return ["*"] return [ origin.strip() for origin in self.cors_allow_origins.split(",") if origin.strip() ] settings = Settings()