| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282 |
- """系统配置和白名单数据库操作
- 提供:
- - 白名单账户查询
- - 系统配置读写
- - 配置缓存机制(避免频繁查询数据库)
- """
- import json
- import logging
- from typing import Any, Dict, List, Optional
- from datetime import datetime, timedelta
- from .connection import get_connection
- logger = logging.getLogger(__name__)
- # 配置缓存(5分钟过期)
- _config_cache: Dict[str, tuple[Any, datetime]] = {}
- _cache_ttl = timedelta(minutes=5)
- def get_whitelist_accounts() -> List[int]:
- """获取启用的白名单账户列表
- Returns:
- List[int]: 账户ID列表,如 [80769799, 71305011]
- Raises:
- Exception: 数据库查询失败
- """
- conn = None
- try:
- conn = get_connection()
- with conn.cursor() as cursor:
- sql = """
- SELECT account_id
- FROM account_whitelist
- WHERE enabled = TRUE
- ORDER BY id ASC
- """
- cursor.execute(sql)
- rows = cursor.fetchall()
- account_ids = [row["account_id"] for row in rows]
- logger.info(f"从数据库读取白名单账户:{len(account_ids)} 个")
- return account_ids
- except Exception as e:
- logger.error(f"查询白名单账户失败: {e}")
- raise
- finally:
- if conn:
- conn.close()
- def get_system_config(key: str, default: Any = None, use_cache: bool = True) -> Any:
- """获取系统配置值
- Args:
- key: 配置键,如 'execution_enabled', 'cron_schedule'
- default: 默认值(配置不存在时返回)
- use_cache: 是否使用缓存(默认True)
- Returns:
- Any: 配置值,根据 value_type 自动转换类型
- Examples:
- >>> get_system_config('execution_enabled') # 返回 False (boolean)
- >>> get_system_config('cron_schedule') # 返回 '0 2 * * *' (string)
- >>> get_system_config('roi_low_factor') # 返回 '0.75' (string)
- """
- # 检查缓存
- if use_cache and key in _config_cache:
- value, cached_at = _config_cache[key]
- if datetime.now() - cached_at < _cache_ttl:
- logger.debug(f"从缓存读取配置: {key} = {value}")
- return value
- conn = None
- try:
- conn = get_connection()
- with conn.cursor() as cursor:
- sql = """
- SELECT config_value, value_type
- FROM system_config
- WHERE config_key = %s AND enabled = TRUE
- """
- cursor.execute(sql, (key,))
- row = cursor.fetchone()
- if row is None:
- logger.warning(f"配置不存在: {key},使用默认值: {default}")
- return default
- value = row["config_value"]
- value_type = row["value_type"]
- # 类型转换
- if value_type == "boolean":
- parsed_value = value.lower() in ("true", "1", "yes")
- elif value_type == "int":
- parsed_value = int(value)
- elif value_type == "json":
- parsed_value = json.loads(value)
- else: # string
- parsed_value = value
- # 更新缓存
- _config_cache[key] = (parsed_value, datetime.now())
- logger.debug(f"从数据库读取配置: {key} = {parsed_value}")
- return parsed_value
- except Exception as e:
- logger.error(f"查询系统配置失败: {key}, {e}")
- return default
- finally:
- if conn:
- conn.close()
- def update_system_config(
- key: str, value: Any, value_type: str = None, updated_by: str = None
- ) -> bool:
- """更新系统配置
- Args:
- key: 配置键
- value: 配置值
- value_type: 值类型(string, boolean, int, json),不指定则自动推断
- updated_by: 更新人
- Returns:
- bool: 更新成功返回 True
- Examples:
- >>> update_system_config('execution_enabled', False)
- >>> update_system_config('cron_schedule', '0 3 * * *')
- """
- # 自动推断类型
- if value_type is None:
- if isinstance(value, bool):
- value_type = "boolean"
- elif isinstance(value, int):
- value_type = "int"
- elif isinstance(value, (dict, list)):
- value_type = "json"
- else:
- value_type = "string"
- # 序列化值
- if value_type == "boolean":
- serialized_value = "true" if value else "false"
- elif value_type == "json":
- serialized_value = json.dumps(value, ensure_ascii=False)
- else:
- serialized_value = str(value)
- conn = None
- try:
- conn = get_connection()
- with conn.cursor() as cursor:
- sql = """
- UPDATE system_config
- SET config_value = %s,
- value_type = %s,
- updated_by = %s,
- updated_at = CURRENT_TIMESTAMP
- WHERE config_key = %s
- """
- affected_rows = cursor.execute(
- sql, (serialized_value, value_type, updated_by, key)
- )
- if affected_rows == 0:
- logger.warning(f"配置键不存在,尝试插入: {key}")
- insert_sql = """
- INSERT INTO system_config (config_key, config_value, value_type, updated_by)
- VALUES (%s, %s, %s, %s)
- """
- cursor.execute(insert_sql, (key, serialized_value, value_type, updated_by))
- # 清除缓存
- if key in _config_cache:
- del _config_cache[key]
- logger.info(f"更新系统配置: {key} = {value} (by {updated_by})")
- return True
- except Exception as e:
- logger.error(f"更新系统配置失败: {key}, {e}")
- return False
- finally:
- if conn:
- conn.close()
- def get_all_system_configs() -> Dict[str, Any]:
- """获取所有启用的系统配置
- Returns:
- Dict[str, Any]: 配置字典,key为配置键,value为配置值
- Examples:
- >>> configs = get_all_system_configs()
- >>> print(configs)
- {'execution_enabled': False, 'cron_schedule': '0 2 * * *', ...}
- """
- conn = None
- try:
- conn = get_connection()
- with conn.cursor() as cursor:
- sql = """
- SELECT config_key, config_value, value_type
- FROM system_config
- WHERE enabled = TRUE
- ORDER BY id ASC
- """
- cursor.execute(sql)
- rows = cursor.fetchall()
- configs = {}
- for row in rows:
- key = row["config_key"]
- value = row["config_value"]
- value_type = row["value_type"]
- # 类型转换
- if value_type == "boolean":
- configs[key] = value.lower() in ("true", "1", "yes")
- elif value_type == "int":
- configs[key] = int(value)
- elif value_type == "json":
- configs[key] = json.loads(value)
- else:
- configs[key] = value
- logger.info(f"读取所有系统配置:{len(configs)} 项")
- return configs
- except Exception as e:
- logger.error(f"查询所有系统配置失败: {e}")
- return {}
- finally:
- if conn:
- conn.close()
- def clear_config_cache():
- """清除配置缓存(用于测试或强制刷新)"""
- global _config_cache
- _config_cache = {}
- logger.info("配置缓存已清除")
- if __name__ == "__main__":
- # 测试配置读取
- logging.basicConfig(level=logging.INFO)
- print("\n=== 测试白名单查询 ===")
- try:
- accounts = get_whitelist_accounts()
- print(f"白名单账户: {accounts}")
- except Exception as e:
- print(f"❌ 白名单查询失败: {e}")
- print("\n=== 测试系统配置查询 ===")
- try:
- execution_enabled = get_system_config("execution_enabled")
- cron_schedule = get_system_config("cron_schedule")
- roi_low_factor = get_system_config("roi_low_factor")
- print(f"执行开关: {execution_enabled} ({type(execution_enabled).__name__})")
- print(f"定时调度: {cron_schedule}")
- print(f"关停线系数: {roi_low_factor}")
- except Exception as e:
- print(f"❌ 系统配置查询失败: {e}")
- print("\n=== 测试所有配置查询 ===")
- try:
- all_configs = get_all_system_configs()
- for key, value in all_configs.items():
- print(f" {key}: {value} ({type(value).__name__})")
- except Exception as e:
- print(f"❌ 所有配置查询失败: {e}")
|