""" 监控告警工具 — 实时异常检测与熔断 """ import logging from typing import Any, Dict, List, Optional from agent.tools import tool from agent.tools.models import ToolResult logger = logging.getLogger(__name__) @tool(description="检查投放指标是否异常(成本突增/转化骤降/预算超支/CTR异常)") async def monitor_check_metrics( account_id: int, check_items: List[str], threshold: Optional[Dict[str, float]] = None, time_window: str = "1h", ) -> ToolResult: """检查各项投放指标是否触发异常阈值。 Args: account_id: 广告主账号ID check_items: 检查项列表,可选: - cost_spike: 成本突增(小时CPA > 目标CPA × 阈值) - cvr_drop: 转化率骤降(小时CVR < 昨日同时段 × 阈值) - budget_overrun: 预算超支(日消耗 > 日预算 × 阈值) - ctr_anomaly: CTR异常低(CTR < 历史均值 × 阈值) - balance_low: 账户余额不足(余额 < N天预估消耗) threshold: 阈值配置,如 {"cost_spike_ratio": 2.0, "cvr_drop_ratio": 0.5} time_window: 检查时间窗口 "1h"(最近1小时)/ "3h" / "today"(今日) Returns: 异常检测结果,包含触发的异常项和详细信息 """ from examples.auto_put_ad.tools.data_query import data_query from examples.auto_put_ad.tools.ad_api import account_get_info default_threshold = { "cost_spike_ratio": 2.0, # CPA 超过目标 2 倍 "cvr_drop_ratio": 0.5, # CVR 低于昨日 50% "budget_overrun_ratio": 0.95, # 消耗达到预算 95% "ctr_low_ratio": 0.5, # CTR 低于均值 50% "balance_days": 3, # 余额不足 3 天消耗 } thresholds = {**default_threshold, **(threshold or {})} anomalies = [] # 1. 成本突增检查 if "cost_spike" in check_items: # 查询最近1小时的CPA result = await data_query( query_type="cost_trend", date_range={"start_date": "today", "end_date": "today"}, dimensions=["hour"], metrics=["cpa"], filters={"account_id": account_id}, ) # 实际实现需要对比目标CPA # 这里简化为占位逻辑 anomalies.append({ "type": "cost_spike", "severity": "warning", "message": "(占位)成本突增检测需要实现目标CPA对比逻辑" }) # 2. 转化率骤降检查 if "cvr_drop" in check_items: anomalies.append({ "type": "cvr_drop", "severity": "info", "message": "(占位)CVR骤降检测需要实现同比逻辑" }) # 3. 预算超支检查 if "budget_overrun" in check_items: result = await data_query( query_type="account_summary", date_range={"start_date": "today", "end_date": "today"}, metrics=["cost"], filters={"account_id": account_id}, ) # 需要对比日预算 anomalies.append({ "type": "budget_overrun", "severity": "info", "message": "(占位)预算超支检测需要获取日预算配置" }) # 4. CTR异常检查 if "ctr_anomaly" in check_items: anomalies.append({ "type": "ctr_anomaly", "severity": "info", "message": "(占位)CTR异常检测需要历史均值数据" }) # 5. 余额不足检查 if "balance_low" in check_items: result = await account_get_info(account_id=account_id) if "失败" not in result.title: balance = (result.metadata or {}).get("balance", 0) # 需要预估日消耗 anomalies.append({ "type": "balance_low", "severity": "info", "message": f"账户余额: {balance/100:.2f}元(需实现日消耗预估)" }) # 汇总结果 if not anomalies: return ToolResult(title="监控检查通过", output=f"所有检查项正常({', '.join(check_items)})") critical = [a for a in anomalies if a.get("severity") == "critical"] warnings = [a for a in anomalies if a.get("severity") == "warning"] lines = [f"检测到 {len(anomalies)} 项异常:"] for a in anomalies: icon = "🔴" if a["severity"] == "critical" else "⚠️" if a["severity"] == "warning" else "ℹ️" lines.append(f"{icon} [{a['type']}] {a['message']}") return ToolResult( title=f"监控检查完成({len(critical)}个严重,{len(warnings)}个警告)", output="\n".join(lines), metadata={"anomalies": anomalies, "thresholds": thresholds}, ) @tool(description="执行熔断操作:批量暂停异常广告") async def monitor_circuit_break( account_id: int, target_ids: List[int], reason: str, ) -> ToolResult: """批量暂停异常广告,记录熔断原因(3.0 只有广告层级)。 Args: account_id: 广告主账号ID target_ids: 需要熔断的广告ID列表(adgroup_id) reason: 熔断原因(记录到日志) """ from examples.auto_put_ad.tools.ad_api import ad_batch_update_status logger.warning( "[熔断] 账户 %s 触发熔断,暂停 %d 个广告,原因: %s", account_id, len(target_ids), reason ) result = await ad_batch_update_status( adgroup_ids=target_ids, configured_status="AD_STATUS_SUSPEND", account_id=account_id, ) if "失败" in result.title: return ToolResult( title="熔断执行失败", output=f"暂停广告失败: {result.output}\n原因: {reason}" ) return ToolResult( title=f"熔断执行完成(暂停{len(target_ids)}个广告)", output=f"已暂停广告: {target_ids}\n熔断原因: {reason}\n\n{result.output}", metadata={"target_ids": target_ids, "reason": reason}, )