| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139 |
- from typing import Any
- from app.strategies.config_store import (
- StrategyConfigRecord,
- fetch_all_configs,
- fetch_config_by_id,
- insert_config,
- update_config,
- )
- from app.strategies.registry import StrategyRegistry
- def _validate_experiment_fields(*, daily_write_limit: int, priority: int) -> None:
- if daily_write_limit < 0:
- raise ValueError("daily_write_limit 不能为负")
- if priority < 0:
- raise ValueError("priority 不能为负")
- def _record_to_dict(record: StrategyConfigRecord) -> dict[str, Any]:
- registered_ids = set(StrategyRegistry.registered_strategy_ids())
- return {
- "strategy_id": record.strategy_id,
- "name": record.name,
- "version": record.version,
- "params": record.params,
- "active": record.active,
- "daily_write_limit": record.daily_write_limit,
- "priority": record.priority,
- "registered": record.strategy_id in registered_ids,
- "create_time": record.create_time,
- "updated_time": record.updated_time,
- }
- def query_strategy_configs() -> dict[str, object]:
- records = fetch_all_configs()
- return {"items": [_record_to_dict(record) for record in records]}
- def query_available_strategies() -> dict[str, object]:
- configured_ids = {record.strategy_id for record in fetch_all_configs()}
- items = [
- item
- for item in StrategyRegistry.list_registered()
- if item["strategy_id"] not in configured_ids
- ]
- return {"items": items}
- def create_strategy_config(
- *,
- strategy_id: str,
- params: dict[str, Any] | None = None,
- active: bool = False,
- daily_write_limit: int = 0,
- priority: int = 0,
- ) -> dict[str, object]:
- normalized_id = strategy_id.strip()
- if not normalized_id:
- raise ValueError("strategy_id 不能为空")
- strategy = StrategyRegistry.get(normalized_id)
- if strategy is None:
- raise ValueError(f"未注册的策略: {normalized_id}")
- if fetch_config_by_id(normalized_id) is not None:
- raise ValueError(f"策略配置已存在: {normalized_id}")
- normalized_params = params if params is not None else {}
- if not isinstance(normalized_params, dict):
- raise ValueError("params 必须是 JSON 对象")
- _validate_experiment_fields(
- daily_write_limit=daily_write_limit,
- priority=priority,
- )
- if not strategy.validate_config(normalized_params):
- raise ValueError(f"策略参数校验失败: {normalized_id}")
- record = insert_config(
- strategy_id=normalized_id,
- name=strategy.name,
- version=strategy.version,
- params=normalized_params,
- active=active,
- daily_write_limit=daily_write_limit,
- priority=priority,
- )
- StrategyRegistry.reload_config(normalized_id)
- return _record_to_dict(record)
- def update_strategy_config(
- *,
- strategy_id: str,
- params: dict[str, Any] | None = None,
- active: bool | None = None,
- daily_write_limit: int | None = None,
- priority: int | None = None,
- ) -> dict[str, object]:
- normalized_id = strategy_id.strip()
- if not normalized_id:
- raise ValueError("strategy_id 不能为空")
- strategy = StrategyRegistry.get(normalized_id)
- if params is not None:
- if not isinstance(params, dict):
- raise ValueError("params 必须是 JSON 对象")
- if strategy is not None and not strategy.validate_config(params):
- raise ValueError(f"策略参数校验失败: {normalized_id}")
- if daily_write_limit is not None or priority is not None:
- existing = fetch_config_by_id(normalized_id)
- if existing is None:
- raise ValueError(f"strategy config not found: {normalized_id}")
- _validate_experiment_fields(
- daily_write_limit=daily_write_limit
- if daily_write_limit is not None
- else existing.daily_write_limit,
- priority=priority if priority is not None else existing.priority,
- )
- try:
- record = update_config(
- strategy_id=normalized_id,
- params=params,
- active=active,
- daily_write_limit=daily_write_limit,
- priority=priority,
- )
- except KeyError as exc:
- raise ValueError(str(exc)) from exc
- StrategyRegistry.reload_config(normalized_id)
- return _record_to_dict(record)
- def set_strategy_config_active(*, strategy_id: str, active: bool) -> dict[str, object]:
- return update_strategy_config(strategy_id=strategy_id, active=active)
|