execution_engine.py 32 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769
  1. """
  2. 执行引擎 — auto_put_ad_mini
  3. 职责:
  4. 1. 加载护栏验证后的决策
  5. 2. 按自治级别分类(Tier 1 自动 / Tier 2 审批 / Tier 3 阻断)
  6. 3. 调用腾讯广告 API 执行操作
  7. 4. 记录审计日志(JSONL)
  8. 5. 执行后效果检查
  9. 依赖:
  10. - tools/ad_api.py 的底层 HTTP 函数
  11. - tools/guardrails.py 的 AdjustmentHistory
  12. """
  13. import asyncio
  14. import json
  15. import logging
  16. import sys
  17. import time
  18. from datetime import datetime, timedelta
  19. from pathlib import Path
  20. from typing import Any, Dict, List, Optional
  21. import pandas as pd
  22. from agent.tools import tool
  23. from agent.tools.models import ToolContext, ToolResult
  24. _MINI_DIR = Path(__file__).resolve().parent.parent
  25. _TOOLS_DIR = Path(__file__).resolve().parent
  26. if str(_MINI_DIR) not in sys.path:
  27. sys.path.insert(0, str(_MINI_DIR))
  28. if str(_TOOLS_DIR) not in sys.path:
  29. sys.path.insert(0, str(_TOOLS_DIR))
  30. from config import (
  31. EXECUTION_ENABLED,
  32. EXECUTION_LOG_DIR,
  33. API_QPS_LIMIT,
  34. API_MAX_RETRIES,
  35. TIER1_MAX_CHANGE_PCT,
  36. TIER3_MIN_DAILY_SPEND,
  37. FEEDBACK_CHECK_HOURS,
  38. DRY_RUN_MODE,
  39. IM_ENABLED,
  40. )
  41. logger = logging.getLogger(__name__)
  42. # ═══════════════════════════════════════════
  43. # QPS 令牌桶限流
  44. # ═══════════════════════════════════════════
  45. class TokenBucket:
  46. """简单令牌桶限流器。"""
  47. def __init__(self, rate: float = API_QPS_LIMIT):
  48. self._rate = rate
  49. self._tokens = rate
  50. self._last_refill = time.monotonic()
  51. async def acquire(self):
  52. """获取一个令牌,不够时等待。"""
  53. while True:
  54. now = time.monotonic()
  55. elapsed = now - self._last_refill
  56. self._tokens = min(self._rate, self._tokens + elapsed * self._rate)
  57. self._last_refill = now
  58. if self._tokens >= 1:
  59. self._tokens -= 1
  60. return
  61. else:
  62. wait = (1 - self._tokens) / self._rate
  63. await asyncio.sleep(wait)
  64. # ═══════════════════════════════════════════
  65. # 腾讯广告执行器
  66. # ═══════════════════════════════════════════
  67. class TencentAdExecutor:
  68. """
  69. 腾讯广告 API 执行器。
  70. 复用 ad_api.py 的底层 HTTP 函数,添加:
  71. - QPS 令牌桶限流
  72. - 指数退避重试
  73. - 操作前快照
  74. """
  75. def __init__(self):
  76. self._bucket = TokenBucket(rate=API_QPS_LIMIT)
  77. async def get_ad_state(self, ad_id: int, account_id: int) -> Optional[Dict]:
  78. """获取广告当前状态快照。"""
  79. try:
  80. from ad_api import _get, _check
  81. await self._bucket.acquire()
  82. resp = _get("/adgroups/get", {
  83. "account_id": account_id,
  84. "filtering": {"adgroup_id_list": [ad_id]},
  85. "page": 1,
  86. "page_size": 1,
  87. })
  88. data = _check(resp, "get_ad_state")
  89. items = data.get("list", [])
  90. return items[0] if items else None
  91. except Exception as e:
  92. logger.warning("获取广告 %s 状态失败: %s", ad_id, e)
  93. return None
  94. async def update_bid(self, ad_id: int, account_id: int, bid_amount_fen: int) -> Dict:
  95. """更新广告出价(分)。"""
  96. # 白名单安全检查(最后防线)
  97. from config import WHITELIST_ENABLED, WHITELIST_ACCOUNTS
  98. if WHITELIST_ENABLED and account_id not in WHITELIST_ACCOUNTS:
  99. logger.error(
  100. f"⚠️ 白名单安全阻断:账户 {account_id} 不在白名单内,拒绝执行 update_bid。"
  101. f"白名单: {WHITELIST_ACCOUNTS}"
  102. )
  103. return {"code": -1, "message": f"账户 {account_id} 不在白名单内"}
  104. # 预先获取并验证 access_token
  105. from ad_api import _post, _check, _get_access_token
  106. try:
  107. token = _get_access_token(account_id)
  108. if not token or len(token) < 10:
  109. return {"code": -1, "message": f"获取 access_token 失败:token={token[:20]}"}
  110. logger.info(f"[update_bid] 已获取 access_token (账户={account_id}): {token[:10]}...")
  111. except Exception as e:
  112. logger.error(f"[update_bid] 获取 access_token 异常: {e}")
  113. return {"code": -1, "message": f"获取 access_token 异常: {e}"}
  114. for attempt in range(API_MAX_RETRIES):
  115. try:
  116. await self._bucket.acquire()
  117. body = {
  118. "account_id": account_id,
  119. "adgroup_id": ad_id,
  120. "bid_amount": bid_amount_fen,
  121. }
  122. resp = _post("/adgroups/update", body)
  123. _check(resp, "update_bid")
  124. return {"code": 0, "message": "success"}
  125. except Exception as e:
  126. if attempt < API_MAX_RETRIES - 1:
  127. wait = 2 ** attempt
  128. logger.warning("update_bid 重试 %d/%d (等待%ds): %s", attempt + 1, API_MAX_RETRIES, wait, e)
  129. await asyncio.sleep(wait)
  130. else:
  131. return {"code": -1, "message": str(e)}
  132. async def pause_ad(self, ad_id: int, account_id: int) -> Dict:
  133. """暂停广告。"""
  134. # 白名单安全检查(最后防线)
  135. from config import WHITELIST_ENABLED, WHITELIST_ACCOUNTS
  136. if WHITELIST_ENABLED and account_id not in WHITELIST_ACCOUNTS:
  137. logger.error(
  138. f"⚠️ 白名单安全阻断:账户 {account_id} 不在白名单内,拒绝执行 pause_ad。"
  139. f"白名单: {WHITELIST_ACCOUNTS}"
  140. )
  141. return {"code": -1, "message": f"账户 {account_id} 不在白名单内"}
  142. # 预先获取并验证 access_token
  143. from ad_api import _post, _check, _get_access_token
  144. try:
  145. token = _get_access_token(account_id)
  146. if not token or len(token) < 10:
  147. return {"code": -1, "message": f"获取 access_token 失败:token={token[:20]}"}
  148. logger.info(f"[pause_ad] 已获取 access_token (账户={account_id}): {token[:10]}...")
  149. except Exception as e:
  150. logger.error(f"[pause_ad] 获取 access_token 异常: {e}")
  151. return {"code": -1, "message": f"获取 access_token 异常: {e}"}
  152. for attempt in range(API_MAX_RETRIES):
  153. try:
  154. await self._bucket.acquire()
  155. body = {
  156. "account_id": account_id,
  157. "adgroup_id": ad_id,
  158. "configured_status": "AD_STATUS_SUSPEND",
  159. }
  160. resp = _post("/adgroups/update", body)
  161. _check(resp, "pause_ad")
  162. return {"code": 0, "message": "success"}
  163. except Exception as e:
  164. if attempt < API_MAX_RETRIES - 1:
  165. wait = 2 ** attempt
  166. logger.warning("pause_ad 重试 %d/%d: %s", attempt + 1, API_MAX_RETRIES, e)
  167. await asyncio.sleep(wait)
  168. else:
  169. return {"code": -1, "message": str(e)}
  170. # ═══════════════════════════════════════════
  171. # 审计日志
  172. # ═══════════════════════════════════════════
  173. class AuditLogger:
  174. """JSONL 审计日志。"""
  175. def __init__(self, log_dir: Path = EXECUTION_LOG_DIR):
  176. self._log_dir = log_dir
  177. self._log_dir.mkdir(parents=True, exist_ok=True)
  178. today = datetime.now().strftime("%Y%m%d")
  179. self._path = self._log_dir / f"exec_{today}.jsonl"
  180. def log(self, entry: Dict):
  181. entry["ts"] = datetime.now().isoformat()
  182. with open(self._path, "a", encoding="utf-8") as f:
  183. f.write(json.dumps(entry, ensure_ascii=False) + "\n")
  184. @property
  185. def path(self) -> Path:
  186. return self._path
  187. # ═══════════════════════════════════════════
  188. # 自治级别分类
  189. # ═══════════════════════════════════════════
  190. def _classify_tier(row: pd.Series) -> int:
  191. """
  192. 自治级别分类。
  193. Tier 0 (无操作): hold, observe, creative_adjust, scale_up(不调用API,仅建议)
  194. Tier 1 (自动执行): bid 调幅 ≤ 5%
  195. Tier 2 (需审批): pause 或 bid 调幅 > 5%
  196. Tier 3 (高价值阻断): 日消耗 > 1500 元的高价值广告
  197. """
  198. action = row.get("final_action", row.get("action", "hold"))
  199. # Tier 0: 无需操作的动作(仅建议,不执行)
  200. if action in ("hold", "observe", "creative_adjust", "scale_up"):
  201. return 0 # 无操作(observe=观察,creative_adjust/scale_up=需人工执行)
  202. cost_7d_avg = float(row.get("cost_7d_avg", 0) or 0)
  203. change_pct = row.get("recommended_change_pct", 0)
  204. if isinstance(change_pct, str):
  205. try:
  206. change_pct = float(change_pct)
  207. except ValueError:
  208. change_pct = 0
  209. # Tier 3: 高价值广告
  210. if cost_7d_avg >= TIER3_MIN_DAILY_SPEND:
  211. return 3
  212. # Tier 2: 暂停 或 大幅调价
  213. if action == "pause" or abs(change_pct) > TIER1_MAX_CHANGE_PCT:
  214. return 2
  215. # Tier 1: 小幅调价
  216. return 1
  217. # ═══════════════════════════════════════════
  218. # 工具:执行已验证的决策
  219. # ═══════════════════════════════════════════
  220. @tool(description="执行已验证的决策:分级自治 → API调用 → 审计日志")
  221. async def execute_decisions(
  222. ctx: ToolContext = None,
  223. validated_csv: str = "",
  224. approval_mode: str = "tiered",
  225. ) -> ToolResult:
  226. """
  227. 执行已通过护栏验证的决策。
  228. Pipeline:
  229. 1. 加载 validated_decisions CSV (guardrail_status != blocked)
  230. 2. 按自治级别分类
  231. 3. Tier 1 (小调, ≤5%): 直接执行 + 通知
  232. 4. Tier 2 (大调/暂停): 标记待审批
  233. 5. Tier 3 (高价值): 标记需人工审批
  234. 6. 记录审计日志
  235. Args:
  236. validated_csv: 护栏验证后的 CSV 路径
  237. approval_mode: "auto"=全部自动执行, "tiered"=分级, "manual"=全部需审批
  238. """
  239. try:
  240. if not EXECUTION_ENABLED:
  241. return ToolResult(
  242. title="执行引擎未启用",
  243. output="EXECUTION_ENABLED=False,跳过执行。修改 config.py 启用。",
  244. )
  245. if DRY_RUN_MODE:
  246. return ToolResult(
  247. title="干运行模式",
  248. output="DRY_RUN_MODE=True,操作不会实际执行。修改 config.py 关闭干运行。",
  249. )
  250. # 查找验证 CSV
  251. if not validated_csv:
  252. reports_dir = _MINI_DIR / "outputs" / "reports"
  253. candidates = sorted(reports_dir.glob("validated_decisions_*.csv"), reverse=True)
  254. if not candidates:
  255. return ToolResult(title="execute_decisions", output="未找到验证后的决策 CSV")
  256. validated_csv = str(candidates[0])
  257. df = pd.read_csv(validated_csv)
  258. if df.empty:
  259. return ToolResult(title="execute_decisions", output="决策数据为空")
  260. # 只执行 approved / modified(非 blocked)
  261. df_exec = df[df["guardrail_status"].isin(["approved", "modified"])].copy()
  262. df_exec = df_exec[df_exec["final_action"] != "hold"]
  263. if df_exec.empty:
  264. return ToolResult(
  265. title="无需执行的操作",
  266. output="所有决策要么是 hold,要么被护栏拦截",
  267. )
  268. # ===== 白名单过滤(仅在执行阶段生效)=====
  269. # 分析阶段覆盖所有账户,执行阶段仅操作白名单账户
  270. from config import WHITELIST_ENABLED, WHITELIST_ACCOUNTS
  271. if WHITELIST_ENABLED:
  272. if "account_id" in df_exec.columns:
  273. non_whitelist = df_exec[~df_exec["account_id"].isin(WHITELIST_ACCOUNTS)]
  274. if not non_whitelist.empty:
  275. logger.info(
  276. f"白名单过滤:跳过 {len(non_whitelist)} 个非白名单账户的决策,"
  277. f"仅执行白名单账户。跳过账户: {non_whitelist['account_id'].unique().tolist()[:10]}"
  278. )
  279. df_exec = df_exec[df_exec["account_id"].isin(WHITELIST_ACCOUNTS)].copy()
  280. if df_exec.empty:
  281. return ToolResult(
  282. title="白名单过滤后无可执行决策",
  283. output=f"所有决策均属于非白名单账户,无需执行API操作。"
  284. f"白名单账户: {WHITELIST_ACCOUNTS[:5]}"
  285. )
  286. else:
  287. logger.warning("白名单检查:数据中缺少 account_id 列,跳过检查")
  288. # 分级
  289. df_exec["tier"] = df_exec.apply(_classify_tier, axis=1)
  290. executor = TencentAdExecutor()
  291. audit = AuditLogger()
  292. # 从 guardrails 导入 AdjustmentHistory 记录操作
  293. from guardrails import AdjustmentHistory
  294. history = AdjustmentHistory()
  295. executed = 0
  296. failed = 0
  297. pending_approval = 0
  298. approved_executed = 0
  299. rejected_count = 0
  300. timeout_count = 0
  301. tier_summary = {1: 0, 2: 0, 3: 0}
  302. # ─── 分离 Tier 1 和 Tier 2/3 ───
  303. df_tier1 = df_exec[df_exec["tier"] == 1]
  304. df_tier2_3 = df_exec[df_exec["tier"] >= 2]
  305. # ⚠️ 硬开关:TIER1_MAX_CHANGE_PCT <= 0 → 完全禁用自动执行通道
  306. # 即使分类器误将某条归为 Tier 1,也强制转入审批通道,避免未审批就执行
  307. if TIER1_MAX_CHANGE_PCT <= 0 and not df_tier1.empty:
  308. logger.warning(
  309. "TIER1_MAX_CHANGE_PCT=%s ≤ 0,自动执行通道已禁用;强制将 %d 条 Tier 1 决策转入审批",
  310. TIER1_MAX_CHANGE_PCT, len(df_tier1),
  311. )
  312. df_tier1_forced = df_tier1.copy()
  313. df_tier1_forced["tier"] = 2 # 提升到 Tier 2 走审批
  314. df_tier2_3 = pd.concat([df_tier2_3, df_tier1_forced], ignore_index=True)
  315. df_tier1 = df_tier1.iloc[0:0] # 清空 Tier 1
  316. for t in [1, 2, 3]:
  317. tier_summary[t] = int((df_exec["tier"] == t).sum() if not df_exec.empty else 0)
  318. # ═══ Phase 1: 自动执行 Tier 1 ═══
  319. # 注意:当 TIER1_MAX_CHANGE_PCT <= 0 时 df_tier1 已被清空,此循环不会执行
  320. for _, row in df_tier1.iterrows():
  321. action = row.get("final_action", row.get("action"))
  322. ad_id = int(row["ad_id"])
  323. account_id = int(row.get("account_id", 0) or 0)
  324. pre_state = await executor.get_ad_state(ad_id, account_id)
  325. if action == "pause":
  326. result = await executor.pause_ad(ad_id, account_id)
  327. elif action in ("bid_up", "bid_down"):
  328. final_bid = row.get("final_bid", row.get("recommended_bid"))
  329. if final_bid is None or final_bid == "":
  330. audit.log({
  331. "ad_id": ad_id,
  332. "action": action,
  333. "tier": 1,
  334. "execution_status": "skipped",
  335. "reason": "无出价数据",
  336. })
  337. continue
  338. bid_fen = int(float(final_bid) * 100)
  339. result = await executor.update_bid(ad_id, account_id, bid_fen)
  340. else:
  341. continue
  342. api_code = result.get("code", -1)
  343. exec_status = "success" if api_code == 0 else "failed"
  344. if exec_status == "success":
  345. executed += 1
  346. change_pct = row.get("recommended_change_pct", 0)
  347. if isinstance(change_pct, str):
  348. try:
  349. change_pct = float(change_pct)
  350. except ValueError:
  351. change_pct = 0
  352. history.record_adjustment(str(ad_id), action, change_pct)
  353. else:
  354. failed += 1
  355. post_state = await executor.get_ad_state(ad_id, account_id) if exec_status == "success" else None
  356. audit.log({
  357. "ad_id": ad_id,
  358. "account_id": account_id,
  359. "action": action,
  360. "tier": 1,
  361. "pre_state": {
  362. "bid_amount": pre_state.get("bid_amount") if pre_state else None,
  363. "status": pre_state.get("configured_status") if pre_state else None,
  364. } if pre_state else None,
  365. "post_state": {
  366. "bid_amount": post_state.get("bid_amount") if post_state else None,
  367. "status": post_state.get("configured_status") if post_state else None,
  368. } if post_state else None,
  369. "api_code": api_code,
  370. "api_message": result.get("message", ""),
  371. "execution_status": exec_status,
  372. "source": row.get("source", ""),
  373. })
  374. # ═══ Phase 2: Tier 2/3 — 审批 + 执行 ═══
  375. if not df_tier2_3.empty:
  376. if IM_ENABLED:
  377. # 阻塞式审批:调用 send_approval_request(wait_for_reply=True)
  378. logger.info("Tier 2/3 共 %d 个操作,发送 IM 审批并等待...", len(df_tier2_3))
  379. from im_approval import send_approval_request
  380. approval_result = await send_approval_request(
  381. ctx=ctx,
  382. validated_csv=validated_csv,
  383. wait_for_reply=True,
  384. )
  385. approval_status = (
  386. approval_result.metadata.get("status", "timeout")
  387. if approval_result.metadata
  388. else "timeout"
  389. )
  390. approved_ids = (
  391. approval_result.metadata.get("approved_ids", [])
  392. if approval_result.metadata
  393. else []
  394. )
  395. rejected_ids = (
  396. approval_result.metadata.get("rejected_ids", [])
  397. if approval_result.metadata
  398. else []
  399. )
  400. if approval_status == "timeout":
  401. # 超时:所有 Tier 2/3 标记为 timeout
  402. timeout_count = len(df_tier2_3)
  403. for _, row in df_tier2_3.iterrows():
  404. audit.log({
  405. "ad_id": int(row["ad_id"]),
  406. "account_id": int(row.get("account_id", 0) or 0),
  407. "action": row.get("final_action", row.get("action")),
  408. "tier": int(row.get("tier", 2)),
  409. "execution_status": "timeout",
  410. "source": row.get("source", ""),
  411. })
  412. else:
  413. # 执行已批准的广告
  414. approved_set = set(int(x) for x in approved_ids)
  415. rejected_set = set(int(x) for x in rejected_ids)
  416. for _, row in df_tier2_3.iterrows():
  417. ad_id = int(row["ad_id"])
  418. account_id = int(row.get("account_id", 0) or 0)
  419. action = row.get("final_action", row.get("action"))
  420. tier = int(row.get("tier", 2))
  421. if ad_id in rejected_set:
  422. rejected_count += 1
  423. audit.log({
  424. "ad_id": ad_id,
  425. "account_id": account_id,
  426. "action": action,
  427. "tier": tier,
  428. "execution_status": "rejected",
  429. "source": row.get("source", ""),
  430. })
  431. continue
  432. if ad_id not in approved_set:
  433. # 既不在 approved 也不在 rejected(部分审批场景遗漏)
  434. pending_approval += 1
  435. audit.log({
  436. "ad_id": ad_id,
  437. "account_id": account_id,
  438. "action": action,
  439. "tier": tier,
  440. "execution_status": "pending_approval",
  441. "source": row.get("source", ""),
  442. })
  443. continue
  444. # 已批准 → 执行
  445. pre_state = await executor.get_ad_state(ad_id, account_id)
  446. if action == "pause":
  447. result = await executor.pause_ad(ad_id, account_id)
  448. elif action in ("bid_up", "bid_down"):
  449. final_bid = row.get("final_bid", row.get("recommended_bid"))
  450. if final_bid is None or final_bid == "":
  451. audit.log({
  452. "ad_id": ad_id,
  453. "action": action,
  454. "tier": tier,
  455. "execution_status": "skipped",
  456. "reason": "无出价数据",
  457. })
  458. continue
  459. bid_fen = int(float(final_bid) * 100)
  460. result = await executor.update_bid(ad_id, account_id, bid_fen)
  461. else:
  462. continue
  463. api_code = result.get("code", -1)
  464. exec_status = "success" if api_code == 0 else "failed"
  465. if exec_status == "success":
  466. approved_executed += 1
  467. change_pct = row.get("recommended_change_pct", 0)
  468. if isinstance(change_pct, str):
  469. try:
  470. change_pct = float(change_pct)
  471. except ValueError:
  472. change_pct = 0
  473. history.record_adjustment(str(ad_id), action, change_pct)
  474. else:
  475. failed += 1
  476. post_state = await executor.get_ad_state(ad_id, account_id) if exec_status == "success" else None
  477. audit.log({
  478. "ad_id": ad_id,
  479. "account_id": account_id,
  480. "action": action,
  481. "tier": tier,
  482. "pre_state": {
  483. "bid_amount": pre_state.get("bid_amount") if pre_state else None,
  484. "status": pre_state.get("configured_status") if pre_state else None,
  485. } if pre_state else None,
  486. "post_state": {
  487. "bid_amount": post_state.get("bid_amount") if post_state else None,
  488. "status": post_state.get("configured_status") if post_state else None,
  489. } if post_state else None,
  490. "api_code": api_code,
  491. "api_message": result.get("message", ""),
  492. "execution_status": f"approved_{exec_status}",
  493. "source": row.get("source", ""),
  494. })
  495. else:
  496. # IM 未启用:Tier 2/3 仅记录不执行
  497. logger.info("IM 未启用,Tier 2/3 共 %d 个操作仅记录不执行", len(df_tier2_3))
  498. pending_approval = len(df_tier2_3)
  499. for _, row in df_tier2_3.iterrows():
  500. audit.log({
  501. "ad_id": int(row["ad_id"]),
  502. "account_id": int(row.get("account_id", 0) or 0),
  503. "action": row.get("final_action", row.get("action")),
  504. "tier": int(row.get("tier", 2)),
  505. "execution_status": "pending_approval",
  506. "note": "IM未启用,操作仅记录",
  507. "source": row.get("source", ""),
  508. })
  509. total_executed = executed + approved_executed
  510. output_lines = [
  511. f"执行完成,审计日志: {audit.path}",
  512. "",
  513. "执行结果:",
  514. f" Tier 1 自动执行: {executed} 个成功 / {failed} 个失败",
  515. ]
  516. if IM_ENABLED and not df_tier2_3.empty:
  517. output_lines.extend([
  518. f" Tier 2/3 审批后执行: {approved_executed} 个成功",
  519. f" 审批拒绝: {rejected_count} 个",
  520. f" 审批超时: {timeout_count} 个",
  521. ])
  522. if pending_approval > 0:
  523. output_lines.append(f" 待审批(未执行): {pending_approval} 个")
  524. output_lines.extend([
  525. "",
  526. "自治级别分布:",
  527. f" Tier 1 (自动): {tier_summary.get(1, 0)} 个",
  528. f" Tier 2 (审批): {tier_summary.get(2, 0)} 个",
  529. f" Tier 3 (高价值): {tier_summary.get(3, 0)} 个",
  530. ])
  531. return ToolResult(
  532. title=f"执行完成(自动{executed}/审批通过{approved_executed}/拒绝{rejected_count}/超时{timeout_count})",
  533. output="\n".join(output_lines),
  534. metadata={
  535. "audit_log": str(audit.path),
  536. "tier1_executed": executed,
  537. "tier1_failed": failed,
  538. "approved_executed": approved_executed,
  539. "rejected": rejected_count,
  540. "timeout": timeout_count,
  541. "pending_approval": pending_approval,
  542. "tier_summary": tier_summary,
  543. },
  544. )
  545. except Exception as e:
  546. logger.error("execute_decisions 失败: %s", e, exc_info=True)
  547. return ToolResult(title="execute_decisions 失败", output=str(e))
  548. # ═══════════════════════════════════════════
  549. # 工具:执行后效果检查
  550. # ═══════════════════════════════════════════
  551. @tool(description="执行后效果检查:对比操作前后广告表现")
  552. async def check_execution_feedback(
  553. ctx: ToolContext = None,
  554. execution_log_path: str = "",
  555. hours_after: int = FEEDBACK_CHECK_HOURS,
  556. ) -> ToolResult:
  557. """
  558. 读取执行日志,通过 API 获取当前状态,对比操作前后。
  559. Args:
  560. execution_log_path: 执行日志路径(JSONL),默认最新
  561. hours_after: 操作后等待时间(小时),仅检查超过此时间的操作
  562. """
  563. try:
  564. # 查找最新执行日志
  565. if not execution_log_path:
  566. log_dir = EXECUTION_LOG_DIR
  567. if not log_dir.exists():
  568. return ToolResult(title="check_execution_feedback", output="无执行日志目录")
  569. candidates = sorted(log_dir.glob("exec_*.jsonl"), reverse=True)
  570. if not candidates:
  571. return ToolResult(title="check_execution_feedback", output="无执行日志")
  572. execution_log_path = str(candidates[0])
  573. # 读取日志
  574. entries = []
  575. with open(execution_log_path, "r", encoding="utf-8") as f:
  576. for line in f:
  577. line = line.strip()
  578. if line:
  579. entries.append(json.loads(line))
  580. if not entries:
  581. return ToolResult(title="check_execution_feedback", output="执行日志为空")
  582. # 过滤:只看成功执行的且超过等待时间的
  583. cutoff = datetime.now() - timedelta(hours=hours_after)
  584. check_entries = [
  585. e for e in entries
  586. if e.get("execution_status") == "success"
  587. and datetime.fromisoformat(e["ts"]) < cutoff
  588. ]
  589. if not check_entries:
  590. return ToolResult(
  591. title="check_execution_feedback",
  592. output=f"无需检查(没有超过{hours_after}小时前的成功操作)",
  593. )
  594. # 获取当前状态
  595. executor = TencentAdExecutor()
  596. results = []
  597. for entry in check_entries:
  598. ad_id = entry.get("ad_id")
  599. account_id = entry.get("account_id", 0)
  600. action = entry.get("action")
  601. pre_state = entry.get("pre_state", {})
  602. current_state = await executor.get_ad_state(ad_id, account_id)
  603. result = {
  604. "ad_id": ad_id,
  605. "action": action,
  606. "executed_at": entry.get("ts"),
  607. "pre_bid": pre_state.get("bid_amount") if pre_state else None,
  608. "pre_status": pre_state.get("status") if pre_state else None,
  609. "current_bid": current_state.get("bid_amount") if current_state else None,
  610. "current_status": current_state.get("configured_status") if current_state else None,
  611. }
  612. # 状态变化判断
  613. if action == "pause":
  614. result["effective"] = (
  615. current_state.get("configured_status") == "AD_STATUS_SUSPEND"
  616. if current_state else None
  617. )
  618. elif action in ("bid_up", "bid_down"):
  619. post_bid = entry.get("post_state", {})
  620. if post_bid:
  621. expected_bid = post_bid.get("bid_amount")
  622. actual_bid = current_state.get("bid_amount") if current_state else None
  623. result["expected_bid"] = expected_bid
  624. result["effective"] = (actual_bid == expected_bid) if actual_bid is not None else None
  625. results.append(result)
  626. # 统计
  627. effective = sum(1 for r in results if r.get("effective") is True)
  628. ineffective = sum(1 for r in results if r.get("effective") is False)
  629. unknown = sum(1 for r in results if r.get("effective") is None)
  630. output_lines = [
  631. f"效果检查完成({len(results)} 个操作)",
  632. "",
  633. f" 有效: {effective} 个",
  634. f" 无效/被覆盖: {ineffective} 个",
  635. f" 未知: {unknown} 个",
  636. ]
  637. if ineffective > 0:
  638. output_lines.append("")
  639. output_lines.append("⚠️ 以下操作可能未生效或被覆盖:")
  640. for r in results:
  641. if r.get("effective") is False:
  642. output_lines.append(
  643. f" - 广告{r['ad_id']}: {r['action']} "
  644. f"(执行于 {r['executed_at']})"
  645. )
  646. return ToolResult(
  647. title=f"效果检查(有效{effective}/无效{ineffective})",
  648. output="\n".join(output_lines),
  649. metadata={
  650. "total": len(results),
  651. "effective": effective,
  652. "ineffective": ineffective,
  653. "unknown": unknown,
  654. "details": results,
  655. },
  656. )
  657. except Exception as e:
  658. logger.error("check_execution_feedback 失败: %s", e, exc_info=True)
  659. return ToolResult(title="check_execution_feedback 失败", output=str(e))