|
|
@@ -0,0 +1,696 @@
|
|
|
+"""
|
|
|
+执行引擎 — auto_put_ad_mini
|
|
|
+
|
|
|
+职责:
|
|
|
+ 1. 加载护栏验证后的决策
|
|
|
+ 2. 按自治级别分类(Tier 1 自动 / Tier 2 审批 / Tier 3 阻断)
|
|
|
+ 3. 调用腾讯广告 API 执行操作
|
|
|
+ 4. 记录审计日志(JSONL)
|
|
|
+ 5. 执行后效果检查
|
|
|
+
|
|
|
+依赖:
|
|
|
+ - tools/ad_api.py 的底层 HTTP 函数
|
|
|
+ - tools/guardrails.py 的 AdjustmentHistory
|
|
|
+"""
|
|
|
+
|
|
|
+import asyncio
|
|
|
+import json
|
|
|
+import logging
|
|
|
+import sys
|
|
|
+import time
|
|
|
+from datetime import datetime, timedelta
|
|
|
+from pathlib import Path
|
|
|
+from typing import Any, Dict, List, Optional
|
|
|
+
|
|
|
+import pandas as pd
|
|
|
+
|
|
|
+from agent.tools import tool
|
|
|
+from agent.tools.models import ToolContext, ToolResult
|
|
|
+
|
|
|
+_MINI_DIR = Path(__file__).resolve().parent.parent
|
|
|
+_TOOLS_DIR = Path(__file__).resolve().parent
|
|
|
+if str(_MINI_DIR) not in sys.path:
|
|
|
+ sys.path.insert(0, str(_MINI_DIR))
|
|
|
+if str(_TOOLS_DIR) not in sys.path:
|
|
|
+ sys.path.insert(0, str(_TOOLS_DIR))
|
|
|
+
|
|
|
+from config import (
|
|
|
+ EXECUTION_ENABLED,
|
|
|
+ EXECUTION_LOG_DIR,
|
|
|
+ API_QPS_LIMIT,
|
|
|
+ API_MAX_RETRIES,
|
|
|
+ TIER1_MAX_CHANGE_PCT,
|
|
|
+ TIER3_MIN_DAILY_SPEND,
|
|
|
+ FEEDBACK_CHECK_HOURS,
|
|
|
+ DRY_RUN_MODE,
|
|
|
+ IM_ENABLED,
|
|
|
+)
|
|
|
+
|
|
|
+logger = logging.getLogger(__name__)
|
|
|
+
|
|
|
+
|
|
|
+# ═══════════════════════════════════════════
|
|
|
+# QPS 令牌桶限流
|
|
|
+# ═══════════════════════════════════════════
|
|
|
+
|
|
|
+
|
|
|
+class TokenBucket:
|
|
|
+ """简单令牌桶限流器。"""
|
|
|
+
|
|
|
+ def __init__(self, rate: float = API_QPS_LIMIT):
|
|
|
+ self._rate = rate
|
|
|
+ self._tokens = rate
|
|
|
+ self._last_refill = time.monotonic()
|
|
|
+
|
|
|
+ async def acquire(self):
|
|
|
+ """获取一个令牌,不够时等待。"""
|
|
|
+ while True:
|
|
|
+ now = time.monotonic()
|
|
|
+ elapsed = now - self._last_refill
|
|
|
+ self._tokens = min(self._rate, self._tokens + elapsed * self._rate)
|
|
|
+ self._last_refill = now
|
|
|
+
|
|
|
+ if self._tokens >= 1:
|
|
|
+ self._tokens -= 1
|
|
|
+ return
|
|
|
+ else:
|
|
|
+ wait = (1 - self._tokens) / self._rate
|
|
|
+ await asyncio.sleep(wait)
|
|
|
+
|
|
|
+
|
|
|
+# ═══════════════════════════════════════════
|
|
|
+# 腾讯广告执行器
|
|
|
+# ═══════════════════════════════════════════
|
|
|
+
|
|
|
+
|
|
|
+class TencentAdExecutor:
|
|
|
+ """
|
|
|
+ 腾讯广告 API 执行器。
|
|
|
+
|
|
|
+ 复用 ad_api.py 的底层 HTTP 函数,添加:
|
|
|
+ - QPS 令牌桶限流
|
|
|
+ - 指数退避重试
|
|
|
+ - 操作前快照
|
|
|
+ """
|
|
|
+
|
|
|
+ def __init__(self):
|
|
|
+ self._bucket = TokenBucket(rate=API_QPS_LIMIT)
|
|
|
+
|
|
|
+ async def get_ad_state(self, ad_id: int, account_id: int) -> Optional[Dict]:
|
|
|
+ """获取广告当前状态快照。"""
|
|
|
+ try:
|
|
|
+ from ad_api import _get, _check
|
|
|
+ await self._bucket.acquire()
|
|
|
+ resp = _get("/adgroups/get", {
|
|
|
+ "account_id": account_id,
|
|
|
+ "filtering": {"adgroup_id_list": [ad_id]},
|
|
|
+ "page": 1,
|
|
|
+ "page_size": 1,
|
|
|
+ })
|
|
|
+ data = _check(resp, "get_ad_state")
|
|
|
+ items = data.get("list", [])
|
|
|
+ return items[0] if items else None
|
|
|
+ except Exception as e:
|
|
|
+ logger.warning("获取广告 %s 状态失败: %s", ad_id, e)
|
|
|
+ return None
|
|
|
+
|
|
|
+ async def update_bid(self, ad_id: int, account_id: int, bid_amount_fen: int) -> Dict:
|
|
|
+ """更新广告出价(分)。"""
|
|
|
+ from ad_api import _post, _check
|
|
|
+
|
|
|
+ for attempt in range(API_MAX_RETRIES):
|
|
|
+ try:
|
|
|
+ await self._bucket.acquire()
|
|
|
+ body = {
|
|
|
+ "account_id": account_id,
|
|
|
+ "adgroup_id": ad_id,
|
|
|
+ "bid_amount": bid_amount_fen,
|
|
|
+ }
|
|
|
+ resp = _post("/adgroups/update", body)
|
|
|
+ _check(resp, "update_bid")
|
|
|
+ return {"code": 0, "message": "success"}
|
|
|
+ except Exception as e:
|
|
|
+ if attempt < API_MAX_RETRIES - 1:
|
|
|
+ wait = 2 ** attempt
|
|
|
+ logger.warning("update_bid 重试 %d/%d (等待%ds): %s", attempt + 1, API_MAX_RETRIES, wait, e)
|
|
|
+ await asyncio.sleep(wait)
|
|
|
+ else:
|
|
|
+ return {"code": -1, "message": str(e)}
|
|
|
+
|
|
|
+ async def pause_ad(self, ad_id: int, account_id: int) -> Dict:
|
|
|
+ """暂停广告。"""
|
|
|
+ from ad_api import _post, _check
|
|
|
+
|
|
|
+ for attempt in range(API_MAX_RETRIES):
|
|
|
+ try:
|
|
|
+ await self._bucket.acquire()
|
|
|
+ body = {
|
|
|
+ "account_id": account_id,
|
|
|
+ "adgroup_id": ad_id,
|
|
|
+ "configured_status": "AD_STATUS_SUSPEND",
|
|
|
+ }
|
|
|
+ resp = _post("/adgroups/update", body)
|
|
|
+ _check(resp, "pause_ad")
|
|
|
+ return {"code": 0, "message": "success"}
|
|
|
+ except Exception as e:
|
|
|
+ if attempt < API_MAX_RETRIES - 1:
|
|
|
+ wait = 2 ** attempt
|
|
|
+ logger.warning("pause_ad 重试 %d/%d: %s", attempt + 1, API_MAX_RETRIES, e)
|
|
|
+ await asyncio.sleep(wait)
|
|
|
+ else:
|
|
|
+ return {"code": -1, "message": str(e)}
|
|
|
+
|
|
|
+
|
|
|
+# ═══════════════════════════════════════════
|
|
|
+# 审计日志
|
|
|
+# ═══════════════════════════════════════════
|
|
|
+
|
|
|
+
|
|
|
+class AuditLogger:
|
|
|
+ """JSONL 审计日志。"""
|
|
|
+
|
|
|
+ def __init__(self, log_dir: Path = EXECUTION_LOG_DIR):
|
|
|
+ self._log_dir = log_dir
|
|
|
+ self._log_dir.mkdir(parents=True, exist_ok=True)
|
|
|
+ today = datetime.now().strftime("%Y%m%d")
|
|
|
+ self._path = self._log_dir / f"exec_{today}.jsonl"
|
|
|
+
|
|
|
+ def log(self, entry: Dict):
|
|
|
+ entry["ts"] = datetime.now().isoformat()
|
|
|
+ with open(self._path, "a", encoding="utf-8") as f:
|
|
|
+ f.write(json.dumps(entry, ensure_ascii=False) + "\n")
|
|
|
+
|
|
|
+ @property
|
|
|
+ def path(self) -> Path:
|
|
|
+ return self._path
|
|
|
+
|
|
|
+
|
|
|
+# ═══════════════════════════════════════════
|
|
|
+# 自治级别分类
|
|
|
+# ═══════════════════════════════════════════
|
|
|
+
|
|
|
+
|
|
|
+def _classify_tier(row: pd.Series) -> int:
|
|
|
+ """
|
|
|
+ 自治级别分类。
|
|
|
+
|
|
|
+ Tier 1 (自动执行): hold 或 bid 调幅 ≤ 5%
|
|
|
+ Tier 2 (需审批): pause 或 bid 调幅 > 5%
|
|
|
+ Tier 3 (高价值阻断): 日消耗 > 1500 元的高价值广告
|
|
|
+ """
|
|
|
+ action = row.get("final_action", row.get("action", "hold"))
|
|
|
+ if action == "hold":
|
|
|
+ return 0 # 无操作
|
|
|
+
|
|
|
+ cost_7d_avg = float(row.get("cost_7d_avg", 0) or 0)
|
|
|
+ change_pct = row.get("recommended_change_pct", 0)
|
|
|
+ if isinstance(change_pct, str):
|
|
|
+ try:
|
|
|
+ change_pct = float(change_pct)
|
|
|
+ except ValueError:
|
|
|
+ change_pct = 0
|
|
|
+
|
|
|
+ # Tier 3: 高价值广告
|
|
|
+ if cost_7d_avg >= TIER3_MIN_DAILY_SPEND:
|
|
|
+ return 3
|
|
|
+
|
|
|
+ # Tier 2: 暂停 或 大幅调价
|
|
|
+ if action == "pause" or abs(change_pct) > TIER1_MAX_CHANGE_PCT:
|
|
|
+ return 2
|
|
|
+
|
|
|
+ # Tier 1: 小幅调价
|
|
|
+ return 1
|
|
|
+
|
|
|
+
|
|
|
+# ═══════════════════════════════════════════
|
|
|
+# 工具:执行已验证的决策
|
|
|
+# ═══════════════════════════════════════════
|
|
|
+
|
|
|
+
|
|
|
+@tool(description="执行已验证的决策:分级自治 → API调用 → 审计日志")
|
|
|
+async def execute_decisions(
|
|
|
+ ctx: ToolContext,
|
|
|
+ validated_csv: str = "",
|
|
|
+ approval_mode: str = "tiered",
|
|
|
+) -> ToolResult:
|
|
|
+ """
|
|
|
+ 执行已通过护栏验证的决策。
|
|
|
+
|
|
|
+ Pipeline:
|
|
|
+ 1. 加载 validated_decisions CSV (guardrail_status != blocked)
|
|
|
+ 2. 按自治级别分类
|
|
|
+ 3. Tier 1 (小调, ≤5%): 直接执行 + 通知
|
|
|
+ 4. Tier 2 (大调/暂停): 标记待审批
|
|
|
+ 5. Tier 3 (高价值): 标记需人工审批
|
|
|
+ 6. 记录审计日志
|
|
|
+
|
|
|
+ Args:
|
|
|
+ validated_csv: 护栏验证后的 CSV 路径
|
|
|
+ approval_mode: "auto"=全部自动执行, "tiered"=分级, "manual"=全部需审批
|
|
|
+ """
|
|
|
+ try:
|
|
|
+ if not EXECUTION_ENABLED:
|
|
|
+ return ToolResult(
|
|
|
+ title="执行引擎未启用",
|
|
|
+ output="EXECUTION_ENABLED=False,跳过执行。修改 config.py 启用。",
|
|
|
+ )
|
|
|
+
|
|
|
+ if DRY_RUN_MODE:
|
|
|
+ return ToolResult(
|
|
|
+ title="干运行模式",
|
|
|
+ output="DRY_RUN_MODE=True,操作不会实际执行。修改 config.py 关闭干运行。",
|
|
|
+ )
|
|
|
+
|
|
|
+ # 查找验证 CSV
|
|
|
+ if not validated_csv:
|
|
|
+ reports_dir = _MINI_DIR / "outputs" / "reports"
|
|
|
+ candidates = sorted(reports_dir.glob("validated_decisions_*.csv"), reverse=True)
|
|
|
+ if not candidates:
|
|
|
+ return ToolResult(title="execute_decisions", output="未找到验证后的决策 CSV")
|
|
|
+ validated_csv = str(candidates[0])
|
|
|
+
|
|
|
+ df = pd.read_csv(validated_csv)
|
|
|
+ if df.empty:
|
|
|
+ return ToolResult(title="execute_decisions", output="决策数据为空")
|
|
|
+
|
|
|
+ # 只执行 approved / modified(非 blocked)
|
|
|
+ df_exec = df[df["guardrail_status"].isin(["approved", "modified"])].copy()
|
|
|
+ df_exec = df_exec[df_exec["final_action"] != "hold"]
|
|
|
+
|
|
|
+ if df_exec.empty:
|
|
|
+ return ToolResult(
|
|
|
+ title="无需执行的操作",
|
|
|
+ output="所有决策要么是 hold,要么被护栏拦截",
|
|
|
+ )
|
|
|
+
|
|
|
+ # 分级
|
|
|
+ df_exec["tier"] = df_exec.apply(_classify_tier, axis=1)
|
|
|
+
|
|
|
+ executor = TencentAdExecutor()
|
|
|
+ audit = AuditLogger()
|
|
|
+
|
|
|
+ # 从 guardrails 导入 AdjustmentHistory 记录操作
|
|
|
+ from guardrails import AdjustmentHistory
|
|
|
+ history = AdjustmentHistory()
|
|
|
+
|
|
|
+ executed = 0
|
|
|
+ failed = 0
|
|
|
+ pending_approval = 0
|
|
|
+ approved_executed = 0
|
|
|
+ rejected_count = 0
|
|
|
+ timeout_count = 0
|
|
|
+ tier_summary = {1: 0, 2: 0, 3: 0}
|
|
|
+
|
|
|
+ # ─── 分离 Tier 1 和 Tier 2/3 ───
|
|
|
+ df_tier1 = df_exec[df_exec["tier"] == 1]
|
|
|
+ df_tier2_3 = df_exec[df_exec["tier"] >= 2]
|
|
|
+
|
|
|
+ for t in [1, 2, 3]:
|
|
|
+ tier_summary[t] = int((df_exec["tier"] == t).sum())
|
|
|
+
|
|
|
+ # ═══ Phase 1: 自动执行 Tier 1 ═══
|
|
|
+ for _, row in df_tier1.iterrows():
|
|
|
+ action = row.get("final_action", row.get("action"))
|
|
|
+ ad_id = int(row["ad_id"])
|
|
|
+ account_id = int(row.get("account_id", 0) or 0)
|
|
|
+
|
|
|
+ pre_state = await executor.get_ad_state(ad_id, account_id)
|
|
|
+
|
|
|
+ if action == "pause":
|
|
|
+ result = await executor.pause_ad(ad_id, account_id)
|
|
|
+ elif action in ("bid_up", "bid_down"):
|
|
|
+ final_bid = row.get("final_bid", row.get("recommended_bid"))
|
|
|
+ if final_bid is None or final_bid == "":
|
|
|
+ audit.log({
|
|
|
+ "ad_id": ad_id,
|
|
|
+ "action": action,
|
|
|
+ "tier": 1,
|
|
|
+ "execution_status": "skipped",
|
|
|
+ "reason": "无出价数据",
|
|
|
+ })
|
|
|
+ continue
|
|
|
+ bid_fen = int(float(final_bid) * 100)
|
|
|
+ result = await executor.update_bid(ad_id, account_id, bid_fen)
|
|
|
+ else:
|
|
|
+ continue
|
|
|
+
|
|
|
+ api_code = result.get("code", -1)
|
|
|
+ exec_status = "success" if api_code == 0 else "failed"
|
|
|
+
|
|
|
+ if exec_status == "success":
|
|
|
+ executed += 1
|
|
|
+ change_pct = row.get("recommended_change_pct", 0)
|
|
|
+ if isinstance(change_pct, str):
|
|
|
+ try:
|
|
|
+ change_pct = float(change_pct)
|
|
|
+ except ValueError:
|
|
|
+ change_pct = 0
|
|
|
+ history.record_adjustment(str(ad_id), action, change_pct)
|
|
|
+ else:
|
|
|
+ failed += 1
|
|
|
+
|
|
|
+ post_state = await executor.get_ad_state(ad_id, account_id) if exec_status == "success" else None
|
|
|
+
|
|
|
+ audit.log({
|
|
|
+ "ad_id": ad_id,
|
|
|
+ "account_id": account_id,
|
|
|
+ "action": action,
|
|
|
+ "tier": 1,
|
|
|
+ "pre_state": {
|
|
|
+ "bid_amount": pre_state.get("bid_amount") if pre_state else None,
|
|
|
+ "status": pre_state.get("configured_status") if pre_state else None,
|
|
|
+ } if pre_state else None,
|
|
|
+ "post_state": {
|
|
|
+ "bid_amount": post_state.get("bid_amount") if post_state else None,
|
|
|
+ "status": post_state.get("configured_status") if post_state else None,
|
|
|
+ } if post_state else None,
|
|
|
+ "api_code": api_code,
|
|
|
+ "api_message": result.get("message", ""),
|
|
|
+ "execution_status": exec_status,
|
|
|
+ "source": row.get("source", ""),
|
|
|
+ })
|
|
|
+
|
|
|
+ # ═══ Phase 2: Tier 2/3 — 审批 + 执行 ═══
|
|
|
+ if not df_tier2_3.empty:
|
|
|
+ if IM_ENABLED:
|
|
|
+ # 阻塞式审批:调用 send_approval_request(wait_for_reply=True)
|
|
|
+ logger.info("Tier 2/3 共 %d 个操作,发送 IM 审批并等待...", len(df_tier2_3))
|
|
|
+
|
|
|
+ from im_approval import send_approval_request
|
|
|
+ approval_result = await send_approval_request(
|
|
|
+ ctx=ctx,
|
|
|
+ validated_csv=validated_csv,
|
|
|
+ wait_for_reply=True,
|
|
|
+ )
|
|
|
+
|
|
|
+ approval_status = (
|
|
|
+ approval_result.metadata.get("status", "timeout")
|
|
|
+ if approval_result.metadata
|
|
|
+ else "timeout"
|
|
|
+ )
|
|
|
+ approved_ids = (
|
|
|
+ approval_result.metadata.get("approved_ids", [])
|
|
|
+ if approval_result.metadata
|
|
|
+ else []
|
|
|
+ )
|
|
|
+ rejected_ids = (
|
|
|
+ approval_result.metadata.get("rejected_ids", [])
|
|
|
+ if approval_result.metadata
|
|
|
+ else []
|
|
|
+ )
|
|
|
+
|
|
|
+ if approval_status == "timeout":
|
|
|
+ # 超时:所有 Tier 2/3 标记为 timeout
|
|
|
+ timeout_count = len(df_tier2_3)
|
|
|
+ for _, row in df_tier2_3.iterrows():
|
|
|
+ audit.log({
|
|
|
+ "ad_id": int(row["ad_id"]),
|
|
|
+ "account_id": int(row.get("account_id", 0) or 0),
|
|
|
+ "action": row.get("final_action", row.get("action")),
|
|
|
+ "tier": int(row.get("tier", 2)),
|
|
|
+ "execution_status": "timeout",
|
|
|
+ "source": row.get("source", ""),
|
|
|
+ })
|
|
|
+ else:
|
|
|
+ # 执行已批准的广告
|
|
|
+ approved_set = set(int(x) for x in approved_ids)
|
|
|
+ rejected_set = set(int(x) for x in rejected_ids)
|
|
|
+
|
|
|
+ for _, row in df_tier2_3.iterrows():
|
|
|
+ ad_id = int(row["ad_id"])
|
|
|
+ account_id = int(row.get("account_id", 0) or 0)
|
|
|
+ action = row.get("final_action", row.get("action"))
|
|
|
+ tier = int(row.get("tier", 2))
|
|
|
+
|
|
|
+ if ad_id in rejected_set:
|
|
|
+ rejected_count += 1
|
|
|
+ audit.log({
|
|
|
+ "ad_id": ad_id,
|
|
|
+ "account_id": account_id,
|
|
|
+ "action": action,
|
|
|
+ "tier": tier,
|
|
|
+ "execution_status": "rejected",
|
|
|
+ "source": row.get("source", ""),
|
|
|
+ })
|
|
|
+ continue
|
|
|
+
|
|
|
+ if ad_id not in approved_set:
|
|
|
+ # 既不在 approved 也不在 rejected(部分审批场景遗漏)
|
|
|
+ pending_approval += 1
|
|
|
+ audit.log({
|
|
|
+ "ad_id": ad_id,
|
|
|
+ "account_id": account_id,
|
|
|
+ "action": action,
|
|
|
+ "tier": tier,
|
|
|
+ "execution_status": "pending_approval",
|
|
|
+ "source": row.get("source", ""),
|
|
|
+ })
|
|
|
+ continue
|
|
|
+
|
|
|
+ # 已批准 → 执行
|
|
|
+ pre_state = await executor.get_ad_state(ad_id, account_id)
|
|
|
+
|
|
|
+ if action == "pause":
|
|
|
+ result = await executor.pause_ad(ad_id, account_id)
|
|
|
+ elif action in ("bid_up", "bid_down"):
|
|
|
+ final_bid = row.get("final_bid", row.get("recommended_bid"))
|
|
|
+ if final_bid is None or final_bid == "":
|
|
|
+ audit.log({
|
|
|
+ "ad_id": ad_id,
|
|
|
+ "action": action,
|
|
|
+ "tier": tier,
|
|
|
+ "execution_status": "skipped",
|
|
|
+ "reason": "无出价数据",
|
|
|
+ })
|
|
|
+ continue
|
|
|
+ bid_fen = int(float(final_bid) * 100)
|
|
|
+ result = await executor.update_bid(ad_id, account_id, bid_fen)
|
|
|
+ else:
|
|
|
+ continue
|
|
|
+
|
|
|
+ api_code = result.get("code", -1)
|
|
|
+ exec_status = "success" if api_code == 0 else "failed"
|
|
|
+
|
|
|
+ if exec_status == "success":
|
|
|
+ approved_executed += 1
|
|
|
+ change_pct = row.get("recommended_change_pct", 0)
|
|
|
+ if isinstance(change_pct, str):
|
|
|
+ try:
|
|
|
+ change_pct = float(change_pct)
|
|
|
+ except ValueError:
|
|
|
+ change_pct = 0
|
|
|
+ history.record_adjustment(str(ad_id), action, change_pct)
|
|
|
+ else:
|
|
|
+ failed += 1
|
|
|
+
|
|
|
+ post_state = await executor.get_ad_state(ad_id, account_id) if exec_status == "success" else None
|
|
|
+
|
|
|
+ audit.log({
|
|
|
+ "ad_id": ad_id,
|
|
|
+ "account_id": account_id,
|
|
|
+ "action": action,
|
|
|
+ "tier": tier,
|
|
|
+ "pre_state": {
|
|
|
+ "bid_amount": pre_state.get("bid_amount") if pre_state else None,
|
|
|
+ "status": pre_state.get("configured_status") if pre_state else None,
|
|
|
+ } if pre_state else None,
|
|
|
+ "post_state": {
|
|
|
+ "bid_amount": post_state.get("bid_amount") if post_state else None,
|
|
|
+ "status": post_state.get("configured_status") if post_state else None,
|
|
|
+ } if post_state else None,
|
|
|
+ "api_code": api_code,
|
|
|
+ "api_message": result.get("message", ""),
|
|
|
+ "execution_status": f"approved_{exec_status}",
|
|
|
+ "source": row.get("source", ""),
|
|
|
+ })
|
|
|
+ else:
|
|
|
+ # IM 未启用:Tier 2/3 仅记录不执行
|
|
|
+ logger.info("IM 未启用,Tier 2/3 共 %d 个操作仅记录不执行", len(df_tier2_3))
|
|
|
+ pending_approval = len(df_tier2_3)
|
|
|
+ for _, row in df_tier2_3.iterrows():
|
|
|
+ audit.log({
|
|
|
+ "ad_id": int(row["ad_id"]),
|
|
|
+ "account_id": int(row.get("account_id", 0) or 0),
|
|
|
+ "action": row.get("final_action", row.get("action")),
|
|
|
+ "tier": int(row.get("tier", 2)),
|
|
|
+ "execution_status": "pending_approval",
|
|
|
+ "note": "IM未启用,操作仅记录",
|
|
|
+ "source": row.get("source", ""),
|
|
|
+ })
|
|
|
+
|
|
|
+ total_executed = executed + approved_executed
|
|
|
+
|
|
|
+ output_lines = [
|
|
|
+ f"执行完成,审计日志: {audit.path}",
|
|
|
+ "",
|
|
|
+ "执行结果:",
|
|
|
+ f" Tier 1 自动执行: {executed} 个成功 / {failed} 个失败",
|
|
|
+ ]
|
|
|
+
|
|
|
+ if IM_ENABLED and not df_tier2_3.empty:
|
|
|
+ output_lines.extend([
|
|
|
+ f" Tier 2/3 审批后执行: {approved_executed} 个成功",
|
|
|
+ f" 审批拒绝: {rejected_count} 个",
|
|
|
+ f" 审批超时: {timeout_count} 个",
|
|
|
+ ])
|
|
|
+ if pending_approval > 0:
|
|
|
+ output_lines.append(f" 待审批(未执行): {pending_approval} 个")
|
|
|
+
|
|
|
+ output_lines.extend([
|
|
|
+ "",
|
|
|
+ "自治级别分布:",
|
|
|
+ f" Tier 1 (自动): {tier_summary.get(1, 0)} 个",
|
|
|
+ f" Tier 2 (审批): {tier_summary.get(2, 0)} 个",
|
|
|
+ f" Tier 3 (高价值): {tier_summary.get(3, 0)} 个",
|
|
|
+ ])
|
|
|
+
|
|
|
+ return ToolResult(
|
|
|
+ title=f"执行完成(自动{executed}/审批通过{approved_executed}/拒绝{rejected_count}/超时{timeout_count})",
|
|
|
+ output="\n".join(output_lines),
|
|
|
+ metadata={
|
|
|
+ "audit_log": str(audit.path),
|
|
|
+ "tier1_executed": executed,
|
|
|
+ "tier1_failed": failed,
|
|
|
+ "approved_executed": approved_executed,
|
|
|
+ "rejected": rejected_count,
|
|
|
+ "timeout": timeout_count,
|
|
|
+ "pending_approval": pending_approval,
|
|
|
+ "tier_summary": tier_summary,
|
|
|
+ },
|
|
|
+ )
|
|
|
+
|
|
|
+ except Exception as e:
|
|
|
+ logger.error("execute_decisions 失败: %s", e, exc_info=True)
|
|
|
+ return ToolResult(title="execute_decisions 失败", output=str(e))
|
|
|
+
|
|
|
+
|
|
|
+# ═══════════════════════════════════════════
|
|
|
+# 工具:执行后效果检查
|
|
|
+# ═══════════════════════════════════════════
|
|
|
+
|
|
|
+
|
|
|
+@tool(description="执行后效果检查:对比操作前后广告表现")
|
|
|
+async def check_execution_feedback(
|
|
|
+ ctx: ToolContext,
|
|
|
+ execution_log_path: str = "",
|
|
|
+ hours_after: int = FEEDBACK_CHECK_HOURS,
|
|
|
+) -> ToolResult:
|
|
|
+ """
|
|
|
+ 读取执行日志,通过 API 获取当前状态,对比操作前后。
|
|
|
+
|
|
|
+ Args:
|
|
|
+ execution_log_path: 执行日志路径(JSONL),默认最新
|
|
|
+ hours_after: 操作后等待时间(小时),仅检查超过此时间的操作
|
|
|
+ """
|
|
|
+ try:
|
|
|
+ # 查找最新执行日志
|
|
|
+ if not execution_log_path:
|
|
|
+ log_dir = EXECUTION_LOG_DIR
|
|
|
+ if not log_dir.exists():
|
|
|
+ return ToolResult(title="check_execution_feedback", output="无执行日志目录")
|
|
|
+ candidates = sorted(log_dir.glob("exec_*.jsonl"), reverse=True)
|
|
|
+ if not candidates:
|
|
|
+ return ToolResult(title="check_execution_feedback", output="无执行日志")
|
|
|
+ execution_log_path = str(candidates[0])
|
|
|
+
|
|
|
+ # 读取日志
|
|
|
+ entries = []
|
|
|
+ with open(execution_log_path, "r", encoding="utf-8") as f:
|
|
|
+ for line in f:
|
|
|
+ line = line.strip()
|
|
|
+ if line:
|
|
|
+ entries.append(json.loads(line))
|
|
|
+
|
|
|
+ if not entries:
|
|
|
+ return ToolResult(title="check_execution_feedback", output="执行日志为空")
|
|
|
+
|
|
|
+ # 过滤:只看成功执行的且超过等待时间的
|
|
|
+ cutoff = datetime.now() - timedelta(hours=hours_after)
|
|
|
+ check_entries = [
|
|
|
+ e for e in entries
|
|
|
+ if e.get("execution_status") == "success"
|
|
|
+ and datetime.fromisoformat(e["ts"]) < cutoff
|
|
|
+ ]
|
|
|
+
|
|
|
+ if not check_entries:
|
|
|
+ return ToolResult(
|
|
|
+ title="check_execution_feedback",
|
|
|
+ output=f"无需检查(没有超过{hours_after}小时前的成功操作)",
|
|
|
+ )
|
|
|
+
|
|
|
+ # 获取当前状态
|
|
|
+ executor = TencentAdExecutor()
|
|
|
+ results = []
|
|
|
+
|
|
|
+ for entry in check_entries:
|
|
|
+ ad_id = entry.get("ad_id")
|
|
|
+ account_id = entry.get("account_id", 0)
|
|
|
+ action = entry.get("action")
|
|
|
+ pre_state = entry.get("pre_state", {})
|
|
|
+
|
|
|
+ current_state = await executor.get_ad_state(ad_id, account_id)
|
|
|
+
|
|
|
+ result = {
|
|
|
+ "ad_id": ad_id,
|
|
|
+ "action": action,
|
|
|
+ "executed_at": entry.get("ts"),
|
|
|
+ "pre_bid": pre_state.get("bid_amount") if pre_state else None,
|
|
|
+ "pre_status": pre_state.get("status") if pre_state else None,
|
|
|
+ "current_bid": current_state.get("bid_amount") if current_state else None,
|
|
|
+ "current_status": current_state.get("configured_status") if current_state else None,
|
|
|
+ }
|
|
|
+
|
|
|
+ # 状态变化判断
|
|
|
+ if action == "pause":
|
|
|
+ result["effective"] = (
|
|
|
+ current_state.get("configured_status") == "AD_STATUS_SUSPEND"
|
|
|
+ if current_state else None
|
|
|
+ )
|
|
|
+ elif action in ("bid_up", "bid_down"):
|
|
|
+ post_bid = entry.get("post_state", {})
|
|
|
+ if post_bid:
|
|
|
+ expected_bid = post_bid.get("bid_amount")
|
|
|
+ actual_bid = current_state.get("bid_amount") if current_state else None
|
|
|
+ result["expected_bid"] = expected_bid
|
|
|
+ result["effective"] = (actual_bid == expected_bid) if actual_bid is not None else None
|
|
|
+
|
|
|
+ results.append(result)
|
|
|
+
|
|
|
+ # 统计
|
|
|
+ effective = sum(1 for r in results if r.get("effective") is True)
|
|
|
+ ineffective = sum(1 for r in results if r.get("effective") is False)
|
|
|
+ unknown = sum(1 for r in results if r.get("effective") is None)
|
|
|
+
|
|
|
+ output_lines = [
|
|
|
+ f"效果检查完成({len(results)} 个操作)",
|
|
|
+ "",
|
|
|
+ f" 有效: {effective} 个",
|
|
|
+ f" 无效/被覆盖: {ineffective} 个",
|
|
|
+ f" 未知: {unknown} 个",
|
|
|
+ ]
|
|
|
+
|
|
|
+ if ineffective > 0:
|
|
|
+ output_lines.append("")
|
|
|
+ output_lines.append("⚠️ 以下操作可能未生效或被覆盖:")
|
|
|
+ for r in results:
|
|
|
+ if r.get("effective") is False:
|
|
|
+ output_lines.append(
|
|
|
+ f" - 广告{r['ad_id']}: {r['action']} "
|
|
|
+ f"(执行于 {r['executed_at']})"
|
|
|
+ )
|
|
|
+
|
|
|
+ return ToolResult(
|
|
|
+ title=f"效果检查(有效{effective}/无效{ineffective})",
|
|
|
+ output="\n".join(output_lines),
|
|
|
+ metadata={
|
|
|
+ "total": len(results),
|
|
|
+ "effective": effective,
|
|
|
+ "ineffective": ineffective,
|
|
|
+ "unknown": unknown,
|
|
|
+ "details": results,
|
|
|
+ },
|
|
|
+ )
|
|
|
+
|
|
|
+ except Exception as e:
|
|
|
+ logger.error("check_execution_feedback 失败: %s", e, exc_info=True)
|
|
|
+ return ToolResult(title="check_execution_feedback 失败", output=str(e))
|