"""系统配置和白名单数据库操作 提供: - 白名单账户查询 - 系统配置读写 - 配置缓存机制(避免频繁查询数据库) """ import json import logging from typing import Any, Dict, List, Optional from datetime import datetime, timedelta from .connection import get_connection logger = logging.getLogger(__name__) # 配置缓存(5分钟过期) _config_cache: Dict[str, tuple[Any, datetime]] = {} _cache_ttl = timedelta(minutes=5) def get_whitelist_accounts() -> List[int]: """获取启用的白名单账户列表 Returns: List[int]: 账户ID列表,如 [80769799, 71305011] Raises: Exception: 数据库查询失败 """ conn = None try: conn = get_connection() with conn.cursor() as cursor: sql = """ SELECT account_id FROM account_whitelist WHERE enabled = TRUE ORDER BY id ASC """ cursor.execute(sql) rows = cursor.fetchall() account_ids = [row["account_id"] for row in rows] logger.info(f"从数据库读取白名单账户:{len(account_ids)} 个") return account_ids except Exception as e: logger.error(f"查询白名单账户失败: {e}") raise finally: if conn: conn.close() def get_system_config(key: str, default: Any = None, use_cache: bool = True) -> Any: """获取系统配置值 Args: key: 配置键,如 'execution_enabled', 'cron_schedule' default: 默认值(配置不存在时返回) use_cache: 是否使用缓存(默认True) Returns: Any: 配置值,根据 value_type 自动转换类型 Examples: >>> get_system_config('execution_enabled') # 返回 False (boolean) >>> get_system_config('cron_schedule') # 返回 '0 2 * * *' (string) >>> get_system_config('roi_low_factor') # 返回 '0.75' (string) """ # 检查缓存 if use_cache and key in _config_cache: value, cached_at = _config_cache[key] if datetime.now() - cached_at < _cache_ttl: logger.debug(f"从缓存读取配置: {key} = {value}") return value conn = None try: conn = get_connection() with conn.cursor() as cursor: sql = """ SELECT config_value, value_type FROM system_config WHERE config_key = %s AND enabled = TRUE """ cursor.execute(sql, (key,)) row = cursor.fetchone() if row is None: logger.warning(f"配置不存在: {key},使用默认值: {default}") return default value = row["config_value"] value_type = row["value_type"] # 类型转换 if value_type == "boolean": parsed_value = value.lower() in ("true", "1", "yes") elif value_type == "int": parsed_value = int(value) elif value_type == "json": parsed_value = json.loads(value) else: # string parsed_value = value # 更新缓存 _config_cache[key] = (parsed_value, datetime.now()) logger.debug(f"从数据库读取配置: {key} = {parsed_value}") return parsed_value except Exception as e: logger.error(f"查询系统配置失败: {key}, {e}") return default finally: if conn: conn.close() def update_system_config( key: str, value: Any, value_type: str = None, updated_by: str = None ) -> bool: """更新系统配置 Args: key: 配置键 value: 配置值 value_type: 值类型(string, boolean, int, json),不指定则自动推断 updated_by: 更新人 Returns: bool: 更新成功返回 True Examples: >>> update_system_config('execution_enabled', False) >>> update_system_config('cron_schedule', '0 3 * * *') """ # 自动推断类型 if value_type is None: if isinstance(value, bool): value_type = "boolean" elif isinstance(value, int): value_type = "int" elif isinstance(value, (dict, list)): value_type = "json" else: value_type = "string" # 序列化值 if value_type == "boolean": serialized_value = "true" if value else "false" elif value_type == "json": serialized_value = json.dumps(value, ensure_ascii=False) else: serialized_value = str(value) conn = None try: conn = get_connection() with conn.cursor() as cursor: sql = """ UPDATE system_config SET config_value = %s, value_type = %s, updated_by = %s, updated_at = CURRENT_TIMESTAMP WHERE config_key = %s """ affected_rows = cursor.execute( sql, (serialized_value, value_type, updated_by, key) ) if affected_rows == 0: logger.warning(f"配置键不存在,尝试插入: {key}") insert_sql = """ INSERT INTO system_config (config_key, config_value, value_type, updated_by) VALUES (%s, %s, %s, %s) """ cursor.execute(insert_sql, (key, serialized_value, value_type, updated_by)) # 清除缓存 if key in _config_cache: del _config_cache[key] logger.info(f"更新系统配置: {key} = {value} (by {updated_by})") return True except Exception as e: logger.error(f"更新系统配置失败: {key}, {e}") return False finally: if conn: conn.close() def get_all_system_configs() -> Dict[str, Any]: """获取所有启用的系统配置 Returns: Dict[str, Any]: 配置字典,key为配置键,value为配置值 Examples: >>> configs = get_all_system_configs() >>> print(configs) {'execution_enabled': False, 'cron_schedule': '0 2 * * *', ...} """ conn = None try: conn = get_connection() with conn.cursor() as cursor: sql = """ SELECT config_key, config_value, value_type FROM system_config WHERE enabled = TRUE ORDER BY id ASC """ cursor.execute(sql) rows = cursor.fetchall() configs = {} for row in rows: key = row["config_key"] value = row["config_value"] value_type = row["value_type"] # 类型转换 if value_type == "boolean": configs[key] = value.lower() in ("true", "1", "yes") elif value_type == "int": configs[key] = int(value) elif value_type == "json": configs[key] = json.loads(value) else: configs[key] = value logger.info(f"读取所有系统配置:{len(configs)} 项") return configs except Exception as e: logger.error(f"查询所有系统配置失败: {e}") return {} finally: if conn: conn.close() def clear_config_cache(): """清除配置缓存(用于测试或强制刷新)""" global _config_cache _config_cache = {} logger.info("配置缓存已清除") if __name__ == "__main__": # 测试配置读取 logging.basicConfig(level=logging.INFO) print("\n=== 测试白名单查询 ===") try: accounts = get_whitelist_accounts() print(f"白名单账户: {accounts}") except Exception as e: print(f"❌ 白名单查询失败: {e}") print("\n=== 测试系统配置查询 ===") try: execution_enabled = get_system_config("execution_enabled") cron_schedule = get_system_config("cron_schedule") roi_low_factor = get_system_config("roi_low_factor") print(f"执行开关: {execution_enabled} ({type(execution_enabled).__name__})") print(f"定时调度: {cron_schedule}") print(f"关停线系数: {roi_low_factor}") except Exception as e: print(f"❌ 系统配置查询失败: {e}") print("\n=== 测试所有配置查询 ===") try: all_configs = get_all_system_configs() for key, value in all_configs.items(): print(f" {key}: {value} ({type(value).__name__})") except Exception as e: print(f"❌ 所有配置查询失败: {e}")