Sfoglia il codice sorgente

需求系统初始化

xueyiming 1 settimana fa
parent
commit
7c1275212f
33 ha cambiato i file con 2188 aggiunte e 0 eliminazioni
  1. 14 0
      .env.example
  2. 42 0
      app/core/config.py
  3. 43 0
      app/db/supply_mysql.py
  4. 2 0
      app/main.py
  5. 22 0
      app/scheduler/jobs.py
  6. 14 0
      app/scheduler/manager.py
  7. 156 0
      app/services/strategy_generate_service.py
  8. 11 0
      app/strategies/__init__.py
  9. 84 0
      app/strategies/base.py
  10. 9 0
      app/strategies/batch_date.py
  11. 16 0
      app/strategies/bootstrap.py
  12. 138 0
      app/strategies/config_store.py
  13. 44 0
      app/strategies/impl/__init__.py
  14. 109 0
      app/strategies/impl/_monthly_base.py
  15. 81 0
      app/strategies/impl/_supply_gap_base.py
  16. 66 0
      app/strategies/impl/hot_event_strategy.py
  17. 122 0
      app/strategies/impl/lunar_same_period_strategy.py
  18. 10 0
      app/strategies/impl/monthly_segment_strategy.py
  19. 10 0
      app/strategies/impl/monthly_strategy.py
  20. 122 0
      app/strategies/impl/solar_same_period_strategy.py
  21. 14 0
      app/strategies/impl/supply_gap_segment_strategy.py
  22. 18 0
      app/strategies/impl/supply_gap_strategy.py
  23. 0 0
      app/strategies/odps/__init__.py
  24. 53 0
      app/strategies/odps/_date_utils.py
  25. 37 0
      app/strategies/odps/_utils.py
  26. 137 0
      app/strategies/odps/lunar_same_period_demands.py
  27. 214 0
      app/strategies/odps/monthly_demands.py
  28. 117 0
      app/strategies/odps/solar_same_period_demands.py
  29. 100 0
      app/strategies/registry.py
  30. 0 0
      app/strategies/sources/__init__.py
  31. 60 0
      app/strategies/sources/hot_content_sync_log.py
  32. 63 0
      app/strategies/sources/supply_demand_content.py
  33. 260 0
      app/strategies/staging_store.py

+ 14 - 0
.env.example

@@ -16,6 +16,14 @@ HOT_CONTENT_MYSQL_USER=
 HOT_CONTENT_MYSQL_PASSWORD=
 HOT_CONTENT_MYSQL_DATABASE=external_demand
 HOT_CONTENT_MYSQL_CHARSET=utf8mb4
+# 当下供需gap-分词来源库(demand_content 表)
+SUPPLY_MYSQL_HOST=
+SUPPLY_MYSQL_PORT=3306
+SUPPLY_MYSQL_USER=
+SUPPLY_MYSQL_PASSWORD=
+SUPPLY_MYSQL_DATABASE=content-deconstruction-supply
+SUPPLY_MYSQL_CHARSET=utf8mb4
+SUPPLY_DEMAND_CONTENT_TABLE=demand_content
 HOT_DEMAND_POOL_STRATEGY=新热事件
 HOT_CONTENT_WXINDEX_THRESHOLD=1000000
 ODPS_ACCESS_ID=
@@ -37,3 +45,9 @@ DEMAND_POOL_DAILY_STRATEGY_ALERT_MINUTE=0
 FEISHU_WEBHOOK_URL=
 FEISHU_WEBHOOK_TIMEOUT_SECONDS=30
 FEISHU_WEBHOOK_VERIFY_SSL=false
+STRATEGY_CONFIG_TABLE=strategy_config
+STRATEGY_STAGING_TABLE=strategy_staging
+STRATEGY_STAGING_HOURLY_GENERATE_ENABLED=true
+STRATEGY_STAGING_HOURLY_GENERATE_START_HOUR=3
+STRATEGY_STAGING_HOURLY_GENERATE_END_HOUR=23
+STRATEGY_STAGING_HOURLY_GENERATE_MINUTE=0

+ 42 - 0
app/core/config.py

@@ -22,6 +22,14 @@ class Settings(BaseSettings):
     hot_content_mysql_password: str = ""
     hot_content_mysql_database: str = ""
     hot_content_mysql_charset: str = "utf8mb4"
+    # 当下供需gap-分词来源库(demand_content 表)
+    supply_mysql_host: str = ""
+    supply_mysql_port: int = 3306
+    supply_mysql_user: str = ""
+    supply_mysql_password: str = ""
+    supply_mysql_database: str = ""
+    supply_mysql_charset: str = "utf8mb4"
+    supply_demand_content_table: str = "demand_content"
     odps_access_id: str = "LTAI9EBa0bd5PrDa"
     odps_access_key: str = "vAalxds7YxhfOA2yVv8GziCg3Y87v5"
     odps_project: str = "loghubods"
@@ -45,6 +53,12 @@ class Settings(BaseSettings):
     feishu_webhook_verify_ssl: bool = False
     hot_demand_pool_strategy: str = "新热事件"
     hot_content_wxindex_threshold: float = 1_000_000.0
+    strategy_config_table: str = "strategy_config"
+    strategy_staging_table: str = "strategy_staging"
+    strategy_staging_hourly_generate_enabled: bool = True
+    strategy_staging_hourly_generate_start_hour: int = 3
+    strategy_staging_hourly_generate_end_hour: int = 23
+    strategy_staging_hourly_generate_minute: int = 0
 
     model_config = SettingsConfigDict(
         env_file=".env",
@@ -75,6 +89,21 @@ class Settings(BaseSettings):
             f"{quote_plus(self.hot_content_mysql_database)}?charset={charset}"
         )
 
+    @property
+    def supply_mysql_configured(self) -> bool:
+        return bool(self.supply_mysql_host.strip() and self.supply_mysql_database.strip())
+
+    @property
+    def supply_mysql_dsn(self) -> str:
+        charset = quote_plus(self.supply_mysql_charset or "utf8mb4")
+        return (
+            "mysql+pymysql://"
+            f"{quote_plus(self.supply_mysql_user)}:"
+            f"{quote_plus(self.supply_mysql_password)}"
+            f"@{self.supply_mysql_host}:{self.supply_mysql_port}/"
+            f"{quote_plus(self.supply_mysql_database)}?charset={charset}"
+        )
+
     @property
     def demand_pool_initial_partition_list(self) -> list[str]:
         return [
@@ -83,6 +112,19 @@ class Settings(BaseSettings):
             if partition.strip()
         ]
 
+    @property
+    def strategy_staging_hourly_cron_hours(self) -> str:
+        """每小时触发,不跨午夜。默认 start=3、end=23 → '3-23'(末次 23:00,0 点不跑)。"""
+        start = self.strategy_staging_hourly_generate_start_hour
+        end = self.strategy_staging_hourly_generate_end_hour
+        if not (0 <= start <= 23 and 0 <= end <= 23):
+            raise ValueError("strategy staging hourly hours must be 0-23")
+        if start > end:
+            raise ValueError("start_hour cannot be greater than end_hour")
+        if start == end:
+            return str(start)
+        return f"{start}-{end}"
+
     @property
     def cors_allow_origin_list(self) -> list[str]:
         if self.cors_allow_origins.strip() == "*":

+ 43 - 0
app/db/supply_mysql.py

@@ -0,0 +1,43 @@
+from collections.abc import Generator
+
+from sqlalchemy import create_engine, text
+from sqlalchemy.engine import Engine
+from sqlalchemy.orm import Session, sessionmaker
+
+from app.core.config import settings
+
+_supply_engine: Engine | None = None
+_supply_session_local: sessionmaker[Session] | None = None
+
+
+def _ensure_supply_engine() -> sessionmaker[Session]:
+    global _supply_engine, _supply_session_local
+    if not settings.supply_mysql_configured:
+        raise RuntimeError(
+            "供需内容库未配置,请在 .env 中设置 SUPPLY_MYSQL_HOST、"
+            "SUPPLY_MYSQL_USER、SUPPLY_MYSQL_PASSWORD、SUPPLY_MYSQL_DATABASE"
+        )
+    if _supply_session_local is None:
+        _supply_engine = create_engine(
+            settings.supply_mysql_dsn,
+            pool_pre_ping=True,
+            pool_recycle=3600,
+        )
+        _supply_session_local = sessionmaker(
+            bind=_supply_engine,
+            autoflush=False,
+            autocommit=False,
+        )
+    return _supply_session_local
+
+
+def SupplySessionLocal() -> Session:
+    return _ensure_supply_engine()()
+
+
+def get_supply_db_session() -> Generator[Session, None, None]:
+    db = SupplySessionLocal()
+    try:
+        yield db
+    finally:
+        db.close()

+ 2 - 0
app/main.py

@@ -6,10 +6,12 @@ from fastapi.middleware.cors import CORSMiddleware
 from app.api.routes import router
 from app.core.config import settings
 from app.scheduler.manager import shutdown_scheduler, start_scheduler
+from app.strategies.bootstrap import init_strategy_system
 
 
 @asynccontextmanager
 async def lifespan(_: FastAPI):
+    init_strategy_system()
     start_scheduler()
     try:
         yield

+ 22 - 0
app/scheduler/jobs.py

@@ -2,6 +2,7 @@ from datetime import datetime
 from zoneinfo import ZoneInfo
 
 from app.services.demand_pool_strategy_daily_alert import run_daily_strategy_alert
+from app.services.strategy_generate_service import run_strategy_generation
 from app.sync.demand_pool_sync import run_full_sync, run_today_incremental_sync
 
 
@@ -31,3 +32,24 @@ def demand_pool_daily_strategy_alert_job(partition_dt: str | None = None) -> Non
         print(f"[scheduler] daily strategy alert failed: {exc}")
         raise
 
+
+def strategy_staging_hourly_generate_job(batch_date: str | None = None) -> None:
+    print("[scheduler] start hourly strategy generation for strategy_staging")
+    try:
+        result = run_strategy_generation(batch_date)
+        print(
+            "[scheduler] strategy generation done: "
+            f"batch_date={result['batch_date']}, "
+            f"active={result['active_count']}, "
+            f"success={result['success_count']}, "
+            f"skipped={result.get('skipped_count', 0)}, "
+            f"errors={result['error_count']}"
+        )
+        if result["warnings"]:
+            print(f"[scheduler] strategy generation warnings: {result['warnings']}")
+        if result["errors"]:
+            print(f"[scheduler] strategy generation errors: {result['errors']}")
+    except Exception as exc:
+        print(f"[scheduler] strategy generation failed: {exc}")
+        raise
+

