Kaynağa Gözat

增加需求去重和写入

xueyiming 5 gün önce
ebeveyn
işleme
b53929d719

+ 5 - 0
.env.example

@@ -61,3 +61,8 @@ VERTICAL_CATEGORY_EFFECT_TABLE=vertical_category_effect_di
 VERTICAL_CATEGORY_DAILY_SYNC_ENABLED=true
 VERTICAL_CATEGORY_DAILY_SYNC_HOUR=9
 VERTICAL_CATEGORY_DAILY_SYNC_MINUTE=30
+EXPERIMENT_DEMAND_POOL_WRITE_ENABLED=true
+EXPERIMENT_DEMAND_POOL_TARGET_TABLE=dwd_multi_demand_pool_di_tmp
+EXPERIMENT_DEMAND_POOL_WRITE_START_HOUR=3
+EXPERIMENT_DEMAND_POOL_WRITE_END_HOUR=23
+EXPERIMENT_DEMAND_POOL_WRITE_MINUTE=30

+ 27 - 0
README.md

@@ -69,6 +69,33 @@ npm run dev
 - 增量任务:`run_today_incremental_sync()`,每小时同步当天分区
 - 去重策略:ODPS 分区内先按 `demand_id` 去重,写入 MySQL 时使用唯一键 UPSERT
 
+## 实验需求池写入(strategy_staging → ODPS)
+
+- 逻辑文件:`app/sync/experiment_demand_pool_write.py`
+- 目标表:`loghubods.dwd_multi_demand_pool_di_tmp`(分区 `dt=yyyymmdd`)
+- 写入方式:PyODPS Tunnel **追加写入**(`open_writer`),语义等价于 `INSERT INTO ... PARTITION(dt=...)` 追加行,非 SQL 执行
+- 定时任务:`experiment_demand_pool_hourly_write_job`,**北京时间** 03:30 首次、之后每小时 `:30`、23:30 末次;写入当天 `dt` 分区
+- 手动触发:
+  - API:`POST /demand/api/v1/experiment/demand-pool/write?partition_dt=yyyymmdd`
+  - 脚本:`python scripts/run_experiment_demand_pool_write.py --partition-dt 20260611`
+
+### 策略配置实验字段(`strategy_config` 表独立列)
+
+| 字段 | 说明 |
+|------|------|
+| `daily_write_limit` | 每个策略每天最多写入条数;`0` 表示不限制,直到 staging 全部写完 |
+| `priority` | 跨策略去重优先级,**数值越小越优先**;**同优先级**策略之间 `demand_name` 不去重 |
+
+`params` JSON 仅保留策略运行阈值等业务参数。
+
+### 写入规则
+
+1. 仅处理 `active=true` 的策略
+2. 从 `strategy_staging` 按 `weight DESC` 选取未写入行
+3. `demand_id` 已在目标表当天分区存在则跳过
+4. 不同 priority 之间按 `demand_name` 去重:**先写入者优先**,后到的其他 priority 一律跳过(即使 priority 数值更小)
+5. 同 priority 之间不去重,可写入同名不同 `demand_id` 的需求
+
 ## 前端说明
 
 - 前端目录:`frontend`

+ 95 - 1
app/api/routes.py

@@ -2,7 +2,7 @@ from datetime import datetime
 from io import BytesIO
 from zoneinfo import ZoneInfo
 
-from fastapi import APIRouter, HTTPException, Query
+from fastapi import APIRouter, Body, HTTPException, Query
 from fastapi.responses import StreamingResponse
 
 from app.core.config import settings
@@ -23,7 +23,15 @@ from app.services.element_search_service import (
     query_same_period_last_year_lunar_element_demands,
     query_video_decode_url2_for_today,
 )
+from app.services.strategy_config_service import (
+    create_strategy_config,
+    query_available_strategies,
+    query_strategy_configs,
+    set_strategy_config_active,
+    update_strategy_config,
+)
 from app.services.vertical_category_tree_service import query_vertical_category_tree
+from app.sync.experiment_demand_pool_write import run_experiment_hourly_write
 from app.utils.excel_export import build_content_disposition, rows_to_excel_bytes
 
 router = APIRouter()
@@ -554,3 +562,89 @@ async def get_demand_pool_strategies(
         min_weight=min_weight,
         max_weight=max_weight,
     )
+
+
+@router.get("/strategy-configs")
+async def get_strategy_configs() -> dict[str, object]:
+    return query_strategy_configs()
+
+
+@router.get("/strategy-configs/available")
+async def get_available_strategy_configs() -> dict[str, object]:
+    return query_available_strategies()
+
+
+@router.post("/strategy-configs")
+async def post_strategy_config(
+    body: dict[str, object] = Body(...),
+) -> dict[str, object]:
+    try:
+        strategy_id = str(body.get("strategy_id") or "").strip()
+        params = body.get("params")
+        active = body.get("active", False)
+        if not isinstance(active, bool):
+            raise ValueError("active 必须是布尔值")
+        parsed_params = params if isinstance(params, dict) else {}
+        daily_write_limit = int(body.get("daily_write_limit", 0))
+        priority = int(body.get("priority", 0))
+        return create_strategy_config(
+            strategy_id=strategy_id,
+            params=parsed_params,
+            active=active,
+            daily_write_limit=daily_write_limit,
+            priority=priority,
+        )
+    except (ValueError, TypeError) as exc:
+        raise HTTPException(status_code=400, detail=str(exc)) from exc
+
+
+@router.put("/strategy-configs/{strategy_id}")
+async def put_strategy_config(
+    strategy_id: str,
+    body: dict[str, object] = Body(...),
+) -> dict[str, object]:
+    try:
+        params = body.get("params")
+        active = body.get("active")
+        parsed_params = params if isinstance(params, dict) else None
+        parsed_active = active if isinstance(active, bool) else None
+        parsed_daily_limit = (
+            int(body["daily_write_limit"]) if "daily_write_limit" in body else None
+        )
+        parsed_priority = int(body["priority"]) if "priority" in body else None
+        return update_strategy_config(
+            strategy_id=strategy_id,
+            params=parsed_params,
+            active=parsed_active,
+            daily_write_limit=parsed_daily_limit,
+            priority=parsed_priority,
+        )
+    except (ValueError, TypeError) as exc:
+        raise HTTPException(status_code=400, detail=str(exc)) from exc
+
+
+@router.patch("/strategy-configs/{strategy_id}/active")
+async def patch_strategy_config_active(
+    strategy_id: str,
+    body: dict[str, object] = Body(...),
+) -> dict[str, object]:
+    try:
+        active = body.get("active")
+        if not isinstance(active, bool):
+            raise ValueError("active 必须是布尔值")
+        return set_strategy_config_active(strategy_id=strategy_id, active=active)
+    except ValueError as exc:
+        raise HTTPException(status_code=400, detail=str(exc)) from exc
+
+
+@router.post("/experiment/demand-pool/write")
+async def post_experiment_demand_pool_write(
+    partition_dt: str | None = Query(
+        default=None,
+        description="业务日期 yyyymmdd;未传则取当天",
+    ),
+) -> dict[str, object]:
+    try:
+        return run_experiment_hourly_write(partition_dt)
+    except ValueError as exc:
+        raise HTTPException(status_code=400, detail=str(exc)) from exc

+ 23 - 0
app/core/config.py

@@ -59,6 +59,13 @@ class Settings(BaseSettings):
     strategy_staging_hourly_generate_start_hour: int = 3
     strategy_staging_hourly_generate_end_hour: int = 23
     strategy_staging_hourly_generate_minute: int = 0
+    # 实验系统:strategy_staging → ODPS 临时需求池表
+    # 北京时间 03:30 首次写入,之后每小时 :30 一次,23:30 为当天最后一次;分区 dt=当天
+    experiment_demand_pool_write_enabled: bool = True
+    experiment_demand_pool_target_table: str = "dwd_multi_demand_pool_di_tmp"
+    experiment_demand_pool_write_start_hour: int = 3
+    experiment_demand_pool_write_end_hour: int = 23
+    experiment_demand_pool_write_minute: int = 30
     substance_element_base_table: str = "substance_element_base"
     substance_element_effect_table: str = "substance_element_effect_di"
     substance_element_daily_sync_enabled: bool = True
