execution_engine.py 27 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699
  1. """
  2. 执行引擎 — auto_put_ad_mini
  3. 职责:
  4. 1. 加载护栏验证后的决策
  5. 2. 按自治级别分类(Tier 1 自动 / Tier 2 审批 / Tier 3 阻断)
  6. 3. 调用腾讯广告 API 执行操作
  7. 4. 记录审计日志(JSONL)
  8. 5. 执行后效果检查
  9. 依赖:
  10. - tools/ad_api.py 的底层 HTTP 函数
  11. - tools/guardrails.py 的 AdjustmentHistory
  12. """
  13. import asyncio
  14. import json
  15. import logging
  16. import sys
  17. import time
  18. from datetime import datetime, timedelta
  19. from pathlib import Path
  20. from typing import Any, Dict, List, Optional
  21. import pandas as pd
  22. from agent.tools import tool
  23. from agent.tools.models import ToolContext, ToolResult
  24. _MINI_DIR = Path(__file__).resolve().parent.parent
  25. _TOOLS_DIR = Path(__file__).resolve().parent
  26. if str(_MINI_DIR) not in sys.path:
  27. sys.path.insert(0, str(_MINI_DIR))
  28. if str(_TOOLS_DIR) not in sys.path:
  29. sys.path.insert(0, str(_TOOLS_DIR))
  30. from config import (
  31. EXECUTION_ENABLED,
  32. EXECUTION_LOG_DIR,
  33. API_QPS_LIMIT,
  34. API_MAX_RETRIES,
  35. TIER1_MAX_CHANGE_PCT,
  36. TIER3_MIN_DAILY_SPEND,
  37. FEEDBACK_CHECK_HOURS,
  38. DRY_RUN_MODE,
  39. IM_ENABLED,
  40. )
  41. logger = logging.getLogger(__name__)
  42. # ═══════════════════════════════════════════
  43. # QPS 令牌桶限流
  44. # ═══════════════════════════════════════════
  45. class TokenBucket:
  46. """简单令牌桶限流器。"""
  47. def __init__(self, rate: float = API_QPS_LIMIT):
  48. self._rate = rate
  49. self._tokens = rate
  50. self._last_refill = time.monotonic()
  51. async def acquire(self):
  52. """获取一个令牌,不够时等待。"""
  53. while True:
  54. now = time.monotonic()
  55. elapsed = now - self._last_refill
  56. self._tokens = min(self._rate, self._tokens + elapsed * self._rate)
  57. self._last_refill = now
  58. if self._tokens >= 1:
  59. self._tokens -= 1
  60. return
  61. else:
  62. wait = (1 - self._tokens) / self._rate
  63. await asyncio.sleep(wait)
  64. # ═══════════════════════════════════════════
  65. # 腾讯广告执行器
  66. # ═══════════════════════════════════════════
  67. class TencentAdExecutor:
  68. """
  69. 腾讯广告 API 执行器。
  70. 复用 ad_api.py 的底层 HTTP 函数,添加:
  71. - QPS 令牌桶限流
  72. - 指数退避重试
  73. - 操作前快照
  74. """
  75. def __init__(self):
  76. self._bucket = TokenBucket(rate=API_QPS_LIMIT)
  77. async def get_ad_state(self, ad_id: int, account_id: int) -> Optional[Dict]:
  78. """获取广告当前状态快照。"""
  79. try:
  80. from ad_api import _get, _check
  81. await self._bucket.acquire()
  82. resp = _get("/adgroups/get", {
  83. "account_id": account_id,
  84. "filtering": {"adgroup_id_list": [ad_id]},
  85. "page": 1,
  86. "page_size": 1,
  87. })
  88. data = _check(resp, "get_ad_state")
  89. items = data.get("list", [])
  90. return items[0] if items else None
  91. except Exception as e:
  92. logger.warning("获取广告 %s 状态失败: %s", ad_id, e)
  93. return None
  94. async def update_bid(self, ad_id: int, account_id: int, bid_amount_fen: int) -> Dict:
  95. """更新广告出价(分)。"""
  96. from ad_api import _post, _check
  97. for attempt in range(API_MAX_RETRIES):
  98. try:
  99. await self._bucket.acquire()
  100. body = {
  101. "account_id": account_id,
  102. "adgroup_id": ad_id,
  103. "bid_amount": bid_amount_fen,
  104. }
  105. resp = _post("/adgroups/update", body)
  106. _check(resp, "update_bid")
  107. return {"code": 0, "message": "success"}
  108. except Exception as e:
  109. if attempt < API_MAX_RETRIES - 1:
  110. wait = 2 ** attempt
  111. logger.warning("update_bid 重试 %d/%d (等待%ds): %s", attempt + 1, API_MAX_RETRIES, wait, e)
  112. await asyncio.sleep(wait)
  113. else:
  114. return {"code": -1, "message": str(e)}
  115. async def pause_ad(self, ad_id: int, account_id: int) -> Dict:
  116. """暂停广告。"""
  117. from ad_api import _post, _check
  118. for attempt in range(API_MAX_RETRIES):
  119. try:
  120. await self._bucket.acquire()
  121. body = {
  122. "account_id": account_id,
  123. "adgroup_id": ad_id,
  124. "configured_status": "AD_STATUS_SUSPEND",
  125. }
  126. resp = _post("/adgroups/update", body)
  127. _check(resp, "pause_ad")
  128. return {"code": 0, "message": "success"}
  129. except Exception as e:
  130. if attempt < API_MAX_RETRIES - 1:
  131. wait = 2 ** attempt
  132. logger.warning("pause_ad 重试 %d/%d: %s", attempt + 1, API_MAX_RETRIES, e)
  133. await asyncio.sleep(wait)
  134. else:
  135. return {"code": -1, "message": str(e)}
  136. # ═══════════════════════════════════════════
  137. # 审计日志
  138. # ═══════════════════════════════════════════
  139. class AuditLogger:
  140. """JSONL 审计日志。"""
  141. def __init__(self, log_dir: Path = EXECUTION_LOG_DIR):
  142. self._log_dir = log_dir
  143. self._log_dir.mkdir(parents=True, exist_ok=True)
  144. today = datetime.now().strftime("%Y%m%d")
  145. self._path = self._log_dir / f"exec_{today}.jsonl"
  146. def log(self, entry: Dict):
  147. entry["ts"] = datetime.now().isoformat()
  148. with open(self._path, "a", encoding="utf-8") as f:
  149. f.write(json.dumps(entry, ensure_ascii=False) + "\n")
  150. @property
  151. def path(self) -> Path:
  152. return self._path
  153. # ═══════════════════════════════════════════
  154. # 自治级别分类
  155. # ═══════════════════════════════════════════
  156. def _classify_tier(row: pd.Series) -> int:
  157. """
  158. 自治级别分类。
  159. Tier 0 (无操作): hold, observe, creative_adjust, scale_up(不调用API,仅建议)
  160. Tier 1 (自动执行): bid 调幅 ≤ 5%
  161. Tier 2 (需审批): pause 或 bid 调幅 > 5%
  162. Tier 3 (高价值阻断): 日消耗 > 1500 元的高价值广告
  163. """
  164. action = row.get("final_action", row.get("action", "hold"))
  165. # Tier 0: 无需操作的动作(仅建议,不执行)
  166. if action in ("hold", "observe", "creative_adjust", "scale_up"):
  167. return 0 # 无操作(observe=观察,creative_adjust/scale_up=需人工执行)
  168. cost_7d_avg = float(row.get("cost_7d_avg", 0) or 0)
  169. change_pct = row.get("recommended_change_pct", 0)
  170. if isinstance(change_pct, str):
  171. try:
  172. change_pct = float(change_pct)
  173. except ValueError:
  174. change_pct = 0
  175. # Tier 3: 高价值广告
  176. if cost_7d_avg >= TIER3_MIN_DAILY_SPEND:
  177. return 3
  178. # Tier 2: 暂停 或 大幅调价
  179. if action == "pause" or abs(change_pct) > TIER1_MAX_CHANGE_PCT:
  180. return 2
  181. # Tier 1: 小幅调价
  182. return 1
  183. # ═══════════════════════════════════════════
  184. # 工具:执行已验证的决策
  185. # ═══════════════════════════════════════════
  186. @tool(description="执行已验证的决策:分级自治 → API调用 → 审计日志")
  187. async def execute_decisions(
  188. ctx: ToolContext,
  189. validated_csv: str = "",
  190. approval_mode: str = "tiered",
  191. ) -> ToolResult:
  192. """
  193. 执行已通过护栏验证的决策。
  194. Pipeline:
  195. 1. 加载 validated_decisions CSV (guardrail_status != blocked)
  196. 2. 按自治级别分类
  197. 3. Tier 1 (小调, ≤5%): 直接执行 + 通知
  198. 4. Tier 2 (大调/暂停): 标记待审批
  199. 5. Tier 3 (高价值): 标记需人工审批
  200. 6. 记录审计日志
  201. Args:
  202. validated_csv: 护栏验证后的 CSV 路径
  203. approval_mode: "auto"=全部自动执行, "tiered"=分级, "manual"=全部需审批
  204. """
  205. try:
  206. if not EXECUTION_ENABLED:
  207. return ToolResult(
  208. title="执行引擎未启用",
  209. output="EXECUTION_ENABLED=False,跳过执行。修改 config.py 启用。",
  210. )
  211. if DRY_RUN_MODE:
  212. return ToolResult(
  213. title="干运行模式",
  214. output="DRY_RUN_MODE=True,操作不会实际执行。修改 config.py 关闭干运行。",
  215. )
  216. # 查找验证 CSV
  217. if not validated_csv:
  218. reports_dir = _MINI_DIR / "outputs" / "reports"
  219. candidates = sorted(reports_dir.glob("validated_decisions_*.csv"), reverse=True)
  220. if not candidates:
  221. return ToolResult(title="execute_decisions", output="未找到验证后的决策 CSV")
  222. validated_csv = str(candidates[0])
  223. df = pd.read_csv(validated_csv)
  224. if df.empty:
  225. return ToolResult(title="execute_decisions", output="决策数据为空")
  226. # 只执行 approved / modified(非 blocked)
  227. df_exec = df[df["guardrail_status"].isin(["approved", "modified"])].copy()
  228. df_exec = df_exec[df_exec["final_action"] != "hold"]
  229. if df_exec.empty:
  230. return ToolResult(
  231. title="无需执行的操作",
  232. output="所有决策要么是 hold,要么被护栏拦截",
  233. )
  234. # 分级
  235. df_exec["tier"] = df_exec.apply(_classify_tier, axis=1)
  236. executor = TencentAdExecutor()
  237. audit = AuditLogger()
  238. # 从 guardrails 导入 AdjustmentHistory 记录操作
  239. from guardrails import AdjustmentHistory
  240. history = AdjustmentHistory()
  241. executed = 0
  242. failed = 0
  243. pending_approval = 0
  244. approved_executed = 0
  245. rejected_count = 0
  246. timeout_count = 0
  247. tier_summary = {1: 0, 2: 0, 3: 0}
  248. # ─── 分离 Tier 1 和 Tier 2/3 ───
  249. df_tier1 = df_exec[df_exec["tier"] == 1]
  250. df_tier2_3 = df_exec[df_exec["tier"] >= 2]
  251. for t in [1, 2, 3]:
  252. tier_summary[t] = int((df_exec["tier"] == t).sum())
  253. # ═══ Phase 1: 自动执行 Tier 1 ═══
  254. for _, row in df_tier1.iterrows():
  255. action = row.get("final_action", row.get("action"))
  256. ad_id = int(row["ad_id"])
  257. account_id = int(row.get("account_id", 0) or 0)
  258. pre_state = await executor.get_ad_state(ad_id, account_id)
  259. if action == "pause":
  260. result = await executor.pause_ad(ad_id, account_id)
  261. elif action in ("bid_up", "bid_down"):
  262. final_bid = row.get("final_bid", row.get("recommended_bid"))
  263. if final_bid is None or final_bid == "":
  264. audit.log({
  265. "ad_id": ad_id,
  266. "action": action,
  267. "tier": 1,
  268. "execution_status": "skipped",
  269. "reason": "无出价数据",
  270. })
  271. continue
  272. bid_fen = int(float(final_bid) * 100)
  273. result = await executor.update_bid(ad_id, account_id, bid_fen)
  274. else:
  275. continue
  276. api_code = result.get("code", -1)
  277. exec_status = "success" if api_code == 0 else "failed"
  278. if exec_status == "success":
  279. executed += 1
  280. change_pct = row.get("recommended_change_pct", 0)
  281. if isinstance(change_pct, str):
  282. try:
  283. change_pct = float(change_pct)
  284. except ValueError:
  285. change_pct = 0
  286. history.record_adjustment(str(ad_id), action, change_pct)
  287. else:
  288. failed += 1
  289. post_state = await executor.get_ad_state(ad_id, account_id) if exec_status == "success" else None
  290. audit.log({
  291. "ad_id": ad_id,
  292. "account_id": account_id,
  293. "action": action,
  294. "tier": 1,
  295. "pre_state": {
  296. "bid_amount": pre_state.get("bid_amount") if pre_state else None,
  297. "status": pre_state.get("configured_status") if pre_state else None,
  298. } if pre_state else None,
  299. "post_state": {
  300. "bid_amount": post_state.get("bid_amount") if post_state else None,
  301. "status": post_state.get("configured_status") if post_state else None,
  302. } if post_state else None,
  303. "api_code": api_code,
  304. "api_message": result.get("message", ""),
  305. "execution_status": exec_status,
  306. "source": row.get("source", ""),
  307. })
  308. # ═══ Phase 2: Tier 2/3 — 审批 + 执行 ═══
  309. if not df_tier2_3.empty:
  310. if IM_ENABLED:
  311. # 阻塞式审批:调用 send_approval_request(wait_for_reply=True)
  312. logger.info("Tier 2/3 共 %d 个操作,发送 IM 审批并等待...", len(df_tier2_3))
  313. from im_approval import send_approval_request
  314. approval_result = await send_approval_request(
  315. ctx=ctx,
  316. validated_csv=validated_csv,
  317. wait_for_reply=True,
  318. )
  319. approval_status = (
  320. approval_result.metadata.get("status", "timeout")
  321. if approval_result.metadata
  322. else "timeout"
  323. )
  324. approved_ids = (
  325. approval_result.metadata.get("approved_ids", [])
  326. if approval_result.metadata
  327. else []
  328. )
  329. rejected_ids = (
  330. approval_result.metadata.get("rejected_ids", [])
  331. if approval_result.metadata
  332. else []
  333. )
  334. if approval_status == "timeout":
  335. # 超时:所有 Tier 2/3 标记为 timeout
  336. timeout_count = len(df_tier2_3)
  337. for _, row in df_tier2_3.iterrows():
  338. audit.log({
  339. "ad_id": int(row["ad_id"]),
  340. "account_id": int(row.get("account_id", 0) or 0),
  341. "action": row.get("final_action", row.get("action")),
  342. "tier": int(row.get("tier", 2)),
  343. "execution_status": "timeout",
  344. "source": row.get("source", ""),
  345. })
  346. else:
  347. # 执行已批准的广告
  348. approved_set = set(int(x) for x in approved_ids)
  349. rejected_set = set(int(x) for x in rejected_ids)
  350. for _, row in df_tier2_3.iterrows():
  351. ad_id = int(row["ad_id"])
  352. account_id = int(row.get("account_id", 0) or 0)
  353. action = row.get("final_action", row.get("action"))
  354. tier = int(row.get("tier", 2))
  355. if ad_id in rejected_set:
  356. rejected_count += 1
  357. audit.log({
  358. "ad_id": ad_id,
  359. "account_id": account_id,
  360. "action": action,
  361. "tier": tier,
  362. "execution_status": "rejected",
  363. "source": row.get("source", ""),
  364. })
  365. continue
  366. if ad_id not in approved_set:
  367. # 既不在 approved 也不在 rejected(部分审批场景遗漏)
  368. pending_approval += 1
  369. audit.log({
  370. "ad_id": ad_id,
  371. "account_id": account_id,
  372. "action": action,
  373. "tier": tier,
  374. "execution_status": "pending_approval",
  375. "source": row.get("source", ""),
  376. })
  377. continue
  378. # 已批准 → 执行
  379. pre_state = await executor.get_ad_state(ad_id, account_id)
  380. if action == "pause":
  381. result = await executor.pause_ad(ad_id, account_id)
  382. elif action in ("bid_up", "bid_down"):
  383. final_bid = row.get("final_bid", row.get("recommended_bid"))
  384. if final_bid is None or final_bid == "":
  385. audit.log({
  386. "ad_id": ad_id,
  387. "action": action,
  388. "tier": tier,
  389. "execution_status": "skipped",
  390. "reason": "无出价数据",
  391. })
  392. continue
  393. bid_fen = int(float(final_bid) * 100)
  394. result = await executor.update_bid(ad_id, account_id, bid_fen)
  395. else:
  396. continue
  397. api_code = result.get("code", -1)
  398. exec_status = "success" if api_code == 0 else "failed"
  399. if exec_status == "success":
  400. approved_executed += 1
  401. change_pct = row.get("recommended_change_pct", 0)
  402. if isinstance(change_pct, str):
  403. try:
  404. change_pct = float(change_pct)
  405. except ValueError:
  406. change_pct = 0
  407. history.record_adjustment(str(ad_id), action, change_pct)
  408. else:
  409. failed += 1
  410. post_state = await executor.get_ad_state(ad_id, account_id) if exec_status == "success" else None
  411. audit.log({
  412. "ad_id": ad_id,
  413. "account_id": account_id,
  414. "action": action,
  415. "tier": tier,
  416. "pre_state": {
  417. "bid_amount": pre_state.get("bid_amount") if pre_state else None,
  418. "status": pre_state.get("configured_status") if pre_state else None,
  419. } if pre_state else None,
  420. "post_state": {
  421. "bid_amount": post_state.get("bid_amount") if post_state else None,
  422. "status": post_state.get("configured_status") if post_state else None,
  423. } if post_state else None,
  424. "api_code": api_code,
  425. "api_message": result.get("message", ""),
  426. "execution_status": f"approved_{exec_status}",
  427. "source": row.get("source", ""),
  428. })
  429. else:
  430. # IM 未启用:Tier 2/3 仅记录不执行
  431. logger.info("IM 未启用,Tier 2/3 共 %d 个操作仅记录不执行", len(df_tier2_3))
  432. pending_approval = len(df_tier2_3)
  433. for _, row in df_tier2_3.iterrows():
  434. audit.log({
  435. "ad_id": int(row["ad_id"]),
  436. "account_id": int(row.get("account_id", 0) or 0),
  437. "action": row.get("final_action", row.get("action")),
  438. "tier": int(row.get("tier", 2)),
  439. "execution_status": "pending_approval",
  440. "note": "IM未启用,操作仅记录",
  441. "source": row.get("source", ""),
  442. })
  443. total_executed = executed + approved_executed
  444. output_lines = [
  445. f"执行完成,审计日志: {audit.path}",
  446. "",
  447. "执行结果:",
  448. f" Tier 1 自动执行: {executed} 个成功 / {failed} 个失败",
  449. ]
  450. if IM_ENABLED and not df_tier2_3.empty:
  451. output_lines.extend([
  452. f" Tier 2/3 审批后执行: {approved_executed} 个成功",
  453. f" 审批拒绝: {rejected_count} 个",
  454. f" 审批超时: {timeout_count} 个",
  455. ])
  456. if pending_approval > 0:
  457. output_lines.append(f" 待审批(未执行): {pending_approval} 个")
  458. output_lines.extend([
  459. "",
  460. "自治级别分布:",
  461. f" Tier 1 (自动): {tier_summary.get(1, 0)} 个",
  462. f" Tier 2 (审批): {tier_summary.get(2, 0)} 个",
  463. f" Tier 3 (高价值): {tier_summary.get(3, 0)} 个",
  464. ])
  465. return ToolResult(
  466. title=f"执行完成(自动{executed}/审批通过{approved_executed}/拒绝{rejected_count}/超时{timeout_count})",
  467. output="\n".join(output_lines),
  468. metadata={
  469. "audit_log": str(audit.path),
  470. "tier1_executed": executed,
  471. "tier1_failed": failed,
  472. "approved_executed": approved_executed,
  473. "rejected": rejected_count,
  474. "timeout": timeout_count,
  475. "pending_approval": pending_approval,
  476. "tier_summary": tier_summary,
  477. },
  478. )
  479. except Exception as e:
  480. logger.error("execute_decisions 失败: %s", e, exc_info=True)
  481. return ToolResult(title="execute_decisions 失败", output=str(e))
  482. # ═══════════════════════════════════════════
  483. # 工具:执行后效果检查
  484. # ═══════════════════════════════════════════
  485. @tool(description="执行后效果检查:对比操作前后广告表现")
  486. async def check_execution_feedback(
  487. ctx: ToolContext,
  488. execution_log_path: str = "",
  489. hours_after: int = FEEDBACK_CHECK_HOURS,
  490. ) -> ToolResult:
  491. """
  492. 读取执行日志,通过 API 获取当前状态,对比操作前后。
  493. Args:
  494. execution_log_path: 执行日志路径(JSONL),默认最新
  495. hours_after: 操作后等待时间(小时),仅检查超过此时间的操作
  496. """
  497. try:
  498. # 查找最新执行日志
  499. if not execution_log_path:
  500. log_dir = EXECUTION_LOG_DIR
  501. if not log_dir.exists():
  502. return ToolResult(title="check_execution_feedback", output="无执行日志目录")
  503. candidates = sorted(log_dir.glob("exec_*.jsonl"), reverse=True)
  504. if not candidates:
  505. return ToolResult(title="check_execution_feedback", output="无执行日志")
  506. execution_log_path = str(candidates[0])
  507. # 读取日志
  508. entries = []
  509. with open(execution_log_path, "r", encoding="utf-8") as f:
  510. for line in f:
  511. line = line.strip()
  512. if line:
  513. entries.append(json.loads(line))
  514. if not entries:
  515. return ToolResult(title="check_execution_feedback", output="执行日志为空")
  516. # 过滤:只看成功执行的且超过等待时间的
  517. cutoff = datetime.now() - timedelta(hours=hours_after)
  518. check_entries = [
  519. e for e in entries
  520. if e.get("execution_status") == "success"
  521. and datetime.fromisoformat(e["ts"]) < cutoff
  522. ]
  523. if not check_entries:
  524. return ToolResult(
  525. title="check_execution_feedback",
  526. output=f"无需检查(没有超过{hours_after}小时前的成功操作)",
  527. )
  528. # 获取当前状态
  529. executor = TencentAdExecutor()
  530. results = []
  531. for entry in check_entries:
  532. ad_id = entry.get("ad_id")
  533. account_id = entry.get("account_id", 0)
  534. action = entry.get("action")
  535. pre_state = entry.get("pre_state", {})
  536. current_state = await executor.get_ad_state(ad_id, account_id)
  537. result = {
  538. "ad_id": ad_id,
  539. "action": action,
  540. "executed_at": entry.get("ts"),
  541. "pre_bid": pre_state.get("bid_amount") if pre_state else None,
  542. "pre_status": pre_state.get("status") if pre_state else None,
  543. "current_bid": current_state.get("bid_amount") if current_state else None,
  544. "current_status": current_state.get("configured_status") if current_state else None,
  545. }
  546. # 状态变化判断
  547. if action == "pause":
  548. result["effective"] = (
  549. current_state.get("configured_status") == "AD_STATUS_SUSPEND"
  550. if current_state else None
  551. )
  552. elif action in ("bid_up", "bid_down"):
  553. post_bid = entry.get("post_state", {})
  554. if post_bid:
  555. expected_bid = post_bid.get("bid_amount")
  556. actual_bid = current_state.get("bid_amount") if current_state else None
  557. result["expected_bid"] = expected_bid
  558. result["effective"] = (actual_bid == expected_bid) if actual_bid is not None else None
  559. results.append(result)
  560. # 统计
  561. effective = sum(1 for r in results if r.get("effective") is True)
  562. ineffective = sum(1 for r in results if r.get("effective") is False)
  563. unknown = sum(1 for r in results if r.get("effective") is None)
  564. output_lines = [
  565. f"效果检查完成({len(results)} 个操作)",
  566. "",
  567. f" 有效: {effective} 个",
  568. f" 无效/被覆盖: {ineffective} 个",
  569. f" 未知: {unknown} 个",
  570. ]
  571. if ineffective > 0:
  572. output_lines.append("")
  573. output_lines.append("⚠️ 以下操作可能未生效或被覆盖:")
  574. for r in results:
  575. if r.get("effective") is False:
  576. output_lines.append(
  577. f" - 广告{r['ad_id']}: {r['action']} "
  578. f"(执行于 {r['executed_at']})"
  579. )
  580. return ToolResult(
  581. title=f"效果检查(有效{effective}/无效{ineffective})",
  582. output="\n".join(output_lines),
  583. metadata={
  584. "total": len(results),
  585. "effective": effective,
  586. "ineffective": ineffective,
  587. "unknown": unknown,
  588. "details": results,
  589. },
  590. )
  591. except Exception as e:
  592. logger.error("check_execution_feedback 失败: %s", e, exc_info=True)
  593. return ToolResult(title="check_execution_feedback 失败", output=str(e))