+ 14 - 0
app/scheduler/manager.py

@@ -11,6 +11,7 @@ from app.scheduler.jobs import (
     demand_pool_daily_strategy_alert_job,
     demand_pool_today_incremental_sync_job,
     heartbeat_job,
+    strategy_staging_hourly_generate_job,
 )
 
 # 与 scheduler 一致。APScheduler 3.x 在 add_job 里传入 CronTrigger 实例时,若未显式指定
@@ -53,6 +54,19 @@ def setup_jobs() -> None:
             max_instances=1,
             coalesce=True,
         )
+    if settings.strategy_staging_hourly_generate_enabled:
+        scheduler.add_job(
+            strategy_staging_hourly_generate_job,
+            trigger=CronTrigger(
+                minute=settings.strategy_staging_hourly_generate_minute,
+                hour=settings.strategy_staging_hourly_cron_hours,
+                timezone=_CRON_TZ,
+            ),
+            id="strategy_staging_hourly_generate_job",
+            replace_existing=True,
+            max_instances=1,
+            coalesce=True,
+        )
 
 
 def start_scheduler() -> None:

+ 156 - 0
app/services/strategy_generate_service.py

@@ -0,0 +1,156 @@
+from typing import Any
+
+from app.strategies.batch_date import today_yyyymmdd
+from app.strategies.base import GenerateContext
+from app.strategies.registry import ActiveStrategy, StrategyRegistry
+
+
+def resolve_batch_date(batch_date: str | None = None) -> str:
+    """策略产出批次日固定为当天 YYYYMMDD(Asia/Shanghai)。"""
+    today = today_yyyymmdd()
+    if batch_date is not None:
+        value = batch_date.strip()
+        if value and value != today:
+            print(
+                f"[strategy] batch_date={value} ignored, using today={today} as dt"
+            )
+    return today
+
+
+def _build_context(active: ActiveStrategy, batch_date: str) -> GenerateContext:
+    config = active.config
+    params = dict(config.params)
+    return GenerateContext(
+        batch_date=batch_date,
+        params=params,
+        strategy_id=config.strategy_id,
+        strategy_name=config.name,
+        window_start=params.get("window_start"),
+        window_end=params.get("window_end"),
+    )
+
+
+def _run_single_strategy(active: ActiveStrategy, batch_date: str) -> dict[str, Any]:
+    strategy_config_id = active.config.strategy_id
+    context = _build_context(active, batch_date)
+
+    skip_decision = active.strategy.should_skip(context)
+    if skip_decision.skip:
+        return {
+            "strategy_id": strategy_config_id,
+            "strategy_name": active.config.name,
+            "generated": 0,
+            "written": 0,
+            "status": "skipped",
+            "reason": skip_decision.reason,
+            **skip_decision.detail,
+        }
+
+    candidates = active.strategy.generate(context)
+    write_result = active.strategy.write_staging(context=context, candidates=candidates)
+    return {
+        "strategy_id": strategy_config_id,
+        "strategy_name": active.config.name,
+        "generated": len(candidates),
+        "written": write_result.get("written", 0),
+        "skipped_duplicates": write_result.get("skipped_duplicates", 0),
+        "status": "ok",
+    }
+
+
+def run_strategy_generation(batch_date: str | None = None) -> dict[str, Any]:
+    """
+    根据 strategy_config 执行策略产出,写入 strategy_staging。
+
+    - 每次执行前从 DB 全量刷新配置(支持热更新)
+    - 仅运行「已注册代码实现 + active=true」的策略
+    - 与实验(experiment)无关
+    """
+    resolved_batch_date = resolve_batch_date(batch_date)
+    config_count = StrategyRegistry.load_all_configs()
+
+    unregistered_active = StrategyRegistry.get_unregistered_active_configs()
+    warnings = [
+        {
+            "strategy_id": item.strategy_id,
+            "name": item.name,
+            "reason": "active in strategy_config but no code implementation registered",
+        }
+        for item in unregistered_active
+    ]
+
+    active_list = StrategyRegistry.get_active()
+    results: list[dict[str, Any]] = []
+    errors: list[dict[str, str]] = []
+    skipped_count = 0
+
+    for active in active_list:
+        strategy_id = active.config.strategy_id
+        try:
+            result = _run_single_strategy(active, resolved_batch_date)
+            if result.get("status") == "skipped":
+                skipped_count += 1
+            results.append(result)
+        except Exception as exc:
+            errors.append(
+                {
+                    "strategy_id": strategy_id,
+                    "strategy_name": active.config.name,
+                    "error": str(exc),
+                }
+            )
+
+    return {
+        "batch_date": resolved_batch_date,
+        "config_count": config_count,
+        "active_count": len(active_list),
+        "success_count": sum(1 for item in results if item.get("status") == "ok"),
+        "skipped_count": skipped_count,
+        "error_count": len(errors),
+        "results": results,
+        "errors": errors,
+        "warnings": warnings,
+    }
+
+
+def generate_for_strategy(
+    *,
+    strategy_id: str,
+    batch_date: str | None = None,
+) -> dict[str, Any]:
+    resolved_batch_date = resolve_batch_date(batch_date)
+    StrategyRegistry.load_all_configs()
+    StrategyRegistry.reload_config(strategy_id)
+
+    active_list = StrategyRegistry.get_active()
+    active = next((item for item in active_list if item.config.strategy_id == strategy_id), None)
+    if active is None:
+        registered = StrategyRegistry.get(strategy_id)
+        if registered is None:
+            raise KeyError(f"strategy not registered: {strategy_id}")
+        raise RuntimeError(f"strategy is not active: {strategy_id}")
+
+    result = _run_single_strategy(active, resolved_batch_date)
+    return {
+        **result,
+        "batch_date": resolved_batch_date,
+    }
+
+
+def generate_for_batch(batch_date: str | None = None) -> dict[str, Any]:
+    """兼容旧调用,等价于 run_strategy_generation。"""
+    return run_strategy_generation(batch_date)
+
+
+def list_active_strategies() -> list[dict[str, Any]]:
+    StrategyRegistry.load_all_configs()
+    return [
+        {
+            "strategy_id": item.config.strategy_id,
+            "name": item.config.name,
+            "version": item.config.version,
+            "params": item.config.params,
+            "code_version": item.strategy.version,
+        }
+        for item in StrategyRegistry.get_active()
+    ]

+ 11 - 0
app/strategies/__init__.py

@@ -0,0 +1,11 @@
+from app.strategies.base import BaseStrategy, DemandCandidate, GenerateContext, StrategySkipDecision
+from app.strategies.registry import ActiveStrategy, StrategyRegistry
+
+__all__ = [
+    "ActiveStrategy",
+    "BaseStrategy",
+    "DemandCandidate",
+    "GenerateContext",
+    "StrategyRegistry",
+    "StrategySkipDecision",
+]

+ 84 - 0
app/strategies/base.py

@@ -0,0 +1,84 @@
+from __future__ import annotations
+
+from abc import ABC, abstractmethod
+from dataclasses import dataclass, field
+from typing import Any
+
+
+@dataclass(frozen=True)
+class GenerateContext:
+    """策略产出上下文。"""
+
+    batch_date: str
+    params: dict[str, Any]
+    strategy_id: str
+    strategy_name: str
+    window_start: str | None = None
+    window_end: str | None = None
+
+
+@dataclass(frozen=True)
+class StrategySkipDecision:
+    """策略跳过决策,由各策略在 should_skip() 中自行返回。"""
+
+    skip: bool
+    reason: str = ""
+    detail: dict[str, Any] = field(default_factory=dict)
+
+
+@dataclass
+class DemandCandidate:
+    """
+    策略产出的候选需求。
+
+    generate() 返回此结构;写入 strategy_staging 时映射为:
+    - content -> demand_name
+    - priority_score -> weight
+    - extra -> extend
+    """
+
+    content: str
+    confidence: float | None = None
+    priority_score: float | None = None
+    demand_id: str | None = None
+    demand_type: str | None = None
+    video_count: int | None = None
+    video_list: list[Any] | None = None
+    extra: dict[str, Any] = field(default_factory=dict)
+
+
+class BaseStrategy(ABC):
+    """所有策略实现的统一接口。"""
+
+    strategy_id: str
+    name: str
+    version: str
+
+    @abstractmethod
+    def validate_config(self, config: dict[str, Any]) -> bool:
+        """校验策略参数,注册或热更新时调用。"""
+
+    @abstractmethod
+    def generate(self, context: GenerateContext) -> list[DemandCandidate]:
+        """产出候选需求列表。"""
+
+    def should_skip(self, context: GenerateContext) -> StrategySkipDecision:
+        """各策略自行决定是否跳过本次产出,默认不跳过。"""
+        return StrategySkipDecision(skip=False)
+
+    def write_staging(
+        self,
+        *,
+        context: GenerateContext,
+        candidates: list[DemandCandidate],
+    ) -> dict[str, Any]:
+        """写入 strategy_staging,默认按 context.batch_date 整批替换。"""
+        from app.strategies.staging_store import replace_staging_batch
+
+        written = replace_staging_batch(
+            strategy_config_id=self.strategy_id,
+            strategy_name=self.name,
+            batch_date=context.batch_date,
+            candidates=candidates,
+        )
+        return {"written": written, "skipped_duplicates": 0}

+ 9 - 0
app/strategies/batch_date.py

@@ -0,0 +1,9 @@
+from datetime import datetime
+from zoneinfo import ZoneInfo
+
+SHANGHAI_TZ = ZoneInfo("Asia/Shanghai")
+
+
+def today_yyyymmdd() -> str:
+    """当天批次日 / ODPS dt 基准日(Asia/Shanghai)。"""
+    return datetime.now(SHANGHAI_TZ).strftime("%Y%m%d")

+ 16 - 0
app/strategies/bootstrap.py

@@ -0,0 +1,16 @@
+from app.strategies.impl import register_all
+from app.strategies.config_store import ensure_strategy_tables
+from app.strategies.registry import StrategyRegistry
+
+
+def init_strategy_system() -> None:
+    """服务启动时初始化策略表、加载配置并注册策略实现。"""
+    ensure_strategy_tables()
+    loaded = StrategyRegistry.load_all_configs()
+    register_all()
+    active_count = len(StrategyRegistry.get_active())
+    print(
+        "[strategy] initialized: "
+        f"registered={len(StrategyRegistry.registered_strategy_ids())}, "
+        f"configs={loaded}, active={active_count}"
+    )

