strategy_generate_service.py 5.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156
  1. from typing import Any
  2. from app.strategies.batch_date import today_yyyymmdd
  3. from app.strategies.base import GenerateContext
  4. from app.strategies.registry import ActiveStrategy, StrategyRegistry
  5. def resolve_batch_date(batch_date: str | None = None) -> str:
  6. """策略产出批次日固定为当天 YYYYMMDD(Asia/Shanghai)。"""
  7. today = today_yyyymmdd()
  8. if batch_date is not None:
  9. value = batch_date.strip()
  10. if value and value != today:
  11. print(
  12. f"[strategy] batch_date={value} ignored, using today={today} as dt"
  13. )
  14. return today
  15. def _build_context(active: ActiveStrategy, batch_date: str) -> GenerateContext:
  16. config = active.config
  17. params = dict(config.params)
  18. return GenerateContext(
  19. batch_date=batch_date,
  20. params=params,
  21. strategy_id=config.strategy_id,
  22. strategy_name=config.name,
  23. window_start=params.get("window_start"),
  24. window_end=params.get("window_end"),
  25. )
  26. def _run_single_strategy(active: ActiveStrategy, batch_date: str) -> dict[str, Any]:
  27. strategy_config_id = active.config.strategy_id
  28. context = _build_context(active, batch_date)
  29. skip_decision = active.strategy.should_skip(context)
  30. if skip_decision.skip:
  31. return {
  32. "strategy_id": strategy_config_id,
  33. "strategy_name": active.config.name,
  34. "generated": 0,
  35. "written": 0,
  36. "status": "skipped",
  37. "reason": skip_decision.reason,
  38. **skip_decision.detail,
  39. }
  40. candidates = active.strategy.generate(context)
  41. write_result = active.strategy.write_staging(context=context, candidates=candidates)
  42. return {
  43. "strategy_id": strategy_config_id,
  44. "strategy_name": active.config.name,
  45. "generated": len(candidates),
  46. "written": write_result.get("written", 0),
  47. "skipped_duplicates": write_result.get("skipped_duplicates", 0),
  48. "status": "ok",
  49. }
  50. def run_strategy_generation(batch_date: str | None = None) -> dict[str, Any]:
  51. """
  52. 根据 strategy_config 执行策略产出,写入 strategy_staging。
  53. - 每次执行前从 DB 全量刷新配置(支持热更新)
  54. - 仅运行「已注册代码实现 + active=true」的策略
  55. - 与实验(experiment)无关
  56. """
  57. resolved_batch_date = resolve_batch_date(batch_date)
  58. config_count = StrategyRegistry.load_all_configs()
  59. unregistered_active = StrategyRegistry.get_unregistered_active_configs()
  60. warnings = [
  61. {
  62. "strategy_id": item.strategy_id,
  63. "name": item.name,
  64. "reason": "active in strategy_config but no code implementation registered",
  65. }
  66. for item in unregistered_active
  67. ]
  68. active_list = StrategyRegistry.get_active()
  69. results: list[dict[str, Any]] = []
  70. errors: list[dict[str, str]] = []
  71. skipped_count = 0
  72. for active in active_list:
  73. strategy_id = active.config.strategy_id
  74. try:
  75. result = _run_single_strategy(active, resolved_batch_date)
  76. if result.get("status") == "skipped":
  77. skipped_count += 1
  78. results.append(result)
  79. except Exception as exc:
  80. errors.append(
  81. {
  82. "strategy_id": strategy_id,
  83. "strategy_name": active.config.name,
  84. "error": str(exc),
  85. }
  86. )
  87. return {
  88. "batch_date": resolved_batch_date,
  89. "config_count": config_count,
  90. "active_count": len(active_list),
  91. "success_count": sum(1 for item in results if item.get("status") == "ok"),
  92. "skipped_count": skipped_count,
  93. "error_count": len(errors),
  94. "results": results,
  95. "errors": errors,
  96. "warnings": warnings,
  97. }
  98. def generate_for_strategy(
  99. *,
  100. strategy_id: str,
  101. batch_date: str | None = None,
  102. ) -> dict[str, Any]:
  103. resolved_batch_date = resolve_batch_date(batch_date)
  104. StrategyRegistry.load_all_configs()
  105. StrategyRegistry.reload_config(strategy_id)
  106. active_list = StrategyRegistry.get_active()
  107. active = next((item for item in active_list if item.config.strategy_id == strategy_id), None)
  108. if active is None:
  109. registered = StrategyRegistry.get(strategy_id)
  110. if registered is None:
  111. raise KeyError(f"strategy not registered: {strategy_id}")
  112. raise RuntimeError(f"strategy is not active: {strategy_id}")
  113. result = _run_single_strategy(active, resolved_batch_date)
  114. return {
  115. **result,
  116. "batch_date": resolved_batch_date,
  117. }
  118. def generate_for_batch(batch_date: str | None = None) -> dict[str, Any]:
  119. """兼容旧调用,等价于 run_strategy_generation。"""
  120. return run_strategy_generation(batch_date)
  121. def list_active_strategies() -> list[dict[str, Any]]:
  122. StrategyRegistry.load_all_configs()
  123. return [
  124. {
  125. "strategy_id": item.config.strategy_id,
  126. "name": item.config.name,
  127. "version": item.config.version,
  128. "params": item.config.params,
  129. "code_version": item.strategy.version,
  130. }
  131. for item in StrategyRegistry.get_active()
  132. ]