@@ -135,6 +142,22 @@ class Settings(BaseSettings):
             return str(start)
         return f"{start}-{end}"
 
+    @property
+    def experiment_demand_pool_write_cron_hours(self) -> str:
+        """北京时间 hourly cron 小时段,配合 write_minute 使用。
+
+        默认 start=3、end=23、minute=30 → 03:30, 04:30, …, 23:30 各执行一次。
+        """
+        start = self.experiment_demand_pool_write_start_hour
+        end = self.experiment_demand_pool_write_end_hour
+        if not (0 <= start <= 23 and 0 <= end <= 23):
+            raise ValueError("experiment demand pool write hours must be 0-23")
+        if start > end:
+            raise ValueError("start_hour cannot be greater than end_hour")
+        if start == end:
+            return str(start)
+        return f"{start}-{end}"
+
     @property
     def cors_allow_origin_list(self) -> list[str]:
         if self.cors_allow_origins.strip() == "*":

+ 17 - 0
app/scheduler/jobs.py

@@ -4,6 +4,7 @@ from zoneinfo import ZoneInfo
 from app.services.demand_pool_strategy_daily_alert import run_daily_strategy_alert
 from app.services.strategy_generate_service import run_strategy_generation
 from app.sync.demand_pool_sync import run_full_sync, run_today_incremental_sync
+from app.sync.experiment_demand_pool_write import run_experiment_hourly_write
 from app.sync.substance_element_sync import sync_substance_elements
 from app.sync.vertical_category_sync import sync_vertical_categories
 
@@ -87,3 +88,19 @@ def strategy_staging_hourly_generate_job(batch_date: str | None = None) -> None:
         print(f"[scheduler] strategy generation failed: {exc}")
         raise
 
+
+def experiment_demand_pool_hourly_write_job(partition_dt: str | None = None) -> None:
+    print("[scheduler] start hourly experiment write to ODPS demand pool tmp")
+    try:
+        result = run_experiment_hourly_write(partition_dt)
+        print(
+            "[scheduler] experiment demand pool write done: "
+            f"partition_dt={result['partition_dt']}, "
+            f"selected={result['selected_count']}, "
+            f"written={result['written_count']}, "
+            f"existing={result['existing_count']}"
+        )
+    except Exception as exc:
+        print(f"[scheduler] experiment demand pool write failed: {exc}")
+        raise
+

+ 21 - 0
app/scheduler/manager.py

@@ -10,6 +10,7 @@ from app.core.config import settings
 from app.scheduler.jobs import (
     demand_pool_daily_strategy_alert_job,
     demand_pool_today_incremental_sync_job,
+    experiment_demand_pool_hourly_write_job,
     heartbeat_job,
     strategy_staging_hourly_generate_job,
     substance_element_daily_sync_job,
@@ -69,6 +70,26 @@ def setup_jobs() -> None:
             max_instances=1,
             coalesce=True,
         )