+ 138 - 0
app/strategies/config_store.py

@@ -0,0 +1,138 @@
+import json
+import re
+from dataclasses import dataclass
+from typing import Any
+
+from sqlalchemy import text
+
+from app.core.config import settings
+from app.db.mysql import SessionLocal
+
+IDENTIFIER_RE = re.compile(r"^[A-Za-z_][A-Za-z0-9_]*$")
+
+
+def _safe_identifier(name: str) -> str:
+    if not IDENTIFIER_RE.match(name):
+        raise ValueError(f"invalid sql identifier: {name}")
+    return name
+
+
+@dataclass(frozen=True)
+class StrategyConfigRecord:
+    strategy_id: str
+    name: str
+    version: str
+    params: dict[str, Any]
+    active: bool
+
+
+def _parse_params(raw: object) -> dict[str, Any]:
+    if raw is None:
+        return {}
+    if isinstance(raw, dict):
+        return raw
+    if isinstance(raw, (bytes, bytearray)):
+        raw = raw.decode("utf-8")
+    if isinstance(raw, str):
+        text_value = raw.strip()
+        if not text_value:
+            return {}
+        return json.loads(text_value)
+    raise TypeError(f"unsupported params type: {type(raw)!r}")
+
+
+def ensure_strategy_tables() -> None:
+    config_table = _safe_identifier(settings.strategy_config_table)
+    staging_table = _safe_identifier(settings.strategy_staging_table)
+
+    config_sql = f"""
+    CREATE TABLE IF NOT EXISTS {config_table}
+    (
+        id           BIGINT       NOT NULL AUTO_INCREMENT COMMENT '自增id',
+        strategy_id  VARCHAR(64)  NOT NULL COMMENT '全局唯一id,英文名称',
+        name         VARCHAR(128) NOT NULL COMMENT '策略名称',
+        version      VARCHAR(32)  NOT NULL COMMENT '策略版本,用于追踪代码变更',
+        params       JSON         NULL COMMENT '策略运行参数,如阈值、窗口期等',
+        active       TINYINT(1)   NOT NULL DEFAULT 1 COMMENT '是否启用,0=不参与产出和合并',
+        create_time  TIMESTAMP    NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
+        updated_time TIMESTAMP    NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
+        PRIMARY KEY (id),
+        UNIQUE KEY uk_strategy_id (strategy_id)
+    ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci COMMENT='策略配置表'
+    """
+
+    staging_sql = f"""
+    CREATE TABLE IF NOT EXISTS {staging_table}
+    (
+        id                 BIGINT        NOT NULL AUTO_INCREMENT COMMENT '自增id',
+        strategy_config_id VARCHAR(64)   NOT NULL COMMENT '来源策略ID,关联 strategy_config.strategy_id',
+        strategy           VARCHAR(32)   NULL COMMENT '策略名称',
+        demand_id          VARCHAR(64)   NULL COMMENT '需求id',
+        demand_name        VARCHAR(512)  NULL COMMENT '需求',
+        weight             DOUBLE        NULL COMMENT '需求权重',
+        `type`             VARCHAR(64)   NULL COMMENT '需求类型',
+        video_count        INT           NULL COMMENT '视频数量',
+        video_list         JSON          NULL COMMENT '视频列表',
+        extend             JSON          NULL COMMENT '拓展字段',
+        batch_date         VARCHAR(32)   NULL COMMENT '归属批次日期,e.g. 20250526',
+        create_time        TIMESTAMP     NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
+        PRIMARY KEY (id),
+        KEY idx_strategy_config_id (strategy_config_id),
+        KEY idx_batch_date (batch_date),
+        KEY idx_demand_id (demand_id)
+    ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci COMMENT='策略暂存表'
+    """
+
+    with SessionLocal() as session:
+        session.execute(text(config_sql))
+        session.execute(text(staging_sql))
+        session.commit()
+
+
+def fetch_all_configs() -> list[StrategyConfigRecord]:
+    config_table = _safe_identifier(settings.strategy_config_table)
+    query = text(
+        f"""
+        SELECT strategy_id, name, version, params, active
+        FROM {config_table}
+        ORDER BY strategy_id ASC
+        """
+    )
+    with SessionLocal() as session:
+        rows = session.execute(query).mappings().all()
+
+    return [
+        StrategyConfigRecord(
+            strategy_id=str(row["strategy_id"]),
+            name=str(row["name"]),
+            version=str(row["version"]),
+            params=_parse_params(row["params"]),
+            active=bool(row["active"]),
+        )
+        for row in rows
+    ]
+
+
+def fetch_config_by_id(strategy_id: str) -> StrategyConfigRecord | None:
+    config_table = _safe_identifier(settings.strategy_config_table)
+    query = text(
+        f"""
+        SELECT strategy_id, name, version, params, active
+        FROM {config_table}
+        WHERE strategy_id = :strategy_id
+        LIMIT 1
+        """
+    )
+    with SessionLocal() as session:
+        row = session.execute(query, {"strategy_id": strategy_id}).mappings().first()
+
+    if row is None:
+        return None
+
+    return StrategyConfigRecord(
+        strategy_id=str(row["strategy_id"]),
+        name=str(row["name"]),
+        version=str(row["version"]),
+        params=_parse_params(row["params"]),
+        active=bool(row["active"]),
+    )

+ 44 - 0
app/strategies/impl/__init__.py

@@ -0,0 +1,44 @@
+import importlib
+import inspect
+import pkgutil
+
+from app.strategies.base import BaseStrategy
+
+
+def _iter_concrete_strategies() -> list[type[BaseStrategy]]:
+    import app.strategies.impl as impl_package
+
+    strategies: list[type[BaseStrategy]] = []
+    seen_ids: set[str] = set()
+
+    for module_info in pkgutil.iter_modules(impl_package.__path__):
+        if module_info.name.startswith("_"):
+            continue
+
+        module = importlib.import_module(f"{impl_package.__name__}.{module_info.name}")
+        for _, cls in inspect.getmembers(module, inspect.isclass):
+            if cls.__module__ != module.__name__:
+                continue
+            if not issubclass(cls, BaseStrategy) or cls is BaseStrategy:
+                continue
+            if inspect.isabstract(cls):
+                continue
+
+            strategy_id = getattr(cls, "strategy_id", "").strip()
+            if not strategy_id:
+                continue
+            if strategy_id in seen_ids:
+                raise ValueError(f"duplicate strategy_id in impl package: {strategy_id}")
+
+            seen_ids.add(strategy_id)
+            strategies.append(cls)
+
+    return strategies
+
+
+def register_all() -> None:
+    """自动扫描 impl/ 下策略模块并注册(跳过 _ 开头的模块/基类)。"""
+    from app.strategies.registry import StrategyRegistry
+
+    for strategy_cls in _iter_concrete_strategies():
+        StrategyRegistry.register(strategy_cls())

+ 109 - 0
app/strategies/impl/_monthly_base.py

@@ -0,0 +1,109 @@
+from typing import Any
+
+from app.strategies.batch_date import today_yyyymmdd
+from app.strategies.base import (
+    BaseStrategy,
+    DemandCandidate,
+    GenerateContext,
+    StrategySkipDecision,
+)
+from app.strategies.odps.monthly_demands import query_monthly_demands
+from app.strategies.staging_store import count_staging_batch, has_staging_batch
+
+_NUMERIC_PARAM_KEYS = (
+    ("view_pv_count", ("view_pv_count",)),
+    ("month_total_pv_threshold", ("month_total_pv_threshold",)),
+    ("min_contribution_score", ("min_contribution_score", "贡献分")),
+    ("rov_avg", ("rov_avg",)),
+    ("min_frequency", ("min_frequency", "频次")),
+)
+
+
+class MonthlyStrategyBase(BaseStrategy):
+    """逐月类策略公共逻辑(ODPS 查询结构相同,仅 strategy 标签不同)。"""
+
+    strategy_label: str
+
+    def validate_config(self, config: dict[str, Any]) -> bool:
+        try:
+            self._resolve_params(config)
+        except (TypeError, ValueError, KeyError):
+            return False
+        return True
+
+    def should_skip(self, context: GenerateContext) -> StrategySkipDecision:
+        """当天 strategy_staging 已有该策略数据则跳过(ODPS 查询成本高)。"""
+        if not has_staging_batch(
+            strategy_config_id=self.strategy_id,
+            batch_date=context.batch_date,
+        ):
+            return StrategySkipDecision(skip=False)
+
+        existing_count = count_staging_batch(
+            strategy_config_id=self.strategy_id,
+            batch_date=context.batch_date,
+        )
+        return StrategySkipDecision(
+            skip=True,
+            reason="strategy_staging already has data for batch_date",
+            detail={"existing_count": existing_count},
+        )
+
+    def generate(self, context: GenerateContext) -> list[DemandCandidate]:
+        params = self._resolve_params(context.params)
+        rows = query_monthly_demands(
+            bizdate=today_yyyymmdd(),
+            strategy_label=self.strategy_label,
+            view_pv_count=params["view_pv_count"],
+            month_total_pv_threshold=params["month_total_pv_threshold"],
+            min_contribution_score=params["min_contribution_score"],
+            rov_avg=params["rov_avg"],
+            min_frequency=params["min_frequency"],
+        )
+
+        candidates: list[DemandCandidate] = []
+        for row in rows:
+            demand_name = str(row.get("demand_name") or "").strip()
+            if not demand_name:
+                continue
+
+            weight = row.get("weight")
+            priority_score = float(weight) if weight is not None else None
+            video_count = row.get("video_count")
+            parsed_video_count = int(video_count) if video_count is not None else None
+
+            candidates.append(
+                DemandCandidate(
+                    content=demand_name,
+                    priority_score=priority_score,
+                    demand_id=str(row["demand_id"]) if row.get("demand_id") else None,
+                    demand_type=str(row.get("type") or "特征点"),
+                    video_count=parsed_video_count,
+                    video_list=row.get("video_list"),
+                )
+            )
+        return candidates
+
+    @staticmethod
+    def _pick_param(config: dict[str, Any], keys: tuple[str, ...]) -> Any:
+        for key in keys:
+            if key in config:
+                return config[key]
+        raise KeyError(keys[0])
+
+    @classmethod
+    def _resolve_params(cls, config: dict[str, Any]) -> dict[str, int | float]:
+        resolved: dict[str, int | float] = {}
+        for canonical, aliases in _NUMERIC_PARAM_KEYS:
+            raw = cls._pick_param(config, aliases)
+            if canonical in ("view_pv_count", "min_frequency"):
+                value = int(raw)
+                if value < 0:
+                    raise ValueError(f"{canonical} 不能为负")
+                resolved[canonical] = value
+            else:
+                value = float(raw)
+                if value < 0:
+                    raise ValueError(f"{canonical} 不能为负")
+                resolved[canonical] = value
+        return resolved

