config.py 8.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282
  1. """系统配置和白名单数据库操作
  2. 提供:
  3. - 白名单账户查询
  4. - 系统配置读写
  5. - 配置缓存机制(避免频繁查询数据库)
  6. """
  7. import json
  8. import logging
  9. from typing import Any, Dict, List, Optional
  10. from datetime import datetime, timedelta
  11. from .connection import get_connection
  12. logger = logging.getLogger(__name__)
  13. # 配置缓存(5分钟过期)
  14. _config_cache: Dict[str, tuple[Any, datetime]] = {}
  15. _cache_ttl = timedelta(minutes=5)
  16. def get_whitelist_accounts() -> List[int]:
  17. """获取启用的白名单账户列表
  18. Returns:
  19. List[int]: 账户ID列表,如 [80769799, 71305011]
  20. Raises:
  21. Exception: 数据库查询失败
  22. """
  23. conn = None
  24. try:
  25. conn = get_connection()
  26. with conn.cursor() as cursor:
  27. sql = """
  28. SELECT account_id
  29. FROM account_whitelist
  30. WHERE enabled = TRUE
  31. ORDER BY id ASC
  32. """
  33. cursor.execute(sql)
  34. rows = cursor.fetchall()
  35. account_ids = [row["account_id"] for row in rows]
  36. logger.info(f"从数据库读取白名单账户:{len(account_ids)} 个")
  37. return account_ids
  38. except Exception as e:
  39. logger.error(f"查询白名单账户失败: {e}")
  40. raise
  41. finally:
  42. if conn:
  43. conn.close()
  44. def get_system_config(key: str, default: Any = None, use_cache: bool = True) -> Any:
  45. """获取系统配置值
  46. Args:
  47. key: 配置键,如 'execution_enabled', 'cron_schedule'
  48. default: 默认值(配置不存在时返回)
  49. use_cache: 是否使用缓存(默认True)
  50. Returns:
  51. Any: 配置值,根据 value_type 自动转换类型
  52. Examples:
  53. >>> get_system_config('execution_enabled') # 返回 False (boolean)
  54. >>> get_system_config('cron_schedule') # 返回 '0 2 * * *' (string)
  55. >>> get_system_config('roi_low_factor') # 返回 '0.75' (string)
  56. """
  57. # 检查缓存
  58. if use_cache and key in _config_cache:
  59. value, cached_at = _config_cache[key]
  60. if datetime.now() - cached_at < _cache_ttl:
  61. logger.debug(f"从缓存读取配置: {key} = {value}")
  62. return value
  63. conn = None
  64. try:
  65. conn = get_connection()
  66. with conn.cursor() as cursor:
  67. sql = """
  68. SELECT config_value, value_type
  69. FROM system_config
  70. WHERE config_key = %s AND enabled = TRUE
  71. """
  72. cursor.execute(sql, (key,))
  73. row = cursor.fetchone()
  74. if row is None:
  75. logger.warning(f"配置不存在: {key},使用默认值: {default}")
  76. return default
  77. value = row["config_value"]
  78. value_type = row["value_type"]
  79. # 类型转换
  80. if value_type == "boolean":
  81. parsed_value = value.lower() in ("true", "1", "yes")
  82. elif value_type == "int":
  83. parsed_value = int(value)
  84. elif value_type == "json":
  85. parsed_value = json.loads(value)
  86. else: # string
  87. parsed_value = value
  88. # 更新缓存
  89. _config_cache[key] = (parsed_value, datetime.now())
  90. logger.debug(f"从数据库读取配置: {key} = {parsed_value}")
  91. return parsed_value
  92. except Exception as e:
  93. logger.error(f"查询系统配置失败: {key}, {e}")
  94. return default
  95. finally:
  96. if conn:
  97. conn.close()
  98. def update_system_config(
  99. key: str, value: Any, value_type: str = None, updated_by: str = None
  100. ) -> bool:
  101. """更新系统配置
  102. Args:
  103. key: 配置键
  104. value: 配置值
  105. value_type: 值类型(string, boolean, int, json),不指定则自动推断
  106. updated_by: 更新人
  107. Returns:
  108. bool: 更新成功返回 True
  109. Examples:
  110. >>> update_system_config('execution_enabled', False)
  111. >>> update_system_config('cron_schedule', '0 3 * * *')
  112. """
  113. # 自动推断类型
  114. if value_type is None:
  115. if isinstance(value, bool):
  116. value_type = "boolean"
  117. elif isinstance(value, int):
  118. value_type = "int"
  119. elif isinstance(value, (dict, list)):
  120. value_type = "json"
  121. else:
  122. value_type = "string"
  123. # 序列化值
  124. if value_type == "boolean":
  125. serialized_value = "true" if value else "false"
  126. elif value_type == "json":
  127. serialized_value = json.dumps(value, ensure_ascii=False)
  128. else:
  129. serialized_value = str(value)
  130. conn = None
  131. try:
  132. conn = get_connection()
  133. with conn.cursor() as cursor:
  134. sql = """
  135. UPDATE system_config
  136. SET config_value = %s,
  137. value_type = %s,
  138. updated_by = %s,
  139. updated_at = CURRENT_TIMESTAMP
  140. WHERE config_key = %s
  141. """
  142. affected_rows = cursor.execute(
  143. sql, (serialized_value, value_type, updated_by, key)
  144. )
  145. if affected_rows == 0:
  146. logger.warning(f"配置键不存在,尝试插入: {key}")
  147. insert_sql = """
  148. INSERT INTO system_config (config_key, config_value, value_type, updated_by)
  149. VALUES (%s, %s, %s, %s)
  150. """
  151. cursor.execute(insert_sql, (key, serialized_value, value_type, updated_by))
  152. # 清除缓存
  153. if key in _config_cache:
  154. del _config_cache[key]
  155. logger.info(f"更新系统配置: {key} = {value} (by {updated_by})")
  156. return True
  157. except Exception as e:
  158. logger.error(f"更新系统配置失败: {key}, {e}")
  159. return False
  160. finally:
  161. if conn:
  162. conn.close()
  163. def get_all_system_configs() -> Dict[str, Any]:
  164. """获取所有启用的系统配置
  165. Returns:
  166. Dict[str, Any]: 配置字典,key为配置键,value为配置值
  167. Examples:
  168. >>> configs = get_all_system_configs()
  169. >>> print(configs)
  170. {'execution_enabled': False, 'cron_schedule': '0 2 * * *', ...}
  171. """
  172. conn = None
  173. try:
  174. conn = get_connection()
  175. with conn.cursor() as cursor:
  176. sql = """
  177. SELECT config_key, config_value, value_type
  178. FROM system_config
  179. WHERE enabled = TRUE
  180. ORDER BY id ASC
  181. """
  182. cursor.execute(sql)
  183. rows = cursor.fetchall()
  184. configs = {}
  185. for row in rows:
  186. key = row["config_key"]
  187. value = row["config_value"]
  188. value_type = row["value_type"]
  189. # 类型转换
  190. if value_type == "boolean":
  191. configs[key] = value.lower() in ("true", "1", "yes")
  192. elif value_type == "int":
  193. configs[key] = int(value)
  194. elif value_type == "json":
  195. configs[key] = json.loads(value)
  196. else:
  197. configs[key] = value
  198. logger.info(f"读取所有系统配置:{len(configs)} 项")
  199. return configs
  200. except Exception as e:
  201. logger.error(f"查询所有系统配置失败: {e}")
  202. return {}
  203. finally:
  204. if conn:
  205. conn.close()
  206. def clear_config_cache():
  207. """清除配置缓存(用于测试或强制刷新)"""
  208. global _config_cache
  209. _config_cache = {}
  210. logger.info("配置缓存已清除")
  211. if __name__ == "__main__":
  212. # 测试配置读取
  213. logging.basicConfig(level=logging.INFO)
  214. print("\n=== 测试白名单查询 ===")
  215. try:
  216. accounts = get_whitelist_accounts()
  217. print(f"白名单账户: {accounts}")
  218. except Exception as e:
  219. print(f"❌ 白名单查询失败: {e}")
  220. print("\n=== 测试系统配置查询 ===")
  221. try:
  222. execution_enabled = get_system_config("execution_enabled")
  223. cron_schedule = get_system_config("cron_schedule")
  224. roi_low_factor = get_system_config("roi_low_factor")
  225. print(f"执行开关: {execution_enabled} ({type(execution_enabled).__name__})")
  226. print(f"定时调度: {cron_schedule}")
  227. print(f"关停线系数: {roi_low_factor}")
  228. except Exception as e:
  229. print(f"❌ 系统配置查询失败: {e}")
  230. print("\n=== 测试所有配置查询 ===")
  231. try:
  232. all_configs = get_all_system_configs()
  233. for key, value in all_configs.items():
  234. print(f" {key}: {value} ({type(value).__name__})")
  235. except Exception as e:
  236. print(f"❌ 所有配置查询失败: {e}")