| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167 |
- """
- 监控告警工具 — 实时异常检测与熔断
- """
- import logging
- from typing import Any, Dict, List, Optional
- from agent.tools import tool
- from agent.tools.models import ToolResult
- logger = logging.getLogger(__name__)
- @tool(description="检查投放指标是否异常(成本突增/转化骤降/预算超支/CTR异常)")
- async def monitor_check_metrics(
- account_id: int,
- check_items: List[str],
- threshold: Optional[Dict[str, float]] = None,
- time_window: str = "1h",
- ) -> ToolResult:
- """检查各项投放指标是否触发异常阈值。
- Args:
- account_id: 广告主账号ID
- check_items: 检查项列表,可选:
- - cost_spike: 成本突增(小时CPA > 目标CPA × 阈值)
- - cvr_drop: 转化率骤降(小时CVR < 昨日同时段 × 阈值)
- - budget_overrun: 预算超支(日消耗 > 日预算 × 阈值)
- - ctr_anomaly: CTR异常低(CTR < 历史均值 × 阈值)
- - balance_low: 账户余额不足(余额 < N天预估消耗)
- threshold: 阈值配置,如 {"cost_spike_ratio": 2.0, "cvr_drop_ratio": 0.5}
- time_window: 检查时间窗口 "1h"(最近1小时)/ "3h" / "today"(今日)
- Returns:
- 异常检测结果,包含触发的异常项和详细信息
- """
- from examples.auto_put_ad.tools.data_query import data_query
- from examples.auto_put_ad.tools.ad_api import account_get_info
- default_threshold = {
- "cost_spike_ratio": 2.0, # CPA 超过目标 2 倍
- "cvr_drop_ratio": 0.5, # CVR 低于昨日 50%
- "budget_overrun_ratio": 0.95, # 消耗达到预算 95%
- "ctr_low_ratio": 0.5, # CTR 低于均值 50%
- "balance_days": 3, # 余额不足 3 天消耗
- }
- thresholds = {**default_threshold, **(threshold or {})}
- anomalies = []
- # 1. 成本突增检查
- if "cost_spike" in check_items:
- # 查询最近1小时的CPA
- result = await data_query(
- query_type="cost_trend",
- date_range={"start_date": "today", "end_date": "today"},
- dimensions=["hour"],
- metrics=["cpa"],
- filters={"account_id": account_id},
- )
- # 实际实现需要对比目标CPA
- # 这里简化为占位逻辑
- anomalies.append({
- "type": "cost_spike",
- "severity": "warning",
- "message": "(占位)成本突增检测需要实现目标CPA对比逻辑"
- })
- # 2. 转化率骤降检查
- if "cvr_drop" in check_items:
- anomalies.append({
- "type": "cvr_drop",
- "severity": "info",
- "message": "(占位)CVR骤降检测需要实现同比逻辑"
- })
- # 3. 预算超支检查
- if "budget_overrun" in check_items:
- result = await data_query(
- query_type="account_summary",
- date_range={"start_date": "today", "end_date": "today"},
- metrics=["cost"],
- filters={"account_id": account_id},
- )
- # 需要对比日预算
- anomalies.append({
- "type": "budget_overrun",
- "severity": "info",
- "message": "(占位)预算超支检测需要获取日预算配置"
- })
- # 4. CTR异常检查
- if "ctr_anomaly" in check_items:
- anomalies.append({
- "type": "ctr_anomaly",
- "severity": "info",
- "message": "(占位)CTR异常检测需要历史均值数据"
- })
- # 5. 余额不足检查
- if "balance_low" in check_items:
- result = await account_get_info(account_id=account_id)
- if "失败" not in result.title:
- balance = (result.metadata or {}).get("balance", 0)
- # 需要预估日消耗
- anomalies.append({
- "type": "balance_low",
- "severity": "info",
- "message": f"账户余额: {balance/100:.2f}元(需实现日消耗预估)"
- })
- # 汇总结果
- if not anomalies:
- return ToolResult(title="监控检查通过", output=f"所有检查项正常({', '.join(check_items)})")
- critical = [a for a in anomalies if a.get("severity") == "critical"]
- warnings = [a for a in anomalies if a.get("severity") == "warning"]
- lines = [f"检测到 {len(anomalies)} 项异常:"]
- for a in anomalies:
- icon = "🔴" if a["severity"] == "critical" else "⚠️" if a["severity"] == "warning" else "ℹ️"
- lines.append(f"{icon} [{a['type']}] {a['message']}")
- return ToolResult(
- title=f"监控检查完成({len(critical)}个严重,{len(warnings)}个警告)",
- output="\n".join(lines),
- metadata={"anomalies": anomalies, "thresholds": thresholds},
- )
- @tool(description="执行熔断操作:批量暂停异常广告")
- async def monitor_circuit_break(
- account_id: int,
- target_ids: List[int],
- reason: str,
- ) -> ToolResult:
- """批量暂停异常广告,记录熔断原因(3.0 只有广告层级)。
- Args:
- account_id: 广告主账号ID
- target_ids: 需要熔断的广告ID列表(adgroup_id)
- reason: 熔断原因(记录到日志)
- """
- from examples.auto_put_ad.tools.ad_api import ad_batch_update_status
- logger.warning(
- "[熔断] 账户 %s 触发熔断,暂停 %d 个广告,原因: %s",
- account_id, len(target_ids), reason
- )
- result = await ad_batch_update_status(
- adgroup_ids=target_ids,
- configured_status="AD_STATUS_SUSPEND",
- account_id=account_id,
- )
- if "失败" in result.title:
- return ToolResult(
- title="熔断执行失败",
- output=f"暂停广告失败: {result.output}\n原因: {reason}"
- )
- return ToolResult(
- title=f"熔断执行完成(暂停{len(target_ids)}个广告)",
- output=f"已暂停广告: {target_ids}\n熔断原因: {reason}\n\n{result.output}",
- metadata={"target_ids": target_ids, "reason": reason},
- )
|