+ 81 - 0
app/strategies/impl/_supply_gap_base.py

@@ -0,0 +1,81 @@
+import hashlib
+from decimal import Decimal, ROUND_HALF_UP
+from typing import Any
+
+from app.core.config import settings
+from app.strategies.batch_date import today_yyyymmdd
+from app.strategies.base import (
+    BaseStrategy,
+    DemandCandidate,
+    GenerateContext,
+)
+from app.strategies.sources.supply_demand_content import fetch_demand_content_by_dt
+from app.strategies.staging_store import insert_staging_rows_skip_duplicates
+
+
+def build_supply_demand_id(*, strategy_name: str, demand_name: str, dt: str) -> str:
+    raw = f"{strategy_name}{demand_name.strip()}{dt.strip()}"
+    return hashlib.md5(raw.encode("utf-8")).hexdigest()
+
+
+def round_supply_score(value: object) -> float | None:
+    if value is None:
+        return None
+    decimal_value = Decimal(str(value)).quantize(
+        Decimal("0.0001"),
+        rounding=ROUND_HALF_UP,
+    )
+    return float(decimal_value)
+
+
+class SupplyGapStrategyBase(BaseStrategy):
+    """当下供需 gap 系列:从 demand_content 同步,逐行插入并跳过重复 demand_id。"""
+
+    def validate_config(self, config: dict[str, Any]) -> bool:
+        if not settings.supply_mysql_configured:
+            return False
+        return isinstance(config, dict)
+
+    def build_demand_name(self, row: dict[str, Any]) -> str:
+        raise NotImplementedError
+
+    def generate(self, context: GenerateContext) -> list[DemandCandidate]:
+        dt = today_yyyymmdd()
+        rows = fetch_demand_content_by_dt(dt)
+
+        candidates: list[DemandCandidate] = []
+        for row in rows:
+            row_dt = str(row.get("dt") or dt).strip()
+            demand_name = self.build_demand_name(row).strip()
+            if not demand_name or not row_dt:
+                continue
+
+            demand_type = row.get("demand_type")
+            parsed_type = str(demand_type).strip() if demand_type is not None else None
+
+            candidates.append(
+                DemandCandidate(
+                    content=demand_name,
+                    priority_score=round_supply_score(row.get("score")),
+                    demand_id=build_supply_demand_id(
+                        strategy_name=self.name,
+                        demand_name=demand_name,
+                        dt=row_dt,
+                    ),
+                    demand_type=parsed_type,
+                    extra={"batch_date": row_dt},
+                )
+            )
+        return candidates
+
+    def write_staging(
+        self,
+        *,
+        context: GenerateContext,
+        candidates: list[DemandCandidate],
+    ) -> dict[str, Any]:
+        return insert_staging_rows_skip_duplicates(
+            strategy_config_id=self.strategy_id,
+            strategy_name=self.name,
+            candidates=candidates,
+        )

+ 66 - 0
app/strategies/impl/hot_event_strategy.py

@@ -0,0 +1,66 @@
+from typing import Any
+
+from app.core.config import settings
+from app.strategies.batch_date import today_yyyymmdd
+from app.strategies.base import (
+    BaseStrategy,
+    DemandCandidate,
+    GenerateContext,
+)
+from app.strategies.odps._utils import round_weight
+from app.strategies.sources.hot_content_sync_log import fetch_hot_content_sync_log_by_partition
+from app.strategies.staging_store import insert_staging_rows_skip_duplicates
+
+
+class HotEventStrategy(BaseStrategy):
+    """新热事件:从 hot_content_odps_sync_log 同步至 strategy_staging。"""
+
+    strategy_id = "hot_event"
+    name = "新热事件"
+    version = "1.0.0"
+
+    def validate_config(self, config: dict[str, Any]) -> bool:
+        if not settings.hot_content_mysql_configured:
+            return False
+        return isinstance(config, dict)
+
+    def generate(self, context: GenerateContext) -> list[DemandCandidate]:
+        partition_dt = today_yyyymmdd()
+        source_strategy = str(context.params.get("source_strategy") or self.name).strip()
+        rows = fetch_hot_content_sync_log_by_partition(
+            partition_dt=partition_dt,
+            strategy=source_strategy,
+        )
+
+        candidates: list[DemandCandidate] = []
+        for row in rows:
+            partition = str(row.get("partition_dt") or partition_dt).strip()
+            demand_name = str(row.get("demand_name") or "").strip()
+            if not partition or not demand_name:
+                continue
+
+            demand_type = row.get("demand_type")
+            parsed_type = str(demand_type).strip() if demand_type else None
+
+            candidates.append(
+                DemandCandidate(
+                    content=demand_name,
+                    demand_id=str(row["demand_id"]),
+                    demand_type=parsed_type,
+                    priority_score=round_weight(row.get("weight")),
+                    extra={"batch_date": partition},
+                )
+            )
+        return candidates
+
+    def write_staging(
+        self,
+        *,
+        context: GenerateContext,
+        candidates: list[DemandCandidate],
+    ) -> dict[str, Any]:
+        return insert_staging_rows_skip_duplicates(
+            strategy_config_id=self.strategy_id,
+            strategy_name=self.name,
+            candidates=candidates,
+        )

+ 122 - 0
app/strategies/impl/lunar_same_period_strategy.py

@@ -0,0 +1,122 @@
+from typing import Any
+
+from app.strategies.batch_date import today_yyyymmdd
+from app.strategies.base import (
+    BaseStrategy,
+    DemandCandidate,
+    GenerateContext,
+    StrategySkipDecision,
+)
+from app.strategies.odps.lunar_same_period_demands import query_lunar_same_period_demands
+from app.strategies.staging_store import count_staging_batch, has_staging_batch
+
+_NUMERIC_PARAM_KEYS = (
+    ("view_pv_count", ("view_pv_count",)),
+    ("min_contribution_score", ("min_contribution_score", "贡献分")),
+    ("rov_avg", ("rov_avg",)),
+)
+
+_OPTIONAL_PARAM_KEYS = (
+    ("period_days", ("period_days",), 7),
+)
+
+
+class LunarSamePeriodStrategy(BaseStrategy):
+    """去年同期阴历:阴历减一年后转阳历,取 7 天视频窗口聚合实质元素 ROV。"""
+
+    strategy_id = "lunar_same_period_last_year"
+    name = "去年同期阴历"
+    version = "1.0.0"
+
+    def validate_config(self, config: dict[str, Any]) -> bool:
+        try:
+            self._resolve_params(config)
+        except (TypeError, ValueError, KeyError):
+            return False
+        return True
+
+    def should_skip(self, context: GenerateContext) -> StrategySkipDecision:
+        if not has_staging_batch(
+            strategy_config_id=self.strategy_id,
+            batch_date=context.batch_date,
+        ):
+            return StrategySkipDecision(skip=False)
+
+        existing_count = count_staging_batch(
+            strategy_config_id=self.strategy_id,
+            batch_date=context.batch_date,
+        )
+        return StrategySkipDecision(
+            skip=True,
+            reason="strategy_staging already has data for batch_date",
+            detail={"existing_count": existing_count},
+        )
+
+    def generate(self, context: GenerateContext) -> list[DemandCandidate]:
+        params = self._resolve_params(context.params)
+        rows = query_lunar_same_period_demands(
+            bizdate=today_yyyymmdd(),
+            period_days=params["period_days"],
+            view_pv_count=params["view_pv_count"],
+            min_contribution_score=params["min_contribution_score"],
+            rov_avg=params["rov_avg"],
+        )
+
+        candidates: list[DemandCandidate] = []
+        for row in rows:
+            demand_name = str(row.get("demand_name") or "").strip()
+            if not demand_name:
+                continue
+
+            weight = row.get("weight")
+            priority_score = float(weight) if weight is not None else None
+            video_count = row.get("video_count")
+            parsed_video_count = int(video_count) if video_count is not None else None
+
+            candidates.append(
+                DemandCandidate(
+                    content=demand_name,
+                    priority_score=priority_score,
+                    demand_id=str(row["demand_id"]) if row.get("demand_id") else None,
+                    demand_type=str(row.get("type") or "特征点"),
+                    video_count=parsed_video_count,
+                    video_list=row.get("video_list"),
+                )
+            )
+        return candidates
+
+    @staticmethod
+    def _pick_param(config: dict[str, Any], keys: tuple[str, ...]) -> Any:
+        for key in keys:
+            if key in config:
+                return config[key]
+        raise KeyError(keys[0])
+
+    @classmethod
+    def _resolve_params(cls, config: dict[str, Any]) -> dict[str, int | float]:
+        resolved: dict[str, int | float] = {}
+        for canonical, aliases in _NUMERIC_PARAM_KEYS:
+            raw = cls._pick_param(config, aliases)
+            if canonical == "view_pv_count":
+                value = int(raw)
+                if value < 0:
+                    raise ValueError(f"{canonical} 不能为负")
+                resolved[canonical] = value
+            else:
+                value = float(raw)
+                if value < 0:
+                    raise ValueError(f"{canonical} 不能为负")
+                resolved[canonical] = value
+
+        for canonical, aliases, default in _OPTIONAL_PARAM_KEYS:
+            raw = default
+            for key in aliases:
+                if key in config:
+                    raw = config[key]
+                    break
+            value = int(raw)
+            if value < 1:
+                raise ValueError(f"{canonical} 须 >= 1")
+            resolved[canonical] = value
+
+        return resolved

+ 10 - 0
app/strategies/impl/monthly_segment_strategy.py

@@ -0,0 +1,10 @@
+from app.strategies.impl._monthly_base import MonthlyStrategyBase
+
+
+class MonthlySegmentStrategy(MonthlyStrategyBase):
+    """逐月-分词策略:与逐月相同 ODPS 逻辑,strategy 标签为「逐月-分词」。"""
+
+    strategy_id = "monthly_segment"
+    name = "逐月-分词"
+    strategy_label = "逐月-分词"
+    version = "1.0.0"

