strategy_config_service.py 4.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139
  1. from typing import Any
  2. from app.strategies.config_store import (
  3. StrategyConfigRecord,
  4. fetch_all_configs,
  5. fetch_config_by_id,
  6. insert_config,
  7. update_config,
  8. )
  9. from app.strategies.registry import StrategyRegistry
  10. def _validate_experiment_fields(*, daily_write_limit: int, priority: int) -> None:
  11. if daily_write_limit < 0:
  12. raise ValueError("daily_write_limit 不能为负")
  13. if priority < 0:
  14. raise ValueError("priority 不能为负")
  15. def _record_to_dict(record: StrategyConfigRecord) -> dict[str, Any]:
  16. registered_ids = set(StrategyRegistry.registered_strategy_ids())
  17. return {
  18. "strategy_id": record.strategy_id,
  19. "name": record.name,
  20. "version": record.version,
  21. "params": record.params,
  22. "active": record.active,
  23. "daily_write_limit": record.daily_write_limit,
  24. "priority": record.priority,
  25. "registered": record.strategy_id in registered_ids,
  26. "create_time": record.create_time,
  27. "updated_time": record.updated_time,
  28. }
  29. def query_strategy_configs() -> dict[str, object]:
  30. records = fetch_all_configs()
  31. return {"items": [_record_to_dict(record) for record in records]}
  32. def query_available_strategies() -> dict[str, object]:
  33. configured_ids = {record.strategy_id for record in fetch_all_configs()}
  34. items = [
  35. item
  36. for item in StrategyRegistry.list_registered()
  37. if item["strategy_id"] not in configured_ids
  38. ]
  39. return {"items": items}
  40. def create_strategy_config(
  41. *,
  42. strategy_id: str,
  43. params: dict[str, Any] | None = None,
  44. active: bool = False,
  45. daily_write_limit: int = 0,
  46. priority: int = 0,
  47. ) -> dict[str, object]:
  48. normalized_id = strategy_id.strip()
  49. if not normalized_id:
  50. raise ValueError("strategy_id 不能为空")
  51. strategy = StrategyRegistry.get(normalized_id)
  52. if strategy is None:
  53. raise ValueError(f"未注册的策略: {normalized_id}")
  54. if fetch_config_by_id(normalized_id) is not None:
  55. raise ValueError(f"策略配置已存在: {normalized_id}")
  56. normalized_params = params if params is not None else {}
  57. if not isinstance(normalized_params, dict):
  58. raise ValueError("params 必须是 JSON 对象")
  59. _validate_experiment_fields(
  60. daily_write_limit=daily_write_limit,
  61. priority=priority,
  62. )
  63. if not strategy.validate_config(normalized_params):
  64. raise ValueError(f"策略参数校验失败: {normalized_id}")
  65. record = insert_config(
  66. strategy_id=normalized_id,
  67. name=strategy.name,
  68. version=strategy.version,
  69. params=normalized_params,
  70. active=active,
  71. daily_write_limit=daily_write_limit,
  72. priority=priority,
  73. )
  74. StrategyRegistry.reload_config(normalized_id)
  75. return _record_to_dict(record)
  76. def update_strategy_config(
  77. *,
  78. strategy_id: str,
  79. params: dict[str, Any] | None = None,
  80. active: bool | None = None,
  81. daily_write_limit: int | None = None,
  82. priority: int | None = None,
  83. ) -> dict[str, object]:
  84. normalized_id = strategy_id.strip()
  85. if not normalized_id:
  86. raise ValueError("strategy_id 不能为空")
  87. strategy = StrategyRegistry.get(normalized_id)
  88. if params is not None:
  89. if not isinstance(params, dict):
  90. raise ValueError("params 必须是 JSON 对象")
  91. if strategy is not None and not strategy.validate_config(params):
  92. raise ValueError(f"策略参数校验失败: {normalized_id}")
  93. if daily_write_limit is not None or priority is not None:
  94. existing = fetch_config_by_id(normalized_id)
  95. if existing is None:
  96. raise ValueError(f"strategy config not found: {normalized_id}")
  97. _validate_experiment_fields(
  98. daily_write_limit=daily_write_limit
  99. if daily_write_limit is not None
  100. else existing.daily_write_limit,
  101. priority=priority if priority is not None else existing.priority,
  102. )
  103. try:
  104. record = update_config(
  105. strategy_id=normalized_id,
  106. params=params,
  107. active=active,
  108. daily_write_limit=daily_write_limit,
  109. priority=priority,
  110. )
  111. except KeyError as exc:
  112. raise ValueError(str(exc)) from exc
  113. StrategyRegistry.reload_config(normalized_id)
  114. return _record_to_dict(record)
  115. def set_strategy_config_active(*, strategy_id: str, active: bool) -> dict[str, object]:
  116. return update_strategy_config(strategy_id=strategy_id, active=active)