""" 执行 Agent — 读取确认后的调整方案并执行 API 操作 支持两种输入方式: 1. 直接传入 adjustment_plan 列表(从分析 Agent 输出获取) 2. 传入 Excel 文件路径(运营在 Excel 中确认/修改后) 主要功能: - 出价调整(increase / decrease) - 广告关停(close → AD_STATUS_SUSPEND) - 冷启动保护(跳过 is_cold_start=True 的广告) - 赔付门槛保护(跳过 conversions_count 3-5 的关停) - Dry-run 模式(只验证不执行) - 执行后自动触发监控检查 """ import logging from datetime import datetime from pathlib import Path from typing import Any, Dict, List, Optional from agent.tools import tool from agent.tools.models import ToolContext, ToolResult from examples.auto_put_ad.tools.ad_api import ad_batch_update_status, ad_update from examples.auto_put_ad.tools.budget_calc import MIN_BID, MAX_BID from examples.auto_put_ad.tools.monitor_tools import monitor_check_metrics logger = logging.getLogger(__name__) def _load_plan_from_excel(excel_path: str) -> List[Dict]: """从 Excel 文件加载调整方案""" import pandas as pd df = pd.read_excel(excel_path) # 映射中文列名到英文字段(兼容两种格式) column_map = { "广告ID": "ad_id", "广告名称": "ad_name", "账户ID": "account_id", "动作": "action", "当前出价(分)": "current_bid", "新出价(分)": "new_bid", "调整幅度": "adjustment_ratio", "ROI等级": "roi_level", "跑量等级": "volume_level", "效率分": "efficiency", "昨日消耗(元)": "cost", "打开数": "open_count", "转化数": "conversions_count", "冷启动": "is_cold_start", "冷启动原因": "cold_start_reason", "广告状态": "ad_status", } # 重命名已有的中文列 rename_cols = {k: v for k, v in column_map.items() if k in df.columns} if rename_cols: df = df.rename(columns=rename_cols) # 转换数据类型 for col in ["ad_id", "account_id", "current_bid", "new_bid", "open_count", "conversions_count"]: if col in df.columns: df[col] = pd.to_numeric(df[col], errors="coerce").fillna(0).astype(int) if "is_cold_start" in df.columns: df["is_cold_start"] = df["is_cold_start"].astype(bool) if "cost" in df.columns: df["cost"] = pd.to_numeric(df["cost"], errors="coerce").fillna(0.0) return df.to_dict("records") def _validate_plan(plan: List[Dict]) -> tuple: """验证调整方案,返回 (validated_plan, warnings, errors) 验证规则: 1. 出价范围 [MIN_BID, MAX_BID] 2. 单次调整幅度 <= 15% 3. 冷启动广告跳过 4. 赔付门槛保护 """ validated = [] warnings = [] errors = [] skip_cold_start = 0 skip_payback = 0 for item in plan: ad_id = item.get("ad_id", "unknown") action = item.get("action", "") # 跳过非操作项 if action not in ("increase", "decrease", "close"): continue # 冷启动保护 if item.get("is_cold_start", False): skip_cold_start += 1 warnings.append(f" 跳过 ad_id={ad_id}: {item.get('cold_start_reason', '冷启动保护')}") continue # 赔付门槛保护(仅针对 close) conversions = item.get("conversions_count", 0) if action == "close" and 3 <= conversions < 6: skip_payback += 1 warnings.append(f" 跳过关停 ad_id={ad_id}: 转化数={conversions},接近赔付门槛") continue # 出价范围检查 new_bid = item.get("new_bid") if new_bid is not None and action in ("increase", "decrease"): if new_bid < MIN_BID: warnings.append(f" ad_id={ad_id}: 出价 {new_bid} < {MIN_BID},自动调整为 {MIN_BID}") item["new_bid"] = MIN_BID elif new_bid > MAX_BID: warnings.append(f" ad_id={ad_id}: 出价 {new_bid} > {MAX_BID},自动调整为 {MAX_BID}") item["new_bid"] = MAX_BID # 调整幅度检查:> 15% 自动 clamp,不放行 current_bid = item.get("current_bid") if current_bid and new_bid and action in ("increase", "decrease"): ratio = abs(new_bid - current_bid) / current_bid if ratio > 0.15: # 自动 clamp 到 15% 幅度 if action == "increase": clamped_bid = int(current_bid * 1.15) else: clamped_bid = int(current_bid * 0.85) clamped_bid = max(MIN_BID, min(MAX_BID, clamped_bid)) warnings.append( f" ad_id={ad_id}: 调整幅度 {ratio:.0%} > 15%," f"已 clamp: {new_bid} -> {clamped_bid} (限制在 15% 内)" ) item["new_bid"] = clamped_bid validated.append(item) if skip_cold_start: warnings.insert(0, f"冷启动保护跳过: {skip_cold_start} 个广告") if skip_payback: warnings.insert(0 if not skip_cold_start else 1, f"赔付门槛保护跳过: {skip_payback} 个广告") return validated, warnings, errors @tool(description="加载并执行运营确认后的出价调整方案(支持 Excel 文件或直接传入方案列表)") async def execute_adjustment_plan( account_id: int, excel_path: str = "", adjustment_plan: Optional[List[Dict]] = None, dry_run: bool = False, execute_close: bool = False, context: Optional[ToolContext] = None, ) -> ToolResult: """加载确认后的方案并执行出价调整和广告关停。 Args: account_id: 账户ID(传 0 则使用方案中每条记录的 account_id) excel_path: Excel 文件路径(与 adjustment_plan 二选一) adjustment_plan: 调整方案列表(与 excel_path 二选一) dry_run: True=只验证不执行,输出模拟报告 execute_close: True=同时执行关停操作(默认不执行,需运营单独确认) """ try: # === 阶段一:加载方案 === source = "" if excel_path: path = Path(excel_path) if not path.exists(): return ToolResult(title="执行失败", output=f"Excel 文件不存在: {excel_path}") plan = _load_plan_from_excel(excel_path) source = f"Excel: {path.name}" elif adjustment_plan: plan = adjustment_plan source = "上游传入" else: return ToolResult(title="执行失败", output="请提供 excel_path 或 adjustment_plan") if not plan: return ToolResult(title="执行失败", output="方案为空,无可执行项") # === 阶段二:验证 === validated, warnings, errors = _validate_plan(plan) mode = "Dry-Run" if dry_run else "正式执行" report_lines = [ "=" * 50, f"执行报告", "=" * 50, f"方案来源: {source}", f"执行模式: {mode}", f"方案总数: {len(plan)} 条", f"有效操作: {len(validated)} 条", "", ] if warnings: report_lines.append("验证警告:") report_lines.extend(warnings) report_lines.append("") if errors: report_lines.append("验证错误:") report_lines.extend(errors) report_lines.append("") # 分类统计 increase_items = [v for v in validated if v["action"] == "increase"] decrease_items = [v for v in validated if v["action"] == "decrease"] close_items = [v for v in validated if v["action"] == "close"] bid_items = increase_items + decrease_items report_lines.append(f"出价调整: 提价 {len(increase_items)} 个 + 降价 {len(decrease_items)} 个 = {len(bid_items)} 个") report_lines.append(f"广告关停: {len(close_items)} 个 {'(本次执行)' if execute_close else '(待确认)'}") report_lines.append("") # === Dry-Run 模式:只输出不执行 === if dry_run: if bid_items: report_lines.append("将要执行的出价调整:") for item in bid_items[:20]: report_lines.append( f" ad_id={item['ad_id']} | {item['action']} | " f"{item.get('current_bid', '?')} -> {item.get('new_bid', '?')} 分" ) if len(bid_items) > 20: report_lines.append(f" ... 还有 {len(bid_items) - 20} 个") if close_items: report_lines.append("\n将要关停的广告:") for item in close_items[:10]: report_lines.append( f" ad_id={item['ad_id']} | 消耗:{item.get('cost', 0):.0f}元 | " f"转化:{item.get('conversions_count', 0)}" ) report_lines.append("\n[Dry-Run] 以上操作未实际执行") return ToolResult( title=f"Dry-Run 报告({len(validated)} 条有效操作)", output="\n".join(report_lines), metadata={"mode": "dry_run", "validated_count": len(validated)}, ) # === 阶段三:执行出价调整 === bid_success = 0 bid_failed = 0 bid_errors = [] if bid_items: report_lines.append("--- 执行出价调整 ---") for item in bid_items: try: target_account = account_id if account_id else item.get("account_id", 0) await ad_update( account_id=target_account, adgroup_id=item["ad_id"], bid_amount=item["new_bid"], ) bid_success += 1 logger.info( "出价调整成功: ad_id=%s, %s -> %s (%s)", item["ad_id"], item.get("current_bid"), item["new_bid"], item["action"] ) except Exception as e: bid_failed += 1 err_msg = f"ad_id={item['ad_id']}: {str(e)}" bid_errors.append(err_msg) logger.error("出价调整失败: %s", err_msg) report_lines.append(f"提价(increase): {len(increase_items)} 个") report_lines.append(f"降价(decrease): {len(decrease_items)} 个") report_lines.append(f"成功: {bid_success} / 失败: {bid_failed}") report_lines.append("") # === 阶段四:执行广告关停 === close_success = 0 close_failed = 0 close_errors = [] if close_items and execute_close: report_lines.append("--- 执行广告关停 ---") # 按 account_id 分组,避免跨账户批量操作 from collections import defaultdict close_by_account = defaultdict(list) for item in close_items: target_acct = account_id if account_id else item.get("account_id", 0) close_by_account[target_acct].append(item["ad_id"]) for target_acct, ad_ids in close_by_account.items(): try: result = await ad_batch_update_status( adgroup_ids=ad_ids, configured_status="AD_STATUS_SUSPEND", account_id=target_acct, ) if "失败" not in result.title: close_success += len(ad_ids) logger.info("关停成功: 账户%s, %d 个广告", target_acct, len(ad_ids)) else: close_failed += len(ad_ids) close_errors.append(f"账户{target_acct}: {result.output}") logger.error("关停失败: 账户%s, %s", target_acct, result.output) except Exception as e: close_failed += len(ad_ids) close_errors.append(f"账户{target_acct}: {str(e)}") logger.error("关停异常: 账户%s, %s", target_acct, e) report_lines.append(f"关停: 成功 {close_success} / 失败 {close_failed}") report_lines.append("") elif close_items and not execute_close: report_lines.append("--- 广告关停(待确认)---") report_lines.append(f"以下 {len(close_items)} 个广告建议关停,请确认后使用 execute_close=True 执行:") for item in close_items[:10]: report_lines.append( f" ad_id={item['ad_id']} | 消耗:{item.get('cost', 0):.0f}元 | " f"转化:{item.get('conversions_count', 0)} | 效率:{item.get('efficiency', 0)}" ) if len(close_items) > 10: report_lines.append(f" ... 还有 {len(close_items) - 10} 个") report_lines.append("") # === 阶段五:执行后监控 === monitor_result = None if bid_success > 0 or close_success > 0: report_lines.append("--- 执行后监控 ---") try: target_account = account_id if account_id else 0 monitor_result = await monitor_check_metrics( account_id=target_account, check_items=["cost_spike", "budget_overrun"], time_window="1h", ) report_lines.append(f"监控结果: {monitor_result.title}") if monitor_result.metadata and monitor_result.metadata.get("anomalies"): for a in monitor_result.metadata["anomalies"]: report_lines.append(f" {a.get('type', '')}: {a.get('message', '')}") else: report_lines.append(" 无异常") except Exception as e: report_lines.append(f"监控检查失败: {str(e)}") logger.warning("执行后监控失败: %s", e) report_lines.append("") # === 失败详情 === all_errors = bid_errors + close_errors if all_errors: report_lines.append("--- 失败详情 ---") for err in all_errors[:20]: report_lines.append(f" {err}") if len(all_errors) > 20: report_lines.append(f" ... 还有 {len(all_errors) - 20} 个错误") report_lines.append("") # === 汇总 === report_lines.append("=" * 50) total_success = bid_success + close_success total_failed = bid_failed + close_failed report_lines.append(f"总计: 成功 {total_success} / 失败 {total_failed}") report_lines.append(f"时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}") return ToolResult( title=f"执行完成(成功{total_success}/失败{total_failed})", output="\n".join(report_lines), metadata={ "mode": "execute", "bid_success": bid_success, "bid_failed": bid_failed, "close_success": close_success, "close_failed": close_failed, "close_pending": len(close_items) if not execute_close else 0, "errors": all_errors, }, ) except Exception as e: logger.error("execute_adjustment_plan 失败: %s", e, exc_info=True) return ToolResult(title="执行失败", output=str(e))