| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660 |
- """
- 安全护栏引擎 — auto_put_ad_mini
- 6 道护栏按顺序执行:
- 1. ColdStartGuardrail — 冷启动保护
- 2. DataFreshnessGuardrail — 数据新鲜度校验
- 3. BidBoundaryGuardrail — 出价边界钳位
- 4. RateLimitGuardrail — 频率限制(每日次数/间隔/累计调幅)
- 5. DailyOpsCapGuardrail — 每日操作总量上限
- 6. DryRunGuardrail — 干运行模式
- 每道护栏输出:approved / blocked / modified
- blocked = 阻止操作,modified = 自动修正参数后放行
- """
- import json
- import logging
- import sys
- from abc import ABC, abstractmethod
- from dataclasses import dataclass, field
- from datetime import datetime, timedelta
- from pathlib import Path
- from typing import Dict, List, Optional
- import pandas as pd
- from agent.tools import tool
- from agent.tools.models import ToolContext, ToolResult
- _MINI_DIR = Path(__file__).resolve().parent.parent
- _TOOLS_DIR = Path(__file__).resolve().parent
- if str(_MINI_DIR) not in sys.path:
- sys.path.insert(0, str(_MINI_DIR))
- if str(_TOOLS_DIR) not in sys.path:
- sys.path.insert(0, str(_TOOLS_DIR))
- from config import (
- COLD_START_DAYS,
- CAUTIOUS_DAYS,
- BID_FLOOR_YUAN,
- BID_CEILING_YUAN,
- MAX_ADJUSTMENTS_PER_AD_PER_DAY,
- MIN_ADJUSTMENT_INTERVAL_HOURS,
- MAX_DAILY_CUMULATIVE_CHANGE_PCT,
- MAX_DAILY_OPS,
- DATA_FRESHNESS_MAX_HOURS,
- ADJUSTMENT_HISTORY_PATH,
- DRY_RUN_MODE,
- GUARDRAILS_ENABLED,
- DATA_DIR,
- )
- logger = logging.getLogger(__name__)
- # ═══════════════════════════════════════════
- # 调整历史持久化
- # ═══════════════════════════════════════════
- class AdjustmentHistory:
- """广告调整历史记录(JSON 文件持久化)。"""
- def __init__(self, path: Path = ADJUSTMENT_HISTORY_PATH):
- self._path = path
- self._data: Dict[str, Dict] = {}
- self._load()
- def _load(self):
- if self._path.exists():
- try:
- self._data = json.loads(self._path.read_text(encoding="utf-8"))
- except Exception as e:
- logger.warning("加载调整历史失败,使用空记录: %s", e)
- self._data = {}
- def _save(self):
- self._path.parent.mkdir(parents=True, exist_ok=True)
- self._path.write_text(
- json.dumps(self._data, ensure_ascii=False, indent=2),
- encoding="utf-8",
- )
- def get_today_adjustments(self, ad_id: str) -> List[Dict]:
- """获取某广告今天的调整记录。"""
- today = datetime.now().strftime("%Y-%m-%d")
- record = self._data.get(str(ad_id), {})
- adjustments = record.get("adjustments", [])
- return [a for a in adjustments if a.get("ts", "").startswith(today)]
- def get_last_adjustment_ts(self, ad_id: str) -> Optional[datetime]:
- """获取某广告最后一次调整的时间。"""
- record = self._data.get(str(ad_id), {})
- last_ts = record.get("last_ts")
- if last_ts:
- try:
- return datetime.fromisoformat(last_ts)
- except ValueError:
- return None
- return None
- def get_cumulative_pct_today(self, ad_id: str) -> float:
- """获取某广告今天的累计调幅绝对值。"""
- today_adj = self.get_today_adjustments(str(ad_id))
- return sum(abs(a.get("pct", 0)) for a in today_adj)
- def record_adjustment(self, ad_id: str, action: str, pct: float):
- """记录一次调整。"""
- ad_key = str(ad_id)
- now = datetime.now().isoformat()
- if ad_key not in self._data:
- self._data[ad_key] = {"adjustments": [], "last_ts": None}
- self._data[ad_key]["adjustments"].append({
- "ts": now,
- "action": action,
- "pct": pct,
- })
- self._data[ad_key]["last_ts"] = now
- # 只保留最近 7 天的记录
- cutoff = (datetime.now() - timedelta(days=7)).isoformat()
- self._data[ad_key]["adjustments"] = [
- a for a in self._data[ad_key]["adjustments"]
- if a.get("ts", "") >= cutoff
- ]
- self._save()
- def get_today_total_ops(self) -> int:
- """获取今天已操作的广告总数。"""
- today = datetime.now().strftime("%Y-%m-%d")
- count = 0
- for ad_key, record in self._data.items():
- adjustments = record.get("adjustments", [])
- if any(a.get("ts", "").startswith(today) for a in adjustments):
- count += 1
- return count
- # ═══════════════════════════════════════════
- # 护栏检查结果
- # ═══════════════════════════════════════════
- @dataclass
- class GuardrailResult:
- """单个护栏的检查结果。"""
- status: str # "approved" / "blocked" / "modified"
- reason: str
- modified_action: Optional[str] = None
- modified_bid: Optional[float] = None
- modified_change_pct: Optional[float] = None
- # ═══════════════════════════════════════════
- # 护栏基类
- # ═══════════════════════════════════════════
- class Guardrail(ABC):
- """护栏基类。"""
- @property
- @abstractmethod
- def name(self) -> str:
- pass
- @abstractmethod
- def check(self, row: pd.Series, history: AdjustmentHistory) -> GuardrailResult:
- """
- 检查单个决策是否通过护栏。
- Args:
- row: 决策行(包含 action, ad_id, recommended_change_pct 等)
- history: 调整历史记录
- Returns:
- GuardrailResult
- """
- pass
- # ═══════════════════════════════════════════
- # 护栏 1: 冷启动保护
- # ═══════════════════════════════════════════
- class ColdStartGuardrail(Guardrail):
- """冷启动保护:0-4天不做负向操作,4-7天仅允许小幅降价。"""
- @property
- def name(self) -> str:
- return "冷启动保护"
- def check(self, row: pd.Series, history: AdjustmentHistory) -> GuardrailResult:
- action = row.get("action", "hold")
- ad_age = row.get("ad_age_days")
- if ad_age is None or action == "hold":
- return GuardrailResult(status="approved", reason="")
- # 冷启动期(≤3天):极度保护,禁止所有操作
- if ad_age <= COLD_START_DAYS:
- if action in ("pause", "bid_down", "bid_up"):
- return GuardrailResult(
- status="blocked",
- reason=f"冷启动期({ad_age}天 ≤ {COLD_START_DAYS}天),极度保护,禁止{action}",
- modified_action="hold",
- )
- # 早期成长期(4-7天):仅允许提价
- elif ad_age <= CAUTIOUS_DAYS:
- # 早期成长期(4-7天):仅允许提价
- if action in ("pause", "bid_down"):
- return GuardrailResult(
- status="blocked",
- reason=f"早期成长期({ad_age}天,4-{CAUTIOUS_DAYS}天),仅允许提价,禁止{action}",
- modified_action="hold",
- )
- return GuardrailResult(status="approved", reason="")
- # ═══════════════════════════════════════════
- # 护栏 2: 数据新鲜度
- # ═══════════════════════════════════════════
- class DataFreshnessGuardrail(Guardrail):
- """数据新鲜度校验:数据超过 26 小时视为过期。"""
- @property
- def name(self) -> str:
- return "数据新鲜度"
- def check(self, row: pd.Series, history: AdjustmentHistory) -> GuardrailResult:
- action = row.get("action", "hold")
- if action == "hold":
- return GuardrailResult(status="approved", reason="")
- data_date = row.get("_data_date") # 由工具注入
- if data_date:
- try:
- data_dt = datetime.strptime(str(data_date), "%Y%m%d")
- hours_old = (datetime.now() - data_dt).total_seconds() / 3600
- if hours_old > DATA_FRESHNESS_MAX_HOURS:
- return GuardrailResult(
- status="blocked",
- reason=f"数据已过期({hours_old:.0f}小时前,上限{DATA_FRESHNESS_MAX_HOURS}小时),阻止所有操作",
- modified_action="hold",
- )
- except ValueError:
- pass
- return GuardrailResult(status="approved", reason="")
- # ═══════════════════════════════════════════
- # 护栏 3: 出价边界
- # ═══════════════════════════════════════════
- class BidBoundaryGuardrail(Guardrail):
- """出价边界检查:钳位到 [BID_FLOOR, BID_CEILING]。"""
- @property
- def name(self) -> str:
- return "出价边界"
- def check(self, row: pd.Series, history: AdjustmentHistory) -> GuardrailResult:
- action = row.get("action", "hold")
- if action not in ("bid_up", "bid_down"):
- return GuardrailResult(status="approved", reason="")
- recommended_bid = row.get("recommended_bid")
- if recommended_bid is None or recommended_bid == "":
- return GuardrailResult(status="approved", reason="")
- recommended_bid = float(recommended_bid)
- current_bid = float(row.get("current_bid", 0) or 0)
- if recommended_bid < BID_FLOOR_YUAN:
- new_bid = BID_FLOOR_YUAN
- new_pct = (new_bid - current_bid) / current_bid if current_bid > 0 else 0
- return GuardrailResult(
- status="modified",
- reason=f"出价{recommended_bid:.2f}元低于下限{BID_FLOOR_YUAN}元,钳位至{new_bid:.2f}元",
- modified_bid=new_bid,
- modified_change_pct=round(new_pct, 4),
- )
- elif recommended_bid > BID_CEILING_YUAN:
- new_bid = BID_CEILING_YUAN
- new_pct = (new_bid - current_bid) / current_bid if current_bid > 0 else 0
- return GuardrailResult(
- status="modified",
- reason=f"出价{recommended_bid:.2f}元超过上限{BID_CEILING_YUAN}元,钳位至{new_bid:.2f}元",
- modified_bid=new_bid,
- modified_change_pct=round(new_pct, 4),
- )
- return GuardrailResult(status="approved", reason="")
- # ═══════════════════════════════════════════
- # 护栏 4: 频率限制
- # ═══════════════════════════════════════════
- class RateLimitGuardrail(Guardrail):
- """频率限制:每日次数/间隔/累计调幅。"""
- @property
- def name(self) -> str:
- return "频率限制"
- def check(self, row: pd.Series, history: AdjustmentHistory) -> GuardrailResult:
- action = row.get("action", "hold")
- if action not in ("bid_up", "bid_down", "pause"):
- return GuardrailResult(status="approved", reason="")
- ad_id = str(row.get("ad_id", ""))
- # 今日已调整次数
- today_adj = history.get_today_adjustments(ad_id)
- if len(today_adj) >= MAX_ADJUSTMENTS_PER_AD_PER_DAY:
- return GuardrailResult(
- status="blocked",
- reason=f"今日已调整{len(today_adj)}次(上限{MAX_ADJUSTMENTS_PER_AD_PER_DAY}次)",
- modified_action="hold",
- )
- # 距上次调整间隔
- last_ts = history.get_last_adjustment_ts(ad_id)
- if last_ts:
- hours_since = (datetime.now() - last_ts).total_seconds() / 3600
- if hours_since < MIN_ADJUSTMENT_INTERVAL_HOURS:
- return GuardrailResult(
- status="blocked",
- reason=f"距上次调整仅{hours_since:.1f}小时(最小间隔{MIN_ADJUSTMENT_INTERVAL_HOURS}小时)",
- modified_action="hold",
- )
- # 日累计调幅
- if action in ("bid_up", "bid_down"):
- change_pct = row.get("recommended_change_pct", 0)
- if isinstance(change_pct, str):
- try:
- change_pct = float(change_pct)
- except ValueError:
- change_pct = 0
- cumulative = history.get_cumulative_pct_today(ad_id)
- if cumulative + abs(change_pct) > MAX_DAILY_CUMULATIVE_CHANGE_PCT:
- remaining = MAX_DAILY_CUMULATIVE_CHANGE_PCT - cumulative
- if remaining <= 0:
- return GuardrailResult(
- status="blocked",
- reason=f"日累计调幅已达{cumulative*100:.1f}%(上限{MAX_DAILY_CUMULATIVE_CHANGE_PCT*100:.0f}%)",
- modified_action="hold",
- )
- else:
- # 修正调幅
- direction = 1 if change_pct > 0 else -1
- new_pct = direction * remaining
- current_bid = float(row.get("current_bid", 0) or 0)
- new_bid = round(current_bid * (1 + new_pct), 2) if current_bid > 0 else None
- return GuardrailResult(
- status="modified",
- reason=f"调幅从{abs(change_pct)*100:.1f}%缩减至{abs(new_pct)*100:.1f}%(日累计限制)",
- modified_change_pct=round(new_pct, 4),
- modified_bid=new_bid,
- )
- return GuardrailResult(status="approved", reason="")
- # ═══════════════════════════════════════════
- # 护栏 5: 每日操作总量上限
- # ═══════════════════════════════════════════
- class DailyOpsCapGuardrail(Guardrail):
- """每日操作总量上限:单日最多操作 N 个广告。"""
- @property
- def name(self) -> str:
- return "每日操作上限"
- def __init__(self):
- self._approved_count = 0
- def check(self, row: pd.Series, history: AdjustmentHistory) -> GuardrailResult:
- action = row.get("action", "hold")
- if action == "hold":
- return GuardrailResult(status="approved", reason="")
- total_ops = history.get_today_total_ops() + self._approved_count
- if total_ops >= MAX_DAILY_OPS:
- return GuardrailResult(
- status="blocked",
- reason=f"今日已操作{total_ops}个广告(上限{MAX_DAILY_OPS}个),请按ROI严重度排优先级",
- modified_action="hold",
- )
- self._approved_count += 1
- return GuardrailResult(status="approved", reason="")
- # ═══════════════════════════════════════════
- # 护栏 6: 干运行模式
- # ═══════════════════════════════════════════
- class DryRunGuardrail(Guardrail):
- """干运行模式:全部标记为 dry_run。"""
- @property
- def name(self) -> str:
- return "干运行模式"
- def check(self, row: pd.Series, history: AdjustmentHistory) -> GuardrailResult:
- action = row.get("action", "hold")
- if action == "hold":
- return GuardrailResult(status="approved", reason="")
- if DRY_RUN_MODE:
- return GuardrailResult(
- status="modified",
- reason="干运行模式:操作不会实际执行",
- )
- return GuardrailResult(status="approved", reason="")
- # ═══════════════════════════════════════════
- # 护栏引擎
- # ═══════════════════════════════════════════
- def _run_guardrails(
- df: pd.DataFrame,
- data_date: str,
- dry_run: bool = False,
- ) -> pd.DataFrame:
- """
- 对决策 DataFrame 执行 6 道护栏检查。
- 新增列:
- - guardrail_status: approved / blocked / modified
- - guardrail_reason: 护栏说明
- - final_action: 护栏修正后的最终动作
- - final_bid: 护栏修正后的最终出价
- """
- history = AdjustmentHistory()
- guardrails = [
- ColdStartGuardrail(),
- DataFreshnessGuardrail(),
- BidBoundaryGuardrail(),
- RateLimitGuardrail(),
- DailyOpsCapGuardrail(),
- DryRunGuardrail() if dry_run or DRY_RUN_MODE else None,
- ]
- guardrails = [g for g in guardrails if g is not None]
- # 注入数据日期
- df["_data_date"] = data_date
- statuses = []
- reasons = []
- final_actions = []
- final_bids = []
- for _, row in df.iterrows():
- action = row.get("action", "hold")
- current_status = "approved"
- current_reasons = []
- current_action = action
- current_bid = row.get("recommended_bid")
- current_change_pct = row.get("recommended_change_pct")
- if action == "hold":
- statuses.append("approved")
- reasons.append("")
- final_actions.append("hold")
- final_bids.append(None)
- continue
- for guardrail in guardrails:
- # 构建可变行用于护栏检查
- check_row = row.copy()
- if current_bid is not None:
- check_row["recommended_bid"] = current_bid
- if current_change_pct is not None:
- check_row["recommended_change_pct"] = current_change_pct
- check_row["action"] = current_action
- result = guardrail.check(check_row, history)
- if result.status == "blocked":
- current_status = "blocked"
- current_reasons.append(f"[{guardrail.name}] {result.reason}")
- current_action = result.modified_action or "hold"
- break
- elif result.status == "modified":
- current_status = "modified"
- current_reasons.append(f"[{guardrail.name}] {result.reason}")
- if result.modified_action:
- current_action = result.modified_action
- if result.modified_bid is not None:
- current_bid = result.modified_bid
- if result.modified_change_pct is not None:
- current_change_pct = result.modified_change_pct
- statuses.append(current_status)
- reasons.append("; ".join(current_reasons))
- final_actions.append(current_action)
- final_bids.append(current_bid if current_action in ("bid_up", "bid_down") else None)
- df["guardrail_status"] = statuses
- df["guardrail_reason"] = reasons
- df["final_action"] = final_actions
- df["final_bid"] = final_bids
- # 清理临时列
- df.drop(columns=["_data_date"], inplace=True, errors="ignore")
- return df
- # ═══════════════════════════════════════════
- # 工具:验证决策安全性
- # ═══════════════════════════════════════════
- @tool(description="验证决策安全性:冷启动保护、出价边界、频率限制、数据新鲜度")
- async def validate_decisions(
- ctx: ToolContext,
- decisions_csv: str = "",
- end_date: str = "yesterday",
- dry_run: bool = False,
- ) -> ToolResult:
- """
- 对每个决策执行 6 道护栏检查。
- 输入:apply_decisions 输出的 llm_decisions CSV
- 输出:validated_decisions_{date}.csv,新增列 guardrail_status / guardrail_reason / final_action / final_bid
- Args:
- decisions_csv: 决策 CSV 路径(默认最新的 llm_decisions)
- end_date: 结束日期
- dry_run: 是否强制干运行模式
- """
- try:
- if not GUARDRAILS_ENABLED:
- return ToolResult(
- title="护栏已禁用",
- output="GUARDRAILS_ENABLED=False,跳过护栏验证",
- )
- if end_date == "yesterday":
- end_date = (datetime.now() - timedelta(days=1)).strftime("%Y%m%d")
- # 自动查找最新决策 CSV(按修改时间排序,而非文件名)
- if not decisions_csv:
- reports_dir = _MINI_DIR / "outputs" / "reports"
- candidates = sorted(
- reports_dir.glob("llm_decisions_*.csv"),
- key=lambda p: p.stat().st_mtime, # 按修改时间排序
- reverse=True
- )
- if not candidates:
- return ToolResult(title="validate_decisions", output="未找到决策 CSV")
- decisions_csv = str(candidates[0])
- df = pd.read_csv(decisions_csv)
- if df.empty:
- return ToolResult(title="validate_decisions", output="决策数据为空")
- # 补充广告年龄(如果缺失)
- if "ad_age_days" not in df.columns:
- metrics_csv = _MINI_DIR / "outputs" / "metrics_temp.csv"
- if metrics_csv.exists():
- df_metrics = pd.read_csv(metrics_csv)
- if "create_time" in df_metrics.columns:
- from ad_decision import _calculate_ad_age_days
- df_metrics["ad_age_days"] = df_metrics["create_time"].apply(_calculate_ad_age_days)
- age_map = df_metrics.set_index("ad_id")["ad_age_days"].to_dict()
- df["ad_age_days"] = df["ad_id"].map(age_map)
- # 补充当前出价(如果缺失)
- if "current_bid" not in df.columns or df["current_bid"].isna().all():
- metrics_csv = _MINI_DIR / "outputs" / "metrics_temp.csv"
- if metrics_csv.exists():
- df_metrics = pd.read_csv(metrics_csv)
- if "bid_amount" in df_metrics.columns:
- bid_map = df_metrics.set_index("ad_id")["bid_amount"].to_dict()
- df["current_bid"] = df["ad_id"].map(bid_map)
- # 运行护栏链
- df = _run_guardrails(df, data_date=end_date, dry_run=dry_run)
- # 保存验证结果
- reports_dir = _MINI_DIR / "outputs" / "reports"
- reports_dir.mkdir(parents=True, exist_ok=True)
- out_path = reports_dir / f"validated_decisions_{end_date}.csv"
- df.to_csv(out_path, index=False, encoding="utf-8-sig")
- # 统计
- total = len(df)
- approved = (df["guardrail_status"] == "approved").sum()
- blocked = (df["guardrail_status"] == "blocked").sum()
- modified = (df["guardrail_status"] == "modified").sum()
- # 最终动作统计
- final_pause = (df["final_action"] == "pause").sum()
- final_hold = (df["final_action"] == "hold").sum()
- final_bid_up = (df["final_action"] == "bid_up").sum()
- final_bid_down = (df["final_action"] == "bid_down").sum()
- output_lines = [
- f"护栏验证完成: {out_path}",
- "",
- f"护栏结果:",
- f" approved: {approved} 个(直接通过)",
- f" modified: {modified} 个(参数修正后通过)",
- f" blocked: {blocked} 个(被拦截→hold)",
- "",
- f"最终动作分布:",
- f" pause: {final_pause} 个",
- f" bid_down: {final_bid_down} 个",
- f" bid_up: {final_bid_up} 个",
- f" hold: {final_hold} 个",
- ]
- if DRY_RUN_MODE or dry_run:
- output_lines.append("")
- output_lines.append("⚠️ 当前为干运行模式(DRY_RUN),操作不会实际执行")
- return ToolResult(
- title=f"护栏验证({total}条,拦截{blocked})",
- output="\n".join(output_lines),
- metadata={
- "csv_path": str(out_path),
- "total": total,
- "approved": int(approved),
- "blocked": int(blocked),
- "modified": int(modified),
- "final_pause": int(final_pause),
- "final_hold": int(final_hold),
- "final_bid_up": int(final_bid_up),
- "final_bid_down": int(final_bid_down),
- "dry_run": DRY_RUN_MODE or dry_run,
- },
- )
- except Exception as e:
- logger.error("validate_decisions 失败: %s", e, exc_info=True)
- return ToolResult(title="validate_decisions 失败", output=str(e))
|