base.py 2.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384
  1. from __future__ import annotations
  2. from abc import ABC, abstractmethod
  3. from dataclasses import dataclass, field
  4. from typing import Any
  5. @dataclass(frozen=True)
  6. class GenerateContext:
  7. """策略产出上下文。"""
  8. batch_date: str
  9. params: dict[str, Any]
  10. strategy_id: str
  11. strategy_name: str
  12. window_start: str | None = None
  13. window_end: str | None = None
  14. @dataclass(frozen=True)
  15. class StrategySkipDecision:
  16. """策略跳过决策,由各策略在 should_skip() 中自行返回。"""
  17. skip: bool
  18. reason: str = ""
  19. detail: dict[str, Any] = field(default_factory=dict)
  20. @dataclass
  21. class DemandCandidate:
  22. """
  23. 策略产出的候选需求。
  24. generate() 返回此结构;写入 strategy_staging 时映射为:
  25. - content -> demand_name
  26. - priority_score -> weight
  27. - extra -> extend
  28. """
  29. content: str
  30. confidence: float | None = None
  31. priority_score: float | None = None
  32. demand_id: str | None = None
  33. demand_type: str | None = None
  34. video_count: int | None = None
  35. video_list: list[Any] | None = None
  36. extra: dict[str, Any] = field(default_factory=dict)
  37. class BaseStrategy(ABC):
  38. """所有策略实现的统一接口。"""
  39. strategy_id: str
  40. name: str
  41. version: str
  42. @abstractmethod
  43. def validate_config(self, config: dict[str, Any]) -> bool:
  44. """校验策略参数,注册或热更新时调用。"""
  45. @abstractmethod
  46. def generate(self, context: GenerateContext) -> list[DemandCandidate]:
  47. """产出候选需求列表。"""
  48. def should_skip(self, context: GenerateContext) -> StrategySkipDecision:
  49. """各策略自行决定是否跳过本次产出,默认不跳过。"""
  50. return StrategySkipDecision(skip=False)
  51. def write_staging(
  52. self,
  53. *,
  54. context: GenerateContext,
  55. candidates: list[DemandCandidate],
  56. ) -> dict[str, Any]:
  57. """写入 strategy_staging,默认按 context.batch_date 整批替换。"""
  58. from app.strategies.staging_store import replace_staging_batch
  59. written = replace_staging_batch(
  60. strategy_config_id=self.strategy_id,
  61. strategy_name=self.name,
  62. batch_date=context.batch_date,
  63. candidates=candidates,
  64. )
  65. return {"written": written, "skipped_duplicates": 0}