+ 10 - 0
app/strategies/impl/monthly_strategy.py

@@ -0,0 +1,10 @@
+from app.strategies.impl._monthly_base import MonthlyStrategyBase
+
+
+class MonthlyStrategy(MonthlyStrategyBase):
+    """逐月策略:基于 ODPS 聚合实质元素 ROV,产出特征点需求。"""
+
+    strategy_id = "monthly"
+    name = "逐月"
+    strategy_label = "逐月"
+    version = "1.0.0"

+ 122 - 0
app/strategies/impl/solar_same_period_strategy.py

@@ -0,0 +1,122 @@
+from typing import Any
+
+from app.strategies.batch_date import today_yyyymmdd
+from app.strategies.base import (
+    BaseStrategy,
+    DemandCandidate,
+    GenerateContext,
+    StrategySkipDecision,
+)
+from app.strategies.odps.solar_same_period_demands import query_solar_same_period_demands
+from app.strategies.staging_store import count_staging_batch, has_staging_batch
+
+_NUMERIC_PARAM_KEYS = (
+    ("view_pv_count", ("view_pv_count",)),
+    ("min_contribution_score", ("min_contribution_score", "贡献分")),
+    ("rov_avg", ("rov_avg",)),
+)
+
+_OPTIONAL_PARAM_KEYS = (
+    ("period_days", ("period_days",), 7),
+)
+
+
+class SolarSamePeriodStrategy(BaseStrategy):
+    """去年同期阳历:取去年同日起到 +6 天(共 7 天)视频窗口聚合实质元素 ROV。"""
+
+    strategy_id = "solar_same_period_last_year"
+    name = "去年同期阳历"
+    version = "1.0.0"
+
+    def validate_config(self, config: dict[str, Any]) -> bool:
+        try:
+            self._resolve_params(config)
+        except (TypeError, ValueError, KeyError):
+            return False
+        return True
+
+    def should_skip(self, context: GenerateContext) -> StrategySkipDecision:
+        if not has_staging_batch(
+            strategy_config_id=self.strategy_id,
+            batch_date=context.batch_date,
+        ):
+            return StrategySkipDecision(skip=False)
+
+        existing_count = count_staging_batch(
+            strategy_config_id=self.strategy_id,
+            batch_date=context.batch_date,
+        )
+        return StrategySkipDecision(
+            skip=True,
+            reason="strategy_staging already has data for batch_date",
+            detail={"existing_count": existing_count},
+        )
+
+    def generate(self, context: GenerateContext) -> list[DemandCandidate]:
+        params = self._resolve_params(context.params)
+        rows = query_solar_same_period_demands(
+            bizdate=today_yyyymmdd(),
+            period_days=params["period_days"],
+            view_pv_count=params["view_pv_count"],
+            min_contribution_score=params["min_contribution_score"],
+            rov_avg=params["rov_avg"],
+        )
+
+        candidates: list[DemandCandidate] = []
+        for row in rows:
+            demand_name = str(row.get("demand_name") or "").strip()
+            if not demand_name:
+                continue
+
+            weight = row.get("weight")
+            priority_score = float(weight) if weight is not None else None
+            video_count = row.get("video_count")
+            parsed_video_count = int(video_count) if video_count is not None else None
+
+            candidates.append(
+                DemandCandidate(
+                    content=demand_name,
+                    priority_score=priority_score,
+                    demand_id=str(row["demand_id"]) if row.get("demand_id") else None,
+                    demand_type=str(row.get("type") or "特征点"),
+                    video_count=parsed_video_count,
+                    video_list=row.get("video_list"),
+                )
+            )
+        return candidates
+
+    @staticmethod
+    def _pick_param(config: dict[str, Any], keys: tuple[str, ...]) -> Any:
+        for key in keys:
+            if key in config:
+                return config[key]
+        raise KeyError(keys[0])
+
+    @classmethod
+    def _resolve_params(cls, config: dict[str, Any]) -> dict[str, int | float]:
+        resolved: dict[str, int | float] = {}
+        for canonical, aliases in _NUMERIC_PARAM_KEYS:
+            raw = cls._pick_param(config, aliases)
+            if canonical == "view_pv_count":
+                value = int(raw)
+                if value < 0:
+                    raise ValueError(f"{canonical} 不能为负")
+                resolved[canonical] = value
+            else:
+                value = float(raw)
+                if value < 0:
+                    raise ValueError(f"{canonical} 不能为负")
+                resolved[canonical] = value
+
+        for canonical, aliases, default in _OPTIONAL_PARAM_KEYS:
+            raw = default
+            for key in aliases:
+                if key in config:
+                    raw = config[key]
+                    break
+            value = int(raw)
+            if value < 1:
+                raise ValueError(f"{canonical} 须 >= 1")
+            resolved[canonical] = value
+
+        return resolved

+ 14 - 0
app/strategies/impl/supply_gap_segment_strategy.py

@@ -0,0 +1,14 @@
+from typing import Any
+
+from app.strategies.impl._supply_gap_base import SupplyGapStrategyBase
+
+
+class SupplyGapSegmentStrategy(SupplyGapStrategyBase):
+    """当下供需gap-分词:demand_name = name。"""
+
+    strategy_id = "supply_gap_segment"
+    name = "当下供需gap-分词"
+    version = "1.0.0"
+
+    def build_demand_name(self, row: dict[str, Any]) -> str:
+        return str(row.get("name") or "").strip()

+ 18 - 0
app/strategies/impl/supply_gap_strategy.py

@@ -0,0 +1,18 @@
+from typing import Any
+
+from app.strategies.impl._supply_gap_base import SupplyGapStrategyBase
+
+
+class SupplyGapStrategy(SupplyGapStrategyBase):
+    """当下供需gap:demand_name = merge_leve2 + 空格 + name。"""
+
+    strategy_id = "supply_gap"
+    name = "当下供需gap"
+    version = "1.0.0"
+
+    def build_demand_name(self, row: dict[str, Any]) -> str:
+        merge_leve2 = str(row.get("merge_leve2") or "").strip()
+        name = str(row.get("name") or "").strip()
+        if merge_leve2 and name:
+            return f"{merge_leve2} {name}"
+        return merge_leve2 or name

+ 0 - 0
app/strategies/odps/__init__.py


+ 53 - 0
app/strategies/odps/_date_utils.py

@@ -0,0 +1,53 @@
+import re
+from datetime import date, datetime, timedelta
+from zoneinfo import ZoneInfo
+
+SHANGHAI_TZ = ZoneInfo("Asia/Shanghai")
+_DATE_PARTITION_RE = re.compile(r"^\d{8}$")
+
+
+def today_shanghai_date() -> date:
+    return datetime.now(SHANGHAI_TZ).date()
+
+
+def same_day_last_year(value: date) -> date:
+    try:
+        return value.replace(year=value.year - 1)
+    except ValueError:
+        return date(value.year - 1, 2, 28)
+
+
+def partition_yyyymmdd(value: date) -> str:
+    return value.strftime("%Y%m%d")
+
+
+def parse_partition_date(value: str) -> date:
+    text = value.strip()
+    if not _DATE_PARTITION_RE.match(text):
+        raise ValueError(f"日期须为 YYYYMMDD 格式,当前为 {value!r}")
+    return datetime.strptime(text, "%Y%m%d").date()
+
+
+def resolve_solar_same_period_window(
+    *,
+    bizdate: str,
+    period_days: int = 7,
+) -> tuple[str, str, str]:
+    """
+    返回 (bizdate, start_date, end_date)。
+
+    - bizdate:当天 YYYYMMDD
+    - start_date:bizdate 去年同期(闰年 2/29 → 去年 2/28)
+    - end_date:start_date + (period_days - 1) 天;period_days=7 → +6 天
+    """
+    if period_days < 1:
+        raise ValueError("period_days 须 >= 1")
+
+    biz_dt = parse_partition_date(bizdate)
+    start_dt = same_day_last_year(biz_dt)
+    end_dt = start_dt + timedelta(days=period_days - 1)
+    return (
+        partition_yyyymmdd(biz_dt),
+        partition_yyyymmdd(start_dt),
+        partition_yyyymmdd(end_dt),
+    )

+ 37 - 0
app/strategies/odps/_utils.py

@@ -0,0 +1,37 @@
+import json
+from decimal import Decimal, ROUND_HALF_UP
+
+
+def normalize_scalar(value: object) -> object:
+    if isinstance(value, Decimal):
+        return float(value)
+    return value
+
+
+def round_weight(value: object) -> float | None:
+    if value is None:
+        return None
+    decimal_value = Decimal(str(value)).quantize(
+        Decimal("0.0001"),
+        rounding=ROUND_HALF_UP,
+    )
+    return float(decimal_value)
+
+
+def parse_video_list(value: object) -> list[str] | None:
+    if value is None:
+        return None
+    if isinstance(value, list):
+        return [str(item) for item in value]
+    if isinstance(value, str):
+        text = value.strip()
+        if not text:
+            return None
+        try:
+            parsed = json.loads(text)
+        except json.JSONDecodeError:
+            return [text]
+        if isinstance(parsed, list):
+            return [str(item) for item in parsed]
+        return [text]
+    return [str(value)]

+ 137 - 0
app/strategies/odps/lunar_same_period_demands.py

