execute_agent.py 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380
  1. """
  2. 执行 Agent — 读取确认后的调整方案并执行 API 操作
  3. 支持两种输入方式:
  4. 1. 直接传入 adjustment_plan 列表(从分析 Agent 输出获取)
  5. 2. 传入 Excel 文件路径(运营在 Excel 中确认/修改后)
  6. 主要功能:
  7. - 出价调整(increase / decrease)
  8. - 广告关停(close → AD_STATUS_SUSPEND)
  9. - 冷启动保护(跳过 is_cold_start=True 的广告)
  10. - 赔付门槛保护(跳过 conversions_count 3-5 的关停)
  11. - Dry-run 模式(只验证不执行)
  12. - 执行后自动触发监控检查
  13. """
  14. import logging
  15. from datetime import datetime
  16. from pathlib import Path
  17. from typing import Any, Dict, List, Optional
  18. from agent.tools import tool
  19. from agent.tools.models import ToolContext, ToolResult
  20. from examples.auto_put_ad.tools.ad_api import ad_batch_update_status, ad_update
  21. from examples.auto_put_ad.tools.budget_calc import MIN_BID, MAX_BID
  22. from examples.auto_put_ad.tools.monitor_tools import monitor_check_metrics
  23. logger = logging.getLogger(__name__)
  24. def _load_plan_from_excel(excel_path: str) -> List[Dict]:
  25. """从 Excel 文件加载调整方案"""
  26. import pandas as pd
  27. df = pd.read_excel(excel_path)
  28. # 映射中文列名到英文字段(兼容两种格式)
  29. column_map = {
  30. "广告ID": "ad_id",
  31. "广告名称": "ad_name",
  32. "账户ID": "account_id",
  33. "动作": "action",
  34. "当前出价(分)": "current_bid",
  35. "新出价(分)": "new_bid",
  36. "调整幅度": "adjustment_ratio",
  37. "ROI等级": "roi_level",
  38. "跑量等级": "volume_level",
  39. "效率分": "efficiency",
  40. "昨日消耗(元)": "cost",
  41. "打开数": "open_count",
  42. "转化数": "conversions_count",
  43. "冷启动": "is_cold_start",
  44. "冷启动原因": "cold_start_reason",
  45. "广告状态": "ad_status",
  46. }
  47. # 重命名已有的中文列
  48. rename_cols = {k: v for k, v in column_map.items() if k in df.columns}
  49. if rename_cols:
  50. df = df.rename(columns=rename_cols)
  51. # 转换数据类型
  52. for col in ["ad_id", "account_id", "current_bid", "new_bid", "open_count", "conversions_count"]:
  53. if col in df.columns:
  54. df[col] = pd.to_numeric(df[col], errors="coerce").fillna(0).astype(int)
  55. if "is_cold_start" in df.columns:
  56. df["is_cold_start"] = df["is_cold_start"].astype(bool)
  57. if "cost" in df.columns:
  58. df["cost"] = pd.to_numeric(df["cost"], errors="coerce").fillna(0.0)
  59. return df.to_dict("records")
  60. def _validate_plan(plan: List[Dict]) -> tuple:
  61. """验证调整方案,返回 (validated_plan, warnings, errors)
  62. 验证规则:
  63. 1. 出价范围 [MIN_BID, MAX_BID]
  64. 2. 单次调整幅度 <= 15%
  65. 3. 冷启动广告跳过
  66. 4. 赔付门槛保护
  67. """
  68. validated = []
  69. warnings = []
  70. errors = []
  71. skip_cold_start = 0
  72. skip_payback = 0
  73. for item in plan:
  74. ad_id = item.get("ad_id", "unknown")
  75. action = item.get("action", "")
  76. # 跳过非操作项
  77. if action not in ("increase", "decrease", "close"):
  78. continue
  79. # 冷启动保护
  80. if item.get("is_cold_start", False):
  81. skip_cold_start += 1
  82. warnings.append(f" 跳过 ad_id={ad_id}: {item.get('cold_start_reason', '冷启动保护')}")
  83. continue
  84. # 赔付门槛保护(仅针对 close)
  85. conversions = item.get("conversions_count", 0)
  86. if action == "close" and 3 <= conversions < 6:
  87. skip_payback += 1
  88. warnings.append(f" 跳过关停 ad_id={ad_id}: 转化数={conversions},接近赔付门槛")
  89. continue
  90. # 出价范围检查
  91. new_bid = item.get("new_bid")
  92. if new_bid is not None and action in ("increase", "decrease"):
  93. if new_bid < MIN_BID:
  94. warnings.append(f" ad_id={ad_id}: 出价 {new_bid} < {MIN_BID},自动调整为 {MIN_BID}")
  95. item["new_bid"] = MIN_BID
  96. elif new_bid > MAX_BID:
  97. warnings.append(f" ad_id={ad_id}: 出价 {new_bid} > {MAX_BID},自动调整为 {MAX_BID}")
  98. item["new_bid"] = MAX_BID
  99. # 调整幅度检查:> 15% 自动 clamp,不放行
  100. current_bid = item.get("current_bid")
  101. if current_bid and new_bid and action in ("increase", "decrease"):
  102. ratio = abs(new_bid - current_bid) / current_bid
  103. if ratio > 0.15:
  104. # 自动 clamp 到 15% 幅度
  105. if action == "increase":
  106. clamped_bid = int(current_bid * 1.15)
  107. else:
  108. clamped_bid = int(current_bid * 0.85)
  109. clamped_bid = max(MIN_BID, min(MAX_BID, clamped_bid))
  110. warnings.append(
  111. f" ad_id={ad_id}: 调整幅度 {ratio:.0%} > 15%,"
  112. f"已 clamp: {new_bid} -> {clamped_bid} (限制在 15% 内)"
  113. )
  114. item["new_bid"] = clamped_bid
  115. validated.append(item)
  116. if skip_cold_start:
  117. warnings.insert(0, f"冷启动保护跳过: {skip_cold_start} 个广告")
  118. if skip_payback:
  119. warnings.insert(0 if not skip_cold_start else 1, f"赔付门槛保护跳过: {skip_payback} 个广告")
  120. return validated, warnings, errors
  121. @tool(description="加载并执行运营确认后的出价调整方案(支持 Excel 文件或直接传入方案列表)")
  122. async def execute_adjustment_plan(
  123. account_id: int,
  124. excel_path: str = "",
  125. adjustment_plan: Optional[List[Dict]] = None,
  126. dry_run: bool = False,
  127. execute_close: bool = False,
  128. context: Optional[ToolContext] = None,
  129. ) -> ToolResult:
  130. """加载确认后的方案并执行出价调整和广告关停。
  131. Args:
  132. account_id: 账户ID(传 0 则使用方案中每条记录的 account_id)
  133. excel_path: Excel 文件路径(与 adjustment_plan 二选一)
  134. adjustment_plan: 调整方案列表(与 excel_path 二选一)
  135. dry_run: True=只验证不执行,输出模拟报告
  136. execute_close: True=同时执行关停操作(默认不执行,需运营单独确认)
  137. """
  138. try:
  139. # === 阶段一:加载方案 ===
  140. source = ""
  141. if excel_path:
  142. path = Path(excel_path)
  143. if not path.exists():
  144. return ToolResult(title="执行失败", output=f"Excel 文件不存在: {excel_path}")
  145. plan = _load_plan_from_excel(excel_path)
  146. source = f"Excel: {path.name}"
  147. elif adjustment_plan:
  148. plan = adjustment_plan
  149. source = "上游传入"
  150. else:
  151. return ToolResult(title="执行失败", output="请提供 excel_path 或 adjustment_plan")
  152. if not plan:
  153. return ToolResult(title="执行失败", output="方案为空,无可执行项")
  154. # === 阶段二:验证 ===
  155. validated, warnings, errors = _validate_plan(plan)
  156. mode = "Dry-Run" if dry_run else "正式执行"
  157. report_lines = [
  158. "=" * 50,
  159. f"执行报告",
  160. "=" * 50,
  161. f"方案来源: {source}",
  162. f"执行模式: {mode}",
  163. f"方案总数: {len(plan)} 条",
  164. f"有效操作: {len(validated)} 条",
  165. "",
  166. ]
  167. if warnings:
  168. report_lines.append("验证警告:")
  169. report_lines.extend(warnings)
  170. report_lines.append("")
  171. if errors:
  172. report_lines.append("验证错误:")
  173. report_lines.extend(errors)
  174. report_lines.append("")
  175. # 分类统计
  176. increase_items = [v for v in validated if v["action"] == "increase"]
  177. decrease_items = [v for v in validated if v["action"] == "decrease"]
  178. close_items = [v for v in validated if v["action"] == "close"]
  179. bid_items = increase_items + decrease_items
  180. report_lines.append(f"出价调整: 提价 {len(increase_items)} 个 + 降价 {len(decrease_items)} 个 = {len(bid_items)} 个")
  181. report_lines.append(f"广告关停: {len(close_items)} 个 {'(本次执行)' if execute_close else '(待确认)'}")
  182. report_lines.append("")
  183. # === Dry-Run 模式:只输出不执行 ===
  184. if dry_run:
  185. if bid_items:
  186. report_lines.append("将要执行的出价调整:")
  187. for item in bid_items[:20]:
  188. report_lines.append(
  189. f" ad_id={item['ad_id']} | {item['action']} | "
  190. f"{item.get('current_bid', '?')} -> {item.get('new_bid', '?')} 分"
  191. )
  192. if len(bid_items) > 20:
  193. report_lines.append(f" ... 还有 {len(bid_items) - 20} 个")
  194. if close_items:
  195. report_lines.append("\n将要关停的广告:")
  196. for item in close_items[:10]:
  197. report_lines.append(
  198. f" ad_id={item['ad_id']} | 消耗:{item.get('cost', 0):.0f}元 | "
  199. f"转化:{item.get('conversions_count', 0)}"
  200. )
  201. report_lines.append("\n[Dry-Run] 以上操作未实际执行")
  202. return ToolResult(
  203. title=f"Dry-Run 报告({len(validated)} 条有效操作)",
  204. output="\n".join(report_lines),
  205. metadata={"mode": "dry_run", "validated_count": len(validated)},
  206. )
  207. # === 阶段三:执行出价调整 ===
  208. bid_success = 0
  209. bid_failed = 0
  210. bid_errors = []
  211. if bid_items:
  212. report_lines.append("--- 执行出价调整 ---")
  213. for item in bid_items:
  214. try:
  215. target_account = account_id if account_id else item.get("account_id", 0)
  216. await ad_update(
  217. account_id=target_account,
  218. adgroup_id=item["ad_id"],
  219. bid_amount=item["new_bid"],
  220. )
  221. bid_success += 1
  222. logger.info(
  223. "出价调整成功: ad_id=%s, %s -> %s (%s)",
  224. item["ad_id"], item.get("current_bid"), item["new_bid"], item["action"]
  225. )
  226. except Exception as e:
  227. bid_failed += 1
  228. err_msg = f"ad_id={item['ad_id']}: {str(e)}"
  229. bid_errors.append(err_msg)
  230. logger.error("出价调整失败: %s", err_msg)
  231. report_lines.append(f"提价(increase): {len(increase_items)} 个")
  232. report_lines.append(f"降价(decrease): {len(decrease_items)} 个")
  233. report_lines.append(f"成功: {bid_success} / 失败: {bid_failed}")
  234. report_lines.append("")
  235. # === 阶段四:执行广告关停 ===
  236. close_success = 0
  237. close_failed = 0
  238. close_errors = []
  239. if close_items and execute_close:
  240. report_lines.append("--- 执行广告关停 ---")
  241. # 按 account_id 分组,避免跨账户批量操作
  242. from collections import defaultdict
  243. close_by_account = defaultdict(list)
  244. for item in close_items:
  245. target_acct = account_id if account_id else item.get("account_id", 0)
  246. close_by_account[target_acct].append(item["ad_id"])
  247. for target_acct, ad_ids in close_by_account.items():
  248. try:
  249. result = await ad_batch_update_status(
  250. adgroup_ids=ad_ids,
  251. configured_status="AD_STATUS_SUSPEND",
  252. account_id=target_acct,
  253. )
  254. if "失败" not in result.title:
  255. close_success += len(ad_ids)
  256. logger.info("关停成功: 账户%s, %d 个广告", target_acct, len(ad_ids))
  257. else:
  258. close_failed += len(ad_ids)
  259. close_errors.append(f"账户{target_acct}: {result.output}")
  260. logger.error("关停失败: 账户%s, %s", target_acct, result.output)
  261. except Exception as e:
  262. close_failed += len(ad_ids)
  263. close_errors.append(f"账户{target_acct}: {str(e)}")
  264. logger.error("关停异常: 账户%s, %s", target_acct, e)
  265. report_lines.append(f"关停: 成功 {close_success} / 失败 {close_failed}")
  266. report_lines.append("")
  267. elif close_items and not execute_close:
  268. report_lines.append("--- 广告关停(待确认)---")
  269. report_lines.append(f"以下 {len(close_items)} 个广告建议关停,请确认后使用 execute_close=True 执行:")
  270. for item in close_items[:10]:
  271. report_lines.append(
  272. f" ad_id={item['ad_id']} | 消耗:{item.get('cost', 0):.0f}元 | "
  273. f"转化:{item.get('conversions_count', 0)} | 效率:{item.get('efficiency', 0)}"
  274. )
  275. if len(close_items) > 10:
  276. report_lines.append(f" ... 还有 {len(close_items) - 10} 个")
  277. report_lines.append("")
  278. # === 阶段五:执行后监控 ===
  279. monitor_result = None
  280. if bid_success > 0 or close_success > 0:
  281. report_lines.append("--- 执行后监控 ---")
  282. try:
  283. target_account = account_id if account_id else 0
  284. monitor_result = await monitor_check_metrics(
  285. account_id=target_account,
  286. check_items=["cost_spike", "budget_overrun"],
  287. time_window="1h",
  288. )
  289. report_lines.append(f"监控结果: {monitor_result.title}")
  290. if monitor_result.metadata and monitor_result.metadata.get("anomalies"):
  291. for a in monitor_result.metadata["anomalies"]:
  292. report_lines.append(f" {a.get('type', '')}: {a.get('message', '')}")
  293. else:
  294. report_lines.append(" 无异常")
  295. except Exception as e:
  296. report_lines.append(f"监控检查失败: {str(e)}")
  297. logger.warning("执行后监控失败: %s", e)
  298. report_lines.append("")
  299. # === 失败详情 ===
  300. all_errors = bid_errors + close_errors
  301. if all_errors:
  302. report_lines.append("--- 失败详情 ---")
  303. for err in all_errors[:20]:
  304. report_lines.append(f" {err}")
  305. if len(all_errors) > 20:
  306. report_lines.append(f" ... 还有 {len(all_errors) - 20} 个错误")
  307. report_lines.append("")
  308. # === 汇总 ===
  309. report_lines.append("=" * 50)
  310. total_success = bid_success + close_success
  311. total_failed = bid_failed + close_failed
  312. report_lines.append(f"总计: 成功 {total_success} / 失败 {total_failed}")
  313. report_lines.append(f"时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
  314. return ToolResult(
  315. title=f"执行完成(成功{total_success}/失败{total_failed})",
  316. output="\n".join(report_lines),
  317. metadata={
  318. "mode": "execute",
  319. "bid_success": bid_success,
  320. "bid_failed": bid_failed,
  321. "close_success": close_success,
  322. "close_failed": close_failed,
  323. "close_pending": len(close_items) if not execute_close else 0,
  324. "errors": all_errors,
  325. },
  326. )
  327. except Exception as e:
  328. logger.error("execute_adjustment_plan 失败: %s", e, exc_info=True)
  329. return ToolResult(title="执行失败", output=str(e))