guardrails.py 35 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873
  1. """
  2. 安全护栏引擎 — auto_put_ad_mini
  3. 7 道护栏按顺序执行:
  4. 1. ColdStartGuardrail — 冷启动保护
  5. 2. DataFreshnessGuardrail — 数据新鲜度校验
  6. 3. BidBoundaryGuardrail — 出价边界钳位(0.05~1.00元)
  7. 3.5 BidRangeGuardrail — 调幅范围钳位(提价5-10%, 降价3-5%)
  8. 4. RateLimitGuardrail — 频率限制(每日次数/间隔/累计调幅)
  9. 5. DailyOpsCapGuardrail — 每日操作总量上限
  10. 6. DryRunGuardrail — 干运行模式
  11. 每道护栏输出:approved / blocked / modified
  12. blocked = 阻止操作,modified = 自动修正参数后放行
  13. """
  14. import json
  15. import logging
  16. import sys
  17. from abc import ABC, abstractmethod
  18. from dataclasses import dataclass, field
  19. from datetime import datetime, timedelta
  20. from pathlib import Path
  21. from typing import Dict, List, Optional
  22. from zoneinfo import ZoneInfo
  23. import pandas as pd
  24. from agent.tools import tool
  25. from agent.tools.models import ToolContext, ToolResult
  26. _MINI_DIR = Path(__file__).resolve().parent.parent
  27. _TOOLS_DIR = Path(__file__).resolve().parent
  28. if str(_MINI_DIR) not in sys.path:
  29. sys.path.insert(0, str(_MINI_DIR))
  30. if str(_TOOLS_DIR) not in sys.path:
  31. sys.path.insert(0, str(_TOOLS_DIR))
  32. from config import (
  33. COLD_START_DAYS,
  34. CAUTIOUS_DAYS,
  35. BID_FLOOR_YUAN,
  36. BID_CEILING_YUAN,
  37. BID_UP_MIN_PCT,
  38. BID_UP_MAX_PCT,
  39. BID_DOWN_MIN_PCT,
  40. BID_DOWN_MAX_PCT,
  41. BID_UP_ROI_FACTOR,
  42. ROI_LOW_FACTOR,
  43. MAX_ADJUSTMENTS_PER_AD_PER_DAY,
  44. MIN_ADJUSTMENT_INTERVAL_HOURS,
  45. MAX_DAILY_CUMULATIVE_CHANGE_PCT,
  46. MAX_DAILY_OPS,
  47. DATA_FRESHNESS_MAX_HOURS,
  48. ADJUSTMENT_HISTORY_PATH,
  49. DRY_RUN_MODE,
  50. GUARDRAILS_ENABLED,
  51. DATA_DIR,
  52. TIMEZONE,
  53. )
  54. logger = logging.getLogger(__name__)
  55. # ═══════════════════════════════════════════
  56. # 调整历史持久化
  57. # ═══════════════════════════════════════════
  58. class AdjustmentHistory:
  59. """广告调整历史记录(JSON 文件持久化)。"""
  60. def __init__(self, path: Path = ADJUSTMENT_HISTORY_PATH):
  61. self._path = path
  62. self._data: Dict[str, Dict] = {}
  63. self._load()
  64. def _load(self):
  65. if self._path.exists():
  66. try:
  67. self._data = json.loads(self._path.read_text(encoding="utf-8"))
  68. except Exception as e:
  69. logger.warning("加载调整历史失败,使用空记录: %s", e)
  70. self._data = {}
  71. def _save(self):
  72. self._path.parent.mkdir(parents=True, exist_ok=True)
  73. self._path.write_text(
  74. json.dumps(self._data, ensure_ascii=False, indent=2),
  75. encoding="utf-8",
  76. )
  77. def get_today_adjustments(self, ad_id: str) -> List[Dict]:
  78. """获取某广告今天的调整记录。"""
  79. today = datetime.now().strftime("%Y-%m-%d")
  80. record = self._data.get(str(ad_id), {})
  81. adjustments = record.get("adjustments", [])
  82. return [a for a in adjustments if a.get("ts", "").startswith(today)]
  83. def get_last_adjustment_ts(self, ad_id: str) -> Optional[datetime]:
  84. """获取某广告最后一次调整的时间。"""
  85. record = self._data.get(str(ad_id), {})
  86. last_ts = record.get("last_ts")
  87. if last_ts:
  88. try:
  89. return datetime.fromisoformat(last_ts)
  90. except ValueError:
  91. return None
  92. return None
  93. def get_cumulative_pct_today(self, ad_id: str) -> float:
  94. """获取某广告今天的累计调幅绝对值。"""
  95. today_adj = self.get_today_adjustments(str(ad_id))
  96. return sum(abs(a.get("pct", 0)) for a in today_adj)
  97. def record_adjustment(self, ad_id: str, action: str, pct: float):
  98. """记录一次调整。"""
  99. ad_key = str(ad_id)
  100. now = datetime.now().isoformat()
  101. if ad_key not in self._data:
  102. self._data[ad_key] = {"adjustments": [], "last_ts": None}
  103. self._data[ad_key]["adjustments"].append({
  104. "ts": now,
  105. "action": action,
  106. "pct": pct,
  107. })
  108. self._data[ad_key]["last_ts"] = now
  109. # 只保留最近 14 天的记录(扩展:支持"持续低ROI升级关停"需查7天前降价记录)
  110. cutoff = (datetime.now() - timedelta(days=14)).isoformat()
  111. self._data[ad_key]["adjustments"] = [
  112. a for a in self._data[ad_key]["adjustments"]
  113. if a.get("ts", "") >= cutoff
  114. ]
  115. self._save()
  116. def get_today_total_ops(self) -> int:
  117. """获取今天已操作的广告总数。"""
  118. today = datetime.now().strftime("%Y-%m-%d")
  119. count = 0
  120. for ad_key, record in self._data.items():
  121. adjustments = record.get("adjustments", [])
  122. if any(a.get("ts", "").startswith(today) for a in adjustments):
  123. count += 1
  124. return count
  125. def get_last_bid_down_ts(self, ad_id: str) -> Optional[datetime]:
  126. """获取某广告最近一次 bid_down 的时间(用于"持续低ROI升级关停"判断)。"""
  127. record = self._data.get(str(ad_id), {})
  128. adjustments = record.get("adjustments", [])
  129. bid_downs = [a for a in adjustments if a.get("action") == "bid_down"]
  130. if bid_downs:
  131. last = max(bid_downs, key=lambda a: a.get("ts", ""))
  132. try:
  133. return datetime.fromisoformat(last["ts"])
  134. except ValueError:
  135. return None
  136. return None
  137. # ═══════════════════════════════════════════
  138. # 护栏检查结果
  139. # ═══════════════════════════════════════════
  140. @dataclass
  141. class GuardrailResult:
  142. """单个护栏的检查结果。"""
  143. status: str # "approved" / "blocked" / "modified"
  144. reason: str
  145. modified_action: Optional[str] = None
  146. modified_bid: Optional[float] = None
  147. modified_change_pct: Optional[float] = None
  148. # ═══════════════════════════════════════════
  149. # 护栏基类
  150. # ═══════════════════════════════════════════
  151. class Guardrail(ABC):
  152. """护栏基类。"""
  153. @property
  154. @abstractmethod
  155. def name(self) -> str:
  156. pass
  157. @abstractmethod
  158. def check(self, row: pd.Series, history: AdjustmentHistory) -> GuardrailResult:
  159. """
  160. 检查单个决策是否通过护栏。
  161. Args:
  162. row: 决策行(包含 action, ad_id, recommended_change_pct 等)
  163. history: 调整历史记录
  164. Returns:
  165. GuardrailResult
  166. """
  167. pass
  168. # ═══════════════════════════════════════════
  169. # 护栏 0: 决策一致性自检(action ↔ pct ↔ ROI 方向对齐)
  170. # ═══════════════════════════════════════════
  171. class ActionConsistencyGuardrail(Guardrail):
  172. """决策一致性护栏:拦住 LLM 的自相矛盾输出。
  173. 分两个子系列:
  174. A. 方向一致性(action ↔ pct 符号)
  175. A1: action=bid_down 必须 pct < 0(否则改 hold)
  176. A2: action=bid_up 必须 pct > 0
  177. A3: action=hold/observe 必须 pct = 0
  178. B. ROI 水位一致性(action ↔ 广告 ROI vs 渠道P50)
  179. B1: bid_down 且 dynamic_roi_7d >= 渠道P50 × 1.05 → 改 hold(Top 1 实测事故)
  180. B2: bid_up 且 dynamic_roi_7d < 渠道P50 × 0.75 → 改 hold(低效广告不能加码)
  181. B3: scale_up 且 dynamic_roi_7d < 渠道P50 × 0.90 → 改 observe(扩量先证明效率)
  182. 依赖:
  183. - row["动态ROI_7日均值"]:广告自身 ROI(由 ad_decision 合并到 decisions CSV)
  184. - row["_channel_roi_p50"]:渠道P50(由 _run_guardrails 从 metrics 算后注入)
  185. 任一值缺失时 B 系列自动跳过(降级为纯 A 系列),保证旧数据仍可通过。
  186. """
  187. @property
  188. def name(self) -> str:
  189. return "决策一致性"
  190. @staticmethod
  191. def _to_float(val) -> Optional[float]:
  192. """安全转 float:None/NaN/空串/非数字 → None。"""
  193. if val is None or val == "":
  194. return None
  195. try:
  196. f = float(val)
  197. if pd.isna(f):
  198. return None
  199. return f
  200. except (ValueError, TypeError):
  201. return None
  202. def check(self, row: pd.Series, history: AdjustmentHistory) -> GuardrailResult:
  203. action = str(row.get("action", "hold")).strip()
  204. # pct 标准化(None/NaN/非数字 → 0)
  205. pct = self._to_float(row.get("recommended_change_pct")) or 0.0
  206. # ==================== A 系列:方向一致性 ====================
  207. if action == "bid_down" and pct >= 0:
  208. return GuardrailResult(
  209. status="modified",
  210. reason=f"方向冲突:action=bid_down 但 pct={pct:+.2%}(应为负)→ 改 hold",
  211. modified_action="hold",
  212. modified_change_pct=0.0,
  213. modified_bid=None,
  214. )
  215. if action == "bid_up" and pct <= 0:
  216. return GuardrailResult(
  217. status="modified",
  218. reason=f"方向冲突:action=bid_up 但 pct={pct:+.2%}(应为正)→ 改 hold",
  219. modified_action="hold",
  220. modified_change_pct=0.0,
  221. modified_bid=None,
  222. )
  223. if action in ("hold", "observe") and abs(pct) > 1e-6:
  224. return GuardrailResult(
  225. status="modified",
  226. reason=f"维持类动作不应改出价:action={action} 但 pct={pct:+.2%} → pct 归零",
  227. modified_action=action,
  228. modified_change_pct=0.0,
  229. modified_bid=None,
  230. )
  231. # ==================== B4: 裂变-关停冲突 ====================
  232. # pause 但裂变率显著优于同类(>1.10×)→ 改 observe(裂变好说明广告质量有潜力)
  233. if action == "pause":
  234. ad_fission = self._to_float(row.get("_ad_fission"))
  235. tier_fission_mean = self._to_float(row.get("_tier_fission_mean"))
  236. if (
  237. ad_fission is not None
  238. and tier_fission_mean is not None
  239. and tier_fission_mean > 0
  240. and ad_fission > tier_fission_mean * 1.10
  241. ):
  242. return GuardrailResult(
  243. status="modified",
  244. reason=(
  245. f"裂变-关停冲突:裂变率={ad_fission:.2f} > 同类均值{tier_fission_mean:.2f}×1.10"
  246. f"={tier_fission_mean * 1.10:.2f}(裂变优秀,广告质量有潜力),"
  247. f"不应关停 → 改 observe"
  248. ),
  249. modified_action="observe",
  250. modified_change_pct=0.0,
  251. modified_bid=None,
  252. )
  253. return GuardrailResult(status="approved", reason="")
  254. # ═══════════════════════════════════════════
  255. # 护栏 1: 冷启动保护
  256. # ═══════════════════════════════════════════
  257. class ColdStartGuardrail(Guardrail):
  258. """冷启动保护:0-4天不做负向操作,4-7天仅允许小幅降价。"""
  259. @property
  260. def name(self) -> str:
  261. return "冷启动保护"
  262. def check(self, row: pd.Series, history: AdjustmentHistory) -> GuardrailResult:
  263. action = row.get("action", "hold")
  264. ad_age = row.get("ad_age_days")
  265. if ad_age is None or action == "hold":
  266. return GuardrailResult(status="approved", reason="")
  267. # 冷启动期(≤3天):极度保护,禁止所有操作
  268. if ad_age <= COLD_START_DAYS:
  269. if action in ("pause", "bid_down", "bid_up"):
  270. return GuardrailResult(
  271. status="blocked",
  272. reason=f"冷启动期({ad_age}天 ≤ {COLD_START_DAYS}天),极度保护,禁止{action}",
  273. modified_action="hold",
  274. )
  275. # 早期成长期(4-7天):仅允许提价
  276. elif ad_age <= CAUTIOUS_DAYS:
  277. # 早期成长期(4-7天):仅允许提价
  278. if action in ("pause", "bid_down"):
  279. return GuardrailResult(
  280. status="blocked",
  281. reason=f"早期成长期({ad_age}天,4-{CAUTIOUS_DAYS}天),仅允许提价,禁止{action}",
  282. modified_action="hold",
  283. )
  284. return GuardrailResult(status="approved", reason="")
  285. # ═══════════════════════════════════════════
  286. # 护栏 2: 数据新鲜度
  287. # ═══════════════════════════════════════════
  288. class DataFreshnessGuardrail(Guardrail):
  289. """数据新鲜度校验:数据超过 26 小时视为过期。"""
  290. @property
  291. def name(self) -> str:
  292. return "数据新鲜度"
  293. def check(self, row: pd.Series, history: AdjustmentHistory) -> GuardrailResult:
  294. action = row.get("action", "hold")
  295. if action == "hold":
  296. return GuardrailResult(status="approved", reason="")
  297. data_date = row.get("_data_date") # 由工具注入
  298. if data_date:
  299. try:
  300. data_dt = datetime.strptime(str(data_date), "%Y%m%d")
  301. # 使用时区感知的 datetime(支持海外部署)
  302. now = datetime.now(ZoneInfo(TIMEZONE))
  303. # data_dt 需要本地化到相同时区
  304. data_dt_aware = data_dt.replace(tzinfo=ZoneInfo(TIMEZONE))
  305. hours_old = (now - data_dt_aware).total_seconds() / 3600
  306. if hours_old > DATA_FRESHNESS_MAX_HOURS:
  307. return GuardrailResult(
  308. status="blocked",
  309. reason=f"数据已过期({hours_old:.0f}小时前,上限{DATA_FRESHNESS_MAX_HOURS}小时),阻止所有操作",
  310. modified_action="hold",
  311. )
  312. except ValueError:
  313. pass
  314. return GuardrailResult(status="approved", reason="")
  315. # ═══════════════════════════════════════════
  316. # 护栏 3: 出价边界
  317. # ═══════════════════════════════════════════
  318. class BidBoundaryGuardrail(Guardrail):
  319. """出价边界检查:钳位到 [BID_FLOOR, BID_CEILING]。"""
  320. @property
  321. def name(self) -> str:
  322. return "出价边界"
  323. def check(self, row: pd.Series, history: AdjustmentHistory) -> GuardrailResult:
  324. action = row.get("action", "hold")
  325. if action not in ("bid_up", "bid_down"):
  326. return GuardrailResult(status="approved", reason="")
  327. recommended_bid = row.get("recommended_bid")
  328. if recommended_bid is None or recommended_bid == "":
  329. return GuardrailResult(status="approved", reason="")
  330. recommended_bid = float(recommended_bid)
  331. current_bid = float(row.get("current_bid", 0) or 0)
  332. if recommended_bid < BID_FLOOR_YUAN:
  333. new_bid = BID_FLOOR_YUAN
  334. new_pct = (new_bid - current_bid) / current_bid if current_bid > 0 else 0
  335. return GuardrailResult(
  336. status="modified",
  337. reason=f"出价{recommended_bid:.2f}元低于下限{BID_FLOOR_YUAN}元,钳位至{new_bid:.2f}元",
  338. modified_bid=new_bid,
  339. modified_change_pct=round(new_pct, 4),
  340. )
  341. elif recommended_bid > BID_CEILING_YUAN:
  342. new_bid = BID_CEILING_YUAN
  343. new_pct = (new_bid - current_bid) / current_bid if current_bid > 0 else 0
  344. return GuardrailResult(
  345. status="modified",
  346. reason=f"出价{recommended_bid:.2f}元超过上限{BID_CEILING_YUAN}元,钳位至{new_bid:.2f}元",
  347. modified_bid=new_bid,
  348. modified_change_pct=round(new_pct, 4),
  349. )
  350. return GuardrailResult(status="approved", reason="")
  351. # ═══════════════════════════════════════════
  352. # 护栏 3.5: 调幅范围钳位
  353. # ═══════════════════════════════════════════
  354. class BidRangeGuardrail(Guardrail):
  355. """调幅范围钳位:bid_up 钳位到 [5%, 10%],bid_down 钳位到 [-5%, -3%]。"""
  356. @property
  357. def name(self) -> str:
  358. return "调幅范围"
  359. def check(self, row: pd.Series, history: AdjustmentHistory) -> GuardrailResult:
  360. action = row.get("action", "hold")
  361. if action not in ("bid_up", "bid_down"):
  362. return GuardrailResult(status="approved", reason="")
  363. change_pct = row.get("recommended_change_pct")
  364. if change_pct is None or change_pct == "":
  365. return GuardrailResult(status="approved", reason="")
  366. change_pct = float(change_pct)
  367. current_bid = float(row.get("current_bid", 0) or 0)
  368. if action == "bid_up":
  369. # 提价:钳位到 [BID_UP_MIN_PCT, BID_UP_MAX_PCT]
  370. clamped = max(BID_UP_MIN_PCT, min(BID_UP_MAX_PCT, change_pct))
  371. if abs(clamped - change_pct) > 0.001:
  372. new_bid = round(current_bid * (1 + clamped), 2) if current_bid > 0 else None
  373. return GuardrailResult(
  374. status="modified",
  375. reason=f"提价调幅从{change_pct*100:.1f}%钳位至{clamped*100:.1f}%(范围{BID_UP_MIN_PCT*100:.0f}%-{BID_UP_MAX_PCT*100:.0f}%)",
  376. modified_change_pct=round(clamped, 4),
  377. modified_bid=new_bid,
  378. )
  379. elif action == "bid_down":
  380. # 降价:change_pct 应为负数,钳位到 [-BID_DOWN_MAX_PCT, -BID_DOWN_MIN_PCT]
  381. clamped = min(-BID_DOWN_MIN_PCT, max(-BID_DOWN_MAX_PCT, change_pct))
  382. if abs(clamped - change_pct) > 0.001:
  383. new_bid = round(current_bid * (1 + clamped), 2) if current_bid > 0 else None
  384. return GuardrailResult(
  385. status="modified",
  386. reason=f"降价调幅从{change_pct*100:.1f}%钳位至{clamped*100:.1f}%(范围-{BID_DOWN_MAX_PCT*100:.0f}%~-{BID_DOWN_MIN_PCT*100:.0f}%)",
  387. modified_change_pct=round(clamped, 4),
  388. modified_bid=new_bid,
  389. )
  390. return GuardrailResult(status="approved", reason="")
  391. # ═══════════════════════════════════════════
  392. # 护栏 4: 频率限制
  393. # ═══════════════════════════════════════════
  394. class RateLimitGuardrail(Guardrail):
  395. """频率限制:每日次数/间隔/累计调幅。"""
  396. @property
  397. def name(self) -> str:
  398. return "频率限制"
  399. def check(self, row: pd.Series, history: AdjustmentHistory) -> GuardrailResult:
  400. action = row.get("action", "hold")
  401. if action not in ("bid_up", "bid_down", "pause"):
  402. return GuardrailResult(status="approved", reason="")
  403. ad_id = str(row.get("ad_id", ""))
  404. # 今日已调整次数
  405. today_adj = history.get_today_adjustments(ad_id)
  406. if len(today_adj) >= MAX_ADJUSTMENTS_PER_AD_PER_DAY:
  407. return GuardrailResult(
  408. status="blocked",
  409. reason=f"今日已调整{len(today_adj)}次(上限{MAX_ADJUSTMENTS_PER_AD_PER_DAY}次)",
  410. modified_action="hold",
  411. )
  412. # 距上次调整间隔
  413. last_ts = history.get_last_adjustment_ts(ad_id)
  414. if last_ts:
  415. hours_since = (datetime.now() - last_ts).total_seconds() / 3600
  416. if hours_since < MIN_ADJUSTMENT_INTERVAL_HOURS:
  417. return GuardrailResult(
  418. status="blocked",
  419. reason=f"距上次调整仅{hours_since:.1f}小时(最小间隔{MIN_ADJUSTMENT_INTERVAL_HOURS}小时)",
  420. modified_action="hold",
  421. )
  422. # 日累计调幅
  423. if action in ("bid_up", "bid_down"):
  424. change_pct = row.get("recommended_change_pct", 0)
  425. if isinstance(change_pct, str):
  426. try:
  427. change_pct = float(change_pct)
  428. except ValueError:
  429. change_pct = 0
  430. cumulative = history.get_cumulative_pct_today(ad_id)
  431. if cumulative + abs(change_pct) > MAX_DAILY_CUMULATIVE_CHANGE_PCT:
  432. remaining = MAX_DAILY_CUMULATIVE_CHANGE_PCT - cumulative
  433. if remaining <= 0:
  434. return GuardrailResult(
  435. status="blocked",
  436. reason=f"日累计调幅已达{cumulative*100:.1f}%(上限{MAX_DAILY_CUMULATIVE_CHANGE_PCT*100:.0f}%)",
  437. modified_action="hold",
  438. )
  439. else:
  440. # 修正调幅
  441. direction = 1 if change_pct > 0 else -1
  442. new_pct = direction * remaining
  443. current_bid = float(row.get("current_bid", 0) or 0)
  444. new_bid = round(current_bid * (1 + new_pct), 2) if current_bid > 0 else None
  445. return GuardrailResult(
  446. status="modified",
  447. reason=f"调幅从{abs(change_pct)*100:.1f}%缩减至{abs(new_pct)*100:.1f}%(日累计限制)",
  448. modified_change_pct=round(new_pct, 4),
  449. modified_bid=new_bid,
  450. )
  451. return GuardrailResult(status="approved", reason="")
  452. # ═══════════════════════════════════════════
  453. # 护栏 5: 每日操作总量上限
  454. # ═══════════════════════════════════════════
  455. class DailyOpsCapGuardrail(Guardrail):
  456. """每日操作总量上限:单日最多操作 N 个广告。"""
  457. @property
  458. def name(self) -> str:
  459. return "每日操作上限"
  460. def __init__(self):
  461. self._approved_count = 0
  462. def check(self, row: pd.Series, history: AdjustmentHistory) -> GuardrailResult:
  463. action = row.get("action", "hold")
  464. if action == "hold":
  465. return GuardrailResult(status="approved", reason="")
  466. total_ops = history.get_today_total_ops() + self._approved_count
  467. if total_ops >= MAX_DAILY_OPS:
  468. return GuardrailResult(
  469. status="blocked",
  470. reason=f"今日已操作{total_ops}个广告(上限{MAX_DAILY_OPS}个),请按ROI严重度排优先级",
  471. modified_action="hold",
  472. )
  473. self._approved_count += 1
  474. return GuardrailResult(status="approved", reason="")
  475. # ═══════════════════════════════════════════
  476. # 护栏 6: 干运行模式
  477. # ═══════════════════════════════════════════
  478. class DryRunGuardrail(Guardrail):
  479. """干运行模式:全部标记为 dry_run。"""
  480. @property
  481. def name(self) -> str:
  482. return "干运行模式"
  483. def check(self, row: pd.Series, history: AdjustmentHistory) -> GuardrailResult:
  484. action = row.get("action", "hold")
  485. if action == "hold":
  486. return GuardrailResult(status="approved", reason="")
  487. if DRY_RUN_MODE:
  488. return GuardrailResult(
  489. status="modified",
  490. reason="干运行模式:操作不会实际执行",
  491. )
  492. return GuardrailResult(status="approved", reason="")
  493. # ═══════════════════════════════════════════
  494. # 护栏引擎
  495. # ═══════════════════════════════════════════
  496. def _run_guardrails(
  497. df: pd.DataFrame,
  498. data_date: str,
  499. dry_run: bool = False,
  500. channel_roi_p50: Optional[float] = None,
  501. ) -> pd.DataFrame:
  502. """
  503. 对决策 DataFrame 执行 2 道护栏检查。
  504. 新增列:
  505. - guardrail_status: approved / blocked / modified
  506. - guardrail_reason: 护栏说明
  507. - final_action: 护栏修正后的最终动作
  508. - final_bid: 护栏修正后的最终出价
  509. Args:
  510. df: 决策 DataFrame
  511. data_date: 数据日期(YYYYMMDD)
  512. dry_run: 是否强制干运行
  513. channel_roi_p50: 渠道P50(全体广告动态ROI_7日均值的中位数),
  514. 用于 ActionConsistencyGuardrail B 系列规则。缺省时 B 系列跳过。
  515. """
  516. history = AdjustmentHistory()
  517. guardrails = [
  518. # 暂时全部关闭,测试 LLM 裸输出 + 字段精简的效果
  519. ]
  520. # 注入数据日期 + 渠道P50(给 ActionConsistencyGuardrail B 系列用)
  521. df["_data_date"] = data_date
  522. if channel_roi_p50 is not None and channel_roi_p50 > 0:
  523. df["_channel_roi_p50"] = channel_roi_p50
  524. statuses = []
  525. reasons = []
  526. final_actions = []
  527. final_bids = []
  528. for _, row in df.iterrows():
  529. action = row.get("action", "hold")
  530. current_status = "approved"
  531. current_reasons = []
  532. current_action = action
  533. current_bid = row.get("recommended_bid")
  534. current_change_pct = row.get("recommended_change_pct")
  535. if action == "hold":
  536. statuses.append("approved")
  537. reasons.append("")
  538. final_actions.append("hold")
  539. final_bids.append(None)
  540. continue
  541. for guardrail in guardrails:
  542. # 构建可变行用于护栏检查
  543. check_row = row.copy()
  544. if current_bid is not None:
  545. check_row["recommended_bid"] = current_bid
  546. if current_change_pct is not None:
  547. check_row["recommended_change_pct"] = current_change_pct
  548. check_row["action"] = current_action
  549. result = guardrail.check(check_row, history)
  550. if result.status == "blocked":
  551. current_status = "blocked"
  552. current_reasons.append(f"[{guardrail.name}] {result.reason}")
  553. current_action = result.modified_action or "hold"
  554. break
  555. elif result.status == "modified":
  556. current_status = "modified"
  557. current_reasons.append(f"[{guardrail.name}] {result.reason}")
  558. if result.modified_action:
  559. current_action = result.modified_action
  560. if result.modified_bid is not None:
  561. current_bid = result.modified_bid
  562. if result.modified_change_pct is not None:
  563. current_change_pct = result.modified_change_pct
  564. statuses.append(current_status)
  565. reasons.append("; ".join(current_reasons))
  566. final_actions.append(current_action)
  567. final_bids.append(current_bid if current_action in ("bid_up", "bid_down") else None)
  568. df["guardrail_status"] = statuses
  569. df["guardrail_reason"] = reasons
  570. df["final_action"] = final_actions
  571. df["final_bid"] = final_bids
  572. # 清理临时列
  573. df.drop(columns=["_data_date", "_channel_roi_p50"], inplace=True, errors="ignore")
  574. return df
  575. # ═══════════════════════════════════════════
  576. # 工具:验证决策安全性
  577. # ═══════════════════════════════════════════
  578. @tool(description="验证决策安全性:冷启动保护、出价边界、频率限制、数据新鲜度")
  579. async def validate_decisions(
  580. ctx: ToolContext = None,
  581. decisions_csv: str = "",
  582. end_date: str = "yesterday",
  583. dry_run: bool = False,
  584. ) -> ToolResult:
  585. """
  586. 对每个决策执行 6 道护栏检查。
  587. 输入:apply_decisions 输出的 llm_decisions CSV
  588. 输出:validated_decisions_{date}.csv,新增列 guardrail_status / guardrail_reason / final_action / final_bid
  589. Args:
  590. decisions_csv: 决策 CSV 路径(默认最新的 llm_decisions)
  591. end_date: 结束日期
  592. dry_run: 是否强制干运行模式
  593. """
  594. try:
  595. if not GUARDRAILS_ENABLED:
  596. return ToolResult(
  597. title="护栏已禁用",
  598. output="GUARDRAILS_ENABLED=False,跳过护栏验证",
  599. )
  600. if end_date == "yesterday":
  601. end_date = (datetime.now() - timedelta(days=1)).strftime("%Y%m%d")
  602. # 自动查找最新决策 CSV(按修改时间排序,而非文件名)
  603. if not decisions_csv:
  604. reports_dir = _MINI_DIR / "outputs" / "reports"
  605. candidates = sorted(
  606. reports_dir.glob("llm_decisions_*.csv"),
  607. key=lambda p: p.stat().st_mtime, # 按修改时间排序
  608. reverse=True
  609. )
  610. if not candidates:
  611. return ToolResult(title="validate_decisions", output="未找到决策 CSV")
  612. decisions_csv = str(candidates[0])
  613. df = pd.read_csv(decisions_csv)
  614. if df.empty:
  615. return ToolResult(title="validate_decisions", output="决策数据为空")
  616. # 读取一次 metrics 做补字段和渠道P50(供 ActionConsistencyGuardrail B 系列用)
  617. metrics_csv = _MINI_DIR / "outputs" / "metrics_temp.csv"
  618. channel_roi_p50: Optional[float] = None
  619. if metrics_csv.exists():
  620. try:
  621. df_metrics = pd.read_csv(metrics_csv)
  622. # 补充广告年龄(如果缺失)
  623. if "ad_age_days" not in df.columns and "create_time" in df_metrics.columns:
  624. from ad_decision import _calculate_ad_age_days
  625. df_metrics["ad_age_days"] = df_metrics["create_time"].apply(_calculate_ad_age_days)
  626. age_map = df_metrics.set_index("ad_id")["ad_age_days"].to_dict()
  627. df["ad_age_days"] = df["ad_id"].map(age_map)
  628. # 补充当前出价(如果缺失)
  629. if ("current_bid" not in df.columns or df["current_bid"].isna().all()) \
  630. and "bid_amount" in df_metrics.columns:
  631. bid_map = df_metrics.set_index("ad_id")["bid_amount"].to_dict()
  632. df["current_bid"] = df["ad_id"].map(bid_map)
  633. # 计算渠道P50:全体广告"动态ROI_7日均值"的中位数
  634. if "动态ROI_7日均值" in df_metrics.columns:
  635. roi_series = df_metrics["动态ROI_7日均值"].dropna()
  636. if len(roi_series) > 0:
  637. channel_roi_p50 = float(roi_series.median())
  638. logger.info(f"护栏 B 系列启用:渠道P50={channel_roi_p50:.4f}")
  639. # 注入裂变字段(供 B4 护栏使用)
  640. if "T0裂变系数_7日均值" in df_metrics.columns and "audience_tier" in df_metrics.columns:
  641. # ad_fission: 每条广告自身的裂变率
  642. fission_map = df_metrics.set_index("ad_id")["T0裂变系数_7日均值"].to_dict()
  643. df["_ad_fission"] = df["ad_id"].map(fission_map)
  644. # tier_fission_mean: 按人群包分组的同类均值
  645. tier_fission = df_metrics.groupby("audience_tier")["T0裂变系数_7日均值"].mean().to_dict()
  646. tier_map = df_metrics.set_index("ad_id")["audience_tier"].to_dict()
  647. df["_tier_fission_mean"] = df["ad_id"].map(
  648. lambda aid: tier_fission.get(tier_map.get(aid))
  649. )
  650. b4_ready = df["_ad_fission"].notna().sum()
  651. logger.info(f"护栏 B4 裂变字段已注入:{b4_ready} 条广告有裂变数据")
  652. except Exception as e:
  653. logger.warning(f"读取 metrics 失败(护栏 B 系列将跳过): {e}")
  654. if channel_roi_p50 is None:
  655. logger.warning("未能计算渠道P50,ActionConsistencyGuardrail B 系列规则跳过")
  656. # 运行护栏链
  657. df = _run_guardrails(df, data_date=end_date, dry_run=dry_run, channel_roi_p50=channel_roi_p50)
  658. # 保存验证结果
  659. reports_dir = _MINI_DIR / "outputs" / "reports"
  660. reports_dir.mkdir(parents=True, exist_ok=True)
  661. out_path = reports_dir / f"validated_decisions_{end_date}.csv"
  662. df.to_csv(out_path, index=False, encoding="utf-8-sig")
  663. # 统计
  664. total = len(df)
  665. approved = (df["guardrail_status"] == "approved").sum()
  666. blocked = (df["guardrail_status"] == "blocked").sum()
  667. modified = (df["guardrail_status"] == "modified").sum()
  668. # 最终动作统计
  669. final_pause = (df["final_action"] == "pause").sum()
  670. final_hold = (df["final_action"] == "hold").sum()
  671. final_bid_up = (df["final_action"] == "bid_up").sum()
  672. final_bid_down = (df["final_action"] == "bid_down").sum()
  673. output_lines = [
  674. f"护栏验证完成: {out_path}",
  675. "",
  676. f"护栏结果:",
  677. f" approved: {approved} 个(直接通过)",
  678. f" modified: {modified} 个(参数修正后通过)",
  679. f" blocked: {blocked} 个(被拦截→hold)",
  680. "",
  681. f"最终动作分布:",
  682. f" pause: {final_pause} 个",
  683. f" bid_down: {final_bid_down} 个",
  684. f" bid_up: {final_bid_up} 个",
  685. f" hold: {final_hold} 个",
  686. ]
  687. if DRY_RUN_MODE or dry_run:
  688. output_lines.append("")
  689. output_lines.append("⚠️ 当前为干运行模式(DRY_RUN),操作不会实际执行")
  690. return ToolResult(
  691. title=f"护栏验证({total}条,拦截{blocked})",
  692. output="\n".join(output_lines),
  693. metadata={
  694. "csv_path": str(out_path),
  695. "total": total,
  696. "approved": int(approved),
  697. "blocked": int(blocked),
  698. "modified": int(modified),
  699. "final_pause": int(final_pause),
  700. "final_hold": int(final_hold),
  701. "final_bid_up": int(final_bid_up),
  702. "final_bid_down": int(final_bid_down),
  703. "dry_run": DRY_RUN_MODE or dry_run,
  704. },
  705. )
  706. except Exception as e:
  707. logger.error("validate_decisions 失败: %s", e, exc_info=True)
  708. return ToolResult(title="validate_decisions 失败", output=str(e))