@@ -0,0 +1,137 @@
+"""去年同期阴历策略 ODPS 查询。"""
+
+from app.odps.client import get_odps_client
+from app.strategies.odps._date_utils import partition_yyyymmdd, today_shanghai_date
+from app.strategies.odps._utils import normalize_scalar, parse_video_list
+
+
+def build_lunar_same_period_sql(
+    *,
+    bizdate: str,
+    period_days: int,
+    view_pv_count: int,
+    min_contribution_score: float,
+    rov_avg: float,
+) -> str:
+    if period_days < 1:
+        raise ValueError("period_days 须 >= 1")
+    days_after = period_days - 1
+
+    return f"""
+SET odps.sql.type.system.odps2 = true
+;
+
+WITH base_date AS (
+    SELECT
+        lunar_to_solar(
+            lunar_add(
+                loghubods.solar_to_lunar('{bizdate}', 'yyyyMMdd'),
+                -1,
+                'Y'
+            )
+        ) AS start_date
+),
+date_params AS (
+    SELECT
+        start_date,
+        DATE_FORMAT(
+            CAST(DATE_ADD(TO_DATE(start_date, 'yyyyMMdd'), {int(days_after)}) AS TIMESTAMP),
+            'yyyyMMdd'
+        ) AS end_date
+    FROM base_date
+),
+cleaned_video_metrics AS (
+    SELECT
+        CAST(d.视频id AS STRING) AS vid,
+        d.rov_t0
+    FROM loghubods.video_dimension_detail_add_column d
+    WHERE d.dt >= (SELECT start_date FROM date_params)
+      AND d.dt <= (SELECT end_date FROM date_params)
+      AND COALESCE(d.`当日分发曝光pv`, 0) >= {int(view_pv_count)}
+),
+video_avg_metrics AS (
+    SELECT
+        vid,
+        AVG(CASE WHEN rov_t0 = 0 THEN NULL ELSE rov_t0 END) AS vid_avg_rov
+    FROM cleaned_video_metrics
+    GROUP BY vid
+),
+tag_vid_dedup AS (
+    SELECT DISTINCT
+        CAST(vid AS STRING) AS vid,
+        原始元素,
+        原始元素描述,
+        点类型,
+        元素维度,
+        短语,
+        选题,
+        `extend`
+    FROM loghubods.dwd_topic_decode_result_detail_di
+    WHERE dt = MAX_PT('loghubods.dwd_topic_decode_result_detail_di')
+      AND TRIM(原始元素) <> ''
+      AND 原始元素 IS NOT NULL
+      AND 元素维度 = '实质'
+      AND 贡献分 >= {float(min_contribution_score)}
+)
+SELECT
+    '去年同期阴历' AS strategy,
+    md5(CONCAT('去年同期阴历', t1.原始元素, '{bizdate}')) AS demand_id,
+    t1.原始元素 AS demand_name,
+    COALESCE(ROUND(AVG(t2.vid_avg_rov), 6), 0) AS weight,
+    '特征点' AS type,
+    COUNT(DISTINCT t1.vid) AS video_count,
+    COLLECT_SET(t1.vid) AS video_list,
+    '{{}}' AS extend
+FROM tag_vid_dedup t1
+LEFT JOIN video_avg_metrics t2
+    ON t1.vid = t2.vid
+GROUP BY t1.原始元素
+HAVING COALESCE(ROUND(AVG(t2.vid_avg_rov), 6), 0) >= {float(rov_avg)}
+ORDER BY weight DESC
+"""
+
+
+def query_lunar_same_period_demands(
+    *,
+    bizdate: str | None = None,
+    period_days: int = 7,
+    view_pv_count: int,
+    min_contribution_score: float,
+    rov_avg: float,
+) -> list[dict[str, object]]:
+    if bizdate is None:
+        bizdate = partition_yyyymmdd(today_shanghai_date())
+
+    sql = build_lunar_same_period_sql(
+        bizdate=bizdate,
+        period_days=period_days,
+        view_pv_count=view_pv_count,
+        min_contribution_score=min_contribution_score,
+        rov_avg=rov_avg,
+    )
+
+    odps_client = get_odps_client()
+    instance = odps_client.execute_sql(
+        sql,
+        hints={
+            "odps.sql.submit.mode": "script",
+            "odps.sql.decimal.odps2": "true",
+        },
+    )
+
+    rows: list[dict[str, object]] = []
+    with instance.open_reader(tunnel=True) as reader:
+        for record in reader:
+            rows.append(
+                {
+                    "strategy": record["strategy"],
+                    "demand_id": record["demand_id"],
+                    "demand_name": record["demand_name"],
+                    "weight": normalize_scalar(record["weight"]),
+                    "type": record["type"],
+                    "video_count": record["video_count"],
+                    "video_list": parse_video_list(record["video_list"]),
+                    "extend": record["extend"],
+                }
+            )
+    return rows

+ 214 - 0
app/strategies/odps/monthly_demands.py

@@ -0,0 +1,214 @@
+"""逐月策略 ODPS 查询。"""
+
+import re
+from datetime import datetime
+from zoneinfo import ZoneInfo
+
+from app.odps.client import get_odps_client
+from app.strategies.odps._utils import normalize_scalar, parse_video_list
+
+SHANGHAI_TZ = ZoneInfo("Asia/Shanghai")
+
+_DATE_PARTITION_RE = re.compile(r"^\d{8}$")
+
+_EXCLUDED_ELEMENTS = (
+    "元旦", "腊八节", "小年", "除夕", "春节", "正月初一", "正月初二", "正月初三",
+    "正月初四", "正月初五", "情人节", "元宵节", "龙抬头", "妇女节", "植树节", "劳动节",
+    "母亲节", "儿童节", "端午节", "父亲节", "建党节", "建军节", "七夕节", "中元节",
+    "中秋节", "国庆节", "重阳节", "感恩节", "公祭日", "平安夜", "圣诞节", "小寒",
+    "大寒", "立春", "雨水", "惊蛰", "春分", "清明", "谷雨", "立夏", "小满", "芒种",
+    "夏至", "小暑", "大暑", "立秋", "处暑", "白露", "秋分", "寒露", "霜降", "立冬",
+    "小雪", "大雪", "冬至", "早上好", "中午好", "下午好", "晚上好", "晚安", "祝福",
+    "祝愿", "祝你", "祝贺", "祝大家", "祝您", "祝好运", "祝群主", "祝朋友",
+)
+
+
+def _validate_bizdate(bizdate: str) -> str:
+    value = bizdate.strip()
+    if not _DATE_PARTITION_RE.match(value):
+        raise ValueError(f"bizdate 须为 YYYYMMDD 格式,当前为 {value!r}")
+    return value
+
+
+def _sql_string_list(values: tuple[str, ...]) -> str:
+    return ", ".join(f"'{item}'" for item in values)
+
+
+def build_monthly_demands_sql(
+    *,
+    bizdate: str,
+    strategy_label: str,
+    view_pv_count: int,
+    month_total_pv_threshold: float,
+    min_contribution_score: float,
+    rov_avg: float,
+    min_frequency: int,
+) -> str:
+    bizdate_value = _validate_bizdate(bizdate)
+    label = strategy_label.strip()
+    if not label:
+        raise ValueError("strategy_label cannot be empty")
+    excluded_sql = _sql_string_list(_EXCLUDED_ELEMENTS)
+
+    return f"""
+WITH biz_day AS (
+    SELECT TO_DATE(
+        CONCAT(
+            SUBSTR('{bizdate_value}', 1, 4), '-',
+            SUBSTR('{bizdate_value}', 5, 2), '-',
+            SUBSTR('{bizdate_value}', 7, 2)
+        )
+    ) AS biz_dt
+),
+yesterday AS (
+    SELECT DATE_SUB((SELECT biz_dt FROM biz_day), 1) AS yest
+),
+window_bounds AS (
+    SELECT
+        CAST((SELECT yest FROM yesterday) AS DATETIME) AS end_dt,
+        CAST(DATE_SUB((SELECT yest FROM yesterday), 359) AS DATETIME) AS start_dt
+),
+cleaned_video_metrics AS (
+    SELECT
+        CAST(视频id AS STRING) AS vid,
+        CAST(FLOOR(DATEDIFF(
+            (SELECT yest FROM yesterday),
+            TO_DATE(REGEXP_REPLACE(CAST(dt AS STRING), '-', ''), 'yyyyMMdd')
+        ) / 30) AS STRING) AS ym,
+        rov_t0,
+        COALESCE(`当日分发曝光pv`, 0) AS day_dist_pv
+    FROM loghubods.video_dimension_detail_add_column
+    WHERE TO_DATE(REGEXP_REPLACE(CAST(dt AS STRING), '-', ''), 'yyyyMMdd')
+        BETWEEN (SELECT start_dt FROM window_bounds) AND (SELECT end_dt FROM window_bounds)
+      AND COALESCE(`当日分发曝光pv`, 0) >= {int(view_pv_count)}
+),
+video_monthly_avg_metrics AS (
+    SELECT
+        ym,
+        vid,
+        AVG(CASE WHEN rov_t0 = 0 THEN NULL ELSE rov_t0 END) AS vid_avg_rov,
+        SUM(day_dist_pv) AS month_total_pv
+    FROM cleaned_video_metrics
+    GROUP BY ym, vid
+    HAVING SUM(day_dist_pv) > {float(month_total_pv_threshold)}
+),
+tag_vid_dedup AS (
+    SELECT DISTINCT
+        CAST(vid AS STRING) AS vid,
+        原始元素
+    FROM loghubods.dwd_topic_decode_result_detail_di
+    WHERE dt = MAX_PT('loghubods.dwd_topic_decode_result_detail_di')
+      AND 元素维度 = '实质'
+      AND 贡献分 >= {float(min_contribution_score)}
+),
+element_monthly_metrics AS (
+    SELECT
+        t1.原始元素,
+        t2.ym,
+        COALESCE(ROUND(AVG(t2.vid_avg_rov), 6), 0) AS month_avg_rov
+    FROM tag_vid_dedup t1
+    JOIN video_monthly_avg_metrics t2
+      ON t1.vid = t2.vid
+    GROUP BY t1.原始元素, t2.ym
+    HAVING COALESCE(ROUND(AVG(t2.vid_avg_rov), 6), 0) >= {float(rov_avg)}
+),
+element_total_rov AS (
+    SELECT
+        原始元素,
+        ROUND(SUM(month_avg_rov), 6) AS avg_rov
+    FROM element_monthly_metrics
+    GROUP BY 原始元素
+),
+element_vid_dedup AS (
+    SELECT DISTINCT
+        em.原始元素,
+        vm.vid
+    FROM element_monthly_metrics em
+    JOIN tag_vid_dedup tv
+      ON em.原始元素 = tv.原始元素
+    JOIN video_monthly_avg_metrics vm
+      ON tv.vid = vm.vid
+     AND em.ym = vm.ym
+),
+element_vid_stats AS (
+    SELECT
+        原始元素,
+        COUNT(DISTINCT vid) AS vid_count,
+        COLLECT_SET(vid) AS vid_list
+    FROM element_vid_dedup
+    GROUP BY 原始元素
+),
+element_freq AS (
+    SELECT
+        原始元素,
+        COUNT(1) AS 频次
+    FROM element_monthly_metrics
+    GROUP BY 原始元素
+)
+SELECT
+    '{label}' AS strategy,
+    md5(CONCAT('{label}', r.原始元素, '{bizdate_value}')) AS demand_id,
+    r.原始元素 AS demand_name,
+    r.avg_rov AS weight,
+    '特征点' AS type,
+    COALESCE(v.vid_count, 0) AS video_count,
+    v.vid_list AS video_list,
+    '{{}}' AS extend
+FROM element_total_rov r
+LEFT JOIN element_vid_stats v
+  ON r.原始元素 = v.原始元素
+LEFT JOIN element_freq f
+  ON r.原始元素 = f.原始元素
+WHERE r.原始元素 NOT IN ({excluded_sql})
+  AND COALESCE(f.频次, 0) >= {int(min_frequency)}
+ORDER BY weight DESC
+"""
+
+
+def query_monthly_demands(
+    *,
+    bizdate: str | None = None,
+    strategy_label: str = "逐月",
+    view_pv_count: int,
+    month_total_pv_threshold: float,
+    min_contribution_score: float,
+    rov_avg: float,
+    min_frequency: int,
+) -> list[dict[str, object]]:
+    if bizdate is None:
+        bizdate = datetime.now(SHANGHAI_TZ).strftime("%Y%m%d")
+    sql = build_monthly_demands_sql(
+        bizdate=bizdate,
+        strategy_label=strategy_label,
+        view_pv_count=view_pv_count,
+        month_total_pv_threshold=month_total_pv_threshold,
+        min_contribution_score=min_contribution_score,
+        rov_avg=rov_avg,
+        min_frequency=min_frequency,
+    )
+
+    odps_client = get_odps_client()
+    instance = odps_client.execute_sql(
+        sql,
+        hints={
+            "odps.sql.submit.mode": "script",
+            "odps.sql.decimal.odps2": "true",
+        },
+    )
+
+    rows: list[dict[str, object]] = []
+    with instance.open_reader(tunnel=True) as reader:
+        for record in reader:
+            rows.append(
+                {
+                    "strategy": record["strategy"],
+                    "demand_id": record["demand_id"],
+                    "demand_name": record["demand_name"],
+                    "weight": normalize_scalar(record["weight"]),
+                    "type": record["type"],
+                    "video_count": record["video_count"],
+                    "video_list": parse_video_list(record["video_list"]),
+                    "extend": record["extend"],
+                }
+            )
+    return rows

