| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380 |
- """
- 执行 Agent — 读取确认后的调整方案并执行 API 操作
- 支持两种输入方式:
- 1. 直接传入 adjustment_plan 列表(从分析 Agent 输出获取)
- 2. 传入 Excel 文件路径(运营在 Excel 中确认/修改后)
- 主要功能:
- - 出价调整(increase / decrease)
- - 广告关停(close → AD_STATUS_SUSPEND)
- - 冷启动保护(跳过 is_cold_start=True 的广告)
- - 赔付门槛保护(跳过 conversions_count 3-5 的关停)
- - Dry-run 模式(只验证不执行)
- - 执行后自动触发监控检查
- """
- import logging
- from datetime import datetime
- from pathlib import Path
- from typing import Any, Dict, List, Optional
- from agent.tools import tool
- from agent.tools.models import ToolContext, ToolResult
- from examples.auto_put_ad.tools.ad_api import ad_batch_update_status, ad_update
- from examples.auto_put_ad.tools.budget_calc import MIN_BID, MAX_BID
- from examples.auto_put_ad.tools.monitor_tools import monitor_check_metrics
- logger = logging.getLogger(__name__)
- def _load_plan_from_excel(excel_path: str) -> List[Dict]:
- """从 Excel 文件加载调整方案"""
- import pandas as pd
- df = pd.read_excel(excel_path)
- # 映射中文列名到英文字段(兼容两种格式)
- column_map = {
- "广告ID": "ad_id",
- "广告名称": "ad_name",
- "账户ID": "account_id",
- "动作": "action",
- "当前出价(分)": "current_bid",
- "新出价(分)": "new_bid",
- "调整幅度": "adjustment_ratio",
- "ROI等级": "roi_level",
- "跑量等级": "volume_level",
- "效率分": "efficiency",
- "昨日消耗(元)": "cost",
- "打开数": "open_count",
- "转化数": "conversions_count",
- "冷启动": "is_cold_start",
- "冷启动原因": "cold_start_reason",
- "广告状态": "ad_status",
- }
- # 重命名已有的中文列
- rename_cols = {k: v for k, v in column_map.items() if k in df.columns}
- if rename_cols:
- df = df.rename(columns=rename_cols)
- # 转换数据类型
- for col in ["ad_id", "account_id", "current_bid", "new_bid", "open_count", "conversions_count"]:
- if col in df.columns:
- df[col] = pd.to_numeric(df[col], errors="coerce").fillna(0).astype(int)
- if "is_cold_start" in df.columns:
- df["is_cold_start"] = df["is_cold_start"].astype(bool)
- if "cost" in df.columns:
- df["cost"] = pd.to_numeric(df["cost"], errors="coerce").fillna(0.0)
- return df.to_dict("records")
- def _validate_plan(plan: List[Dict]) -> tuple:
- """验证调整方案,返回 (validated_plan, warnings, errors)
- 验证规则:
- 1. 出价范围 [MIN_BID, MAX_BID]
- 2. 单次调整幅度 <= 15%
- 3. 冷启动广告跳过
- 4. 赔付门槛保护
- """
- validated = []
- warnings = []
- errors = []
- skip_cold_start = 0
- skip_payback = 0
- for item in plan:
- ad_id = item.get("ad_id", "unknown")
- action = item.get("action", "")
- # 跳过非操作项
- if action not in ("increase", "decrease", "close"):
- continue
- # 冷启动保护
- if item.get("is_cold_start", False):
- skip_cold_start += 1
- warnings.append(f" 跳过 ad_id={ad_id}: {item.get('cold_start_reason', '冷启动保护')}")
- continue
- # 赔付门槛保护(仅针对 close)
- conversions = item.get("conversions_count", 0)
- if action == "close" and 3 <= conversions < 6:
- skip_payback += 1
- warnings.append(f" 跳过关停 ad_id={ad_id}: 转化数={conversions},接近赔付门槛")
- continue
- # 出价范围检查
- new_bid = item.get("new_bid")
- if new_bid is not None and action in ("increase", "decrease"):
- if new_bid < MIN_BID:
- warnings.append(f" ad_id={ad_id}: 出价 {new_bid} < {MIN_BID},自动调整为 {MIN_BID}")
- item["new_bid"] = MIN_BID
- elif new_bid > MAX_BID:
- warnings.append(f" ad_id={ad_id}: 出价 {new_bid} > {MAX_BID},自动调整为 {MAX_BID}")
- item["new_bid"] = MAX_BID
- # 调整幅度检查:> 15% 自动 clamp,不放行
- current_bid = item.get("current_bid")
- if current_bid and new_bid and action in ("increase", "decrease"):
- ratio = abs(new_bid - current_bid) / current_bid
- if ratio > 0.15:
- # 自动 clamp 到 15% 幅度
- if action == "increase":
- clamped_bid = int(current_bid * 1.15)
- else:
- clamped_bid = int(current_bid * 0.85)
- clamped_bid = max(MIN_BID, min(MAX_BID, clamped_bid))
- warnings.append(
- f" ad_id={ad_id}: 调整幅度 {ratio:.0%} > 15%,"
- f"已 clamp: {new_bid} -> {clamped_bid} (限制在 15% 内)"
- )
- item["new_bid"] = clamped_bid
- validated.append(item)
- if skip_cold_start:
- warnings.insert(0, f"冷启动保护跳过: {skip_cold_start} 个广告")
- if skip_payback:
- warnings.insert(0 if not skip_cold_start else 1, f"赔付门槛保护跳过: {skip_payback} 个广告")
- return validated, warnings, errors
- @tool(description="加载并执行运营确认后的出价调整方案(支持 Excel 文件或直接传入方案列表)")
- async def execute_adjustment_plan(
- account_id: int,
- excel_path: str = "",
- adjustment_plan: Optional[List[Dict]] = None,
- dry_run: bool = False,
- execute_close: bool = False,
- context: Optional[ToolContext] = None,
- ) -> ToolResult:
- """加载确认后的方案并执行出价调整和广告关停。
- Args:
- account_id: 账户ID(传 0 则使用方案中每条记录的 account_id)
- excel_path: Excel 文件路径(与 adjustment_plan 二选一)
- adjustment_plan: 调整方案列表(与 excel_path 二选一)
- dry_run: True=只验证不执行,输出模拟报告
- execute_close: True=同时执行关停操作(默认不执行,需运营单独确认)
- """
- try:
- # === 阶段一:加载方案 ===
- source = ""
- if excel_path:
- path = Path(excel_path)
- if not path.exists():
- return ToolResult(title="执行失败", output=f"Excel 文件不存在: {excel_path}")
- plan = _load_plan_from_excel(excel_path)
- source = f"Excel: {path.name}"
- elif adjustment_plan:
- plan = adjustment_plan
- source = "上游传入"
- else:
- return ToolResult(title="执行失败", output="请提供 excel_path 或 adjustment_plan")
- if not plan:
- return ToolResult(title="执行失败", output="方案为空,无可执行项")
- # === 阶段二:验证 ===
- validated, warnings, errors = _validate_plan(plan)
- mode = "Dry-Run" if dry_run else "正式执行"
- report_lines = [
- "=" * 50,
- f"执行报告",
- "=" * 50,
- f"方案来源: {source}",
- f"执行模式: {mode}",
- f"方案总数: {len(plan)} 条",
- f"有效操作: {len(validated)} 条",
- "",
- ]
- if warnings:
- report_lines.append("验证警告:")
- report_lines.extend(warnings)
- report_lines.append("")
- if errors:
- report_lines.append("验证错误:")
- report_lines.extend(errors)
- report_lines.append("")
- # 分类统计
- increase_items = [v for v in validated if v["action"] == "increase"]
- decrease_items = [v for v in validated if v["action"] == "decrease"]
- close_items = [v for v in validated if v["action"] == "close"]
- bid_items = increase_items + decrease_items
- report_lines.append(f"出价调整: 提价 {len(increase_items)} 个 + 降价 {len(decrease_items)} 个 = {len(bid_items)} 个")
- report_lines.append(f"广告关停: {len(close_items)} 个 {'(本次执行)' if execute_close else '(待确认)'}")
- report_lines.append("")
- # === Dry-Run 模式:只输出不执行 ===
- if dry_run:
- if bid_items:
- report_lines.append("将要执行的出价调整:")
- for item in bid_items[:20]:
- report_lines.append(
- f" ad_id={item['ad_id']} | {item['action']} | "
- f"{item.get('current_bid', '?')} -> {item.get('new_bid', '?')} 分"
- )
- if len(bid_items) > 20:
- report_lines.append(f" ... 还有 {len(bid_items) - 20} 个")
- if close_items:
- report_lines.append("\n将要关停的广告:")
- for item in close_items[:10]:
- report_lines.append(
- f" ad_id={item['ad_id']} | 消耗:{item.get('cost', 0):.0f}元 | "
- f"转化:{item.get('conversions_count', 0)}"
- )
- report_lines.append("\n[Dry-Run] 以上操作未实际执行")
- return ToolResult(
- title=f"Dry-Run 报告({len(validated)} 条有效操作)",
- output="\n".join(report_lines),
- metadata={"mode": "dry_run", "validated_count": len(validated)},
- )
- # === 阶段三:执行出价调整 ===
- bid_success = 0
- bid_failed = 0
- bid_errors = []
- if bid_items:
- report_lines.append("--- 执行出价调整 ---")
- for item in bid_items:
- try:
- target_account = account_id if account_id else item.get("account_id", 0)
- await ad_update(
- account_id=target_account,
- adgroup_id=item["ad_id"],
- bid_amount=item["new_bid"],
- )
- bid_success += 1
- logger.info(
- "出价调整成功: ad_id=%s, %s -> %s (%s)",
- item["ad_id"], item.get("current_bid"), item["new_bid"], item["action"]
- )
- except Exception as e:
- bid_failed += 1
- err_msg = f"ad_id={item['ad_id']}: {str(e)}"
- bid_errors.append(err_msg)
- logger.error("出价调整失败: %s", err_msg)
- report_lines.append(f"提价(increase): {len(increase_items)} 个")
- report_lines.append(f"降价(decrease): {len(decrease_items)} 个")
- report_lines.append(f"成功: {bid_success} / 失败: {bid_failed}")
- report_lines.append("")
- # === 阶段四:执行广告关停 ===
- close_success = 0
- close_failed = 0
- close_errors = []
- if close_items and execute_close:
- report_lines.append("--- 执行广告关停 ---")
- # 按 account_id 分组,避免跨账户批量操作
- from collections import defaultdict
- close_by_account = defaultdict(list)
- for item in close_items:
- target_acct = account_id if account_id else item.get("account_id", 0)
- close_by_account[target_acct].append(item["ad_id"])
- for target_acct, ad_ids in close_by_account.items():
- try:
- result = await ad_batch_update_status(
- adgroup_ids=ad_ids,
- configured_status="AD_STATUS_SUSPEND",
- account_id=target_acct,
- )
- if "失败" not in result.title:
- close_success += len(ad_ids)
- logger.info("关停成功: 账户%s, %d 个广告", target_acct, len(ad_ids))
- else:
- close_failed += len(ad_ids)
- close_errors.append(f"账户{target_acct}: {result.output}")
- logger.error("关停失败: 账户%s, %s", target_acct, result.output)
- except Exception as e:
- close_failed += len(ad_ids)
- close_errors.append(f"账户{target_acct}: {str(e)}")
- logger.error("关停异常: 账户%s, %s", target_acct, e)
- report_lines.append(f"关停: 成功 {close_success} / 失败 {close_failed}")
- report_lines.append("")
- elif close_items and not execute_close:
- report_lines.append("--- 广告关停(待确认)---")
- report_lines.append(f"以下 {len(close_items)} 个广告建议关停,请确认后使用 execute_close=True 执行:")
- for item in close_items[:10]:
- report_lines.append(
- f" ad_id={item['ad_id']} | 消耗:{item.get('cost', 0):.0f}元 | "
- f"转化:{item.get('conversions_count', 0)} | 效率:{item.get('efficiency', 0)}"
- )
- if len(close_items) > 10:
- report_lines.append(f" ... 还有 {len(close_items) - 10} 个")
- report_lines.append("")
- # === 阶段五:执行后监控 ===
- monitor_result = None
- if bid_success > 0 or close_success > 0:
- report_lines.append("--- 执行后监控 ---")
- try:
- target_account = account_id if account_id else 0
- monitor_result = await monitor_check_metrics(
- account_id=target_account,
- check_items=["cost_spike", "budget_overrun"],
- time_window="1h",
- )
- report_lines.append(f"监控结果: {monitor_result.title}")
- if monitor_result.metadata and monitor_result.metadata.get("anomalies"):
- for a in monitor_result.metadata["anomalies"]:
- report_lines.append(f" {a.get('type', '')}: {a.get('message', '')}")
- else:
- report_lines.append(" 无异常")
- except Exception as e:
- report_lines.append(f"监控检查失败: {str(e)}")
- logger.warning("执行后监控失败: %s", e)
- report_lines.append("")
- # === 失败详情 ===
- all_errors = bid_errors + close_errors
- if all_errors:
- report_lines.append("--- 失败详情 ---")
- for err in all_errors[:20]:
- report_lines.append(f" {err}")
- if len(all_errors) > 20:
- report_lines.append(f" ... 还有 {len(all_errors) - 20} 个错误")
- report_lines.append("")
- # === 汇总 ===
- report_lines.append("=" * 50)
- total_success = bid_success + close_success
- total_failed = bid_failed + close_failed
- report_lines.append(f"总计: 成功 {total_success} / 失败 {total_failed}")
- report_lines.append(f"时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
- return ToolResult(
- title=f"执行完成(成功{total_success}/失败{total_failed})",
- output="\n".join(report_lines),
- metadata={
- "mode": "execute",
- "bid_success": bid_success,
- "bid_failed": bid_failed,
- "close_success": close_success,
- "close_failed": close_failed,
- "close_pending": len(close_items) if not execute_close else 0,
- "errors": all_errors,
- },
- )
- except Exception as e:
- logger.error("execute_adjustment_plan 失败: %s", e, exc_info=True)
- return ToolResult(title="执行失败", output=str(e))
|