| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873 |
- """
- 安全护栏引擎 — auto_put_ad_mini
- 7 道护栏按顺序执行:
- 1. ColdStartGuardrail — 冷启动保护
- 2. DataFreshnessGuardrail — 数据新鲜度校验
- 3. BidBoundaryGuardrail — 出价边界钳位(0.05~1.00元)
- 3.5 BidRangeGuardrail — 调幅范围钳位(提价5-10%, 降价3-5%)
- 4. RateLimitGuardrail — 频率限制(每日次数/间隔/累计调幅)
- 5. DailyOpsCapGuardrail — 每日操作总量上限
- 6. DryRunGuardrail — 干运行模式
- 每道护栏输出:approved / blocked / modified
- blocked = 阻止操作,modified = 自动修正参数后放行
- """
- import json
- import logging
- import sys
- from abc import ABC, abstractmethod
- from dataclasses import dataclass, field
- from datetime import datetime, timedelta
- from pathlib import Path
- from typing import Dict, List, Optional
- from zoneinfo import ZoneInfo
- import pandas as pd
- from agent.tools import tool
- from agent.tools.models import ToolContext, ToolResult
- _MINI_DIR = Path(__file__).resolve().parent.parent
- _TOOLS_DIR = Path(__file__).resolve().parent
- if str(_MINI_DIR) not in sys.path:
- sys.path.insert(0, str(_MINI_DIR))
- if str(_TOOLS_DIR) not in sys.path:
- sys.path.insert(0, str(_TOOLS_DIR))
- from config import (
- COLD_START_DAYS,
- CAUTIOUS_DAYS,
- BID_FLOOR_YUAN,
- BID_CEILING_YUAN,
- BID_UP_MIN_PCT,
- BID_UP_MAX_PCT,
- BID_DOWN_MIN_PCT,
- BID_DOWN_MAX_PCT,
- BID_UP_ROI_FACTOR,
- ROI_LOW_FACTOR,
- MAX_ADJUSTMENTS_PER_AD_PER_DAY,
- MIN_ADJUSTMENT_INTERVAL_HOURS,
- MAX_DAILY_CUMULATIVE_CHANGE_PCT,
- MAX_DAILY_OPS,
- DATA_FRESHNESS_MAX_HOURS,
- ADJUSTMENT_HISTORY_PATH,
- DRY_RUN_MODE,
- GUARDRAILS_ENABLED,
- DATA_DIR,
- TIMEZONE,
- )
- logger = logging.getLogger(__name__)
- # ═══════════════════════════════════════════
- # 调整历史持久化
- # ═══════════════════════════════════════════
- class AdjustmentHistory:
- """广告调整历史记录(JSON 文件持久化)。"""
- def __init__(self, path: Path = ADJUSTMENT_HISTORY_PATH):
- self._path = path
- self._data: Dict[str, Dict] = {}
- self._load()
- def _load(self):
- if self._path.exists():
- try:
- self._data = json.loads(self._path.read_text(encoding="utf-8"))
- except Exception as e:
- logger.warning("加载调整历史失败,使用空记录: %s", e)
- self._data = {}
- def _save(self):
- self._path.parent.mkdir(parents=True, exist_ok=True)
- self._path.write_text(
- json.dumps(self._data, ensure_ascii=False, indent=2),
- encoding="utf-8",
- )
- def get_today_adjustments(self, ad_id: str) -> List[Dict]:
- """获取某广告今天的调整记录。"""
- today = datetime.now().strftime("%Y-%m-%d")
- record = self._data.get(str(ad_id), {})
- adjustments = record.get("adjustments", [])
- return [a for a in adjustments if a.get("ts", "").startswith(today)]
- def get_last_adjustment_ts(self, ad_id: str) -> Optional[datetime]:
- """获取某广告最后一次调整的时间。"""
- record = self._data.get(str(ad_id), {})
- last_ts = record.get("last_ts")
- if last_ts:
- try:
- return datetime.fromisoformat(last_ts)
- except ValueError:
- return None
- return None
- def get_cumulative_pct_today(self, ad_id: str) -> float:
- """获取某广告今天的累计调幅绝对值。"""
- today_adj = self.get_today_adjustments(str(ad_id))
- return sum(abs(a.get("pct", 0)) for a in today_adj)
- def record_adjustment(self, ad_id: str, action: str, pct: float):
- """记录一次调整。"""
- ad_key = str(ad_id)
- now = datetime.now().isoformat()
- if ad_key not in self._data:
- self._data[ad_key] = {"adjustments": [], "last_ts": None}
- self._data[ad_key]["adjustments"].append({
- "ts": now,
- "action": action,
- "pct": pct,
- })
- self._data[ad_key]["last_ts"] = now
- # 只保留最近 14 天的记录(扩展:支持"持续低ROI升级关停"需查7天前降价记录)
- cutoff = (datetime.now() - timedelta(days=14)).isoformat()
- self._data[ad_key]["adjustments"] = [
- a for a in self._data[ad_key]["adjustments"]
- if a.get("ts", "") >= cutoff
- ]
- self._save()
- def get_today_total_ops(self) -> int:
- """获取今天已操作的广告总数。"""
- today = datetime.now().strftime("%Y-%m-%d")
- count = 0
- for ad_key, record in self._data.items():
- adjustments = record.get("adjustments", [])
- if any(a.get("ts", "").startswith(today) for a in adjustments):
- count += 1
- return count
- def get_last_bid_down_ts(self, ad_id: str) -> Optional[datetime]:
- """获取某广告最近一次 bid_down 的时间(用于"持续低ROI升级关停"判断)。"""
- record = self._data.get(str(ad_id), {})
- adjustments = record.get("adjustments", [])
- bid_downs = [a for a in adjustments if a.get("action") == "bid_down"]
- if bid_downs:
- last = max(bid_downs, key=lambda a: a.get("ts", ""))
- try:
- return datetime.fromisoformat(last["ts"])
- except ValueError:
- return None
- return None
- # ═══════════════════════════════════════════
- # 护栏检查结果
- # ═══════════════════════════════════════════
- @dataclass
- class GuardrailResult:
- """单个护栏的检查结果。"""
- status: str # "approved" / "blocked" / "modified"
- reason: str
- modified_action: Optional[str] = None
- modified_bid: Optional[float] = None
- modified_change_pct: Optional[float] = None
- # ═══════════════════════════════════════════
- # 护栏基类
- # ═══════════════════════════════════════════
- class Guardrail(ABC):
- """护栏基类。"""
- @property
- @abstractmethod
- def name(self) -> str:
- pass
- @abstractmethod
- def check(self, row: pd.Series, history: AdjustmentHistory) -> GuardrailResult:
- """
- 检查单个决策是否通过护栏。
- Args:
- row: 决策行(包含 action, ad_id, recommended_change_pct 等)
- history: 调整历史记录
- Returns:
- GuardrailResult
- """
- pass
- # ═══════════════════════════════════════════
- # 护栏 0: 决策一致性自检(action ↔ pct ↔ ROI 方向对齐)
- # ═══════════════════════════════════════════
- class ActionConsistencyGuardrail(Guardrail):
- """决策一致性护栏:拦住 LLM 的自相矛盾输出。
- 分两个子系列:
- A. 方向一致性(action ↔ pct 符号)
- A1: action=bid_down 必须 pct < 0(否则改 hold)
- A2: action=bid_up 必须 pct > 0
- A3: action=hold/observe 必须 pct = 0
- B. ROI 水位一致性(action ↔ 广告 ROI vs 渠道P50)
- B1: bid_down 且 dynamic_roi_7d >= 渠道P50 × 1.05 → 改 hold(Top 1 实测事故)
- B2: bid_up 且 dynamic_roi_7d < 渠道P50 × 0.75 → 改 hold(低效广告不能加码)
- B3: scale_up 且 dynamic_roi_7d < 渠道P50 × 0.90 → 改 observe(扩量先证明效率)
- 依赖:
- - row["动态ROI_7日均值"]:广告自身 ROI(由 ad_decision 合并到 decisions CSV)
- - row["_channel_roi_p50"]:渠道P50(由 _run_guardrails 从 metrics 算后注入)
- 任一值缺失时 B 系列自动跳过(降级为纯 A 系列),保证旧数据仍可通过。
- """
- @property
- def name(self) -> str:
- return "决策一致性"
- @staticmethod
- def _to_float(val) -> Optional[float]:
- """安全转 float:None/NaN/空串/非数字 → None。"""
- if val is None or val == "":
- return None
- try:
- f = float(val)
- if pd.isna(f):
- return None
- return f
- except (ValueError, TypeError):
- return None
- def check(self, row: pd.Series, history: AdjustmentHistory) -> GuardrailResult:
- action = str(row.get("action", "hold")).strip()
- # pct 标准化(None/NaN/非数字 → 0)
- pct = self._to_float(row.get("recommended_change_pct")) or 0.0
- # ==================== A 系列:方向一致性 ====================
- if action == "bid_down" and pct >= 0:
- return GuardrailResult(
- status="modified",
- reason=f"方向冲突:action=bid_down 但 pct={pct:+.2%}(应为负)→ 改 hold",
- modified_action="hold",
- modified_change_pct=0.0,
- modified_bid=None,
- )
- if action == "bid_up" and pct <= 0:
- return GuardrailResult(
- status="modified",
- reason=f"方向冲突:action=bid_up 但 pct={pct:+.2%}(应为正)→ 改 hold",
- modified_action="hold",
- modified_change_pct=0.0,
- modified_bid=None,
- )
- if action in ("hold", "observe") and abs(pct) > 1e-6:
- return GuardrailResult(
- status="modified",
- reason=f"维持类动作不应改出价:action={action} 但 pct={pct:+.2%} → pct 归零",
- modified_action=action,
- modified_change_pct=0.0,
- modified_bid=None,
- )
- # ==================== B4: 裂变-关停冲突 ====================
- # pause 但裂变率显著优于同类(>1.10×)→ 改 observe(裂变好说明广告质量有潜力)
- if action == "pause":
- ad_fission = self._to_float(row.get("_ad_fission"))
- tier_fission_mean = self._to_float(row.get("_tier_fission_mean"))
- if (
- ad_fission is not None
- and tier_fission_mean is not None
- and tier_fission_mean > 0
- and ad_fission > tier_fission_mean * 1.10
- ):
- return GuardrailResult(
- status="modified",
- reason=(
- f"裂变-关停冲突:裂变率={ad_fission:.2f} > 同类均值{tier_fission_mean:.2f}×1.10"
- f"={tier_fission_mean * 1.10:.2f}(裂变优秀,广告质量有潜力),"
- f"不应关停 → 改 observe"
- ),
- modified_action="observe",
- modified_change_pct=0.0,
- modified_bid=None,
- )
- return GuardrailResult(status="approved", reason="")
- # ═══════════════════════════════════════════
- # 护栏 1: 冷启动保护
- # ═══════════════════════════════════════════
- class ColdStartGuardrail(Guardrail):
- """冷启动保护:0-4天不做负向操作,4-7天仅允许小幅降价。"""
- @property
- def name(self) -> str:
- return "冷启动保护"
- def check(self, row: pd.Series, history: AdjustmentHistory) -> GuardrailResult:
- action = row.get("action", "hold")
- ad_age = row.get("ad_age_days")
- if ad_age is None or action == "hold":
- return GuardrailResult(status="approved", reason="")
- # 冷启动期(≤3天):极度保护,禁止所有操作
- if ad_age <= COLD_START_DAYS:
- if action in ("pause", "bid_down", "bid_up"):
- return GuardrailResult(
- status="blocked",
- reason=f"冷启动期({ad_age}天 ≤ {COLD_START_DAYS}天),极度保护,禁止{action}",
- modified_action="hold",
- )
- # 早期成长期(4-7天):仅允许提价
- elif ad_age <= CAUTIOUS_DAYS:
- # 早期成长期(4-7天):仅允许提价
- if action in ("pause", "bid_down"):
- return GuardrailResult(
- status="blocked",
- reason=f"早期成长期({ad_age}天,4-{CAUTIOUS_DAYS}天),仅允许提价,禁止{action}",
- modified_action="hold",
- )
- return GuardrailResult(status="approved", reason="")
- # ═══════════════════════════════════════════
- # 护栏 2: 数据新鲜度
- # ═══════════════════════════════════════════
- class DataFreshnessGuardrail(Guardrail):
- """数据新鲜度校验:数据超过 26 小时视为过期。"""
- @property
- def name(self) -> str:
- return "数据新鲜度"
- def check(self, row: pd.Series, history: AdjustmentHistory) -> GuardrailResult:
- action = row.get("action", "hold")
- if action == "hold":
- return GuardrailResult(status="approved", reason="")
- data_date = row.get("_data_date") # 由工具注入
- if data_date:
- try:
- data_dt = datetime.strptime(str(data_date), "%Y%m%d")
- # 使用时区感知的 datetime(支持海外部署)
- now = datetime.now(ZoneInfo(TIMEZONE))
- # data_dt 需要本地化到相同时区
- data_dt_aware = data_dt.replace(tzinfo=ZoneInfo(TIMEZONE))
- hours_old = (now - data_dt_aware).total_seconds() / 3600
- if hours_old > DATA_FRESHNESS_MAX_HOURS:
- return GuardrailResult(
- status="blocked",
- reason=f"数据已过期({hours_old:.0f}小时前,上限{DATA_FRESHNESS_MAX_HOURS}小时),阻止所有操作",
- modified_action="hold",
- )
- except ValueError:
- pass
- return GuardrailResult(status="approved", reason="")
- # ═══════════════════════════════════════════
- # 护栏 3: 出价边界
- # ═══════════════════════════════════════════
- class BidBoundaryGuardrail(Guardrail):
- """出价边界检查:钳位到 [BID_FLOOR, BID_CEILING]。"""
- @property
- def name(self) -> str:
- return "出价边界"
- def check(self, row: pd.Series, history: AdjustmentHistory) -> GuardrailResult:
- action = row.get("action", "hold")
- if action not in ("bid_up", "bid_down"):
- return GuardrailResult(status="approved", reason="")
- recommended_bid = row.get("recommended_bid")
- if recommended_bid is None or recommended_bid == "":
- return GuardrailResult(status="approved", reason="")
- recommended_bid = float(recommended_bid)
- current_bid = float(row.get("current_bid", 0) or 0)
- if recommended_bid < BID_FLOOR_YUAN:
- new_bid = BID_FLOOR_YUAN
- new_pct = (new_bid - current_bid) / current_bid if current_bid > 0 else 0
- return GuardrailResult(
- status="modified",
- reason=f"出价{recommended_bid:.2f}元低于下限{BID_FLOOR_YUAN}元,钳位至{new_bid:.2f}元",
- modified_bid=new_bid,
- modified_change_pct=round(new_pct, 4),
- )
- elif recommended_bid > BID_CEILING_YUAN:
- new_bid = BID_CEILING_YUAN
- new_pct = (new_bid - current_bid) / current_bid if current_bid > 0 else 0
- return GuardrailResult(
- status="modified",
- reason=f"出价{recommended_bid:.2f}元超过上限{BID_CEILING_YUAN}元,钳位至{new_bid:.2f}元",
- modified_bid=new_bid,
- modified_change_pct=round(new_pct, 4),
- )
- return GuardrailResult(status="approved", reason="")
- # ═══════════════════════════════════════════
- # 护栏 3.5: 调幅范围钳位
- # ═══════════════════════════════════════════
- class BidRangeGuardrail(Guardrail):
- """调幅范围钳位:bid_up 钳位到 [5%, 10%],bid_down 钳位到 [-5%, -3%]。"""
- @property
- def name(self) -> str:
- return "调幅范围"
- def check(self, row: pd.Series, history: AdjustmentHistory) -> GuardrailResult:
- action = row.get("action", "hold")
- if action not in ("bid_up", "bid_down"):
- return GuardrailResult(status="approved", reason="")
- change_pct = row.get("recommended_change_pct")
- if change_pct is None or change_pct == "":
- return GuardrailResult(status="approved", reason="")
- change_pct = float(change_pct)
- current_bid = float(row.get("current_bid", 0) or 0)
- if action == "bid_up":
- # 提价:钳位到 [BID_UP_MIN_PCT, BID_UP_MAX_PCT]
- clamped = max(BID_UP_MIN_PCT, min(BID_UP_MAX_PCT, change_pct))
- if abs(clamped - change_pct) > 0.001:
- new_bid = round(current_bid * (1 + clamped), 2) if current_bid > 0 else None
- return GuardrailResult(
- status="modified",
- reason=f"提价调幅从{change_pct*100:.1f}%钳位至{clamped*100:.1f}%(范围{BID_UP_MIN_PCT*100:.0f}%-{BID_UP_MAX_PCT*100:.0f}%)",
- modified_change_pct=round(clamped, 4),
- modified_bid=new_bid,
- )
- elif action == "bid_down":
- # 降价:change_pct 应为负数,钳位到 [-BID_DOWN_MAX_PCT, -BID_DOWN_MIN_PCT]
- clamped = min(-BID_DOWN_MIN_PCT, max(-BID_DOWN_MAX_PCT, change_pct))
- if abs(clamped - change_pct) > 0.001:
- new_bid = round(current_bid * (1 + clamped), 2) if current_bid > 0 else None
- return GuardrailResult(
- status="modified",
- reason=f"降价调幅从{change_pct*100:.1f}%钳位至{clamped*100:.1f}%(范围-{BID_DOWN_MAX_PCT*100:.0f}%~-{BID_DOWN_MIN_PCT*100:.0f}%)",
- modified_change_pct=round(clamped, 4),
- modified_bid=new_bid,
- )
- return GuardrailResult(status="approved", reason="")
- # ═══════════════════════════════════════════
- # 护栏 4: 频率限制
- # ═══════════════════════════════════════════
- class RateLimitGuardrail(Guardrail):
- """频率限制:每日次数/间隔/累计调幅。"""
- @property
- def name(self) -> str:
- return "频率限制"
- def check(self, row: pd.Series, history: AdjustmentHistory) -> GuardrailResult:
- action = row.get("action", "hold")
- if action not in ("bid_up", "bid_down", "pause"):
- return GuardrailResult(status="approved", reason="")
- ad_id = str(row.get("ad_id", ""))
- # 今日已调整次数
- today_adj = history.get_today_adjustments(ad_id)
- if len(today_adj) >= MAX_ADJUSTMENTS_PER_AD_PER_DAY:
- return GuardrailResult(
- status="blocked",
- reason=f"今日已调整{len(today_adj)}次(上限{MAX_ADJUSTMENTS_PER_AD_PER_DAY}次)",
- modified_action="hold",
- )
- # 距上次调整间隔
- last_ts = history.get_last_adjustment_ts(ad_id)
- if last_ts:
- hours_since = (datetime.now() - last_ts).total_seconds() / 3600
- if hours_since < MIN_ADJUSTMENT_INTERVAL_HOURS:
- return GuardrailResult(
- status="blocked",
- reason=f"距上次调整仅{hours_since:.1f}小时(最小间隔{MIN_ADJUSTMENT_INTERVAL_HOURS}小时)",
- modified_action="hold",
- )
- # 日累计调幅
- if action in ("bid_up", "bid_down"):
- change_pct = row.get("recommended_change_pct", 0)
- if isinstance(change_pct, str):
- try:
- change_pct = float(change_pct)
- except ValueError:
- change_pct = 0
- cumulative = history.get_cumulative_pct_today(ad_id)
- if cumulative + abs(change_pct) > MAX_DAILY_CUMULATIVE_CHANGE_PCT:
- remaining = MAX_DAILY_CUMULATIVE_CHANGE_PCT - cumulative
- if remaining <= 0:
- return GuardrailResult(
- status="blocked",
- reason=f"日累计调幅已达{cumulative*100:.1f}%(上限{MAX_DAILY_CUMULATIVE_CHANGE_PCT*100:.0f}%)",
- modified_action="hold",
- )
- else:
- # 修正调幅
- direction = 1 if change_pct > 0 else -1
- new_pct = direction * remaining
- current_bid = float(row.get("current_bid", 0) or 0)
- new_bid = round(current_bid * (1 + new_pct), 2) if current_bid > 0 else None
- return GuardrailResult(
- status="modified",
- reason=f"调幅从{abs(change_pct)*100:.1f}%缩减至{abs(new_pct)*100:.1f}%(日累计限制)",
- modified_change_pct=round(new_pct, 4),
- modified_bid=new_bid,
- )
- return GuardrailResult(status="approved", reason="")
- # ═══════════════════════════════════════════
- # 护栏 5: 每日操作总量上限
- # ═══════════════════════════════════════════
- class DailyOpsCapGuardrail(Guardrail):
- """每日操作总量上限:单日最多操作 N 个广告。"""
- @property
- def name(self) -> str:
- return "每日操作上限"
- def __init__(self):
- self._approved_count = 0
- def check(self, row: pd.Series, history: AdjustmentHistory) -> GuardrailResult:
- action = row.get("action", "hold")
- if action == "hold":
- return GuardrailResult(status="approved", reason="")
- total_ops = history.get_today_total_ops() + self._approved_count
- if total_ops >= MAX_DAILY_OPS:
- return GuardrailResult(
- status="blocked",
- reason=f"今日已操作{total_ops}个广告(上限{MAX_DAILY_OPS}个),请按ROI严重度排优先级",
- modified_action="hold",
- )
- self._approved_count += 1
- return GuardrailResult(status="approved", reason="")
- # ═══════════════════════════════════════════
- # 护栏 6: 干运行模式
- # ═══════════════════════════════════════════
- class DryRunGuardrail(Guardrail):
- """干运行模式:全部标记为 dry_run。"""
- @property
- def name(self) -> str:
- return "干运行模式"
- def check(self, row: pd.Series, history: AdjustmentHistory) -> GuardrailResult:
- action = row.get("action", "hold")
- if action == "hold":
- return GuardrailResult(status="approved", reason="")
- if DRY_RUN_MODE:
- return GuardrailResult(
- status="modified",
- reason="干运行模式:操作不会实际执行",
- )
- return GuardrailResult(status="approved", reason="")
- # ═══════════════════════════════════════════
- # 护栏引擎
- # ═══════════════════════════════════════════
- def _run_guardrails(
- df: pd.DataFrame,
- data_date: str,
- dry_run: bool = False,
- channel_roi_p50: Optional[float] = None,
- ) -> pd.DataFrame:
- """
- 对决策 DataFrame 执行 2 道护栏检查。
- 新增列:
- - guardrail_status: approved / blocked / modified
- - guardrail_reason: 护栏说明
- - final_action: 护栏修正后的最终动作
- - final_bid: 护栏修正后的最终出价
- Args:
- df: 决策 DataFrame
- data_date: 数据日期(YYYYMMDD)
- dry_run: 是否强制干运行
- channel_roi_p50: 渠道P50(全体广告动态ROI_7日均值的中位数),
- 用于 ActionConsistencyGuardrail B 系列规则。缺省时 B 系列跳过。
- """
- history = AdjustmentHistory()
- guardrails = [
- # 暂时全部关闭,测试 LLM 裸输出 + 字段精简的效果
- ]
- # 注入数据日期 + 渠道P50(给 ActionConsistencyGuardrail B 系列用)
- df["_data_date"] = data_date
- if channel_roi_p50 is not None and channel_roi_p50 > 0:
- df["_channel_roi_p50"] = channel_roi_p50
- statuses = []
- reasons = []
- final_actions = []
- final_bids = []
- for _, row in df.iterrows():
- action = row.get("action", "hold")
- current_status = "approved"
- current_reasons = []
- current_action = action
- current_bid = row.get("recommended_bid")
- current_change_pct = row.get("recommended_change_pct")
- if action == "hold":
- statuses.append("approved")
- reasons.append("")
- final_actions.append("hold")
- final_bids.append(None)
- continue
- for guardrail in guardrails:
- # 构建可变行用于护栏检查
- check_row = row.copy()
- if current_bid is not None:
- check_row["recommended_bid"] = current_bid
- if current_change_pct is not None:
- check_row["recommended_change_pct"] = current_change_pct
- check_row["action"] = current_action
- result = guardrail.check(check_row, history)
- if result.status == "blocked":
- current_status = "blocked"
- current_reasons.append(f"[{guardrail.name}] {result.reason}")
- current_action = result.modified_action or "hold"
- break
- elif result.status == "modified":
- current_status = "modified"
- current_reasons.append(f"[{guardrail.name}] {result.reason}")
- if result.modified_action:
- current_action = result.modified_action
- if result.modified_bid is not None:
- current_bid = result.modified_bid
- if result.modified_change_pct is not None:
- current_change_pct = result.modified_change_pct
- statuses.append(current_status)
- reasons.append("; ".join(current_reasons))
- final_actions.append(current_action)
- final_bids.append(current_bid if current_action in ("bid_up", "bid_down") else None)
- df["guardrail_status"] = statuses
- df["guardrail_reason"] = reasons
- df["final_action"] = final_actions
- df["final_bid"] = final_bids
- # 清理临时列
- df.drop(columns=["_data_date", "_channel_roi_p50"], inplace=True, errors="ignore")
- return df
- # ═══════════════════════════════════════════
- # 工具:验证决策安全性
- # ═══════════════════════════════════════════
- @tool(description="验证决策安全性:冷启动保护、出价边界、频率限制、数据新鲜度")
- async def validate_decisions(
- ctx: ToolContext = None,
- decisions_csv: str = "",
- end_date: str = "yesterday",
- dry_run: bool = False,
- ) -> ToolResult:
- """
- 对每个决策执行 6 道护栏检查。
- 输入:apply_decisions 输出的 llm_decisions CSV
- 输出:validated_decisions_{date}.csv,新增列 guardrail_status / guardrail_reason / final_action / final_bid
- Args:
- decisions_csv: 决策 CSV 路径(默认最新的 llm_decisions)
- end_date: 结束日期
- dry_run: 是否强制干运行模式
- """
- try:
- if not GUARDRAILS_ENABLED:
- return ToolResult(
- title="护栏已禁用",
- output="GUARDRAILS_ENABLED=False,跳过护栏验证",
- )
- if end_date == "yesterday":
- end_date = (datetime.now() - timedelta(days=1)).strftime("%Y%m%d")
- # 自动查找最新决策 CSV(按修改时间排序,而非文件名)
- if not decisions_csv:
- reports_dir = _MINI_DIR / "outputs" / "reports"
- candidates = sorted(
- reports_dir.glob("llm_decisions_*.csv"),
- key=lambda p: p.stat().st_mtime, # 按修改时间排序
- reverse=True
- )
- if not candidates:
- return ToolResult(title="validate_decisions", output="未找到决策 CSV")
- decisions_csv = str(candidates[0])
- df = pd.read_csv(decisions_csv)
- if df.empty:
- return ToolResult(title="validate_decisions", output="决策数据为空")
- # 读取一次 metrics 做补字段和渠道P50(供 ActionConsistencyGuardrail B 系列用)
- metrics_csv = _MINI_DIR / "outputs" / "metrics_temp.csv"
- channel_roi_p50: Optional[float] = None
- if metrics_csv.exists():
- try:
- df_metrics = pd.read_csv(metrics_csv)
- # 补充广告年龄(如果缺失)
- if "ad_age_days" not in df.columns and "create_time" in df_metrics.columns:
- from ad_decision import _calculate_ad_age_days
- df_metrics["ad_age_days"] = df_metrics["create_time"].apply(_calculate_ad_age_days)
- age_map = df_metrics.set_index("ad_id")["ad_age_days"].to_dict()
- df["ad_age_days"] = df["ad_id"].map(age_map)
- # 补充当前出价(如果缺失)
- if ("current_bid" not in df.columns or df["current_bid"].isna().all()) \
- and "bid_amount" in df_metrics.columns:
- bid_map = df_metrics.set_index("ad_id")["bid_amount"].to_dict()
- df["current_bid"] = df["ad_id"].map(bid_map)
- # 计算渠道P50:全体广告"动态ROI_7日均值"的中位数
- if "动态ROI_7日均值" in df_metrics.columns:
- roi_series = df_metrics["动态ROI_7日均值"].dropna()
- if len(roi_series) > 0:
- channel_roi_p50 = float(roi_series.median())
- logger.info(f"护栏 B 系列启用:渠道P50={channel_roi_p50:.4f}")
- # 注入裂变字段(供 B4 护栏使用)
- if "T0裂变系数_7日均值" in df_metrics.columns and "audience_tier" in df_metrics.columns:
- # ad_fission: 每条广告自身的裂变率
- fission_map = df_metrics.set_index("ad_id")["T0裂变系数_7日均值"].to_dict()
- df["_ad_fission"] = df["ad_id"].map(fission_map)
- # tier_fission_mean: 按人群包分组的同类均值
- tier_fission = df_metrics.groupby("audience_tier")["T0裂变系数_7日均值"].mean().to_dict()
- tier_map = df_metrics.set_index("ad_id")["audience_tier"].to_dict()
- df["_tier_fission_mean"] = df["ad_id"].map(
- lambda aid: tier_fission.get(tier_map.get(aid))
- )
- b4_ready = df["_ad_fission"].notna().sum()
- logger.info(f"护栏 B4 裂变字段已注入:{b4_ready} 条广告有裂变数据")
- except Exception as e:
- logger.warning(f"读取 metrics 失败(护栏 B 系列将跳过): {e}")
- if channel_roi_p50 is None:
- logger.warning("未能计算渠道P50,ActionConsistencyGuardrail B 系列规则跳过")
- # 运行护栏链
- df = _run_guardrails(df, data_date=end_date, dry_run=dry_run, channel_roi_p50=channel_roi_p50)
- # 保存验证结果
- reports_dir = _MINI_DIR / "outputs" / "reports"
- reports_dir.mkdir(parents=True, exist_ok=True)
- out_path = reports_dir / f"validated_decisions_{end_date}.csv"
- df.to_csv(out_path, index=False, encoding="utf-8-sig")
- # 统计
- total = len(df)
- approved = (df["guardrail_status"] == "approved").sum()
- blocked = (df["guardrail_status"] == "blocked").sum()
- modified = (df["guardrail_status"] == "modified").sum()
- # 最终动作统计
- final_pause = (df["final_action"] == "pause").sum()
- final_hold = (df["final_action"] == "hold").sum()
- final_bid_up = (df["final_action"] == "bid_up").sum()
- final_bid_down = (df["final_action"] == "bid_down").sum()
- output_lines = [
- f"护栏验证完成: {out_path}",
- "",
- f"护栏结果:",
- f" approved: {approved} 个(直接通过)",
- f" modified: {modified} 个(参数修正后通过)",
- f" blocked: {blocked} 个(被拦截→hold)",
- "",
- f"最终动作分布:",
- f" pause: {final_pause} 个",
- f" bid_down: {final_bid_down} 个",
- f" bid_up: {final_bid_up} 个",
- f" hold: {final_hold} 个",
- ]
- if DRY_RUN_MODE or dry_run:
- output_lines.append("")
- output_lines.append("⚠️ 当前为干运行模式(DRY_RUN),操作不会实际执行")
- return ToolResult(
- title=f"护栏验证({total}条,拦截{blocked})",
- output="\n".join(output_lines),
- metadata={
- "csv_path": str(out_path),
- "total": total,
- "approved": int(approved),
- "blocked": int(blocked),
- "modified": int(modified),
- "final_pause": int(final_pause),
- "final_hold": int(final_hold),
- "final_bid_up": int(final_bid_up),
- "final_bid_down": int(final_bid_down),
- "dry_run": DRY_RUN_MODE or dry_run,
- },
- )
- except Exception as e:
- logger.error("validate_decisions 失败: %s", e, exc_info=True)
- return ToolResult(title="validate_decisions 失败", output=str(e))
|