+ 117 - 0
app/strategies/odps/solar_same_period_demands.py

@@ -0,0 +1,117 @@
+"""去年同期阳历策略 ODPS 查询。"""
+
+from app.odps.client import get_odps_client
+from app.strategies.odps._date_utils import resolve_solar_same_period_window, today_shanghai_date, partition_yyyymmdd
+from app.strategies.odps._utils import normalize_scalar, parse_video_list
+
+
+def build_solar_same_period_sql(
+    *,
+    bizdate: str,
+    start_date: str,
+    end_date: str,
+    view_pv_count: int,
+    min_contribution_score: float,
+    rov_avg: float,
+) -> str:
+    return f"""
+WITH cleaned_video_metrics AS (
+    SELECT
+        CAST(视频id AS STRING) AS vid,
+        rov_t0
+    FROM loghubods.video_dimension_detail_add_column
+    WHERE dt >= '{start_date}'
+      AND dt <= '{end_date}'
+      AND COALESCE(`当日分发曝光pv`, 0) >= {int(view_pv_count)}
+),
+video_avg_metrics AS (
+    SELECT
+        vid,
+        AVG(CASE WHEN rov_t0 = 0 THEN NULL ELSE rov_t0 END) AS vid_avg_rov
+    FROM cleaned_video_metrics
+    GROUP BY vid
+),
+tag_vid_dedup AS (
+    SELECT DISTINCT
+        CAST(vid AS STRING) AS vid,
+        原始元素,
+        原始元素描述,
+        点类型,
+        元素维度,
+        短语,
+        选题,
+        `extend`
+    FROM loghubods.dwd_topic_decode_result_detail_di
+    WHERE dt = MAX_PT('loghubods.dwd_topic_decode_result_detail_di')
+      AND TRIM(原始元素) <> ''
+      AND 原始元素 IS NOT NULL
+      AND 元素维度 = '实质'
+      AND 贡献分 >= {float(min_contribution_score)}
+)
+SELECT
+    '去年同期阳历' AS strategy,
+    md5(CONCAT('去年同期阳历', t1.原始元素, '{bizdate}')) AS demand_id,
+    t1.原始元素 AS demand_name,
+    COALESCE(ROUND(AVG(t2.vid_avg_rov), 6), 0) AS weight,
+    '特征点' AS type,
+    COUNT(DISTINCT t1.vid) AS video_count,
+    COLLECT_SET(t1.vid) AS video_list,
+    '{{}}' AS extend
+FROM tag_vid_dedup t1
+LEFT JOIN video_avg_metrics t2
+    ON t1.vid = t2.vid
+GROUP BY t1.原始元素
+HAVING COALESCE(ROUND(AVG(t2.vid_avg_rov), 6), 0) >= {float(rov_avg)}
+ORDER BY weight DESC
+"""
+
+
+def query_solar_same_period_demands(
+    *,
+    bizdate: str | None = None,
+    period_days: int = 7,
+    view_pv_count: int,
+    min_contribution_score: float,
+    rov_avg: float,
+) -> list[dict[str, object]]:
+    if bizdate is None:
+        bizdate = partition_yyyymmdd(today_shanghai_date())
+
+    bizdate_value, start_date, end_date = resolve_solar_same_period_window(
+        bizdate=bizdate,
+        period_days=period_days,
+    )
+    sql = build_solar_same_period_sql(
+        bizdate=bizdate_value,
+        start_date=start_date,
+        end_date=end_date,
+        view_pv_count=view_pv_count,
+        min_contribution_score=min_contribution_score,
+        rov_avg=rov_avg,
+    )
+
+    odps_client = get_odps_client()
+    instance = odps_client.execute_sql(
+        sql,
+        hints={
+            "odps.sql.submit.mode": "script",
+            "odps.sql.decimal.odps2": "true",
+        },
+    )
+
+    rows: list[dict[str, object]] = []
+    with instance.open_reader(tunnel=True) as reader:
+        for record in reader:
+            rows.append(
+                {
+                    "strategy": record["strategy"],
+                    "demand_id": record["demand_id"],
+                    "demand_name": record["demand_name"],
+                    "weight": normalize_scalar(record["weight"]),
+                    "type": record["type"],
+                    "video_count": record["video_count"],
+                    "video_list": parse_video_list(record["video_list"]),
+                    "extend": record["extend"],
+                }
+            )
+    return rows

+ 100 - 0
app/strategies/registry.py

@@ -0,0 +1,100 @@
+import threading
+from dataclasses import dataclass
+from typing import Any
+
+from app.strategies.base import BaseStrategy
+from app.strategies.config_store import StrategyConfigRecord, fetch_all_configs, fetch_config_by_id
+
+
+@dataclass(frozen=True)
+class ActiveStrategy:
+    strategy: BaseStrategy
+    config: StrategyConfigRecord
+
+
+class StrategyRegistry:
+    """策略注册表:代码实现与 DB 配置解耦,支持热更新。"""
+
+    _registry: dict[str, BaseStrategy] = {}
+    _config_cache: dict[str, StrategyConfigRecord] = {}
+    _lock = threading.RLock()
+
+    @classmethod
+    def register(cls, strategy: BaseStrategy) -> None:
+        strategy_id = strategy.strategy_id.strip()
+        if not strategy_id:
+            raise ValueError("strategy_id is required")
+
+        with cls._lock:
+            if strategy_id in cls._registry:
+                raise ValueError(f"strategy already registered: {strategy_id}")
+
+            cached = cls._config_cache.get(strategy_id)
+            if cached is not None and not strategy.validate_config(cached.params):
+                raise ValueError(f"invalid config for strategy: {strategy_id}")
+
+            cls._registry[strategy_id] = strategy
+
+    @classmethod
+    def get(cls, strategy_id: str) -> BaseStrategy | None:
+        with cls._lock:
+            return cls._registry.get(strategy_id)
+
+    @classmethod
+    def load_all_configs(cls) -> int:
+        records = fetch_all_configs()
+        with cls._lock:
+            cls._config_cache = {record.strategy_id: record for record in records}
+        return len(records)
+
+    @classmethod
+    def get_active(cls) -> list[ActiveStrategy]:
+        with cls._lock:
+            active: list[ActiveStrategy] = []
+            for strategy_id, strategy in cls._registry.items():
+                config = cls._config_cache.get(strategy_id)
+                if config is None or not config.active:
+                    continue
+                active.append(ActiveStrategy(strategy=strategy, config=config))
+            return active
+
+    @classmethod
+    def get_unregistered_active_configs(cls) -> list[StrategyConfigRecord]:
+        """DB 中 active=true 但未注册代码实现的策略。"""
+        with cls._lock:
+            return [
+                config
+                for strategy_id, config in cls._config_cache.items()
+                if config.active and strategy_id not in cls._registry
+            ]
+
+    @classmethod
+    def reload_config(cls, strategy_id: str) -> StrategyConfigRecord:
+        record = fetch_config_by_id(strategy_id)
+        if record is None:
+            raise KeyError(f"strategy config not found: {strategy_id}")
+
+        with cls._lock:
+            strategy = cls._registry.get(strategy_id)
+            if strategy is not None and not strategy.validate_config(record.params):
+                raise ValueError(f"invalid config for strategy: {strategy_id}")
+            cls._config_cache[strategy_id] = record
+            return record
+
+    @classmethod
+    def get_config(cls, strategy_id: str) -> StrategyConfigRecord | None:
+        with cls._lock:
+            return cls._config_cache.get(strategy_id)
+
+    @classmethod
+    def registered_strategy_ids(cls) -> list[str]:
+        with cls._lock:
+            return sorted(cls._registry.keys())
+
+    @classmethod
+    def validate_config_for_strategy(cls, strategy_id: str, config: dict[str, Any]) -> bool:
+        with cls._lock:
+            strategy = cls._registry.get(strategy_id)
+        if strategy is None:
+            raise KeyError(f"strategy not registered: {strategy_id}")
+        return strategy.validate_config(config)

+ 0 - 0
app/strategies/sources/__init__.py


+ 60 - 0
app/strategies/sources/hot_content_sync_log.py

