""" 安全护栏引擎 — auto_put_ad_mini 7 道护栏按顺序执行: 1. ColdStartGuardrail — 冷启动保护 2. DataFreshnessGuardrail — 数据新鲜度校验 3. BidBoundaryGuardrail — 出价边界钳位(0.05~1.00元) 3.5 BidRangeGuardrail — 调幅范围钳位(提价5-10%, 降价3-5%) 4. RateLimitGuardrail — 频率限制(每日次数/间隔/累计调幅) 5. DailyOpsCapGuardrail — 每日操作总量上限 6. DryRunGuardrail — 干运行模式 每道护栏输出:approved / blocked / modified blocked = 阻止操作,modified = 自动修正参数后放行 """ import json import logging import sys from abc import ABC, abstractmethod from dataclasses import dataclass, field from datetime import datetime, timedelta from pathlib import Path from typing import Dict, List, Optional from zoneinfo import ZoneInfo import pandas as pd from agent.tools import tool from agent.tools.models import ToolContext, ToolResult _MINI_DIR = Path(__file__).resolve().parent.parent _TOOLS_DIR = Path(__file__).resolve().parent if str(_MINI_DIR) not in sys.path: sys.path.insert(0, str(_MINI_DIR)) if str(_TOOLS_DIR) not in sys.path: sys.path.insert(0, str(_TOOLS_DIR)) from config import ( COLD_START_DAYS, CAUTIOUS_DAYS, BID_FLOOR_YUAN, BID_CEILING_YUAN, BID_UP_MIN_PCT, BID_UP_MAX_PCT, BID_DOWN_MIN_PCT, BID_DOWN_MAX_PCT, BID_UP_ROI_FACTOR, ROI_LOW_FACTOR, MAX_ADJUSTMENTS_PER_AD_PER_DAY, MIN_ADJUSTMENT_INTERVAL_HOURS, MAX_DAILY_CUMULATIVE_CHANGE_PCT, MAX_DAILY_OPS, DATA_FRESHNESS_MAX_HOURS, ADJUSTMENT_HISTORY_PATH, DRY_RUN_MODE, GUARDRAILS_ENABLED, DATA_DIR, TIMEZONE, ) logger = logging.getLogger(__name__) # ═══════════════════════════════════════════ # 调整历史持久化 # ═══════════════════════════════════════════ class AdjustmentHistory: """广告调整历史记录(JSON 文件持久化)。""" def __init__(self, path: Path = ADJUSTMENT_HISTORY_PATH): self._path = path self._data: Dict[str, Dict] = {} self._load() def _load(self): if self._path.exists(): try: self._data = json.loads(self._path.read_text(encoding="utf-8")) except Exception as e: logger.warning("加载调整历史失败,使用空记录: %s", e) self._data = {} def _save(self): self._path.parent.mkdir(parents=True, exist_ok=True) self._path.write_text( json.dumps(self._data, ensure_ascii=False, indent=2), encoding="utf-8", ) def get_today_adjustments(self, ad_id: str) -> List[Dict]: """获取某广告今天的调整记录。""" today = datetime.now().strftime("%Y-%m-%d") record = self._data.get(str(ad_id), {}) adjustments = record.get("adjustments", []) return [a for a in adjustments if a.get("ts", "").startswith(today)] def get_last_adjustment_ts(self, ad_id: str) -> Optional[datetime]: """获取某广告最后一次调整的时间。""" record = self._data.get(str(ad_id), {}) last_ts = record.get("last_ts") if last_ts: try: return datetime.fromisoformat(last_ts) except ValueError: return None return None def get_cumulative_pct_today(self, ad_id: str) -> float: """获取某广告今天的累计调幅绝对值。""" today_adj = self.get_today_adjustments(str(ad_id)) return sum(abs(a.get("pct", 0)) for a in today_adj) def record_adjustment(self, ad_id: str, action: str, pct: float): """记录一次调整。""" ad_key = str(ad_id) now = datetime.now().isoformat() if ad_key not in self._data: self._data[ad_key] = {"adjustments": [], "last_ts": None} self._data[ad_key]["adjustments"].append({ "ts": now, "action": action, "pct": pct, }) self._data[ad_key]["last_ts"] = now # 只保留最近 14 天的记录(扩展:支持"持续低ROI升级关停"需查7天前降价记录) cutoff = (datetime.now() - timedelta(days=14)).isoformat() self._data[ad_key]["adjustments"] = [ a for a in self._data[ad_key]["adjustments"] if a.get("ts", "") >= cutoff ] self._save() def get_today_total_ops(self) -> int: """获取今天已操作的广告总数。""" today = datetime.now().strftime("%Y-%m-%d") count = 0 for ad_key, record in self._data.items(): adjustments = record.get("adjustments", []) if any(a.get("ts", "").startswith(today) for a in adjustments): count += 1 return count def get_last_bid_down_ts(self, ad_id: str) -> Optional[datetime]: """获取某广告最近一次 bid_down 的时间(用于"持续低ROI升级关停"判断)。""" record = self._data.get(str(ad_id), {}) adjustments = record.get("adjustments", []) bid_downs = [a for a in adjustments if a.get("action") == "bid_down"] if bid_downs: last = max(bid_downs, key=lambda a: a.get("ts", "")) try: return datetime.fromisoformat(last["ts"]) except ValueError: return None return None # ═══════════════════════════════════════════ # 护栏检查结果 # ═══════════════════════════════════════════ @dataclass class GuardrailResult: """单个护栏的检查结果。""" status: str # "approved" / "blocked" / "modified" reason: str modified_action: Optional[str] = None modified_bid: Optional[float] = None modified_change_pct: Optional[float] = None # ═══════════════════════════════════════════ # 护栏基类 # ═══════════════════════════════════════════ class Guardrail(ABC): """护栏基类。""" @property @abstractmethod def name(self) -> str: pass @abstractmethod def check(self, row: pd.Series, history: AdjustmentHistory) -> GuardrailResult: """ 检查单个决策是否通过护栏。 Args: row: 决策行(包含 action, ad_id, recommended_change_pct 等) history: 调整历史记录 Returns: GuardrailResult """ pass # ═══════════════════════════════════════════ # 护栏 0: 决策一致性自检(action ↔ pct ↔ ROI 方向对齐) # ═══════════════════════════════════════════ class ActionConsistencyGuardrail(Guardrail): """决策一致性护栏:拦住 LLM 的自相矛盾输出。 分两个子系列: A. 方向一致性(action ↔ pct 符号) A1: action=bid_down 必须 pct < 0(否则改 hold) A2: action=bid_up 必须 pct > 0 A3: action=hold/observe 必须 pct = 0 B. ROI 水位一致性(action ↔ 广告 ROI vs 渠道P50) B1: bid_down 且 dynamic_roi_7d >= 渠道P50 × 1.05 → 改 hold(Top 1 实测事故) B2: bid_up 且 dynamic_roi_7d < 渠道P50 × 0.75 → 改 hold(低效广告不能加码) B3: scale_up 且 dynamic_roi_7d < 渠道P50 × 0.90 → 改 observe(扩量先证明效率) 依赖: - row["动态ROI_7日均值"]:广告自身 ROI(由 ad_decision 合并到 decisions CSV) - row["_channel_roi_p50"]:渠道P50(由 _run_guardrails 从 metrics 算后注入) 任一值缺失时 B 系列自动跳过(降级为纯 A 系列),保证旧数据仍可通过。 """ @property def name(self) -> str: return "决策一致性" @staticmethod def _to_float(val) -> Optional[float]: """安全转 float:None/NaN/空串/非数字 → None。""" if val is None or val == "": return None try: f = float(val) if pd.isna(f): return None return f except (ValueError, TypeError): return None def check(self, row: pd.Series, history: AdjustmentHistory) -> GuardrailResult: action = str(row.get("action", "hold")).strip() # pct 标准化(None/NaN/非数字 → 0) pct = self._to_float(row.get("recommended_change_pct")) or 0.0 # ==================== A 系列:方向一致性 ==================== if action == "bid_down" and pct >= 0: return GuardrailResult( status="modified", reason=f"方向冲突:action=bid_down 但 pct={pct:+.2%}(应为负)→ 改 hold", modified_action="hold", modified_change_pct=0.0, modified_bid=None, ) if action == "bid_up" and pct <= 0: return GuardrailResult( status="modified", reason=f"方向冲突:action=bid_up 但 pct={pct:+.2%}(应为正)→ 改 hold", modified_action="hold", modified_change_pct=0.0, modified_bid=None, ) if action in ("hold", "observe") and abs(pct) > 1e-6: return GuardrailResult( status="modified", reason=f"维持类动作不应改出价:action={action} 但 pct={pct:+.2%} → pct 归零", modified_action=action, modified_change_pct=0.0, modified_bid=None, ) # ==================== B4: 裂变-关停冲突 ==================== # pause 但裂变率显著优于同类(>1.10×)→ 改 observe(裂变好说明广告质量有潜力) if action == "pause": ad_fission = self._to_float(row.get("_ad_fission")) tier_fission_mean = self._to_float(row.get("_tier_fission_mean")) if ( ad_fission is not None and tier_fission_mean is not None and tier_fission_mean > 0 and ad_fission > tier_fission_mean * 1.10 ): return GuardrailResult( status="modified", reason=( f"裂变-关停冲突:裂变率={ad_fission:.2f} > 同类均值{tier_fission_mean:.2f}×1.10" f"={tier_fission_mean * 1.10:.2f}(裂变优秀,广告质量有潜力)," f"不应关停 → 改 observe" ), modified_action="observe", modified_change_pct=0.0, modified_bid=None, ) return GuardrailResult(status="approved", reason="") # ═══════════════════════════════════════════ # 护栏 1: 冷启动保护 # ═══════════════════════════════════════════ class ColdStartGuardrail(Guardrail): """冷启动保护:0-4天不做负向操作,4-7天仅允许小幅降价。""" @property def name(self) -> str: return "冷启动保护" def check(self, row: pd.Series, history: AdjustmentHistory) -> GuardrailResult: action = row.get("action", "hold") ad_age = row.get("ad_age_days") if ad_age is None or action == "hold": return GuardrailResult(status="approved", reason="") # 冷启动期(≤3天):极度保护,禁止所有操作 if ad_age <= COLD_START_DAYS: if action in ("pause", "bid_down", "bid_up"): return GuardrailResult( status="blocked", reason=f"冷启动期({ad_age}天 ≤ {COLD_START_DAYS}天),极度保护,禁止{action}", modified_action="hold", ) # 早期成长期(4-7天):仅允许提价 elif ad_age <= CAUTIOUS_DAYS: # 早期成长期(4-7天):仅允许提价 if action in ("pause", "bid_down"): return GuardrailResult( status="blocked", reason=f"早期成长期({ad_age}天,4-{CAUTIOUS_DAYS}天),仅允许提价,禁止{action}", modified_action="hold", ) return GuardrailResult(status="approved", reason="") # ═══════════════════════════════════════════ # 护栏 2: 数据新鲜度 # ═══════════════════════════════════════════ class DataFreshnessGuardrail(Guardrail): """数据新鲜度校验:数据超过 26 小时视为过期。""" @property def name(self) -> str: return "数据新鲜度" def check(self, row: pd.Series, history: AdjustmentHistory) -> GuardrailResult: action = row.get("action", "hold") if action == "hold": return GuardrailResult(status="approved", reason="") data_date = row.get("_data_date") # 由工具注入 if data_date: try: data_dt = datetime.strptime(str(data_date), "%Y%m%d") # 使用时区感知的 datetime(支持海外部署) now = datetime.now(ZoneInfo(TIMEZONE)) # data_dt 需要本地化到相同时区 data_dt_aware = data_dt.replace(tzinfo=ZoneInfo(TIMEZONE)) hours_old = (now - data_dt_aware).total_seconds() / 3600 if hours_old > DATA_FRESHNESS_MAX_HOURS: return GuardrailResult( status="blocked", reason=f"数据已过期({hours_old:.0f}小时前,上限{DATA_FRESHNESS_MAX_HOURS}小时),阻止所有操作", modified_action="hold", ) except ValueError: pass return GuardrailResult(status="approved", reason="") # ═══════════════════════════════════════════ # 护栏 3: 出价边界 # ═══════════════════════════════════════════ class BidBoundaryGuardrail(Guardrail): """出价边界检查:钳位到 [BID_FLOOR, BID_CEILING]。""" @property def name(self) -> str: return "出价边界" def check(self, row: pd.Series, history: AdjustmentHistory) -> GuardrailResult: action = row.get("action", "hold") if action not in ("bid_up", "bid_down"): return GuardrailResult(status="approved", reason="") recommended_bid = row.get("recommended_bid") if recommended_bid is None or recommended_bid == "": return GuardrailResult(status="approved", reason="") recommended_bid = float(recommended_bid) current_bid = float(row.get("current_bid", 0) or 0) if recommended_bid < BID_FLOOR_YUAN: new_bid = BID_FLOOR_YUAN new_pct = (new_bid - current_bid) / current_bid if current_bid > 0 else 0 return GuardrailResult( status="modified", reason=f"出价{recommended_bid:.2f}元低于下限{BID_FLOOR_YUAN}元,钳位至{new_bid:.2f}元", modified_bid=new_bid, modified_change_pct=round(new_pct, 4), ) elif recommended_bid > BID_CEILING_YUAN: new_bid = BID_CEILING_YUAN new_pct = (new_bid - current_bid) / current_bid if current_bid > 0 else 0 return GuardrailResult( status="modified", reason=f"出价{recommended_bid:.2f}元超过上限{BID_CEILING_YUAN}元,钳位至{new_bid:.2f}元", modified_bid=new_bid, modified_change_pct=round(new_pct, 4), ) return GuardrailResult(status="approved", reason="") # ═══════════════════════════════════════════ # 护栏 3.5: 调幅范围钳位 # ═══════════════════════════════════════════ class BidRangeGuardrail(Guardrail): """调幅范围钳位:bid_up 钳位到 [5%, 10%],bid_down 钳位到 [-5%, -3%]。""" @property def name(self) -> str: return "调幅范围" def check(self, row: pd.Series, history: AdjustmentHistory) -> GuardrailResult: action = row.get("action", "hold") if action not in ("bid_up", "bid_down"): return GuardrailResult(status="approved", reason="") change_pct = row.get("recommended_change_pct") if change_pct is None or change_pct == "": return GuardrailResult(status="approved", reason="") change_pct = float(change_pct) current_bid = float(row.get("current_bid", 0) or 0) if action == "bid_up": # 提价:钳位到 [BID_UP_MIN_PCT, BID_UP_MAX_PCT] clamped = max(BID_UP_MIN_PCT, min(BID_UP_MAX_PCT, change_pct)) if abs(clamped - change_pct) > 0.001: new_bid = round(current_bid * (1 + clamped), 2) if current_bid > 0 else None return GuardrailResult( status="modified", reason=f"提价调幅从{change_pct*100:.1f}%钳位至{clamped*100:.1f}%(范围{BID_UP_MIN_PCT*100:.0f}%-{BID_UP_MAX_PCT*100:.0f}%)", modified_change_pct=round(clamped, 4), modified_bid=new_bid, ) elif action == "bid_down": # 降价:change_pct 应为负数,钳位到 [-BID_DOWN_MAX_PCT, -BID_DOWN_MIN_PCT] clamped = min(-BID_DOWN_MIN_PCT, max(-BID_DOWN_MAX_PCT, change_pct)) if abs(clamped - change_pct) > 0.001: new_bid = round(current_bid * (1 + clamped), 2) if current_bid > 0 else None return GuardrailResult( status="modified", reason=f"降价调幅从{change_pct*100:.1f}%钳位至{clamped*100:.1f}%(范围-{BID_DOWN_MAX_PCT*100:.0f}%~-{BID_DOWN_MIN_PCT*100:.0f}%)", modified_change_pct=round(clamped, 4), modified_bid=new_bid, ) return GuardrailResult(status="approved", reason="") # ═══════════════════════════════════════════ # 护栏 4: 频率限制 # ═══════════════════════════════════════════ class RateLimitGuardrail(Guardrail): """频率限制:每日次数/间隔/累计调幅。""" @property def name(self) -> str: return "频率限制" def check(self, row: pd.Series, history: AdjustmentHistory) -> GuardrailResult: action = row.get("action", "hold") if action not in ("bid_up", "bid_down", "pause"): return GuardrailResult(status="approved", reason="") ad_id = str(row.get("ad_id", "")) # 今日已调整次数 today_adj = history.get_today_adjustments(ad_id) if len(today_adj) >= MAX_ADJUSTMENTS_PER_AD_PER_DAY: return GuardrailResult( status="blocked", reason=f"今日已调整{len(today_adj)}次(上限{MAX_ADJUSTMENTS_PER_AD_PER_DAY}次)", modified_action="hold", ) # 距上次调整间隔 last_ts = history.get_last_adjustment_ts(ad_id) if last_ts: hours_since = (datetime.now() - last_ts).total_seconds() / 3600 if hours_since < MIN_ADJUSTMENT_INTERVAL_HOURS: return GuardrailResult( status="blocked", reason=f"距上次调整仅{hours_since:.1f}小时(最小间隔{MIN_ADJUSTMENT_INTERVAL_HOURS}小时)", modified_action="hold", ) # 日累计调幅 if action in ("bid_up", "bid_down"): change_pct = row.get("recommended_change_pct", 0) if isinstance(change_pct, str): try: change_pct = float(change_pct) except ValueError: change_pct = 0 cumulative = history.get_cumulative_pct_today(ad_id) if cumulative + abs(change_pct) > MAX_DAILY_CUMULATIVE_CHANGE_PCT: remaining = MAX_DAILY_CUMULATIVE_CHANGE_PCT - cumulative if remaining <= 0: return GuardrailResult( status="blocked", reason=f"日累计调幅已达{cumulative*100:.1f}%(上限{MAX_DAILY_CUMULATIVE_CHANGE_PCT*100:.0f}%)", modified_action="hold", ) else: # 修正调幅 direction = 1 if change_pct > 0 else -1 new_pct = direction * remaining current_bid = float(row.get("current_bid", 0) or 0) new_bid = round(current_bid * (1 + new_pct), 2) if current_bid > 0 else None return GuardrailResult( status="modified", reason=f"调幅从{abs(change_pct)*100:.1f}%缩减至{abs(new_pct)*100:.1f}%(日累计限制)", modified_change_pct=round(new_pct, 4), modified_bid=new_bid, ) return GuardrailResult(status="approved", reason="") # ═══════════════════════════════════════════ # 护栏 5: 每日操作总量上限 # ═══════════════════════════════════════════ class DailyOpsCapGuardrail(Guardrail): """每日操作总量上限:单日最多操作 N 个广告。""" @property def name(self) -> str: return "每日操作上限" def __init__(self): self._approved_count = 0 def check(self, row: pd.Series, history: AdjustmentHistory) -> GuardrailResult: action = row.get("action", "hold") if action == "hold": return GuardrailResult(status="approved", reason="") total_ops = history.get_today_total_ops() + self._approved_count if total_ops >= MAX_DAILY_OPS: return GuardrailResult( status="blocked", reason=f"今日已操作{total_ops}个广告(上限{MAX_DAILY_OPS}个),请按ROI严重度排优先级", modified_action="hold", ) self._approved_count += 1 return GuardrailResult(status="approved", reason="") # ═══════════════════════════════════════════ # 护栏 6: 干运行模式 # ═══════════════════════════════════════════ class DryRunGuardrail(Guardrail): """干运行模式:全部标记为 dry_run。""" @property def name(self) -> str: return "干运行模式" def check(self, row: pd.Series, history: AdjustmentHistory) -> GuardrailResult: action = row.get("action", "hold") if action == "hold": return GuardrailResult(status="approved", reason="") if DRY_RUN_MODE: return GuardrailResult( status="modified", reason="干运行模式:操作不会实际执行", ) return GuardrailResult(status="approved", reason="") # ═══════════════════════════════════════════ # 护栏引擎 # ═══════════════════════════════════════════ def _run_guardrails( df: pd.DataFrame, data_date: str, dry_run: bool = False, channel_roi_p50: Optional[float] = None, ) -> pd.DataFrame: """ 对决策 DataFrame 执行 2 道护栏检查。 新增列: - guardrail_status: approved / blocked / modified - guardrail_reason: 护栏说明 - final_action: 护栏修正后的最终动作 - final_bid: 护栏修正后的最终出价 Args: df: 决策 DataFrame data_date: 数据日期(YYYYMMDD) dry_run: 是否强制干运行 channel_roi_p50: 渠道P50(全体广告动态ROI_7日均值的中位数), 用于 ActionConsistencyGuardrail B 系列规则。缺省时 B 系列跳过。 """ history = AdjustmentHistory() guardrails = [ # 暂时全部关闭,测试 LLM 裸输出 + 字段精简的效果 ] # 注入数据日期 + 渠道P50(给 ActionConsistencyGuardrail B 系列用) df["_data_date"] = data_date if channel_roi_p50 is not None and channel_roi_p50 > 0: df["_channel_roi_p50"] = channel_roi_p50 statuses = [] reasons = [] final_actions = [] final_bids = [] for _, row in df.iterrows(): action = row.get("action", "hold") current_status = "approved" current_reasons = [] current_action = action current_bid = row.get("recommended_bid") current_change_pct = row.get("recommended_change_pct") if action == "hold": statuses.append("approved") reasons.append("") final_actions.append("hold") final_bids.append(None) continue for guardrail in guardrails: # 构建可变行用于护栏检查 check_row = row.copy() if current_bid is not None: check_row["recommended_bid"] = current_bid if current_change_pct is not None: check_row["recommended_change_pct"] = current_change_pct check_row["action"] = current_action result = guardrail.check(check_row, history) if result.status == "blocked": current_status = "blocked" current_reasons.append(f"[{guardrail.name}] {result.reason}") current_action = result.modified_action or "hold" break elif result.status == "modified": current_status = "modified" current_reasons.append(f"[{guardrail.name}] {result.reason}") if result.modified_action: current_action = result.modified_action if result.modified_bid is not None: current_bid = result.modified_bid if result.modified_change_pct is not None: current_change_pct = result.modified_change_pct statuses.append(current_status) reasons.append("; ".join(current_reasons)) final_actions.append(current_action) final_bids.append(current_bid if current_action in ("bid_up", "bid_down") else None) df["guardrail_status"] = statuses df["guardrail_reason"] = reasons df["final_action"] = final_actions df["final_bid"] = final_bids # 清理临时列 df.drop(columns=["_data_date", "_channel_roi_p50"], inplace=True, errors="ignore") return df # ═══════════════════════════════════════════ # 工具:验证决策安全性 # ═══════════════════════════════════════════ @tool(description="验证决策安全性:冷启动保护、出价边界、频率限制、数据新鲜度") async def validate_decisions( ctx: ToolContext = None, decisions_csv: str = "", end_date: str = "yesterday", dry_run: bool = False, ) -> ToolResult: """ 对每个决策执行 6 道护栏检查。 输入:apply_decisions 输出的 llm_decisions CSV 输出:validated_decisions_{date}.csv,新增列 guardrail_status / guardrail_reason / final_action / final_bid Args: decisions_csv: 决策 CSV 路径(默认最新的 llm_decisions) end_date: 结束日期 dry_run: 是否强制干运行模式 """ try: if not GUARDRAILS_ENABLED: return ToolResult( title="护栏已禁用", output="GUARDRAILS_ENABLED=False,跳过护栏验证", ) if end_date == "yesterday": end_date = (datetime.now() - timedelta(days=1)).strftime("%Y%m%d") # 自动查找最新决策 CSV(按修改时间排序,而非文件名) if not decisions_csv: reports_dir = _MINI_DIR / "outputs" / "reports" candidates = sorted( reports_dir.glob("llm_decisions_*.csv"), key=lambda p: p.stat().st_mtime, # 按修改时间排序 reverse=True ) if not candidates: return ToolResult(title="validate_decisions", output="未找到决策 CSV") decisions_csv = str(candidates[0]) df = pd.read_csv(decisions_csv) if df.empty: return ToolResult(title="validate_decisions", output="决策数据为空") # 读取一次 metrics 做补字段和渠道P50(供 ActionConsistencyGuardrail B 系列用) metrics_csv = _MINI_DIR / "outputs" / "metrics_temp.csv" channel_roi_p50: Optional[float] = None if metrics_csv.exists(): try: df_metrics = pd.read_csv(metrics_csv) # 补充广告年龄(如果缺失) if "ad_age_days" not in df.columns and "create_time" in df_metrics.columns: from ad_decision import _calculate_ad_age_days df_metrics["ad_age_days"] = df_metrics["create_time"].apply(_calculate_ad_age_days) age_map = df_metrics.set_index("ad_id")["ad_age_days"].to_dict() df["ad_age_days"] = df["ad_id"].map(age_map) # 补充当前出价(如果缺失) if ("current_bid" not in df.columns or df["current_bid"].isna().all()) \ and "bid_amount" in df_metrics.columns: bid_map = df_metrics.set_index("ad_id")["bid_amount"].to_dict() df["current_bid"] = df["ad_id"].map(bid_map) # 计算渠道P50:全体广告"动态ROI_7日均值"的中位数 if "动态ROI_7日均值" in df_metrics.columns: roi_series = df_metrics["动态ROI_7日均值"].dropna() if len(roi_series) > 0: channel_roi_p50 = float(roi_series.median()) logger.info(f"护栏 B 系列启用:渠道P50={channel_roi_p50:.4f}") # 注入裂变字段(供 B4 护栏使用) if "T0裂变系数_7日均值" in df_metrics.columns and "audience_tier" in df_metrics.columns: # ad_fission: 每条广告自身的裂变率 fission_map = df_metrics.set_index("ad_id")["T0裂变系数_7日均值"].to_dict() df["_ad_fission"] = df["ad_id"].map(fission_map) # tier_fission_mean: 按人群包分组的同类均值 tier_fission = df_metrics.groupby("audience_tier")["T0裂变系数_7日均值"].mean().to_dict() tier_map = df_metrics.set_index("ad_id")["audience_tier"].to_dict() df["_tier_fission_mean"] = df["ad_id"].map( lambda aid: tier_fission.get(tier_map.get(aid)) ) b4_ready = df["_ad_fission"].notna().sum() logger.info(f"护栏 B4 裂变字段已注入:{b4_ready} 条广告有裂变数据") except Exception as e: logger.warning(f"读取 metrics 失败(护栏 B 系列将跳过): {e}") if channel_roi_p50 is None: logger.warning("未能计算渠道P50,ActionConsistencyGuardrail B 系列规则跳过") # 运行护栏链 df = _run_guardrails(df, data_date=end_date, dry_run=dry_run, channel_roi_p50=channel_roi_p50) # 保存验证结果 reports_dir = _MINI_DIR / "outputs" / "reports" reports_dir.mkdir(parents=True, exist_ok=True) out_path = reports_dir / f"validated_decisions_{end_date}.csv" df.to_csv(out_path, index=False, encoding="utf-8-sig") # 统计 total = len(df) approved = (df["guardrail_status"] == "approved").sum() blocked = (df["guardrail_status"] == "blocked").sum() modified = (df["guardrail_status"] == "modified").sum() # 最终动作统计 final_pause = (df["final_action"] == "pause").sum() final_hold = (df["final_action"] == "hold").sum() final_bid_up = (df["final_action"] == "bid_up").sum() final_bid_down = (df["final_action"] == "bid_down").sum() output_lines = [ f"护栏验证完成: {out_path}", "", f"护栏结果:", f" approved: {approved} 个(直接通过)", f" modified: {modified} 个(参数修正后通过)", f" blocked: {blocked} 个(被拦截→hold)", "", f"最终动作分布:", f" pause: {final_pause} 个", f" bid_down: {final_bid_down} 个", f" bid_up: {final_bid_up} 个", f" hold: {final_hold} 个", ] if DRY_RUN_MODE or dry_run: output_lines.append("") output_lines.append("⚠️ 当前为干运行模式(DRY_RUN),操作不会实际执行") return ToolResult( title=f"护栏验证({total}条,拦截{blocked})", output="\n".join(output_lines), metadata={ "csv_path": str(out_path), "total": total, "approved": int(approved), "blocked": int(blocked), "modified": int(modified), "final_pause": int(final_pause), "final_hold": int(final_hold), "final_bid_up": int(final_bid_up), "final_bid_down": int(final_bid_down), "dry_run": DRY_RUN_MODE or dry_run, }, ) except Exception as e: logger.error("validate_decisions 失败: %s", e, exc_info=True) return ToolResult(title="validate_decisions 失败", output=str(e))