| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156 |
- from typing import Any
- from app.strategies.batch_date import today_yyyymmdd
- from app.strategies.base import GenerateContext
- from app.strategies.registry import ActiveStrategy, StrategyRegistry
- def resolve_batch_date(batch_date: str | None = None) -> str:
- """策略产出批次日固定为当天 YYYYMMDD(Asia/Shanghai)。"""
- today = today_yyyymmdd()
- if batch_date is not None:
- value = batch_date.strip()
- if value and value != today:
- print(
- f"[strategy] batch_date={value} ignored, using today={today} as dt"
- )
- return today
- def _build_context(active: ActiveStrategy, batch_date: str) -> GenerateContext:
- config = active.config
- params = dict(config.params)
- return GenerateContext(
- batch_date=batch_date,
- params=params,
- strategy_id=config.strategy_id,
- strategy_name=config.name,
- window_start=params.get("window_start"),
- window_end=params.get("window_end"),
- )
- def _run_single_strategy(active: ActiveStrategy, batch_date: str) -> dict[str, Any]:
- strategy_config_id = active.config.strategy_id
- context = _build_context(active, batch_date)
- skip_decision = active.strategy.should_skip(context)
- if skip_decision.skip:
- return {
- "strategy_id": strategy_config_id,
- "strategy_name": active.config.name,
- "generated": 0,
- "written": 0,
- "status": "skipped",
- "reason": skip_decision.reason,
- **skip_decision.detail,
- }
- candidates = active.strategy.generate(context)
- write_result = active.strategy.write_staging(context=context, candidates=candidates)
- return {
- "strategy_id": strategy_config_id,
- "strategy_name": active.config.name,
- "generated": len(candidates),
- "written": write_result.get("written", 0),
- "skipped_duplicates": write_result.get("skipped_duplicates", 0),
- "status": "ok",
- }
- def run_strategy_generation(batch_date: str | None = None) -> dict[str, Any]:
- """
- 根据 strategy_config 执行策略产出,写入 strategy_staging。
- - 每次执行前从 DB 全量刷新配置(支持热更新)
- - 仅运行「已注册代码实现 + active=true」的策略
- - 与实验(experiment)无关
- """
- resolved_batch_date = resolve_batch_date(batch_date)
- config_count = StrategyRegistry.load_all_configs()
- unregistered_active = StrategyRegistry.get_unregistered_active_configs()
- warnings = [
- {
- "strategy_id": item.strategy_id,
- "name": item.name,
- "reason": "active in strategy_config but no code implementation registered",
- }
- for item in unregistered_active
- ]
- active_list = StrategyRegistry.get_active()
- results: list[dict[str, Any]] = []
- errors: list[dict[str, str]] = []
- skipped_count = 0
- for active in active_list:
- strategy_id = active.config.strategy_id
- try:
- result = _run_single_strategy(active, resolved_batch_date)
- if result.get("status") == "skipped":
- skipped_count += 1
- results.append(result)
- except Exception as exc:
- errors.append(
- {
- "strategy_id": strategy_id,
- "strategy_name": active.config.name,
- "error": str(exc),
- }
- )
- return {
- "batch_date": resolved_batch_date,
- "config_count": config_count,
- "active_count": len(active_list),
- "success_count": sum(1 for item in results if item.get("status") == "ok"),
- "skipped_count": skipped_count,
- "error_count": len(errors),
- "results": results,
- "errors": errors,
- "warnings": warnings,
- }
- def generate_for_strategy(
- *,
- strategy_id: str,
- batch_date: str | None = None,
- ) -> dict[str, Any]:
- resolved_batch_date = resolve_batch_date(batch_date)
- StrategyRegistry.load_all_configs()
- StrategyRegistry.reload_config(strategy_id)
- active_list = StrategyRegistry.get_active()
- active = next((item for item in active_list if item.config.strategy_id == strategy_id), None)
- if active is None:
- registered = StrategyRegistry.get(strategy_id)
- if registered is None:
- raise KeyError(f"strategy not registered: {strategy_id}")
- raise RuntimeError(f"strategy is not active: {strategy_id}")
- result = _run_single_strategy(active, resolved_batch_date)
- return {
- **result,
- "batch_date": resolved_batch_date,
- }
- def generate_for_batch(batch_date: str | None = None) -> dict[str, Any]:
- """兼容旧调用,等价于 run_strategy_generation。"""
- return run_strategy_generation(batch_date)
- def list_active_strategies() -> list[dict[str, Any]]:
- StrategyRegistry.load_all_configs()
- return [
- {
- "strategy_id": item.config.strategy_id,
- "name": item.config.name,
- "version": item.config.version,
- "params": item.config.params,
- "code_version": item.strategy.version,
- }
- for item in StrategyRegistry.get_active()
- ]
|