+    if settings.experiment_demand_pool_write_enabled:
+        scheduler.add_job(
+            experiment_demand_pool_hourly_write_job,
+            trigger=CronTrigger(
+                minute=settings.experiment_demand_pool_write_minute,
+                hour=settings.experiment_demand_pool_write_cron_hours,
+                timezone=_CRON_TZ,
+            ),
+            id="experiment_demand_pool_hourly_write_job",
+            replace_existing=True,
+            max_instances=1,
+            coalesce=True,
+        )
+        print(
+            "[scheduler] experiment demand pool write cron: "
+            f"Asia/Shanghai {settings.experiment_demand_pool_write_start_hour:02d}:"
+            f"{settings.experiment_demand_pool_write_minute:02d}-"
+            f"{settings.experiment_demand_pool_write_end_hour:02d}:"
+            f"{settings.experiment_demand_pool_write_minute:02d} hourly, dt=today"
+        )
     if settings.substance_element_daily_sync_enabled:
         scheduler.add_job(
             substance_element_daily_sync_job,

+ 139 - 0
app/services/strategy_config_service.py

@@ -0,0 +1,139 @@
+from typing import Any
+
+from app.strategies.config_store import (
+    StrategyConfigRecord,
+    fetch_all_configs,
+    fetch_config_by_id,
+    insert_config,
+    update_config,
+)
+from app.strategies.registry import StrategyRegistry
+
+
+def _validate_experiment_fields(*, daily_write_limit: int, priority: int) -> None:
+    if daily_write_limit < 0:
+        raise ValueError("daily_write_limit 不能为负")
+    if priority < 0:
+        raise ValueError("priority 不能为负")
+
+
+def _record_to_dict(record: StrategyConfigRecord) -> dict[str, Any]:
+    registered_ids = set(StrategyRegistry.registered_strategy_ids())
+    return {
+        "strategy_id": record.strategy_id,
+        "name": record.name,
+        "version": record.version,
+        "params": record.params,
+        "active": record.active,
+        "daily_write_limit": record.daily_write_limit,
+        "priority": record.priority,
+        "registered": record.strategy_id in registered_ids,
+        "create_time": record.create_time,
+        "updated_time": record.updated_time,
+    }
+
+
+def query_strategy_configs() -> dict[str, object]:
+    records = fetch_all_configs()
+    return {"items": [_record_to_dict(record) for record in records]}
+
+
+def query_available_strategies() -> dict[str, object]:
+    configured_ids = {record.strategy_id for record in fetch_all_configs()}
+    items = [
+        item
+        for item in StrategyRegistry.list_registered()
+        if item["strategy_id"] not in configured_ids
+    ]
+    return {"items": items}
+
+
+def create_strategy_config(
+    *,
+    strategy_id: str,
+    params: dict[str, Any] | None = None,
+    active: bool = False,
+    daily_write_limit: int = 0,
+    priority: int = 0,
+) -> dict[str, object]:
+    normalized_id = strategy_id.strip()
+    if not normalized_id:
+        raise ValueError("strategy_id 不能为空")
+
+    strategy = StrategyRegistry.get(normalized_id)
+    if strategy is None:
+        raise ValueError(f"未注册的策略: {normalized_id}")
+
+    if fetch_config_by_id(normalized_id) is not None:
+        raise ValueError(f"策略配置已存在: {normalized_id}")
+
+    normalized_params = params if params is not None else {}
+    if not isinstance(normalized_params, dict):
+        raise ValueError("params 必须是 JSON 对象")
+    _validate_experiment_fields(
+        daily_write_limit=daily_write_limit,
+        priority=priority,
+    )
+    if not strategy.validate_config(normalized_params):
+        raise ValueError(f"策略参数校验失败: {normalized_id}")
+
+    record = insert_config(
+        strategy_id=normalized_id,
+        name=strategy.name,
+        version=strategy.version,
+        params=normalized_params,
+        active=active,
+        daily_write_limit=daily_write_limit,
+        priority=priority,
+    )
+    StrategyRegistry.reload_config(normalized_id)
+    return _record_to_dict(record)
+
+
+def update_strategy_config(
+    *,
+    strategy_id: str,
+    params: dict[str, Any] | None = None,
+    active: bool | None = None,
+    daily_write_limit: int | None = None,
+    priority: int | None = None,
+) -> dict[str, object]:
+    normalized_id = strategy_id.strip()
+    if not normalized_id:
+        raise ValueError("strategy_id 不能为空")
+
+    strategy = StrategyRegistry.get(normalized_id)
+    if params is not None:
+        if not isinstance(params, dict):
+            raise ValueError("params 必须是 JSON 对象")
+        if strategy is not None and not strategy.validate_config(params):
+            raise ValueError(f"策略参数校验失败: {normalized_id}")
+
+    if daily_write_limit is not None or priority is not None:
+        existing = fetch_config_by_id(normalized_id)
+        if existing is None:
+            raise ValueError(f"strategy config not found: {normalized_id}")
+        _validate_experiment_fields(
+            daily_write_limit=daily_write_limit
+            if daily_write_limit is not None
+            else existing.daily_write_limit,
+            priority=priority if priority is not None else existing.priority,
+        )
+
+    try:
+        record = update_config(
+            strategy_id=normalized_id,
+            params=params,
+            active=active,
+            daily_write_limit=daily_write_limit,
+            priority=priority,
+        )
+    except KeyError as exc:
+        raise ValueError(str(exc)) from exc
+
+    StrategyRegistry.reload_config(normalized_id)
+    return _record_to_dict(record)
+
+
+def set_strategy_config_active(*, strategy_id: str, active: bool) -> dict[str, object]:
+    return update_strategy_config(strategy_id=strategy_id, active=active)

+ 162 - 18
app/strategies/config_store.py

@@ -24,6 +24,10 @@ class StrategyConfigRecord:
     version: str
     params: dict[str, Any]
     active: bool
+    daily_write_limit: int = 0
+    priority: int = 0
+    create_time: str | None = None
+    updated_time: str | None = None
 
 
 def _parse_params(raw: object) -> dict[str, Any]:
@@ -53,6 +57,8 @@ def ensure_strategy_tables() -> None:
         name         VARCHAR(128) NOT NULL COMMENT '策略名称',
         version      VARCHAR(32)  NOT NULL COMMENT '策略版本,用于追踪代码变更',
         params       JSON         NULL COMMENT '策略运行参数,如阈值、窗口期等',
+        daily_write_limit INT     NOT NULL DEFAULT 0 COMMENT '实验:每日写入上限,0=不限',
+        priority     INT          NOT NULL DEFAULT 0 COMMENT '实验:跨策略去重优先级,越小越优先',
         active       TINYINT(1)   NOT NULL DEFAULT 1 COMMENT '是否启用,0=不参与产出和合并',
         create_time  TIMESTAMP    NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
         updated_time TIMESTAMP    NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
@@ -86,14 +92,74 @@ def ensure_strategy_tables() -> None:
     with SessionLocal() as session:
         session.execute(text(config_sql))
         session.execute(text(staging_sql))
+        _migrate_strategy_config_columns(session, config_table)
         session.commit()
 
 
+def _migrate_strategy_config_columns(session, config_table: str) -> None:
+    rows = session.execute(
+        text(
+            """
+            SELECT COLUMN_NAME
+            FROM information_schema.COLUMNS
+            WHERE TABLE_SCHEMA = DATABASE()
+              AND TABLE_NAME = :table_name
+            """
+        ),
+        {"table_name": config_table},
+    ).all()
+    existing = {str(row[0]) for row in rows}
+    if "daily_write_limit" not in existing:
+        session.execute(
+            text(
+                f"""
+                ALTER TABLE {config_table}
+                ADD COLUMN daily_write_limit INT NOT NULL DEFAULT 0
+                COMMENT '实验:每日写入上限,0=不限'
+                """
+            )
+        )
+    if "priority" not in existing:
+        session.execute(
+            text(
+                f"""
+                ALTER TABLE {config_table}
+                ADD COLUMN priority INT NOT NULL DEFAULT 0
+                COMMENT '实验:跨策略去重优先级,越小越优先'
+                """
+            )
+        )
+
+
+def _format_timestamp(value: object) -> str | None:
+    if value is None:
+        return None
+    if hasattr(value, "isoformat"):
+        return value.isoformat(sep=" ", timespec="seconds")
+    return str(value)
+
+
+def _row_to_record(row: object) -> StrategyConfigRecord:
+    mapping = row._mapping if hasattr(row, "_mapping") else row
+    return StrategyConfigRecord(
+        strategy_id=str(mapping["strategy_id"]),
+        name=str(mapping["name"]),
+        version=str(mapping["version"]),
+        params=_parse_params(mapping["params"]),
+        active=bool(mapping["active"]),
+        daily_write_limit=int(mapping.get("daily_write_limit") or 0),
+        priority=int(mapping.get("priority") or 0),
+        create_time=_format_timestamp(mapping.get("create_time")),
+        updated_time=_format_timestamp(mapping.get("updated_time")),
+    )
+
+
 def fetch_all_configs() -> list[StrategyConfigRecord]:
     config_table = _safe_identifier(settings.strategy_config_table)
     query = text(
         f"""
-        SELECT strategy_id, name, version, params, active
+        SELECT strategy_id, name, version, params, active,
+               daily_write_limit, priority, create_time, updated_time
         FROM {config_table}
         ORDER BY strategy_id ASC
         """
@@ -101,23 +167,15 @@ def fetch_all_configs() -> list[StrategyConfigRecord]:
     with SessionLocal() as session:
         rows = session.execute(query).mappings().all()
 
-    return [
-        StrategyConfigRecord(
-            strategy_id=str(row["strategy_id"]),
-            name=str(row["name"]),
-            version=str(row["version"]),
-            params=_parse_params(row["params"]),
-            active=bool(row["active"]),
-        )
-        for row in rows
-    ]
+    return [_row_to_record(row) for row in rows]
 
 
 def fetch_config_by_id(strategy_id: str) -> StrategyConfigRecord | None:
     config_table = _safe_identifier(settings.strategy_config_table)
     query = text(
         f"""
-        SELECT strategy_id, name, version, params, active
+        SELECT strategy_id, name, version, params, active,
+               daily_write_limit, priority, create_time, updated_time
         FROM {config_table}
         WHERE strategy_id = :strategy_id
         LIMIT 1
@@ -129,10 +187,96 @@ def fetch_config_by_id(strategy_id: str) -> StrategyConfigRecord | None:
     if row is None:
         return None
 
-    return StrategyConfigRecord(
-        strategy_id=str(row["strategy_id"]),
-        name=str(row["name"]),
-        version=str(row["version"]),
-        params=_parse_params(row["params"]),
-        active=bool(row["active"]),
+    return _row_to_record(row)
+
+
+def insert_config(
+    *,
+    strategy_id: str,
+    name: str,
+    version: str,
+    params: dict[str, Any],
+    active: bool,
+    daily_write_limit: int = 0,
+    priority: int = 0,
+) -> StrategyConfigRecord:
+    config_table = _safe_identifier(settings.strategy_config_table)
+    query = text(
+        f"""
+        INSERT INTO {config_table}
+        (strategy_id, name, version, params, active, daily_write_limit, priority)
+        VALUES
+        (:strategy_id, :name, :version, CAST(:params AS JSON), :active,
+         :daily_write_limit, :priority)
+        """
+    )
+    with SessionLocal() as session:
+        session.execute(
+            query,
+            {
+                "strategy_id": strategy_id,
+                "name": name,
+                "version": version,
+                "params": json.dumps(params, ensure_ascii=False),
+                "active": int(active),
+                "daily_write_limit": daily_write_limit,
+                "priority": priority,
+            },
+        )
+        session.commit()
+
+    record = fetch_config_by_id(strategy_id)
+    if record is None:
+        raise RuntimeError(f"failed to load inserted strategy config: {strategy_id}")
+    return record
+
+
+def update_config(
+    *,
+    strategy_id: str,
+    params: dict[str, Any] | None = None,
+    active: bool | None = None,
+    daily_write_limit: int | None = None,
+    priority: int | None = None,
+) -> StrategyConfigRecord:
+    if (
+        params is None
+        and active is None
+        and daily_write_limit is None
+        and priority is None
+    ):
+        raise ValueError("至少需提供一项可更新字段")
+
+    config_table = _safe_identifier(settings.strategy_config_table)
+    set_clauses: list[str] = []
+    payload: dict[str, object] = {"strategy_id": strategy_id}
+    if params is not None:
+        set_clauses.append("params = CAST(:params AS JSON)")
+        payload["params"] = json.dumps(params, ensure_ascii=False)
+    if active is not None:
+        set_clauses.append("active = :active")
+        payload["active"] = int(active)
+    if daily_write_limit is not None:
+        set_clauses.append("daily_write_limit = :daily_write_limit")
+        payload["daily_write_limit"] = daily_write_limit
+    if priority is not None:
+        set_clauses.append("priority = :priority")
+        payload["priority"] = priority
+
+    query = text(
+        f"""
+        UPDATE {config_table}
+        SET {", ".join(set_clauses)}
+        WHERE strategy_id = :strategy_id
+        """
     )
+    with SessionLocal() as session:
+        result = session.execute(query, payload)
+        if result.rowcount == 0:
+            raise KeyError(f"strategy config not found: {strategy_id}")
+        session.commit()
+
+    record = fetch_config_by_id(strategy_id)
+    if record is None:
+        raise RuntimeError(f"failed to load updated strategy config: {strategy_id}")
+    return record

+ 15 - 0
app/strategies/registry.py

@@ -91,6 +91,21 @@ class StrategyRegistry:
         with cls._lock:
             return sorted(cls._registry.keys())
 
+    @classmethod
+    def list_registered(cls) -> list[dict[str, str]]:
+        with cls._lock:
+            return [
+                {
+                    "strategy_id": strategy.strategy_id,
+                    "name": strategy.name,
+                    "version": strategy.version,
+                }
+                for strategy in sorted(
+                    cls._registry.values(),
+                    key=lambda item: item.strategy_id,
+                )
+            ]
+
     @classmethod
     def validate_config_for_strategy(cls, strategy_id: str, config: dict[str, Any]) -> bool:
         with cls._lock:

+ 79 - 0
app/strategies/staging_store.py

@@ -1,5 +1,6 @@
 import hashlib
 import json
+from dataclasses import dataclass
 from typing import Any
 
 from sqlalchemy import text
@@ -12,6 +13,20 @@ from app.strategies.config_store import _safe_identifier
 BATCH_SIZE = 500
 
 
+@dataclass(frozen=True)
+class StagingRow:
+    strategy_config_id: str
+    strategy: str
+    demand_id: str
+    demand_name: str
+    weight: float | None
+    demand_type: str | None
+    video_count: int | None
+    video_list: str | None
+    extend: str | None
+    batch_date: str
+
+
 def build_demand_id(*, strategy: str, demand_name: str, batch_date: str) -> str:
     raw = f"{strategy}{demand_name.strip()}{batch_date}"
     return hashlib.md5(raw.encode("utf-8")).hexdigest()
@@ -258,3 +273,67 @@ def replace_staging_batch(
         session.commit()
 
     return len(rows)
+
+
+def fetch_staging_rows_for_batch(
+    *,
+    batch_date: str,
+    strategy_config_ids: list[str] | None = None,
+) -> list[StagingRow]:
+    staging_table = _safe_identifier(settings.strategy_staging_table)
+    filters = ["batch_date = :batch_date"]
+    payload: dict[str, object] = {"batch_date": batch_date}
+    if strategy_config_ids:
+        placeholders = ", ".join(f":sid_{index}" for index in range(len(strategy_config_ids)))
+        filters.append(f"strategy_config_id IN ({placeholders})")
+        for index, strategy_id in enumerate(strategy_config_ids):
+            payload[f"sid_{index}"] = strategy_id
+
+    query = text(
+        f"""
+        SELECT
+            strategy_config_id,
+            strategy,
+            demand_id,
+            demand_name,
+            weight,
+            `type` AS demand_type,
+            video_count,
+            video_list,
+            extend,
+            batch_date
+        FROM {staging_table}
+        WHERE {" AND ".join(filters)}
+        ORDER BY strategy_config_id ASC, weight DESC, id ASC
+        """
+    )
+    with SessionLocal() as session:
+        rows = session.execute(query, payload).mappings().all()
+
+    result: list[StagingRow] = []
+    for row in rows:
+        demand_id = str(row["demand_id"] or "").strip()
+        demand_name = str(row["demand_name"] or "").strip()
+        if not demand_id or not demand_name:
+            continue
+        weight = row["weight"]
+        parsed_weight = float(weight) if weight is not None else None
+        video_count = row["video_count"]
+        parsed_video_count = int(video_count) if video_count is not None else None
+        result.append(
+            StagingRow(
+                strategy_config_id=str(row["strategy_config_id"]),
+                strategy=str(row["strategy"]),
+                demand_id=demand_id,
+                demand_name=demand_name,
+                weight=parsed_weight,
+                demand_type=str(row["demand_type"]).strip()
+                if row["demand_type"] is not None
+                else None,
+                video_count=parsed_video_count,
+                video_list=str(row["video_list"]) if row["video_list"] is not None else None,
+                extend=str(row["extend"]) if row["extend"] is not None else None,
+                batch_date=str(row["batch_date"]),
+            )
+        )
+    return result

+ 330 - 0
app/sync/experiment_demand_pool_write.py

@@ -0,0 +1,330 @@
+"""实验系统:从 strategy_staging 增量写入 ODPS dwd_multi_demand_pool_di_tmp。"""
+
+from __future__ import annotations
+
+import json
+import re
+from collections import defaultdict
+from dataclasses import dataclass
+
+from app.core.config import settings
+from app.odps.client import get_odps_client
+from app.strategies.batch_date import today_yyyymmdd
+from app.strategies.config_store import StrategyConfigRecord, fetch_all_configs
+from app.strategies.registry import StrategyRegistry
+from app.strategies.staging_store import BATCH_SIZE, StagingRow, fetch_staging_rows_for_batch
+
+IDENTIFIER_RE = re.compile(r"^[A-Za-z_][A-Za-z0-9_]*$")
+PARTITION_DT_RE = re.compile(r"^\d{8}$")
+_UNKNOWN_STRATEGY_PRIORITY = "__unknown__"
+
+
+def _safe_identifier(name: str) -> str:
+    if not IDENTIFIER_RE.match(name):
+        raise ValueError(f"invalid sql identifier: {name}")
+    return name
+
+
+@dataclass(frozen=True)
+class ExperimentStrategyContext:
+    strategy_id: str
+    strategy_name: str
+    priority: int
+    daily_limit: int
+    current_count: int
+    staging_rows: list[StagingRow]
+
+
+@dataclass(frozen=True)
+class ExistingPartitionState:
+    demand_ids: set[str]
+    strategy_counts: dict[str, int]
+    claimed_names: dict[str, set[int | str]]
+
+
+def _normalize_partition_dt(partition_dt: str | None) -> str:
+    value = (partition_dt or today_yyyymmdd()).strip()
+    if not PARTITION_DT_RE.match(value):
+        raise ValueError("partition_dt must be yyyymmdd")
+    return value
+
+
+def _parse_video_list_for_odps(raw: str | None) -> list[str] | None:
+    if raw is None:
+        return None
+    text_value = raw.strip()
+    if not text_value:
+        return None
+    try:
+        parsed = json.loads(text_value)
+    except json.JSONDecodeError:
+        return [text_value]
+    if isinstance(parsed, list):
+        return [str(item) for item in parsed]
+    return [text_value]
+
+
+def _qualified_target_table_name() -> str:
+    target_table = _safe_identifier(settings.experiment_demand_pool_target_table)
+    project = settings.odps_project.strip()
+    if not project:
+        return target_table
+    return f"{project}.{target_table}"
+
+
+def _build_strategy_priority_by_name(
+    configs: list[StrategyConfigRecord],
+) -> dict[str, int]:
+    """含 active / paused 全量配置,避免策略中途暂停后 Hive 占位 priority 丢失。"""
+    return {config.name: config.priority for config in configs}
+
+
+def _resolve_hive_row_priority(
+    strategy_name: str,
+    priority_by_name: dict[str, int],
+) -> int | str:
+    if not strategy_name or strategy_name not in priority_by_name:
+        return _UNKNOWN_STRATEGY_PRIORITY
+    return priority_by_name[strategy_name]
+
+
+def _select_writable_configs(
+    configs: list[StrategyConfigRecord],
+) -> list[StrategyConfigRecord]:
+    """与策略生成一致:仅 registered + active 的策略参与实验写入。"""
+    registered_ids = set(StrategyRegistry.registered_strategy_ids())
+    return [
+        config
+        for config in configs
+        if config.active and config.strategy_id in registered_ids
+    ]
+
+
+def _get_odps_target_table():
+    odps_client = get_odps_client()
+    target_table = _safe_identifier(settings.experiment_demand_pool_target_table)
+    if not odps_client.exist_table(target_table):
+        raise ValueError(f"ODPS 表不存在: {_qualified_target_table_name()}")
+    return odps_client.get_table(target_table)
+
+
+def _fetch_existing_partition_state(
+    partition_dt: str,
+    *,
+    strategy_priority_by_name: dict[str, int],
+) -> ExistingPartitionState:
+    table = _get_odps_target_table()
+    partition_spec = f"dt={partition_dt}"
+    if not table.exist_partition(partition_spec):
+        return ExistingPartitionState(
+            demand_ids=set(),
+            strategy_counts={},
+            claimed_names={},
+        )
+
+    demand_ids: set[str] = set()
+    strategy_counts: dict[str, int] = defaultdict(int)
+    claimed_names: dict[str, set[int | str]] = {}
+
+    with table.open_reader(partition=partition_spec) as reader:
+        for record in reader:
+            demand_id = str(record["demand_id"] or "").strip()
+            demand_name = str(record["demand_name"] or "").strip()
+            strategy_name = str(record["strategy"] or "").strip()
+            if demand_id:
+                demand_ids.add(demand_id)
+            if strategy_name:
+                strategy_counts[strategy_name] += 1
+            if not demand_name:
+                continue
+            priority = _resolve_hive_row_priority(strategy_name, strategy_priority_by_name)
+            if demand_name not in claimed_names:
+                claimed_names[demand_name] = {priority}
+            else:
+                claimed_names[demand_name].add(priority)
+
+    return ExistingPartitionState(
+        demand_ids=demand_ids,
+        strategy_counts=dict(strategy_counts),
+        claimed_names=claimed_names,
+    )
+
+
+def _build_strategy_contexts(
+    *,
+    configs: list[StrategyConfigRecord],
+    staging_rows: list[StagingRow],
+    strategy_counts: dict[str, int],
+) -> list[ExperimentStrategyContext]:
+    rows_by_strategy_id: dict[str, list[StagingRow]] = defaultdict(list)
+    for row in staging_rows:
+        rows_by_strategy_id[row.strategy_config_id].append(row)
+
+    contexts: list[ExperimentStrategyContext] = []
+    for config in configs:
+        if not config.active:
+            continue
+        contexts.append(
+            ExperimentStrategyContext(
+                strategy_id=config.strategy_id,
+                strategy_name=config.name,
+                priority=config.priority,
+                daily_limit=config.daily_write_limit,
+                current_count=strategy_counts.get(config.name, 0),
+                staging_rows=rows_by_strategy_id.get(config.strategy_id, []),
+            )
+        )
+    return contexts
+
+
+def select_rows_to_write(
+    *,
+    strategies: list[ExperimentStrategyContext],
+    existing_demand_ids: set[str],
+    claimed_names: dict[str, set[int | str]],
+) -> tuple[list[StagingRow], dict[str, int]]:
+    """跨策略选取待写入行。
+
+    - demand_id 已存在:跳过
+    - demand_name 已被其他 priority 写入:跳过(先写入者优先,高 priority 不可覆盖)
+    - 同 priority:demand_name 不去重
+    """
+    selected: list[StagingRow] = []
+    selected_counts: dict[str, int] = defaultdict(int)
+
+    ordered = sorted(strategies, key=lambda item: (item.priority, item.strategy_id))
+    for ctx in ordered:
+        remaining: int | None
+        if ctx.daily_limit > 0:
+            remaining = ctx.daily_limit - ctx.current_count - selected_counts[ctx.strategy_name]
+            if remaining <= 0:
+                continue
+        else:
+            remaining = None
+
+        candidates = sorted(
+            ctx.staging_rows,
+            key=lambda row: (-(row.weight or 0.0), row.demand_id),
+        )
+        for row in candidates:
+            if remaining is not None and remaining <= 0:
+                break
+            if row.demand_id in existing_demand_ids:
+                continue
+
+            claimed_priorities = claimed_names.get(row.demand_name)
+            if claimed_priorities is not None and ctx.priority not in claimed_priorities:
+                continue
+
+            if row.demand_name not in claimed_names:
+                claimed_names[row.demand_name] = {ctx.priority}
+            else:
+                claimed_names[row.demand_name].add(ctx.priority)
+
+            selected.append(row)
+            existing_demand_ids.add(row.demand_id)
+            selected_counts[ctx.strategy_name] += 1
+            if remaining is not None:
+                remaining -= 1
+
+    return selected, dict(selected_counts)
+
+
+def _staging_row_to_odps_record(row: StagingRow) -> tuple[object, ...]:
+    """字段顺序与 dwd_multi_demand_pool_di_tmp 非分区列一致。"""
+    weight = float(row.weight) if row.weight is not None else None
+    video_count = int(row.video_count) if row.video_count is not None else None
+    extend = row.extend.strip() if row.extend else None
+    return (
+        row.strategy,
+        row.demand_id,
+        row.demand_name,
+        weight,
+        row.demand_type,
+        video_count,
+        _parse_video_list_for_odps(row.video_list),
+        extend,
+    )
+
+
+def _write_rows_to_odps(*, partition_dt: str, rows: list[StagingRow]) -> int:
+    if not rows:
+        return 0
+
+    table = _get_odps_target_table()
+    partition_spec = f"dt={partition_dt}"
+    records = [_staging_row_to_odps_record(row) for row in rows]
+
+    # PyODPS Tunnel 追加写入,等价于 INSERT INTO ... PARTITION(dt=...) 追加行
+    with table.open_writer(partition=partition_spec, create_partition=True) as writer:
+        for start in range(0, len(records), BATCH_SIZE):
+            writer.write(records[start : start + BATCH_SIZE])
+
+    return len(records)
+
+
+def run_experiment_hourly_write(partition_dt: str | None = None) -> dict[str, object]:
+    """每小时执行:检查各策略当日写入量,不足则从 staging 继续补充。"""
+    StrategyRegistry.load_all_configs()
+    batch_date = _normalize_partition_dt(partition_dt)
+
+    configs = fetch_all_configs()
+    priority_by_name = _build_strategy_priority_by_name(configs)
+    writable_configs = _select_writable_configs(configs)
+
+    existing_state = _fetch_existing_partition_state(
+        batch_date,
+        strategy_priority_by_name=priority_by_name,
+    )
+    staging_rows = fetch_staging_rows_for_batch(
+        batch_date=batch_date,
+        strategy_config_ids=[config.strategy_id for config in writable_configs],
+    )
+    contexts = _build_strategy_contexts(
+        configs=writable_configs,
+        staging_rows=staging_rows,
+        strategy_counts=existing_state.strategy_counts,
+    )
+
+    claimed_names = {
+        name: set(priorities) for name, priorities in existing_state.claimed_names.items()
+    }
+    pending_ids = set(existing_state.demand_ids)
+    selected_rows, selected_counts = select_rows_to_write(
+        strategies=contexts,
+        existing_demand_ids=pending_ids,
+        claimed_names=claimed_names,
+    )
+    written = _write_rows_to_odps(partition_dt=batch_date, rows=selected_rows)
+    if written:
+        print(
+            "[experiment-write] appended "
+            f"{written} rows to {_qualified_target_table_name()} "
+            f"partition dt={batch_date}"
+        )
+
+    strategy_summary = []
+    for ctx in sorted(contexts, key=lambda item: (item.priority, item.strategy_id)):
+        strategy_summary.append(
+            {
+                "strategy_id": ctx.strategy_id,
+                "strategy_name": ctx.strategy_name,
+                "priority": ctx.priority,
+                "daily_limit": ctx.daily_limit,
+                "existing_count": ctx.current_count,
+                "selected_count": selected_counts.get(ctx.strategy_name, 0),
+                "staging_total": len(ctx.staging_rows),
+            }
+        )
+
+    return {
+        "partition_dt": batch_date,
+        "target_table": _qualified_target_table_name(),
+        "write_mode": "tunnel_append",
+        "staging_total": len(staging_rows),
+        "selected_count": len(selected_rows),
+        "written_count": written,
+        "existing_count": len(existing_state.demand_ids),
+        "writable_strategy_count": len(writable_configs),
+        "strategies": strategy_summary,
+    }

+ 7 - 1
frontend/src/DemandNavBar.tsx

@@ -1,7 +1,7 @@
 import "./demandNav.css";
 
 type DemandNavBarProps = {
-  active: "dashboard" | "summary";
+  active: "dashboard" | "summary" | "strategy-config";
 };
 
 export default function DemandNavBar({ active }: DemandNavBarProps) {
@@ -20,6 +20,12 @@ export default function DemandNavBar({ active }: DemandNavBarProps) {
         >
           需求汇总
         </a>
+        <a
+          href="/strategy-config.html"
+          className={`demand-nav-tab${active === "strategy-config" ? " demand-nav-tab--active" : ""}`}
+        >
+          需求策略配置
+        </a>
       </div>
     </nav>
   );

+ 574 - 0
frontend/src/StrategyConfigApp.tsx

@@ -0,0 +1,574 @@
+import { useCallback, useEffect, useMemo, useState } from "react";
+import {
+  Alert,
+  Button,
+  Form,
+  Input,
+  InputNumber,
+  Modal,
+  Select,
+  Space,
+  Switch,
+  Table,
+  Tag,
+  Typography,
+  message,
+} from "antd";
+import type { ColumnsType } from "antd/es/table";
+import { PlusOutlined, ReloadOutlined } from "@ant-design/icons";
+import EllipsisCell from "./EllipsisCell";
+
+const API_BASE_URL =
+  import.meta.env.VITE_API_BASE_URL ?? "/demand/api/v1";
+
+const getResolvedApiBaseUrl = () => {
+  if (API_BASE_URL.startsWith("http://") || API_BASE_URL.startsWith("https://")) {
+    return API_BASE_URL;
+  }
+  return new URL(API_BASE_URL, window.location.origin).toString();
+};
+
+type StrategyConfigItem = {
+  strategy_id: string;
+  name: string;
+  version: string;
+  params: Record<string, unknown>;
+  active: boolean;
+  daily_write_limit: number;
+  priority: number;
+  registered: boolean;
+  create_time: string | null;
+  updated_time: string | null;
+};
+
+type StrategyConfigResponse = {
+  items: StrategyConfigItem[];
+};
+
+type AvailableStrategyItem = {
+  strategy_id: string;
+  name: string;
+  version: string;
+};
+
+type AvailableStrategyResponse = {
+  items: AvailableStrategyItem[];
+};
+
+function formatParamsPreview(params: Record<string, unknown>): string {
+  const text = JSON.stringify(params ?? {}, null, 0);
+  if (!text || text === "{}") {
+    return "-";
+  }
+  return text.length > 80 ? `${text.slice(0, 80)}…` : text;
+}
+
+function formatExperimentNumber(value: number | null | undefined): string {
+  if (value === undefined || value === null) {
+    return "-";
+  }
+  return String(value);
+}
+
+const PARAMS_HINT = "策略运行阈值等参数,JSON 格式";
+const DAILY_LIMIT_HINT = "0 表示不限制,直到 staging 全部写入";
+const PRIORITY_HINT =
+  "数值越小同批次越先选取;同名需求一旦写入 Hive,其他 priority 不可再写,仅同 priority 可重复";
+
+function parseParamsJson(raw: string): Record<string, unknown> {
+  const trimmed = raw.trim();
+  if (!trimmed) {
+    return {};
+  }
+  const parsed: unknown = JSON.parse(trimmed);
+  if (!parsed || typeof parsed !== "object" || Array.isArray(parsed)) {
+    throw new Error("params 必须是 JSON 对象");
+  }
+  return parsed as Record<string, unknown>;
+}
+
+async function readErrorDetail(response: Response): Promise<string> {
+  const text = await response.text();
+  if (!text) {
+    return `HTTP ${response.status}`;
+  }
+  try {
+    const payload = JSON.parse(text) as { detail?: unknown };
+    if (typeof payload.detail === "string") {
+      return payload.detail;
+    }
+  } catch {
+    /* 非 JSON 响应 */
+  }
+  return text;
+}
+
+function compareStrategyConfigs(a: StrategyConfigItem, b: StrategyConfigItem): number {
+  if (a.active !== b.active) {
+    return a.active ? -1 : 1;
+  }
+  const priorityA = a.priority ?? 0;
+  const priorityB = b.priority ?? 0;
+  if (priorityA !== priorityB) {
+    return priorityA - priorityB;
+  }
+  return a.strategy_id.localeCompare(b.strategy_id);
+}
+
+export default function StrategyConfigApp() {
+  const [loading, setLoading] = useState(false);
+  const [error, setError] = useState("");
+  const [items, setItems] = useState<StrategyConfigItem[]>([]);
+  const [availableItems, setAvailableItems] = useState<AvailableStrategyItem[]>([]);
+  const [createOpen, setCreateOpen] = useState(false);
+  const [editOpen, setEditOpen] = useState(false);
+  const [editingItem, setEditingItem] = useState<StrategyConfigItem | null>(null);
+  const [createForm] = Form.useForm();
+  const [editForm] = Form.useForm();
+  const [submitting, setSubmitting] = useState(false);
+  const [togglingId, setTogglingId] = useState<string | null>(null);
+
+  const fetchConfigs = useCallback(async () => {
+    setLoading(true);
+    setError("");
+    try {
+      const resolvedBase = getResolvedApiBaseUrl();
+      const baseWithSlash = resolvedBase.endsWith("/")
+        ? resolvedBase
+        : `${resolvedBase}/`;
+      const url = new URL("strategy-configs", baseWithSlash);
+      const response = await fetch(url.toString(), {
+        method: "GET",
+        headers: { Accept: "application/json" },
+      });
+      if (!response.ok) {
+        throw new Error(await readErrorDetail(response));
+      }
+      const payload = (await response.json()) as StrategyConfigResponse;
+      setItems(payload.items ?? []);
+    } catch (queryError) {
+      setError(
+        queryError instanceof Error ? queryError.message : "加载策略配置失败",
+      );
+    } finally {
+      setLoading(false);
+    }
+  }, []);
+
+  const fetchAvailable = useCallback(async () => {
+    try {
+      const resolvedBase = getResolvedApiBaseUrl();
+      const baseWithSlash = resolvedBase.endsWith("/")
+        ? resolvedBase
+        : `${resolvedBase}/`;
+      const url = new URL("strategy-configs/available", baseWithSlash);
+      const response = await fetch(url.toString(), {
+        method: "GET",
+        headers: { Accept: "application/json" },
+      });
+      if (!response.ok) {
+        throw new Error(await readErrorDetail(response));
+      }
+      const payload = (await response.json()) as AvailableStrategyResponse;
+      setAvailableItems(payload.items ?? []);
+    } catch (queryError) {
+      message.error(
+        queryError instanceof Error ? queryError.message : "加载可添加策略失败",
+      );
+    }
+  }, []);
+
+  useEffect(() => {
+    void fetchConfigs();
+  }, [fetchConfigs]);
+
+  const sortedItems = useMemo(
+    () => [...items].sort(compareStrategyConfigs),
+    [items],
+  );
+
+  const openCreateModal = () => {
+    createForm.resetFields();
+    createForm.setFieldsValue({
+      paramsText: "{}",
+      daily_write_limit: 0,
+      priority: 0,
+      active: false,
+    });
+    setCreateOpen(true);
+    void fetchAvailable();
+  };
+
+  const openEditModal = (item: StrategyConfigItem) => {
+    setEditingItem(item);
+    editForm.setFieldsValue({
+      paramsText: JSON.stringify(item.params ?? {}, null, 2),
+      daily_write_limit: item.daily_write_limit ?? 0,
+      priority: item.priority ?? 0,
+      active: item.active,
+    });
+    setEditOpen(true);
+  };
+
+  const handleCreate = async () => {
+    try {
+      const values = await createForm.validateFields();
+      setSubmitting(true);
+      const params = parseParamsJson(String(values.paramsText ?? ""));
+      const resolvedBase = getResolvedApiBaseUrl();
+      const baseWithSlash = resolvedBase.endsWith("/")
+        ? resolvedBase
+        : `${resolvedBase}/`;
+      const url = new URL("strategy-configs", baseWithSlash);
+      const response = await fetch(url.toString(), {
+        method: "POST",
+        headers: {
+          Accept: "application/json",
+          "Content-Type": "application/json",
+        },
+        body: JSON.stringify({
+          strategy_id: values.strategy_id,
+          params,
+          daily_write_limit: Number(values.daily_write_limit ?? 0),
+          priority: Number(values.priority ?? 0),
+          active: Boolean(values.active),
+        }),
+      });
+      if (!response.ok) {
+        throw new Error(await readErrorDetail(response));
+      }
+      message.success("策略配置已添加");
+      setCreateOpen(false);
+      await fetchConfigs();
+    } catch (submitError) {
+      if (submitError instanceof Error) {
+        message.error(submitError.message);
+      }
+    } finally {
+      setSubmitting(false);
+    }
+  };
+
+  const handleEdit = async () => {
+    if (!editingItem) {
+      return;
+    }
+    try {
+      const values = await editForm.validateFields();
+      setSubmitting(true);
+      const params = parseParamsJson(String(values.paramsText ?? ""));
+      const resolvedBase = getResolvedApiBaseUrl();
+      const baseWithSlash = resolvedBase.endsWith("/")
+        ? resolvedBase
+        : `${resolvedBase}/`;
+      const url = new URL(`strategy-configs/${encodeURIComponent(editingItem.strategy_id)}`, baseWithSlash);
+      const response = await fetch(url.toString(), {
+        method: "PUT",
+        headers: {
+          Accept: "application/json",
+          "Content-Type": "application/json",
+        },
+        body: JSON.stringify({
+          params,
+          daily_write_limit: Number(values.daily_write_limit ?? 0),
+          priority: Number(values.priority ?? 0),
+          active: Boolean(values.active),
+        }),
+      });
+      if (!response.ok) {
+        throw new Error(await readErrorDetail(response));
+      }
+      message.success("策略配置已更新");
+      setEditOpen(false);
+      setEditingItem(null);
+      await fetchConfigs();
+    } catch (submitError) {
+      if (submitError instanceof Error) {
+        message.error(submitError.message);
+      }
+    } finally {
+      setSubmitting(false);
+    }
+  };
+
+  const handleToggleActive = async (item: StrategyConfigItem) => {
+    setTogglingId(item.strategy_id);
+    try {
+      const resolvedBase = getResolvedApiBaseUrl();
+      const baseWithSlash = resolvedBase.endsWith("/")
+        ? resolvedBase
+        : `${resolvedBase}/`;
+      const url = new URL(
+        `strategy-configs/${encodeURIComponent(item.strategy_id)}/active`,
+        baseWithSlash,
+      );
+      const response = await fetch(url.toString(), {
+        method: "PATCH",
+        headers: {
+          Accept: "application/json",
+          "Content-Type": "application/json",
+        },
+        body: JSON.stringify({ active: !item.active }),
+      });
+      if (!response.ok) {
+        throw new Error(await readErrorDetail(response));
+      }
+      message.success(item.active ? "策略已暂停" : "策略已开始");
+      await fetchConfigs();
+    } catch (toggleError) {
+      message.error(
+        toggleError instanceof Error ? toggleError.message : "操作失败",
+      );
+    } finally {
+      setTogglingId(null);
+    }
+  };
+
+  const columns: ColumnsType<StrategyConfigItem> = useMemo(
+    () => [
+      {
+        title: "策略 ID",
+        dataIndex: "strategy_id",
+        width: 220,
+        render: (value) => <EllipsisCell value={value} />,
+      },
+      {
+        title: "策略名称",
+        dataIndex: "name",
+        width: 160,
+        render: (value) => value ?? "-",
+      },
+      {
+        title: "版本",
+        dataIndex: "version",
+        width: 100,
+        render: (value) => value ?? "-",
+      },
+      {
+        title: "状态",
+        dataIndex: "active",
+        width: 100,
+        render: (active: boolean) => (
+          <Tag color={active ? "green" : "default"}>
+            {active ? "运行中" : "已暂停"}
+          </Tag>
+        ),
+      },
+      {
+        title: "代码注册",
+        dataIndex: "registered",
+        width: 110,
+        render: (registered: boolean) => (
+          <Tag color={registered ? "blue" : "orange"}>
+            {registered ? "已注册" : "未注册"}
+          </Tag>
+        ),
+      },
+      {
+        title: "日写入上限",
+        dataIndex: "daily_write_limit",
+        width: 110,
+        render: (value: number) => formatExperimentNumber(value),
+      },
+      {
+        title: "优先级",
+        dataIndex: "priority",
+        width: 90,
+        render: (value: number) => formatExperimentNumber(value),
+      },
+      {
+        title: "运行参数",
+        dataIndex: "params",
+        render: (params: Record<string, unknown>) => (
+          <EllipsisCell value={formatParamsPreview(params)} />
+        ),
+      },
+      {
+        title: "更新时间",
+        dataIndex: "updated_time",
+        width: 180,
+        render: (value) => value ?? "-",
+      },
+      {
+        title: "操作",
+        key: "actions",
+        width: 220,
+        fixed: "right",
+        render: (_, record) => (
+          <Space size="small">
+            <Button type="link" size="small" onClick={() => openEditModal(record)}>
+              修改
+            </Button>
+            <Button
+              type="link"
+              size="small"
+              loading={togglingId === record.strategy_id}
+              onClick={() => void handleToggleActive(record)}
+            >
+              {record.active ? "暂停" : "开始"}
+            </Button>
+          </Space>
+        ),
+      },
+    ],
+    [togglingId],
+  );
+
+  return (
+    <div className="page">
+      <div className="hero">
+        <Typography.Title level={2} className="hero-title">
+          需求策略配置
+        </Typography.Title>
+        <div className="hero-subtitle">
+          <Tag color="blue">策略管理</Tag>
+          <Tag color="cyan">参数配置</Tag>
+          <Tag color="green">启停控制</Tag>
+          <Tag color="purple">实验写入 ODPS</Tag>
+        </div>
+      </div>
+
+      <div className="dashboard-shell">
+        <div className="panel-sheet">
+          <section className="panel-section panel-section--table">
+            <div className="table-toolbar">
+              <Space wrap>
+                <Button
+                  type="primary"
+                  icon={<PlusOutlined />}
+                  onClick={openCreateModal}
+                  disabled={loading}
+                >
+                  添加策略
+                </Button>
+                <Button
+                  icon={<ReloadOutlined />}
+                  onClick={() => void fetchConfigs()}
+                  loading={loading}
+                >
+                  刷新
+                </Button>
+              </Space>
+            </div>
+
+            {error ? (
+              <Alert
+                type="error"
+                showIcon
+                message={error}
+                style={{ marginBottom: 16 }}
+              />
+            ) : null}
+
+            <Table<StrategyConfigItem>
+              rowKey="strategy_id"
+              loading={loading}
+              columns={columns}
+              dataSource={sortedItems}
+              pagination={false}
+              scroll={{ x: 1200 }}
+              locale={{ emptyText: "暂无策略配置,点击「添加策略」创建" }}
+            />
+          </section>
+        </div>
+      </div>
+
+      <Modal
+        title="添加策略配置"
+        open={createOpen}
+        onCancel={() => setCreateOpen(false)}
+        onOk={() => void handleCreate()}
+        confirmLoading={submitting}
+        okText="添加"
+        cancelText="取消"
+        destroyOnClose
+        width={640}
+      >
+        <Form form={createForm} layout="vertical">
+          <Form.Item
+            label="策略"
+            name="strategy_id"
+            rules={[{ required: true, message: "请选择策略" }]}
+          >
+            <Select
+              placeholder="选择要添加的策略"
+              options={availableItems.map((item) => ({
+                value: item.strategy_id,
+                label: `${item.name} (${item.strategy_id})`,
+              }))}
+              notFoundContent="暂无可添加策略"
+            />
+          </Form.Item>
+          <Form.Item
+            label="日写入上限"
+            name="daily_write_limit"
+            extra={DAILY_LIMIT_HINT}
+          >
+            <InputNumber min={0} precision={0} style={{ width: "100%" }} />
+          </Form.Item>
+          <Form.Item
+            label="优先级"
+            name="priority"
+            extra={PRIORITY_HINT}
+          >
+            <InputNumber min={0} precision={0} style={{ width: "100%" }} />
+          </Form.Item>
+          <Form.Item
+            label="运行参数 (JSON)"
+            name="paramsText"
+            rules={[{ required: true, message: "请输入运行参数" }]}
+            extra={PARAMS_HINT}
+          >
+            <Input.TextArea rows={8} placeholder="{}" />
+          </Form.Item>
+          <Form.Item label="创建后立即开始" name="active" valuePropName="checked">
+            <Switch />
+          </Form.Item>
+        </Form>
+      </Modal>
+
+      <Modal
+        title={editingItem ? `修改策略:${editingItem.name}` : "修改策略配置"}
+        open={editOpen}
+        onCancel={() => {
+          setEditOpen(false);
+          setEditingItem(null);
+        }}
+        onOk={() => void handleEdit()}
+        confirmLoading={submitting}
+        okText="保存"
+        cancelText="取消"
+        destroyOnClose
+        width={640}
+      >
+        <Form form={editForm} layout="vertical">
+          <Form.Item
+            label="日写入上限"
+            name="daily_write_limit"
+            extra={DAILY_LIMIT_HINT}
+          >
+            <InputNumber min={0} precision={0} style={{ width: "100%" }} />
+          </Form.Item>
+          <Form.Item
+            label="优先级"
+            name="priority"
+            extra={PRIORITY_HINT}
+          >
+            <InputNumber min={0} precision={0} style={{ width: "100%" }} />
+          </Form.Item>
+          <Form.Item
+            label="运行参数 (JSON)"
+            name="paramsText"
+            rules={[{ required: true, message: "请输入运行参数" }]}
+            extra={PARAMS_HINT}
+          >
+            <Input.TextArea rows={10} />
+          </Form.Item>
+          <Form.Item label="运行状态" name="active" valuePropName="checked">
+            <Switch checkedChildren="运行中" unCheckedChildren="已暂停" />
+          </Form.Item>
+        </Form>
+      </Modal>
+    </div>
+  );
+}

+ 15 - 0
frontend/src/strategy-config-main.tsx

@@ -0,0 +1,15 @@
+import React from "react";
+import ReactDOM from "react-dom/client";
+import "antd/dist/reset.css";
+import StrategyConfigApp from "./StrategyConfigApp";
+import DemandNavBar from "./DemandNavBar";
+import "./styles.css";
+
+ReactDOM.createRoot(document.getElementById("root")!).render(
+  <React.StrictMode>
+    <>
+      <DemandNavBar active="strategy-config" />
+      <StrategyConfigApp />
+    </>
+  </React.StrictMode>,
+);

+ 12 - 0
frontend/strategy-config.html

@@ -0,0 +1,12 @@
+<!doctype html>
+<html lang="zh-CN">
+  <head>
+    <meta charset="UTF-8" />
+    <meta name="viewport" content="width=device-width, initial-scale=1.0" />
+    <title>需求策略配置</title>
+  </head>
+  <body>
+    <div id="root"></div>
+    <script type="module" src="/src/strategy-config-main.tsx"></script>
+  </body>
+</html>

+ 1 - 0
frontend/vite.config.ts

@@ -9,6 +9,7 @@ export default defineConfig({
       input: {
         main: resolve(__dirname, "index.html"),
         categoryTree: resolve(__dirname, "category-tree.html"),
+        strategyConfig: resolve(__dirname, "strategy-config.html"),
       },
     },
   },

+ 227 - 0
tests/test_experiment_demand_pool_write.py

@@ -0,0 +1,227 @@
+"""实验需求池写入逻辑单元测试。"""
+
+from app.strategies.staging_store import StagingRow
+from app.sync.experiment_demand_pool_write import (
+    ExperimentStrategyContext,
+    _staging_row_to_odps_record,
+    select_rows_to_write,
+)
+
+
+def _row(
+    *,
+    strategy_id: str,
+    strategy_name: str,
+    demand_id: str,
+    demand_name: str,
+    weight: float,
+) -> StagingRow:
+    return StagingRow(
+        strategy_config_id=strategy_id,
+        strategy=strategy_name,
+        demand_id=demand_id,
+        demand_name=demand_name,
+        weight=weight,
+        demand_type="特征点",
+        video_count=1,
+        video_list=None,
+        extend=None,
+        batch_date="20260611",
+    )
+
+
+def test_select_rows_respects_daily_limit() -> None:
+    contexts = [
+        ExperimentStrategyContext(
+            strategy_id="monthly",
+            strategy_name="逐月",
+            priority=10,
+            daily_limit=1,
+            current_count=0,
+            staging_rows=[
+                _row(
+                    strategy_id="monthly",
+                    strategy_name="逐月",
+                    demand_id="a1",
+                    demand_name="需求A",
+                    weight=2.0,
+                ),
+                _row(
+                    strategy_id="monthly",
+                    strategy_name="逐月",
+                    demand_id="a2",
+                    demand_name="需求B",
+                    weight=1.0,
+                ),
+            ],
+        )
+    ]
+    selected, counts = select_rows_to_write(
+        strategies=contexts,
+        existing_demand_ids=set(),
+        claimed_names={},
+    )
+    assert len(selected) == 1
+    assert selected[0].demand_id == "a1"
+    assert counts["逐月"] == 1
+
+
+def test_select_rows_same_priority_no_name_dedup() -> None:
+    contexts = [
+        ExperimentStrategyContext(
+            strategy_id="s1",
+            strategy_name="策略1",
+            priority=10,
+            daily_limit=0,
+            current_count=0,
+            staging_rows=[
+                _row(
+                    strategy_id="s1",
+                    strategy_name="策略1",
+                    demand_id="id1",
+                    demand_name="同名需求",
+                    weight=1.0,
+                )
+            ],
+        ),
+        ExperimentStrategyContext(
+            strategy_id="s2",
+            strategy_name="策略2",
+            priority=10,
+            daily_limit=0,
+            current_count=0,
+            staging_rows=[
+                _row(
+                    strategy_id="s2",
+                    strategy_name="策略2",
+                    demand_id="id2",
+                    demand_name="同名需求",
+                    weight=1.0,
+                )
+            ],
+        ),
+    ]
+    selected, _ = select_rows_to_write(
+        strategies=contexts,
+        existing_demand_ids=set(),
+        claimed_names={},
+    )
+    assert len(selected) == 2
+
+
+def test_select_rows_different_priority_name_dedup() -> None:
+    contexts = [
+        ExperimentStrategyContext(
+            strategy_id="high",
+            strategy_name="高优",
+            priority=5,
+            daily_limit=0,
+            current_count=0,
+            staging_rows=[
+                _row(
+                    strategy_id="high",
+                    strategy_name="高优",
+                    demand_id="id1",
+                    demand_name="同名需求",
+                    weight=1.0,
+                )
+            ],
+        ),
+        ExperimentStrategyContext(
+            strategy_id="low",
+            strategy_name="低优",
+            priority=20,
+            daily_limit=0,
+            current_count=0,
+            staging_rows=[
+                _row(
+                    strategy_id="low",
+                    strategy_name="低优",
+                    demand_id="id2",
+                    demand_name="同名需求",
+                    weight=1.0,
+                )
+            ],
+        ),
+    ]
+    selected, _ = select_rows_to_write(
+        strategies=contexts,
+        existing_demand_ids=set(),
+        claimed_names={},
+    )
+    assert len(selected) == 1
+    assert selected[0].demand_id == "id1"
+
+
+def test_higher_priority_blocked_if_lower_priority_already_in_hive() -> None:
+    contexts = [
+        ExperimentStrategyContext(
+            strategy_id="high",
+            strategy_name="高优",
+            priority=3,
+            daily_limit=0,
+            current_count=0,
+            staging_rows=[
+                _row(
+                    strategy_id="high",
+                    strategy_name="高优",
+                    demand_id="id_high",
+                    demand_name="同名需求",
+                    weight=1.0,
+                )
+            ],
+        )
+    ]
+    selected, _ = select_rows_to_write(
+        strategies=contexts,
+        existing_demand_ids=set(),
+        claimed_names={"同名需求": {5}},
+    )
+    assert len(selected) == 0
+
+
+def test_unknown_hive_strategy_blocks_all_priorities() -> None:
+    contexts = [
+        ExperimentStrategyContext(
+            strategy_id="high",
+            strategy_name="高优",
+            priority=3,
+            daily_limit=0,
+            current_count=0,
+            staging_rows=[
+                _row(
+                    strategy_id="high",
+                    strategy_name="高优",
+                    demand_id="id_high",
+                    demand_name="同名需求",
+                    weight=1.0,
+                )
+            ],
+        )
+    ]
+    selected, _ = select_rows_to_write(
+        strategies=contexts,
+        existing_demand_ids=set(),
+        claimed_names={"同名需求": {"__unknown__"}},
+    )
+    assert len(selected) == 0
+
+
+def test_staging_row_to_odps_record_field_order_and_types() -> None:
+    record = _staging_row_to_odps_record(
+        _row(
+            strategy_id="monthly",
+            strategy_name="逐月",
+            demand_id="abc",
+            demand_name="需求A",
+            weight=1.5,
+        )
+    )
+    assert record[0] == "逐月"
+    assert record[1] == "abc"
+    assert record[2] == "需求A"
+    assert record[3] == 1.5
+    assert record[4] == "特征点"
+    assert record[5] == 1
+    assert record[6] is None
+    assert record[7] is None