| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699 |
- """
- 执行引擎 — auto_put_ad_mini
- 职责:
- 1. 加载护栏验证后的决策
- 2. 按自治级别分类(Tier 1 自动 / Tier 2 审批 / Tier 3 阻断)
- 3. 调用腾讯广告 API 执行操作
- 4. 记录审计日志(JSONL)
- 5. 执行后效果检查
- 依赖:
- - tools/ad_api.py 的底层 HTTP 函数
- - tools/guardrails.py 的 AdjustmentHistory
- """
- import asyncio
- import json
- import logging
- import sys
- import time
- from datetime import datetime, timedelta
- from pathlib import Path
- from typing import Any, Dict, List, Optional
- import pandas as pd
- from agent.tools import tool
- from agent.tools.models import ToolContext, ToolResult
- _MINI_DIR = Path(__file__).resolve().parent.parent
- _TOOLS_DIR = Path(__file__).resolve().parent
- if str(_MINI_DIR) not in sys.path:
- sys.path.insert(0, str(_MINI_DIR))
- if str(_TOOLS_DIR) not in sys.path:
- sys.path.insert(0, str(_TOOLS_DIR))
- from config import (
- EXECUTION_ENABLED,
- EXECUTION_LOG_DIR,
- API_QPS_LIMIT,
- API_MAX_RETRIES,
- TIER1_MAX_CHANGE_PCT,
- TIER3_MIN_DAILY_SPEND,
- FEEDBACK_CHECK_HOURS,
- DRY_RUN_MODE,
- IM_ENABLED,
- )
- logger = logging.getLogger(__name__)
- # ═══════════════════════════════════════════
- # QPS 令牌桶限流
- # ═══════════════════════════════════════════
- class TokenBucket:
- """简单令牌桶限流器。"""
- def __init__(self, rate: float = API_QPS_LIMIT):
- self._rate = rate
- self._tokens = rate
- self._last_refill = time.monotonic()
- async def acquire(self):
- """获取一个令牌,不够时等待。"""
- while True:
- now = time.monotonic()
- elapsed = now - self._last_refill
- self._tokens = min(self._rate, self._tokens + elapsed * self._rate)
- self._last_refill = now
- if self._tokens >= 1:
- self._tokens -= 1
- return
- else:
- wait = (1 - self._tokens) / self._rate
- await asyncio.sleep(wait)
- # ═══════════════════════════════════════════
- # 腾讯广告执行器
- # ═══════════════════════════════════════════
- class TencentAdExecutor:
- """
- 腾讯广告 API 执行器。
- 复用 ad_api.py 的底层 HTTP 函数,添加:
- - QPS 令牌桶限流
- - 指数退避重试
- - 操作前快照
- """
- def __init__(self):
- self._bucket = TokenBucket(rate=API_QPS_LIMIT)
- async def get_ad_state(self, ad_id: int, account_id: int) -> Optional[Dict]:
- """获取广告当前状态快照。"""
- try:
- from ad_api import _get, _check
- await self._bucket.acquire()
- resp = _get("/adgroups/get", {
- "account_id": account_id,
- "filtering": {"adgroup_id_list": [ad_id]},
- "page": 1,
- "page_size": 1,
- })
- data = _check(resp, "get_ad_state")
- items = data.get("list", [])
- return items[0] if items else None
- except Exception as e:
- logger.warning("获取广告 %s 状态失败: %s", ad_id, e)
- return None
- async def update_bid(self, ad_id: int, account_id: int, bid_amount_fen: int) -> Dict:
- """更新广告出价(分)。"""
- from ad_api import _post, _check
- for attempt in range(API_MAX_RETRIES):
- try:
- await self._bucket.acquire()
- body = {
- "account_id": account_id,
- "adgroup_id": ad_id,
- "bid_amount": bid_amount_fen,
- }
- resp = _post("/adgroups/update", body)
- _check(resp, "update_bid")
- return {"code": 0, "message": "success"}
- except Exception as e:
- if attempt < API_MAX_RETRIES - 1:
- wait = 2 ** attempt
- logger.warning("update_bid 重试 %d/%d (等待%ds): %s", attempt + 1, API_MAX_RETRIES, wait, e)
- await asyncio.sleep(wait)
- else:
- return {"code": -1, "message": str(e)}
- async def pause_ad(self, ad_id: int, account_id: int) -> Dict:
- """暂停广告。"""
- from ad_api import _post, _check
- for attempt in range(API_MAX_RETRIES):
- try:
- await self._bucket.acquire()
- body = {
- "account_id": account_id,
- "adgroup_id": ad_id,
- "configured_status": "AD_STATUS_SUSPEND",
- }
- resp = _post("/adgroups/update", body)
- _check(resp, "pause_ad")
- return {"code": 0, "message": "success"}
- except Exception as e:
- if attempt < API_MAX_RETRIES - 1:
- wait = 2 ** attempt
- logger.warning("pause_ad 重试 %d/%d: %s", attempt + 1, API_MAX_RETRIES, e)
- await asyncio.sleep(wait)
- else:
- return {"code": -1, "message": str(e)}
- # ═══════════════════════════════════════════
- # 审计日志
- # ═══════════════════════════════════════════
- class AuditLogger:
- """JSONL 审计日志。"""
- def __init__(self, log_dir: Path = EXECUTION_LOG_DIR):
- self._log_dir = log_dir
- self._log_dir.mkdir(parents=True, exist_ok=True)
- today = datetime.now().strftime("%Y%m%d")
- self._path = self._log_dir / f"exec_{today}.jsonl"
- def log(self, entry: Dict):
- entry["ts"] = datetime.now().isoformat()
- with open(self._path, "a", encoding="utf-8") as f:
- f.write(json.dumps(entry, ensure_ascii=False) + "\n")
- @property
- def path(self) -> Path:
- return self._path
- # ═══════════════════════════════════════════
- # 自治级别分类
- # ═══════════════════════════════════════════
- def _classify_tier(row: pd.Series) -> int:
- """
- 自治级别分类。
- Tier 0 (无操作): hold, observe, creative_adjust, scale_up(不调用API,仅建议)
- Tier 1 (自动执行): bid 调幅 ≤ 5%
- Tier 2 (需审批): pause 或 bid 调幅 > 5%
- Tier 3 (高价值阻断): 日消耗 > 1500 元的高价值广告
- """
- action = row.get("final_action", row.get("action", "hold"))
- # Tier 0: 无需操作的动作(仅建议,不执行)
- if action in ("hold", "observe", "creative_adjust", "scale_up"):
- return 0 # 无操作(observe=观察,creative_adjust/scale_up=需人工执行)
- cost_7d_avg = float(row.get("cost_7d_avg", 0) or 0)
- change_pct = row.get("recommended_change_pct", 0)
- if isinstance(change_pct, str):
- try:
- change_pct = float(change_pct)
- except ValueError:
- change_pct = 0
- # Tier 3: 高价值广告
- if cost_7d_avg >= TIER3_MIN_DAILY_SPEND:
- return 3
- # Tier 2: 暂停 或 大幅调价
- if action == "pause" or abs(change_pct) > TIER1_MAX_CHANGE_PCT:
- return 2
- # Tier 1: 小幅调价
- return 1
- # ═══════════════════════════════════════════
- # 工具:执行已验证的决策
- # ═══════════════════════════════════════════
- @tool(description="执行已验证的决策:分级自治 → API调用 → 审计日志")
- async def execute_decisions(
- ctx: ToolContext,
- validated_csv: str = "",
- approval_mode: str = "tiered",
- ) -> ToolResult:
- """
- 执行已通过护栏验证的决策。
- Pipeline:
- 1. 加载 validated_decisions CSV (guardrail_status != blocked)
- 2. 按自治级别分类
- 3. Tier 1 (小调, ≤5%): 直接执行 + 通知
- 4. Tier 2 (大调/暂停): 标记待审批
- 5. Tier 3 (高价值): 标记需人工审批
- 6. 记录审计日志
- Args:
- validated_csv: 护栏验证后的 CSV 路径
- approval_mode: "auto"=全部自动执行, "tiered"=分级, "manual"=全部需审批
- """
- try:
- if not EXECUTION_ENABLED:
- return ToolResult(
- title="执行引擎未启用",
- output="EXECUTION_ENABLED=False,跳过执行。修改 config.py 启用。",
- )
- if DRY_RUN_MODE:
- return ToolResult(
- title="干运行模式",
- output="DRY_RUN_MODE=True,操作不会实际执行。修改 config.py 关闭干运行。",
- )
- # 查找验证 CSV
- if not validated_csv:
- reports_dir = _MINI_DIR / "outputs" / "reports"
- candidates = sorted(reports_dir.glob("validated_decisions_*.csv"), reverse=True)
- if not candidates:
- return ToolResult(title="execute_decisions", output="未找到验证后的决策 CSV")
- validated_csv = str(candidates[0])
- df = pd.read_csv(validated_csv)
- if df.empty:
- return ToolResult(title="execute_decisions", output="决策数据为空")
- # 只执行 approved / modified(非 blocked)
- df_exec = df[df["guardrail_status"].isin(["approved", "modified"])].copy()
- df_exec = df_exec[df_exec["final_action"] != "hold"]
- if df_exec.empty:
- return ToolResult(
- title="无需执行的操作",
- output="所有决策要么是 hold,要么被护栏拦截",
- )
- # 分级
- df_exec["tier"] = df_exec.apply(_classify_tier, axis=1)
- executor = TencentAdExecutor()
- audit = AuditLogger()
- # 从 guardrails 导入 AdjustmentHistory 记录操作
- from guardrails import AdjustmentHistory
- history = AdjustmentHistory()
- executed = 0
- failed = 0
- pending_approval = 0
- approved_executed = 0
- rejected_count = 0
- timeout_count = 0
- tier_summary = {1: 0, 2: 0, 3: 0}
- # ─── 分离 Tier 1 和 Tier 2/3 ───
- df_tier1 = df_exec[df_exec["tier"] == 1]
- df_tier2_3 = df_exec[df_exec["tier"] >= 2]
- for t in [1, 2, 3]:
- tier_summary[t] = int((df_exec["tier"] == t).sum())
- # ═══ Phase 1: 自动执行 Tier 1 ═══
- for _, row in df_tier1.iterrows():
- action = row.get("final_action", row.get("action"))
- ad_id = int(row["ad_id"])
- account_id = int(row.get("account_id", 0) or 0)
- pre_state = await executor.get_ad_state(ad_id, account_id)
- if action == "pause":
- result = await executor.pause_ad(ad_id, account_id)
- elif action in ("bid_up", "bid_down"):
- final_bid = row.get("final_bid", row.get("recommended_bid"))
- if final_bid is None or final_bid == "":
- audit.log({
- "ad_id": ad_id,
- "action": action,
- "tier": 1,
- "execution_status": "skipped",
- "reason": "无出价数据",
- })
- continue
- bid_fen = int(float(final_bid) * 100)
- result = await executor.update_bid(ad_id, account_id, bid_fen)
- else:
- continue
- api_code = result.get("code", -1)
- exec_status = "success" if api_code == 0 else "failed"
- if exec_status == "success":
- executed += 1
- change_pct = row.get("recommended_change_pct", 0)
- if isinstance(change_pct, str):
- try:
- change_pct = float(change_pct)
- except ValueError:
- change_pct = 0
- history.record_adjustment(str(ad_id), action, change_pct)
- else:
- failed += 1
- post_state = await executor.get_ad_state(ad_id, account_id) if exec_status == "success" else None
- audit.log({
- "ad_id": ad_id,
- "account_id": account_id,
- "action": action,
- "tier": 1,
- "pre_state": {
- "bid_amount": pre_state.get("bid_amount") if pre_state else None,
- "status": pre_state.get("configured_status") if pre_state else None,
- } if pre_state else None,
- "post_state": {
- "bid_amount": post_state.get("bid_amount") if post_state else None,
- "status": post_state.get("configured_status") if post_state else None,
- } if post_state else None,
- "api_code": api_code,
- "api_message": result.get("message", ""),
- "execution_status": exec_status,
- "source": row.get("source", ""),
- })
- # ═══ Phase 2: Tier 2/3 — 审批 + 执行 ═══
- if not df_tier2_3.empty:
- if IM_ENABLED:
- # 阻塞式审批:调用 send_approval_request(wait_for_reply=True)
- logger.info("Tier 2/3 共 %d 个操作,发送 IM 审批并等待...", len(df_tier2_3))
- from im_approval import send_approval_request
- approval_result = await send_approval_request(
- ctx=ctx,
- validated_csv=validated_csv,
- wait_for_reply=True,
- )
- approval_status = (
- approval_result.metadata.get("status", "timeout")
- if approval_result.metadata
- else "timeout"
- )
- approved_ids = (
- approval_result.metadata.get("approved_ids", [])
- if approval_result.metadata
- else []
- )
- rejected_ids = (
- approval_result.metadata.get("rejected_ids", [])
- if approval_result.metadata
- else []
- )
- if approval_status == "timeout":
- # 超时:所有 Tier 2/3 标记为 timeout
- timeout_count = len(df_tier2_3)
- for _, row in df_tier2_3.iterrows():
- audit.log({
- "ad_id": int(row["ad_id"]),
- "account_id": int(row.get("account_id", 0) or 0),
- "action": row.get("final_action", row.get("action")),
- "tier": int(row.get("tier", 2)),
- "execution_status": "timeout",
- "source": row.get("source", ""),
- })
- else:
- # 执行已批准的广告
- approved_set = set(int(x) for x in approved_ids)
- rejected_set = set(int(x) for x in rejected_ids)
- for _, row in df_tier2_3.iterrows():
- ad_id = int(row["ad_id"])
- account_id = int(row.get("account_id", 0) or 0)
- action = row.get("final_action", row.get("action"))
- tier = int(row.get("tier", 2))
- if ad_id in rejected_set:
- rejected_count += 1
- audit.log({
- "ad_id": ad_id,
- "account_id": account_id,
- "action": action,
- "tier": tier,
- "execution_status": "rejected",
- "source": row.get("source", ""),
- })
- continue
- if ad_id not in approved_set:
- # 既不在 approved 也不在 rejected(部分审批场景遗漏)
- pending_approval += 1
- audit.log({
- "ad_id": ad_id,
- "account_id": account_id,
- "action": action,
- "tier": tier,
- "execution_status": "pending_approval",
- "source": row.get("source", ""),
- })
- continue
- # 已批准 → 执行
- pre_state = await executor.get_ad_state(ad_id, account_id)
- if action == "pause":
- result = await executor.pause_ad(ad_id, account_id)
- elif action in ("bid_up", "bid_down"):
- final_bid = row.get("final_bid", row.get("recommended_bid"))
- if final_bid is None or final_bid == "":
- audit.log({
- "ad_id": ad_id,
- "action": action,
- "tier": tier,
- "execution_status": "skipped",
- "reason": "无出价数据",
- })
- continue
- bid_fen = int(float(final_bid) * 100)
- result = await executor.update_bid(ad_id, account_id, bid_fen)
- else:
- continue
- api_code = result.get("code", -1)
- exec_status = "success" if api_code == 0 else "failed"
- if exec_status == "success":
- approved_executed += 1
- change_pct = row.get("recommended_change_pct", 0)
- if isinstance(change_pct, str):
- try:
- change_pct = float(change_pct)
- except ValueError:
- change_pct = 0
- history.record_adjustment(str(ad_id), action, change_pct)
- else:
- failed += 1
- post_state = await executor.get_ad_state(ad_id, account_id) if exec_status == "success" else None
- audit.log({
- "ad_id": ad_id,
- "account_id": account_id,
- "action": action,
- "tier": tier,
- "pre_state": {
- "bid_amount": pre_state.get("bid_amount") if pre_state else None,
- "status": pre_state.get("configured_status") if pre_state else None,
- } if pre_state else None,
- "post_state": {
- "bid_amount": post_state.get("bid_amount") if post_state else None,
- "status": post_state.get("configured_status") if post_state else None,
- } if post_state else None,
- "api_code": api_code,
- "api_message": result.get("message", ""),
- "execution_status": f"approved_{exec_status}",
- "source": row.get("source", ""),
- })
- else:
- # IM 未启用:Tier 2/3 仅记录不执行
- logger.info("IM 未启用,Tier 2/3 共 %d 个操作仅记录不执行", len(df_tier2_3))
- pending_approval = len(df_tier2_3)
- for _, row in df_tier2_3.iterrows():
- audit.log({
- "ad_id": int(row["ad_id"]),
- "account_id": int(row.get("account_id", 0) or 0),
- "action": row.get("final_action", row.get("action")),
- "tier": int(row.get("tier", 2)),
- "execution_status": "pending_approval",
- "note": "IM未启用,操作仅记录",
- "source": row.get("source", ""),
- })
- total_executed = executed + approved_executed
- output_lines = [
- f"执行完成,审计日志: {audit.path}",
- "",
- "执行结果:",
- f" Tier 1 自动执行: {executed} 个成功 / {failed} 个失败",
- ]
- if IM_ENABLED and not df_tier2_3.empty:
- output_lines.extend([
- f" Tier 2/3 审批后执行: {approved_executed} 个成功",
- f" 审批拒绝: {rejected_count} 个",
- f" 审批超时: {timeout_count} 个",
- ])
- if pending_approval > 0:
- output_lines.append(f" 待审批(未执行): {pending_approval} 个")
- output_lines.extend([
- "",
- "自治级别分布:",
- f" Tier 1 (自动): {tier_summary.get(1, 0)} 个",
- f" Tier 2 (审批): {tier_summary.get(2, 0)} 个",
- f" Tier 3 (高价值): {tier_summary.get(3, 0)} 个",
- ])
- return ToolResult(
- title=f"执行完成(自动{executed}/审批通过{approved_executed}/拒绝{rejected_count}/超时{timeout_count})",
- output="\n".join(output_lines),
- metadata={
- "audit_log": str(audit.path),
- "tier1_executed": executed,
- "tier1_failed": failed,
- "approved_executed": approved_executed,
- "rejected": rejected_count,
- "timeout": timeout_count,
- "pending_approval": pending_approval,
- "tier_summary": tier_summary,
- },
- )
- except Exception as e:
- logger.error("execute_decisions 失败: %s", e, exc_info=True)
- return ToolResult(title="execute_decisions 失败", output=str(e))
- # ═══════════════════════════════════════════
- # 工具:执行后效果检查
- # ═══════════════════════════════════════════
- @tool(description="执行后效果检查:对比操作前后广告表现")
- async def check_execution_feedback(
- ctx: ToolContext,
- execution_log_path: str = "",
- hours_after: int = FEEDBACK_CHECK_HOURS,
- ) -> ToolResult:
- """
- 读取执行日志,通过 API 获取当前状态,对比操作前后。
- Args:
- execution_log_path: 执行日志路径(JSONL),默认最新
- hours_after: 操作后等待时间(小时),仅检查超过此时间的操作
- """
- try:
- # 查找最新执行日志
- if not execution_log_path:
- log_dir = EXECUTION_LOG_DIR
- if not log_dir.exists():
- return ToolResult(title="check_execution_feedback", output="无执行日志目录")
- candidates = sorted(log_dir.glob("exec_*.jsonl"), reverse=True)
- if not candidates:
- return ToolResult(title="check_execution_feedback", output="无执行日志")
- execution_log_path = str(candidates[0])
- # 读取日志
- entries = []
- with open(execution_log_path, "r", encoding="utf-8") as f:
- for line in f:
- line = line.strip()
- if line:
- entries.append(json.loads(line))
- if not entries:
- return ToolResult(title="check_execution_feedback", output="执行日志为空")
- # 过滤:只看成功执行的且超过等待时间的
- cutoff = datetime.now() - timedelta(hours=hours_after)
- check_entries = [
- e for e in entries
- if e.get("execution_status") == "success"
- and datetime.fromisoformat(e["ts"]) < cutoff
- ]
- if not check_entries:
- return ToolResult(
- title="check_execution_feedback",
- output=f"无需检查(没有超过{hours_after}小时前的成功操作)",
- )
- # 获取当前状态
- executor = TencentAdExecutor()
- results = []
- for entry in check_entries:
- ad_id = entry.get("ad_id")
- account_id = entry.get("account_id", 0)
- action = entry.get("action")
- pre_state = entry.get("pre_state", {})
- current_state = await executor.get_ad_state(ad_id, account_id)
- result = {
- "ad_id": ad_id,
- "action": action,
- "executed_at": entry.get("ts"),
- "pre_bid": pre_state.get("bid_amount") if pre_state else None,
- "pre_status": pre_state.get("status") if pre_state else None,
- "current_bid": current_state.get("bid_amount") if current_state else None,
- "current_status": current_state.get("configured_status") if current_state else None,
- }
- # 状态变化判断
- if action == "pause":
- result["effective"] = (
- current_state.get("configured_status") == "AD_STATUS_SUSPEND"
- if current_state else None
- )
- elif action in ("bid_up", "bid_down"):
- post_bid = entry.get("post_state", {})
- if post_bid:
- expected_bid = post_bid.get("bid_amount")
- actual_bid = current_state.get("bid_amount") if current_state else None
- result["expected_bid"] = expected_bid
- result["effective"] = (actual_bid == expected_bid) if actual_bid is not None else None
- results.append(result)
- # 统计
- effective = sum(1 for r in results if r.get("effective") is True)
- ineffective = sum(1 for r in results if r.get("effective") is False)
- unknown = sum(1 for r in results if r.get("effective") is None)
- output_lines = [
- f"效果检查完成({len(results)} 个操作)",
- "",
- f" 有效: {effective} 个",
- f" 无效/被覆盖: {ineffective} 个",
- f" 未知: {unknown} 个",
- ]
- if ineffective > 0:
- output_lines.append("")
- output_lines.append("⚠️ 以下操作可能未生效或被覆盖:")
- for r in results:
- if r.get("effective") is False:
- output_lines.append(
- f" - 广告{r['ad_id']}: {r['action']} "
- f"(执行于 {r['executed_at']})"
- )
- return ToolResult(
- title=f"效果检查(有效{effective}/无效{ineffective})",
- output="\n".join(output_lines),
- metadata={
- "total": len(results),
- "effective": effective,
- "ineffective": ineffective,
- "unknown": unknown,
- "details": results,
- },
- )
- except Exception as e:
- logger.error("check_execution_feedback 失败: %s", e, exc_info=True)
- return ToolResult(title="check_execution_feedback 失败", output=str(e))
|