monitor_tools.py 5.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167
  1. """
  2. 监控告警工具 — 实时异常检测与熔断
  3. """
  4. import logging
  5. from typing import Any, Dict, List, Optional
  6. from agent.tools import tool
  7. from agent.tools.models import ToolResult
  8. logger = logging.getLogger(__name__)
  9. @tool(description="检查投放指标是否异常(成本突增/转化骤降/预算超支/CTR异常)")
  10. async def monitor_check_metrics(
  11. account_id: int,
  12. check_items: List[str],
  13. threshold: Optional[Dict[str, float]] = None,
  14. time_window: str = "1h",
  15. ) -> ToolResult:
  16. """检查各项投放指标是否触发异常阈值。
  17. Args:
  18. account_id: 广告主账号ID
  19. check_items: 检查项列表,可选:
  20. - cost_spike: 成本突增(小时CPA > 目标CPA × 阈值)
  21. - cvr_drop: 转化率骤降(小时CVR < 昨日同时段 × 阈值)
  22. - budget_overrun: 预算超支(日消耗 > 日预算 × 阈值)
  23. - ctr_anomaly: CTR异常低(CTR < 历史均值 × 阈值)
  24. - balance_low: 账户余额不足(余额 < N天预估消耗)
  25. threshold: 阈值配置,如 {"cost_spike_ratio": 2.0, "cvr_drop_ratio": 0.5}
  26. time_window: 检查时间窗口 "1h"(最近1小时)/ "3h" / "today"(今日)
  27. Returns:
  28. 异常检测结果,包含触发的异常项和详细信息
  29. """
  30. from examples.auto_put_ad.tools.data_query import data_query
  31. from examples.auto_put_ad.tools.ad_api import account_get_info
  32. default_threshold = {
  33. "cost_spike_ratio": 2.0, # CPA 超过目标 2 倍
  34. "cvr_drop_ratio": 0.5, # CVR 低于昨日 50%
  35. "budget_overrun_ratio": 0.95, # 消耗达到预算 95%
  36. "ctr_low_ratio": 0.5, # CTR 低于均值 50%
  37. "balance_days": 3, # 余额不足 3 天消耗
  38. }
  39. thresholds = {**default_threshold, **(threshold or {})}
  40. anomalies = []
  41. # 1. 成本突增检查
  42. if "cost_spike" in check_items:
  43. # 查询最近1小时的CPA
  44. result = await data_query(
  45. query_type="cost_trend",
  46. date_range={"start_date": "today", "end_date": "today"},
  47. dimensions=["hour"],
  48. metrics=["cpa"],
  49. filters={"account_id": account_id},
  50. )
  51. # 实际实现需要对比目标CPA
  52. # 这里简化为占位逻辑
  53. anomalies.append({
  54. "type": "cost_spike",
  55. "severity": "warning",
  56. "message": "(占位)成本突增检测需要实现目标CPA对比逻辑"
  57. })
  58. # 2. 转化率骤降检查
  59. if "cvr_drop" in check_items:
  60. anomalies.append({
  61. "type": "cvr_drop",
  62. "severity": "info",
  63. "message": "(占位)CVR骤降检测需要实现同比逻辑"
  64. })
  65. # 3. 预算超支检查
  66. if "budget_overrun" in check_items:
  67. result = await data_query(
  68. query_type="account_summary",
  69. date_range={"start_date": "today", "end_date": "today"},
  70. metrics=["cost"],
  71. filters={"account_id": account_id},
  72. )
  73. # 需要对比日预算
  74. anomalies.append({
  75. "type": "budget_overrun",
  76. "severity": "info",
  77. "message": "(占位)预算超支检测需要获取日预算配置"
  78. })
  79. # 4. CTR异常检查
  80. if "ctr_anomaly" in check_items:
  81. anomalies.append({
  82. "type": "ctr_anomaly",
  83. "severity": "info",
  84. "message": "(占位)CTR异常检测需要历史均值数据"
  85. })
  86. # 5. 余额不足检查
  87. if "balance_low" in check_items:
  88. result = await account_get_info(account_id=account_id)
  89. if "失败" not in result.title:
  90. balance = (result.metadata or {}).get("balance", 0)
  91. # 需要预估日消耗
  92. anomalies.append({
  93. "type": "balance_low",
  94. "severity": "info",
  95. "message": f"账户余额: {balance/100:.2f}元(需实现日消耗预估)"
  96. })
  97. # 汇总结果
  98. if not anomalies:
  99. return ToolResult(title="监控检查通过", output=f"所有检查项正常({', '.join(check_items)})")
  100. critical = [a for a in anomalies if a.get("severity") == "critical"]
  101. warnings = [a for a in anomalies if a.get("severity") == "warning"]
  102. lines = [f"检测到 {len(anomalies)} 项异常:"]
  103. for a in anomalies:
  104. icon = "🔴" if a["severity"] == "critical" else "⚠️" if a["severity"] == "warning" else "ℹ️"
  105. lines.append(f"{icon} [{a['type']}] {a['message']}")
  106. return ToolResult(
  107. title=f"监控检查完成({len(critical)}个严重,{len(warnings)}个警告)",
  108. output="\n".join(lines),
  109. metadata={"anomalies": anomalies, "thresholds": thresholds},
  110. )
  111. @tool(description="执行熔断操作:批量暂停异常广告")
  112. async def monitor_circuit_break(
  113. account_id: int,
  114. target_ids: List[int],
  115. reason: str,
  116. ) -> ToolResult:
  117. """批量暂停异常广告,记录熔断原因(3.0 只有广告层级)。
  118. Args:
  119. account_id: 广告主账号ID
  120. target_ids: 需要熔断的广告ID列表(adgroup_id)
  121. reason: 熔断原因(记录到日志)
  122. """
  123. from examples.auto_put_ad.tools.ad_api import ad_batch_update_status
  124. logger.warning(
  125. "[熔断] 账户 %s 触发熔断,暂停 %d 个广告,原因: %s",
  126. account_id, len(target_ids), reason
  127. )
  128. result = await ad_batch_update_status(
  129. adgroup_ids=target_ids,
  130. configured_status="AD_STATUS_SUSPEND",
  131. account_id=account_id,
  132. )
  133. if "失败" in result.title:
  134. return ToolResult(
  135. title="熔断执行失败",
  136. output=f"暂停广告失败: {result.output}\n原因: {reason}"
  137. )
  138. return ToolResult(
  139. title=f"熔断执行完成(暂停{len(target_ids)}个广告)",
  140. output=f"已暂停广告: {target_ids}\n熔断原因: {reason}\n\n{result.output}",
  141. metadata={"target_ids": target_ids, "reason": reason},
  142. )