""" 执行引擎 — auto_put_ad_mini 职责: 1. 加载护栏验证后的决策 2. 按自治级别分类(Tier 1 自动 / Tier 2 审批 / Tier 3 阻断) 3. 调用腾讯广告 API 执行操作 4. 记录审计日志(JSONL) 5. 执行后效果检查 依赖: - tools/ad_api.py 的底层 HTTP 函数 - tools/guardrails.py 的 AdjustmentHistory """ import asyncio import json import logging import sys import time from datetime import datetime, timedelta from pathlib import Path from typing import Any, Dict, List, Optional import pandas as pd from agent.tools import tool from agent.tools.models import ToolContext, ToolResult _MINI_DIR = Path(__file__).resolve().parent.parent _TOOLS_DIR = Path(__file__).resolve().parent if str(_MINI_DIR) not in sys.path: sys.path.insert(0, str(_MINI_DIR)) if str(_TOOLS_DIR) not in sys.path: sys.path.insert(0, str(_TOOLS_DIR)) from config import ( EXECUTION_ENABLED, EXECUTION_LOG_DIR, API_QPS_LIMIT, API_MAX_RETRIES, TIER1_MAX_CHANGE_PCT, TIER3_MIN_DAILY_SPEND, FEEDBACK_CHECK_HOURS, DRY_RUN_MODE, IM_ENABLED, ) logger = logging.getLogger(__name__) # ═══════════════════════════════════════════ # QPS 令牌桶限流 # ═══════════════════════════════════════════ class TokenBucket: """简单令牌桶限流器。""" def __init__(self, rate: float = API_QPS_LIMIT): self._rate = rate self._tokens = rate self._last_refill = time.monotonic() async def acquire(self): """获取一个令牌,不够时等待。""" while True: now = time.monotonic() elapsed = now - self._last_refill self._tokens = min(self._rate, self._tokens + elapsed * self._rate) self._last_refill = now if self._tokens >= 1: self._tokens -= 1 return else: wait = (1 - self._tokens) / self._rate await asyncio.sleep(wait) # ═══════════════════════════════════════════ # 腾讯广告执行器 # ═══════════════════════════════════════════ class TencentAdExecutor: """ 腾讯广告 API 执行器。 复用 ad_api.py 的底层 HTTP 函数,添加: - QPS 令牌桶限流 - 指数退避重试 - 操作前快照 """ def __init__(self): self._bucket = TokenBucket(rate=API_QPS_LIMIT) async def get_ad_state(self, ad_id: int, account_id: int) -> Optional[Dict]: """获取广告当前状态快照。""" try: from ad_api import _get, _check await self._bucket.acquire() resp = _get("/adgroups/get", { "account_id": account_id, "filtering": {"adgroup_id_list": [ad_id]}, "page": 1, "page_size": 1, }) data = _check(resp, "get_ad_state") items = data.get("list", []) return items[0] if items else None except Exception as e: logger.warning("获取广告 %s 状态失败: %s", ad_id, e) return None async def update_bid(self, ad_id: int, account_id: int, bid_amount_fen: int) -> Dict: """更新广告出价(分)。""" from ad_api import _post, _check for attempt in range(API_MAX_RETRIES): try: await self._bucket.acquire() body = { "account_id": account_id, "adgroup_id": ad_id, "bid_amount": bid_amount_fen, } resp = _post("/adgroups/update", body) _check(resp, "update_bid") return {"code": 0, "message": "success"} except Exception as e: if attempt < API_MAX_RETRIES - 1: wait = 2 ** attempt logger.warning("update_bid 重试 %d/%d (等待%ds): %s", attempt + 1, API_MAX_RETRIES, wait, e) await asyncio.sleep(wait) else: return {"code": -1, "message": str(e)} async def pause_ad(self, ad_id: int, account_id: int) -> Dict: """暂停广告。""" from ad_api import _post, _check for attempt in range(API_MAX_RETRIES): try: await self._bucket.acquire() body = { "account_id": account_id, "adgroup_id": ad_id, "configured_status": "AD_STATUS_SUSPEND", } resp = _post("/adgroups/update", body) _check(resp, "pause_ad") return {"code": 0, "message": "success"} except Exception as e: if attempt < API_MAX_RETRIES - 1: wait = 2 ** attempt logger.warning("pause_ad 重试 %d/%d: %s", attempt + 1, API_MAX_RETRIES, e) await asyncio.sleep(wait) else: return {"code": -1, "message": str(e)} # ═══════════════════════════════════════════ # 审计日志 # ═══════════════════════════════════════════ class AuditLogger: """JSONL 审计日志。""" def __init__(self, log_dir: Path = EXECUTION_LOG_DIR): self._log_dir = log_dir self._log_dir.mkdir(parents=True, exist_ok=True) today = datetime.now().strftime("%Y%m%d") self._path = self._log_dir / f"exec_{today}.jsonl" def log(self, entry: Dict): entry["ts"] = datetime.now().isoformat() with open(self._path, "a", encoding="utf-8") as f: f.write(json.dumps(entry, ensure_ascii=False) + "\n") @property def path(self) -> Path: return self._path # ═══════════════════════════════════════════ # 自治级别分类 # ═══════════════════════════════════════════ def _classify_tier(row: pd.Series) -> int: """ 自治级别分类。 Tier 0 (无操作): hold, observe, creative_adjust, scale_up(不调用API,仅建议) Tier 1 (自动执行): bid 调幅 ≤ 5% Tier 2 (需审批): pause 或 bid 调幅 > 5% Tier 3 (高价值阻断): 日消耗 > 1500 元的高价值广告 """ action = row.get("final_action", row.get("action", "hold")) # Tier 0: 无需操作的动作(仅建议,不执行) if action in ("hold", "observe", "creative_adjust", "scale_up"): return 0 # 无操作(observe=观察,creative_adjust/scale_up=需人工执行) cost_7d_avg = float(row.get("cost_7d_avg", 0) or 0) change_pct = row.get("recommended_change_pct", 0) if isinstance(change_pct, str): try: change_pct = float(change_pct) except ValueError: change_pct = 0 # Tier 3: 高价值广告 if cost_7d_avg >= TIER3_MIN_DAILY_SPEND: return 3 # Tier 2: 暂停 或 大幅调价 if action == "pause" or abs(change_pct) > TIER1_MAX_CHANGE_PCT: return 2 # Tier 1: 小幅调价 return 1 # ═══════════════════════════════════════════ # 工具:执行已验证的决策 # ═══════════════════════════════════════════ @tool(description="执行已验证的决策:分级自治 → API调用 → 审计日志") async def execute_decisions( ctx: ToolContext, validated_csv: str = "", approval_mode: str = "tiered", ) -> ToolResult: """ 执行已通过护栏验证的决策。 Pipeline: 1. 加载 validated_decisions CSV (guardrail_status != blocked) 2. 按自治级别分类 3. Tier 1 (小调, ≤5%): 直接执行 + 通知 4. Tier 2 (大调/暂停): 标记待审批 5. Tier 3 (高价值): 标记需人工审批 6. 记录审计日志 Args: validated_csv: 护栏验证后的 CSV 路径 approval_mode: "auto"=全部自动执行, "tiered"=分级, "manual"=全部需审批 """ try: if not EXECUTION_ENABLED: return ToolResult( title="执行引擎未启用", output="EXECUTION_ENABLED=False,跳过执行。修改 config.py 启用。", ) if DRY_RUN_MODE: return ToolResult( title="干运行模式", output="DRY_RUN_MODE=True,操作不会实际执行。修改 config.py 关闭干运行。", ) # 查找验证 CSV if not validated_csv: reports_dir = _MINI_DIR / "outputs" / "reports" candidates = sorted(reports_dir.glob("validated_decisions_*.csv"), reverse=True) if not candidates: return ToolResult(title="execute_decisions", output="未找到验证后的决策 CSV") validated_csv = str(candidates[0]) df = pd.read_csv(validated_csv) if df.empty: return ToolResult(title="execute_decisions", output="决策数据为空") # 只执行 approved / modified(非 blocked) df_exec = df[df["guardrail_status"].isin(["approved", "modified"])].copy() df_exec = df_exec[df_exec["final_action"] != "hold"] if df_exec.empty: return ToolResult( title="无需执行的操作", output="所有决策要么是 hold,要么被护栏拦截", ) # 分级 df_exec["tier"] = df_exec.apply(_classify_tier, axis=1) executor = TencentAdExecutor() audit = AuditLogger() # 从 guardrails 导入 AdjustmentHistory 记录操作 from guardrails import AdjustmentHistory history = AdjustmentHistory() executed = 0 failed = 0 pending_approval = 0 approved_executed = 0 rejected_count = 0 timeout_count = 0 tier_summary = {1: 0, 2: 0, 3: 0} # ─── 分离 Tier 1 和 Tier 2/3 ─── df_tier1 = df_exec[df_exec["tier"] == 1] df_tier2_3 = df_exec[df_exec["tier"] >= 2] for t in [1, 2, 3]: tier_summary[t] = int((df_exec["tier"] == t).sum()) # ═══ Phase 1: 自动执行 Tier 1 ═══ for _, row in df_tier1.iterrows(): action = row.get("final_action", row.get("action")) ad_id = int(row["ad_id"]) account_id = int(row.get("account_id", 0) or 0) pre_state = await executor.get_ad_state(ad_id, account_id) if action == "pause": result = await executor.pause_ad(ad_id, account_id) elif action in ("bid_up", "bid_down"): final_bid = row.get("final_bid", row.get("recommended_bid")) if final_bid is None or final_bid == "": audit.log({ "ad_id": ad_id, "action": action, "tier": 1, "execution_status": "skipped", "reason": "无出价数据", }) continue bid_fen = int(float(final_bid) * 100) result = await executor.update_bid(ad_id, account_id, bid_fen) else: continue api_code = result.get("code", -1) exec_status = "success" if api_code == 0 else "failed" if exec_status == "success": executed += 1 change_pct = row.get("recommended_change_pct", 0) if isinstance(change_pct, str): try: change_pct = float(change_pct) except ValueError: change_pct = 0 history.record_adjustment(str(ad_id), action, change_pct) else: failed += 1 post_state = await executor.get_ad_state(ad_id, account_id) if exec_status == "success" else None audit.log({ "ad_id": ad_id, "account_id": account_id, "action": action, "tier": 1, "pre_state": { "bid_amount": pre_state.get("bid_amount") if pre_state else None, "status": pre_state.get("configured_status") if pre_state else None, } if pre_state else None, "post_state": { "bid_amount": post_state.get("bid_amount") if post_state else None, "status": post_state.get("configured_status") if post_state else None, } if post_state else None, "api_code": api_code, "api_message": result.get("message", ""), "execution_status": exec_status, "source": row.get("source", ""), }) # ═══ Phase 2: Tier 2/3 — 审批 + 执行 ═══ if not df_tier2_3.empty: if IM_ENABLED: # 阻塞式审批:调用 send_approval_request(wait_for_reply=True) logger.info("Tier 2/3 共 %d 个操作,发送 IM 审批并等待...", len(df_tier2_3)) from im_approval import send_approval_request approval_result = await send_approval_request( ctx=ctx, validated_csv=validated_csv, wait_for_reply=True, ) approval_status = ( approval_result.metadata.get("status", "timeout") if approval_result.metadata else "timeout" ) approved_ids = ( approval_result.metadata.get("approved_ids", []) if approval_result.metadata else [] ) rejected_ids = ( approval_result.metadata.get("rejected_ids", []) if approval_result.metadata else [] ) if approval_status == "timeout": # 超时:所有 Tier 2/3 标记为 timeout timeout_count = len(df_tier2_3) for _, row in df_tier2_3.iterrows(): audit.log({ "ad_id": int(row["ad_id"]), "account_id": int(row.get("account_id", 0) or 0), "action": row.get("final_action", row.get("action")), "tier": int(row.get("tier", 2)), "execution_status": "timeout", "source": row.get("source", ""), }) else: # 执行已批准的广告 approved_set = set(int(x) for x in approved_ids) rejected_set = set(int(x) for x in rejected_ids) for _, row in df_tier2_3.iterrows(): ad_id = int(row["ad_id"]) account_id = int(row.get("account_id", 0) or 0) action = row.get("final_action", row.get("action")) tier = int(row.get("tier", 2)) if ad_id in rejected_set: rejected_count += 1 audit.log({ "ad_id": ad_id, "account_id": account_id, "action": action, "tier": tier, "execution_status": "rejected", "source": row.get("source", ""), }) continue if ad_id not in approved_set: # 既不在 approved 也不在 rejected(部分审批场景遗漏) pending_approval += 1 audit.log({ "ad_id": ad_id, "account_id": account_id, "action": action, "tier": tier, "execution_status": "pending_approval", "source": row.get("source", ""), }) continue # 已批准 → 执行 pre_state = await executor.get_ad_state(ad_id, account_id) if action == "pause": result = await executor.pause_ad(ad_id, account_id) elif action in ("bid_up", "bid_down"): final_bid = row.get("final_bid", row.get("recommended_bid")) if final_bid is None or final_bid == "": audit.log({ "ad_id": ad_id, "action": action, "tier": tier, "execution_status": "skipped", "reason": "无出价数据", }) continue bid_fen = int(float(final_bid) * 100) result = await executor.update_bid(ad_id, account_id, bid_fen) else: continue api_code = result.get("code", -1) exec_status = "success" if api_code == 0 else "failed" if exec_status == "success": approved_executed += 1 change_pct = row.get("recommended_change_pct", 0) if isinstance(change_pct, str): try: change_pct = float(change_pct) except ValueError: change_pct = 0 history.record_adjustment(str(ad_id), action, change_pct) else: failed += 1 post_state = await executor.get_ad_state(ad_id, account_id) if exec_status == "success" else None audit.log({ "ad_id": ad_id, "account_id": account_id, "action": action, "tier": tier, "pre_state": { "bid_amount": pre_state.get("bid_amount") if pre_state else None, "status": pre_state.get("configured_status") if pre_state else None, } if pre_state else None, "post_state": { "bid_amount": post_state.get("bid_amount") if post_state else None, "status": post_state.get("configured_status") if post_state else None, } if post_state else None, "api_code": api_code, "api_message": result.get("message", ""), "execution_status": f"approved_{exec_status}", "source": row.get("source", ""), }) else: # IM 未启用:Tier 2/3 仅记录不执行 logger.info("IM 未启用,Tier 2/3 共 %d 个操作仅记录不执行", len(df_tier2_3)) pending_approval = len(df_tier2_3) for _, row in df_tier2_3.iterrows(): audit.log({ "ad_id": int(row["ad_id"]), "account_id": int(row.get("account_id", 0) or 0), "action": row.get("final_action", row.get("action")), "tier": int(row.get("tier", 2)), "execution_status": "pending_approval", "note": "IM未启用,操作仅记录", "source": row.get("source", ""), }) total_executed = executed + approved_executed output_lines = [ f"执行完成,审计日志: {audit.path}", "", "执行结果:", f" Tier 1 自动执行: {executed} 个成功 / {failed} 个失败", ] if IM_ENABLED and not df_tier2_3.empty: output_lines.extend([ f" Tier 2/3 审批后执行: {approved_executed} 个成功", f" 审批拒绝: {rejected_count} 个", f" 审批超时: {timeout_count} 个", ]) if pending_approval > 0: output_lines.append(f" 待审批(未执行): {pending_approval} 个") output_lines.extend([ "", "自治级别分布:", f" Tier 1 (自动): {tier_summary.get(1, 0)} 个", f" Tier 2 (审批): {tier_summary.get(2, 0)} 个", f" Tier 3 (高价值): {tier_summary.get(3, 0)} 个", ]) return ToolResult( title=f"执行完成(自动{executed}/审批通过{approved_executed}/拒绝{rejected_count}/超时{timeout_count})", output="\n".join(output_lines), metadata={ "audit_log": str(audit.path), "tier1_executed": executed, "tier1_failed": failed, "approved_executed": approved_executed, "rejected": rejected_count, "timeout": timeout_count, "pending_approval": pending_approval, "tier_summary": tier_summary, }, ) except Exception as e: logger.error("execute_decisions 失败: %s", e, exc_info=True) return ToolResult(title="execute_decisions 失败", output=str(e)) # ═══════════════════════════════════════════ # 工具:执行后效果检查 # ═══════════════════════════════════════════ @tool(description="执行后效果检查:对比操作前后广告表现") async def check_execution_feedback( ctx: ToolContext, execution_log_path: str = "", hours_after: int = FEEDBACK_CHECK_HOURS, ) -> ToolResult: """ 读取执行日志,通过 API 获取当前状态,对比操作前后。 Args: execution_log_path: 执行日志路径(JSONL),默认最新 hours_after: 操作后等待时间(小时),仅检查超过此时间的操作 """ try: # 查找最新执行日志 if not execution_log_path: log_dir = EXECUTION_LOG_DIR if not log_dir.exists(): return ToolResult(title="check_execution_feedback", output="无执行日志目录") candidates = sorted(log_dir.glob("exec_*.jsonl"), reverse=True) if not candidates: return ToolResult(title="check_execution_feedback", output="无执行日志") execution_log_path = str(candidates[0]) # 读取日志 entries = [] with open(execution_log_path, "r", encoding="utf-8") as f: for line in f: line = line.strip() if line: entries.append(json.loads(line)) if not entries: return ToolResult(title="check_execution_feedback", output="执行日志为空") # 过滤:只看成功执行的且超过等待时间的 cutoff = datetime.now() - timedelta(hours=hours_after) check_entries = [ e for e in entries if e.get("execution_status") == "success" and datetime.fromisoformat(e["ts"]) < cutoff ] if not check_entries: return ToolResult( title="check_execution_feedback", output=f"无需检查(没有超过{hours_after}小时前的成功操作)", ) # 获取当前状态 executor = TencentAdExecutor() results = [] for entry in check_entries: ad_id = entry.get("ad_id") account_id = entry.get("account_id", 0) action = entry.get("action") pre_state = entry.get("pre_state", {}) current_state = await executor.get_ad_state(ad_id, account_id) result = { "ad_id": ad_id, "action": action, "executed_at": entry.get("ts"), "pre_bid": pre_state.get("bid_amount") if pre_state else None, "pre_status": pre_state.get("status") if pre_state else None, "current_bid": current_state.get("bid_amount") if current_state else None, "current_status": current_state.get("configured_status") if current_state else None, } # 状态变化判断 if action == "pause": result["effective"] = ( current_state.get("configured_status") == "AD_STATUS_SUSPEND" if current_state else None ) elif action in ("bid_up", "bid_down"): post_bid = entry.get("post_state", {}) if post_bid: expected_bid = post_bid.get("bid_amount") actual_bid = current_state.get("bid_amount") if current_state else None result["expected_bid"] = expected_bid result["effective"] = (actual_bid == expected_bid) if actual_bid is not None else None results.append(result) # 统计 effective = sum(1 for r in results if r.get("effective") is True) ineffective = sum(1 for r in results if r.get("effective") is False) unknown = sum(1 for r in results if r.get("effective") is None) output_lines = [ f"效果检查完成({len(results)} 个操作)", "", f" 有效: {effective} 个", f" 无效/被覆盖: {ineffective} 个", f" 未知: {unknown} 个", ] if ineffective > 0: output_lines.append("") output_lines.append("⚠️ 以下操作可能未生效或被覆盖:") for r in results: if r.get("effective") is False: output_lines.append( f" - 广告{r['ad_id']}: {r['action']} " f"(执行于 {r['executed_at']})" ) return ToolResult( title=f"效果检查(有效{effective}/无效{ineffective})", output="\n".join(output_lines), metadata={ "total": len(results), "effective": effective, "ineffective": ineffective, "unknown": unknown, "details": results, }, ) except Exception as e: logger.error("check_execution_feedback 失败: %s", e, exc_info=True) return ToolResult(title="check_execution_feedback 失败", output=str(e))