@@ -0,0 +1,60 @@
+from typing import Any
+
+from sqlalchemy import text
+
+from app.core.config import settings
+from app.db.hot_content_mysql import HotContentSessionLocal
+
+
+def _hot_strategy_name(strategy: str | None = None) -> str:
+    value = (strategy or settings.hot_demand_pool_strategy or "新热事件").strip()
+    return value or "新热事件"
+
+
+def fetch_hot_content_sync_log_by_partition(
+    *,
+    partition_dt: str,
+    strategy: str | None = None,
+) -> list[dict[str, Any]]:
+    strategy_value = _hot_strategy_name(strategy)
+    query = text(
+        """
+        SELECT
+            partition_dt,
+            strategy,
+            demand_id,
+            demand_name,
+            demand_type,
+            weight
+        FROM hot_content_odps_sync_log
+        WHERE partition_dt = :partition_dt
+          AND strategy = :strategy
+        ORDER BY id ASC
+        """
+    )
+    with HotContentSessionLocal() as session:
+        rows = session.execute(
+            query,
+            {
+                "partition_dt": partition_dt,
+                "strategy": strategy_value,
+            },
+        ).mappings().all()
+
+    results: list[dict[str, Any]] = []
+    for row in rows:
+        demand_id = str(row.get("demand_id") or "").strip()
+        demand_name = str(row.get("demand_name") or "").strip()
+        if not demand_id or not demand_name:
+            continue
+        results.append(
+            {
+                "partition_dt": str(row.get("partition_dt") or partition_dt).strip(),
+                "strategy": str(row.get("strategy") or strategy_value).strip(),
+                "demand_id": demand_id,
+                "demand_name": demand_name,
+                "demand_type": str(row.get("demand_type") or "").strip() or None,
+                "weight": row.get("weight"),
+            }
+        )
+    return results

+ 63 - 0
app/strategies/sources/supply_demand_content.py

@@ -0,0 +1,63 @@
+import json
+from typing import Any
+
+from sqlalchemy import text
+
+from app.core.config import settings
+from app.db.supply_mysql import SupplySessionLocal
+from app.strategies.config_store import _safe_identifier
+
+
+def _parse_ext_data(raw: object) -> dict[str, Any]:
+    if raw is None:
+        return {}
+    if isinstance(raw, dict):
+        return raw
+    if isinstance(raw, (bytes, bytearray)):
+        raw = raw.decode("utf-8")
+    if isinstance(raw, str):
+        text_value = raw.strip()
+        if not text_value:
+            return {}
+        return json.loads(text_value)
+    return {}
+
+
+def fetch_demand_content_by_dt(dt: str) -> list[dict[str, Any]]:
+    table = _safe_identifier(settings.supply_demand_content_table)
+    query = text(
+        f"""
+        SELECT
+            merge_leve2,
+            name,
+            reason,
+            suggestion,
+            ext_data,
+            score,
+            dt
+        FROM {table}
+        WHERE dt = :dt
+          AND name IS NOT NULL
+          AND TRIM(name) <> ''
+        ORDER BY score DESC, name ASC
+        """
+    )
+    with SupplySessionLocal() as session:
+        rows = session.execute(query, {"dt": dt}).mappings().all()
+
+    results: list[dict[str, Any]] = []
+    for row in rows:
+        ext_data = _parse_ext_data(row["ext_data"])
+        results.append(
+            {
+                "merge_leve2": row.get("merge_leve2"),
+                "name": str(row["name"]).strip(),
+                "reason": row.get("reason"),
+                "suggestion": row.get("suggestion"),
+                "ext_data": ext_data,
+                "demand_type": ext_data.get("type"),
+                "score": row.get("score"),
+                "dt": str(row.get("dt") or dt).strip(),
+            }
+        )
+    return results

+ 260 - 0
app/strategies/staging_store.py

@@ -0,0 +1,260 @@
+import hashlib
+import json
+from typing import Any
+
+from sqlalchemy import text
+
+from app.core.config import settings
+from app.db.mysql import SessionLocal
+from app.strategies.base import DemandCandidate
+from app.strategies.config_store import _safe_identifier
+
+BATCH_SIZE = 500
+
+
+def build_demand_id(*, strategy: str, demand_name: str, batch_date: str) -> str:
+    raw = f"{strategy}{demand_name.strip()}{batch_date}"
+    return hashlib.md5(raw.encode("utf-8")).hexdigest()
+
+
+def _serialize_json(value: object) -> str | None:
+    if value is None:
+        return None
+    if isinstance(value, (dict, list)):
+        return json.dumps(value, ensure_ascii=False)
+    return str(value)
+
+
+def has_staging_batch(*, strategy_config_id: str, batch_date: str) -> bool:
+    staging_table = _safe_identifier(settings.strategy_staging_table)
+    query = text(
+        f"""
+        SELECT 1
+        FROM {staging_table}
+        WHERE strategy_config_id = :strategy_config_id
+          AND batch_date = :batch_date
+        LIMIT 1
+        """
+    )
+    with SessionLocal() as session:
+        row = session.execute(
+            query,
+            {
+                "strategy_config_id": strategy_config_id,
+                "batch_date": batch_date,
+            },
+        ).first()
+    return row is not None
+
+
+def count_staging_batch(*, strategy_config_id: str, batch_date: str) -> int:
+    staging_table = _safe_identifier(settings.strategy_staging_table)
+    query = text(
+        f"""
+        SELECT COUNT(1) AS cnt
+        FROM {staging_table}
+        WHERE strategy_config_id = :strategy_config_id
+          AND batch_date = :batch_date
+        """
+    )
+    with SessionLocal() as session:
+        row = session.execute(
+            query,
+            {
+                "strategy_config_id": strategy_config_id,
+                "batch_date": batch_date,
+            },
+        ).mappings().first()
+    if row is None:
+        return 0
+    return int(row["cnt"])
+
+
+def insert_staging_rows_skip_duplicates(
+    *,
+    strategy_config_id: str,
+    strategy_name: str,
+    candidates: list[DemandCandidate],
+) -> dict[str, int]:
+    """逐行插入;demand_id 已存在则跳过(不覆盖)。"""
+    staging_table = _safe_identifier(settings.strategy_staging_table)
+    exists_sql = text(
+        f"""
+        SELECT 1
+        FROM {staging_table}
+        WHERE demand_id = :demand_id
+        LIMIT 1
+        """
+    )
+    insert_sql = text(
+        f"""
+        INSERT INTO {staging_table}
+        (
+            strategy_config_id,
+            strategy,
+            demand_id,
+            demand_name,
+            weight,
+            `type`,
+            video_count,
+            video_list,
+            extend,
+            batch_date
+        )
+        VALUES
+        (
+            :strategy_config_id,
+            :strategy,
+            :demand_id,
+            :demand_name,
+            :weight,
+            :demand_type,
+            :video_count,
+            :video_list,
+            :extend,
+            :batch_date
+        )
+        """
+    )
+
+    written = 0
+    skipped_duplicates = 0
+
+    with SessionLocal() as session:
+        for candidate in candidates:
+            content = candidate.content.strip()
+            if not content:
+                continue
+
+            batch_date = str(candidate.extra.get("batch_date") or "").strip()
+            if not batch_date:
+                continue
+
+            demand_id = candidate.demand_id or build_demand_id(
+                strategy=strategy_name,
+                demand_name=content,
+                batch_date=batch_date,
+            )
+
+            exists = session.execute(exists_sql, {"demand_id": demand_id}).first()
+            if exists is not None:
+                skipped_duplicates += 1
+                continue
+
+            extend = dict(candidate.extra)
+            extend.pop("batch_date", None)
+            if candidate.confidence is not None:
+                extend.setdefault("confidence", candidate.confidence)
+
+            session.execute(
+                insert_sql,
+                {
+                    "strategy_config_id": strategy_config_id,
+                    "strategy": strategy_name,
+                    "demand_id": demand_id,
+                    "demand_name": content,
+                    "weight": candidate.priority_score,
+                    "demand_type": candidate.demand_type,
+                    "video_count": candidate.video_count,
+                    "video_list": _serialize_json(candidate.video_list),
+                    "extend": _serialize_json(extend) if extend else None,
+                    "batch_date": batch_date,
+                },
+            )
+            written += 1
+
+        session.commit()
+
+    return {"written": written, "skipped_duplicates": skipped_duplicates}
+
+
+def replace_staging_batch(
+    *,
+    strategy_config_id: str,
+    strategy_name: str,
+    batch_date: str,
+    candidates: list[DemandCandidate],
+) -> int:
+    staging_table = _safe_identifier(settings.strategy_staging_table)
+
+    delete_sql = text(
+        f"""
+        DELETE FROM {staging_table}
+        WHERE strategy_config_id = :strategy_config_id
+          AND batch_date = :batch_date
+        """
+    )
+    insert_sql = text(
+        f"""
+        INSERT INTO {staging_table}
+        (
+            strategy_config_id,
+            strategy,
+            demand_id,
+            demand_name,
+            weight,
+            `type`,
+            video_count,
+            video_list,
+            extend,
+            batch_date
+        )
+        VALUES
+        (
+            :strategy_config_id,
+            :strategy,
+            :demand_id,
+            :demand_name,
+            :weight,
+            :demand_type,
+            :video_count,
+            :video_list,
+            :extend,
+            :batch_date
+        )
+        """
+    )
+
+    rows: list[dict[str, Any]] = []
+    for candidate in candidates:
+        content = candidate.content.strip()
+        if not content:
+            continue
+
+        extra = dict(candidate.extra)
+        if candidate.confidence is not None:
+            extra.setdefault("confidence", candidate.confidence)
+
+        rows.append(
+            {
+                "strategy_config_id": strategy_config_id,
+                "strategy": strategy_name,
+                "demand_id": candidate.demand_id
+                or build_demand_id(
+                    strategy=strategy_name,
+                    demand_name=content,
+                    batch_date=batch_date,
+                ),
+                "demand_name": content,
+                "weight": candidate.priority_score,
+                "demand_type": candidate.demand_type,
+                "video_count": candidate.video_count,
+                "video_list": _serialize_json(candidate.video_list),
+                "extend": _serialize_json(extra or None),
+                "batch_date": batch_date,
+            }
+        )
+
+    with SessionLocal() as session:
+        session.execute(
+            delete_sql,
+            {
+                "strategy_config_id": strategy_config_id,
+                "batch_date": batch_date,
+            },
+        )
+        for start in range(0, len(rows), BATCH_SIZE):
+            session.execute(insert_sql, rows[start : start + BATCH_SIZE])
+        session.commit()
+
+    return len(rows)