| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543 |
- """
- 账户级 + 人群包级 ROI 基线汇总 — auto_put_ad_mini (Step 4.2)
- 职责:
- - 读取 metrics CSV(calculate_roi_metrics 输出)
- - 按 account_id / audience_tier / global 三层聚合
- - 输出 p25/p50/p75 分位数作为决策基线
- 使用定位:
- - LLM 在判断广告 ROI "偏低" 时,应先和【同人群包 p25】比,而非全体均值 × 0.5
- - 跑量放大候选:同人群包 p75 是最低门槛
- - 大盘下滑信号:账户 roi_mean 比历史显著低 → 行情差,不应过度降价触发死亡螺旋
- """
- import json
- import logging
- import math
- import sys
- from datetime import datetime, timedelta
- from pathlib import Path
- from typing import Any, Dict, List, Optional, Tuple
- import numpy as np
- import pandas as pd
- from agent.tools import tool
- from agent.tools.models import ToolContext, ToolResult
- logger = logging.getLogger(__name__)
- _MINI_DIR = Path(__file__).resolve().parent.parent
- # metrics CSV 保存在 outputs/metrics_{date}.csv(roi_calculator.py line 769)
- _OUTPUTS_DIR = _MINI_DIR / "outputs"
- _PORTFOLIO_DIR = _MINI_DIR / "outputs" / "portfolio_summary"
- # 引入 config 中的市场信号阈值
- sys.path.insert(0, str(_MINI_DIR))
- try:
- from config import MARKET_VOLATILE_PCT, MARKET_TRENDING_DOWN_PCT # noqa: E402
- except ImportError:
- MARKET_VOLATILE_PCT = 0.15
- MARKET_TRENDING_DOWN_PCT = -0.10
- def _safe_float(v) -> Optional[float]:
- try:
- f = float(v)
- if math.isnan(f) or math.isinf(f):
- return None
- return f
- except (TypeError, ValueError):
- return None
- def _round_or_none(v: Optional[float], digits: int = 4) -> Optional[float]:
- return round(v, digits) if v is not None else None
- def _describe_group(df: pd.DataFrame) -> Dict[str, Any]:
- """对一个分组 DataFrame 计算统计指标。"""
- if df.empty:
- return {
- "ad_count": 0,
- "active_ads": 0,
- "daily_cost_avg": 0.0,
- "daily_revenue_avg": 0.0,
- "roi_mean": None,
- "roi_p25": None,
- "roi_p50": None,
- "roi_p75": None,
- "stable_ads_pct": 0.0,
- }
- ad_count = len(df)
- # active_ads: 7 日有消耗
- if "cost_7d_avg" in df.columns:
- active = int((pd.to_numeric(df["cost_7d_avg"], errors="coerce").fillna(0) > 0).sum())
- else:
- active = 0
- # 日消耗 / 日收入均值(跨广告的日均)
- if "cost_7d_avg" in df.columns:
- daily_cost_avg = float(pd.to_numeric(df["cost_7d_avg"], errors="coerce").fillna(0).sum())
- else:
- daily_cost_avg = 0.0
- # 日收入 = 7日总收入 / 7(若有 revenue_7d_total),否则 fallback 为空
- daily_revenue_avg = 0.0
- if "revenue_7d_total" in df.columns:
- total_rev = pd.to_numeric(df["revenue_7d_total"], errors="coerce").fillna(0).sum()
- daily_revenue_avg = float(total_rev) / 7.0
- # ROI 分位数(使用 动态ROI_7日均值,若缺则尝试 f_7日动态ROI)
- roi_col = None
- for candidate in ("动态ROI_7日均值", "f_7日动态ROI"):
- if candidate in df.columns:
- roi_col = candidate
- break
- roi_stats = {"roi_mean": None, "roi_p25": None, "roi_p50": None, "roi_p75": None}
- if roi_col:
- roi_series = pd.to_numeric(df[roi_col], errors="coerce").dropna()
- # 过滤掉 ROI = 0 的记录(通常代表无数据)
- roi_series = roi_series[roi_series > 0]
- if not roi_series.empty:
- roi_stats = {
- "roi_mean": _round_or_none(float(roi_series.mean())),
- "roi_p25": _round_or_none(float(roi_series.quantile(0.25))),
- "roi_p50": _round_or_none(float(roi_series.quantile(0.50))),
- "roi_p75": _round_or_none(float(roi_series.quantile(0.75))),
- }
- # ===== 新增:裂变率统计 =====
- fission_stats = {"fission_mean": None, "fission_p50": None}
- # 假设metrics CSV中有 "T0裂变系数_7日均值" 字段
- fission_col = None
- for candidate in ("T0裂变系数_7日均值", "T0裂变系数", "fission_ratio"):
- if candidate in df.columns:
- fission_col = candidate
- break
- if fission_col:
- fission_series = pd.to_numeric(df[fission_col], errors="coerce").dropna()
- fission_series = fission_series[fission_series > 0] # 过滤无效值
- if not fission_series.empty:
- fission_stats = {
- "fission_mean": _round_or_none(float(fission_series.mean())),
- "fission_p50": _round_or_none(float(fission_series.quantile(0.50))),
- }
- # ===== 新增:CTR 统计 =====
- ctr_stats = {"ctr_mean": None, "ctr_p50": None}
- if "valid_click_count" in df.columns and "view_count" in df.columns:
- click = pd.to_numeric(df["valid_click_count"], errors="coerce").fillna(0)
- view = pd.to_numeric(df["view_count"], errors="coerce").fillna(0)
- ctr_series = (click / view).replace([np.inf, -np.inf], np.nan).dropna()
- ctr_series = ctr_series[ctr_series > 0]
- if not ctr_series.empty:
- ctr_stats = {
- "ctr_mean": round(float(ctr_series.mean()), 6),
- "ctr_p50": round(float(ctr_series.quantile(0.50)), 6),
- }
- # ===== 新增:出价统计 =====
- bid_stats = {"bid_mean": None, "bid_p50": None}
- if "bid_amount" in df.columns:
- bid_series = pd.to_numeric(df["bid_amount"], errors="coerce").dropna()
- bid_series = bid_series[bid_series > 0]
- if not bid_series.empty:
- bid_stats = {
- "bid_mean": round(float(bid_series.mean()), 4),
- "bid_p50": round(float(bid_series.quantile(0.50)), 4),
- }
- # 稳定消耗比例:stable_spend_days_30d >= 7 视为稳定
- stable_pct = 0.0
- if "stable_spend_days_30d" in df.columns and ad_count > 0:
- stable_mask = pd.to_numeric(df["stable_spend_days_30d"], errors="coerce").fillna(0) >= 7
- stable_pct = float(stable_mask.sum()) / ad_count
- return {
- "ad_count": ad_count,
- "active_ads": active,
- "daily_cost_avg": round(daily_cost_avg, 2),
- "daily_revenue_avg": round(daily_revenue_avg, 2),
- **roi_stats,
- **fission_stats,
- **ctr_stats,
- **bid_stats,
- "stable_ads_pct": round(stable_pct, 4),
- }
- def _resolve_metrics_csv(metrics_csv: str, end_date: str) -> Optional[Path]:
- """解析 metrics CSV 路径(优先用传入的路径,否则根据 end_date 找)。"""
- if metrics_csv:
- p = Path(metrics_csv)
- if not p.is_absolute():
- p = _MINI_DIR / p
- if p.exists():
- return p
- # Fallback:根据 end_date
- if end_date == "yesterday":
- end_dt = datetime.now() - timedelta(days=1)
- else:
- try:
- end_dt = datetime.strptime(end_date.replace("-", ""), "%Y%m%d")
- except ValueError:
- return None
- end_date_str = end_dt.strftime("%Y%m%d")
- # 按常见命名找
- candidates = [
- _OUTPUTS_DIR / f"metrics_{end_date_str}.csv",
- _OUTPUTS_DIR / "metrics_temp.csv",
- ]
- for c in candidates:
- if c.exists():
- return c
- # 最后尝试目录下最新的 metrics_*.csv
- if _OUTPUTS_DIR.exists():
- csvs = sorted(_OUTPUTS_DIR.glob("metrics_*.csv"), key=lambda p: p.stat().st_mtime, reverse=True)
- if csvs:
- return csvs[0]
- return None
- def _compute_daily_tier_snapshot(
- end_dt: datetime, days: int = 7
- ) -> Dict[str, Dict[str, Any]]:
- """读取最近 N 天的 metrics_{date}.csv,按 (date, audience_tier) 计算 p25/p50/p75。
- 结构:
- {
- "YYYYMMDD": {
- "global": {roi_p25, roi_p50, roi_p75, ad_count},
- "by_audience_tier": {
- "R500": {roi_p25, roi_p50, roi_p75, ad_count},
- ...
- }
- },
- ...
- }
- 用途:给 LLM 看"同 tier 基线的日级波动",辅助判断大盘行情;
- 也是 relative_trend_pct 的审计来源(虽然 roi_calculator 内部已自算)。
- 对历史 metrics CSV 中无 audience_tier 列的情况 → 该天只输出 global。
- """
- by_date: Dict[str, Dict[str, Any]] = {}
- for i in range(days):
- date_str = (end_dt - timedelta(days=i)).strftime("%Y%m%d")
- csv_path = _OUTPUTS_DIR / f"metrics_{date_str}.csv"
- if not csv_path.exists():
- continue
- try:
- dfi = pd.read_csv(csv_path, dtype={"ad_id": str, "account_id": str})
- except Exception as e:
- logger.warning("读取 %s 失败:%s", csv_path.name, e)
- continue
- if dfi.empty:
- continue
- day_entry: Dict[str, Any] = {
- "global": _describe_group(dfi),
- }
- if "audience_tier" in dfi.columns:
- tier_map: Dict[str, Any] = {}
- for tier, group in dfi.groupby("audience_tier"):
- tier_map[str(tier)] = _describe_group(group)
- day_entry["by_audience_tier"] = tier_map
- else:
- day_entry["by_audience_tier"] = {}
- by_date[date_str] = day_entry
- return by_date
- def _compute_market_signal(by_date: Dict[str, Dict[str, Any]]) -> Dict[str, Any]:
- """
- 根据最近 N 天的全局 p50 序列,自动判定大盘行情(不让 LLM 看 by_date 自己猜)。
- 判定规则(优先级从高到低):
- 1. 数据 < 4 天 → unknown(无法判断)
- 2. 最近 3 日 p50 均值 / 之前 N-3 日均值 - 1 ≤ MARKET_TRENDING_DOWN_PCT (-10%) → trending_down
- 3. (max - min) / min > MARKET_VOLATILE_PCT (15%) → volatile
- 4. 否则 → stable
- 输出:
- {
- "label": "stable" / "volatile" / "trending_down" / "unknown",
- "p50_volatility_pct": 0.12,
- "recent_vs_baseline_pct": -0.08,
- "p50_series": [{"date": "20260410", "p50": 1.78}, ...],
- "ad_count_today": 1234,
- "reason": "..."
- }
- """
- p50_pairs: List[Tuple[str, float]] = sorted(
- [
- (d, entry["global"].get("roi_p50"))
- for d, entry in by_date.items()
- if entry.get("global", {}).get("roi_p50") is not None
- ]
- )
- if len(p50_pairs) < 4:
- return {
- "label": "unknown",
- "p50_volatility_pct": None,
- "recent_vs_baseline_pct": None,
- "p50_series": [{"date": d, "p50": p} for d, p in p50_pairs],
- "ad_count_today": None,
- "reason": f"数据不足(只有 {len(p50_pairs)} 天 p50),无法判断行情",
- }
- p50_values = [p for _, p in p50_pairs]
- p50_min = min(p50_values)
- p50_max = max(p50_values)
- volatility_pct = (p50_max - p50_min) / p50_min if p50_min > 0 else 0.0
- # 最近 3 天 vs 之前若干天
- recent = p50_values[-3:]
- baseline = p50_values[:-3]
- recent_mean = sum(recent) / len(recent)
- baseline_mean = sum(baseline) / len(baseline) if baseline else recent_mean
- recent_vs_baseline_pct = (
- (recent_mean - baseline_mean) / baseline_mean if baseline_mean > 0 else 0.0
- )
- # 判定
- if recent_vs_baseline_pct <= MARKET_TRENDING_DOWN_PCT:
- label = "trending_down"
- reason = (
- f"最近 3 日 p50 均值 {recent_mean:.3f} 比之前 {len(baseline)} 日均值 "
- f"{baseline_mean:.3f} 低 {-recent_vs_baseline_pct * 100:.1f}%(阈值 "
- f"{-MARKET_TRENDING_DOWN_PCT * 100:.0f}%),大盘下行"
- )
- elif volatility_pct > MARKET_VOLATILE_PCT:
- label = "volatile"
- reason = (
- f"全局 p50 跨日波幅 {volatility_pct * 100:.1f}%(阈值 "
- f"{MARKET_VOLATILE_PCT * 100:.0f}%),ARPU 共模噪声大,单广告决策应保守"
- )
- else:
- label = "stable"
- reason = (
- f"全局 p50 跨日波幅 {volatility_pct * 100:.1f}% < "
- f"{MARKET_VOLATILE_PCT * 100:.0f}% 且无显著趋势,大盘平稳"
- )
- # 当日广告数量(取最后一天)
- ad_count_today = None
- if p50_pairs:
- last_date = p50_pairs[-1][0]
- ad_count_today = by_date.get(last_date, {}).get("global", {}).get("ad_count")
- return {
- "label": label,
- "p50_volatility_pct": round(volatility_pct, 4),
- "recent_vs_baseline_pct": round(recent_vs_baseline_pct, 4),
- "p50_series": [{"date": d, "p50": round(p, 4)} for d, p in p50_pairs],
- "ad_count_today": ad_count_today,
- "reason": reason,
- }
- @tool(description="账户级 + 人群包级 ROI 基线汇总(p25/p50/p75 + 最近 7 天日级基线快照 + 大盘行情判定),供 LLM 对标用")
- async def calculate_portfolio_summary(
- ctx: ToolContext = None,
- metrics_csv: str = "",
- end_date: str = "yesterday",
- ) -> ToolResult:
- """
- 读取 metrics CSV,按账户 / 人群包 / 全局三层汇总;
- 同时读取最近 7 天 metrics CSV 生成日级基线快照(供 LLM 审计大盘波动)。
- 输出:outputs/portfolio_summary/portfolio_summary_{end_date}.json
- 结构:
- {
- "end_date": "YYYYMMDD",
- "by_account": { "<account_id>": { ... } },
- "by_audience_tier": { "<tier>": { ... } }, # 基于 end_date CSV
- "global": { ... },
- "by_date": {
- "YYYYMMDD": {
- "global": { roi_p25, roi_p50, roi_p75, ad_count, ... },
- "by_audience_tier": { "R500": {...}, "R50": {...} }
- },
- ...(最近 7 天)
- }
- }
- Args:
- metrics_csv: metrics CSV 路径(可选,默认根据 end_date 自动解析)
- end_date: 结束日期(YYYYMMDD 或 "yesterday")
- Returns:
- ToolResult 含 JSON 路径 + 关键摘要
- """
- try:
- if end_date == "yesterday":
- end_dt = datetime.now() - timedelta(days=1)
- else:
- end_dt = datetime.strptime(end_date.replace("-", ""), "%Y%m%d")
- end_date_str = end_dt.strftime("%Y%m%d")
- csv_path = _resolve_metrics_csv(metrics_csv, end_date)
- if csv_path is None or not csv_path.exists():
- return ToolResult(
- title="账户基线计算失败",
- output=f"未找到 metrics CSV(指定:{metrics_csv or '自动'},日期:{end_date})",
- )
- df = pd.read_csv(csv_path, dtype={"ad_id": str, "account_id": str})
- if df.empty:
- return ToolResult(
- title="账户基线计算失败",
- output=f"metrics CSV 为空:{csv_path}",
- )
- # 按 account_id 分组
- by_account: Dict[str, Any] = {}
- if "account_id" in df.columns:
- for acc_id, group in df.groupby("account_id"):
- by_account[str(acc_id)] = _describe_group(group)
- # 按 audience_tier 分组
- by_tier: Dict[str, Any] = {}
- if "audience_tier" in df.columns:
- for tier, group in df.groupby("audience_tier"):
- by_tier[str(tier)] = _describe_group(group)
- # 按 (audience_tier, 广告优化目标) 双键分组 — 用于同类+同转化目标的均值出价
- by_tier_goal: Dict[str, Any] = {}
- goal_col = None
- for candidate in ("广告优化目标", "optimization_goal"):
- if candidate in df.columns:
- goal_col = candidate
- break
- if "audience_tier" in df.columns and goal_col:
- for (tier, goal), group in df.groupby(["audience_tier", goal_col]):
- key = f"{tier}_{goal}"
- by_tier_goal[key] = _describe_group(group)
- # 全局
- global_stats = _describe_group(df)
- # 日级基线快照(最近 7 天)
- by_date = _compute_daily_tier_snapshot(end_dt, days=7)
- # 大盘行情判定(V4:由代码出,不让 LLM 看 by_date 自己猜)
- market_signal = _compute_market_signal(by_date)
- summary = {
- "end_date": end_date_str,
- "source_csv": str(csv_path),
- "by_account": by_account,
- "by_audience_tier": by_tier,
- "by_tier_goal": by_tier_goal,
- "global": global_stats,
- "by_date": by_date,
- "market_signal": market_signal,
- }
- _PORTFOLIO_DIR.mkdir(parents=True, exist_ok=True)
- out_path = _PORTFOLIO_DIR / f"portfolio_summary_{end_date_str}.json"
- out_path.write_text(
- json.dumps(summary, ensure_ascii=False, indent=2),
- encoding="utf-8",
- )
- # 生成可读摘要(只显示 top 层)
- lines = [
- f"✅ 账户 / 人群包基线汇总完成",
- f" 数据源:{csv_path.name}",
- f" 输出文件:{out_path}",
- "",
- f"【全局】ad_count={global_stats['ad_count']}, "
- f"active={global_stats['active_ads']}, "
- f"roi_mean={global_stats['roi_mean']}, "
- f"p25/p50/p75={global_stats['roi_p25']}/{global_stats['roi_p50']}/{global_stats['roi_p75']}",
- ]
- if by_tier:
- lines.append("")
- lines.append("【人群包(end_date 截面)】")
- # 按 R 值典型顺序展示
- tier_order = ["R500", "R330+", "R330", "R180", "R100", "R50", "R10", "R2", "default"]
- for tier in tier_order:
- if tier in by_tier:
- s = by_tier[tier]
- lines.append(
- f" {tier}: n={s['ad_count']}, "
- f"roi_p25/p50/p75={s['roi_p25']}/{s['roi_p50']}/{s['roi_p75']}, "
- f"stable_pct={s['stable_ads_pct']*100:.0f}%"
- )
- # 其他未列出的
- for tier, s in by_tier.items():
- if tier not in tier_order:
- lines.append(
- f" {tier}: n={s['ad_count']}, "
- f"roi_p25/p50/p75={s['roi_p25']}/{s['roi_p50']}/{s['roi_p75']}"
- )
- if by_date:
- lines.append("")
- lines.append(f"【日级基线快照(最近 {len(by_date)} 天)】")
- # 全局 p50 波动范围
- p50_series = [
- (d, entry["global"].get("roi_p50"))
- for d, entry in sorted(by_date.items())
- if entry.get("global", {}).get("roi_p50") is not None
- ]
- if p50_series:
- p50_values = [v for _, v in p50_series]
- lines.append(
- f" 全局 p50 波动:min={min(p50_values):.3f}, "
- f"max={max(p50_values):.3f}, "
- f"波幅={(max(p50_values) - min(p50_values)) / min(p50_values) * 100:.1f}% "
- f"(大盘行情参考,波动大说明 ARPU 共模噪声大)"
- )
- # 显示每天每 tier 的 p50
- lines.append(" 每日 tier p50 快照:")
- for d, entry in sorted(by_date.items()):
- tier_map = entry.get("by_audience_tier", {})
- tier_p50s = []
- for t in tier_order:
- if t in tier_map and tier_map[t].get("roi_p50") is not None:
- tier_p50s.append(f"{t}={tier_map[t]['roi_p50']:.2f}")
- if tier_p50s:
- lines.append(f" {d}: {' | '.join(tier_p50s)}")
- else:
- g = entry.get("global", {})
- lines.append(f" {d}: global_p50={g.get('roi_p50')}(无 tier 数据)")
- # 大盘行情可读输出
- lines.append("")
- lines.append(f"【大盘行情】label={market_signal['label']}")
- if market_signal.get("p50_volatility_pct") is not None:
- lines.append(
- f" 波幅={market_signal['p50_volatility_pct'] * 100:.1f}%, "
- f"近 3 日 vs 之前={market_signal['recent_vs_baseline_pct'] * 100:+.1f}%"
- )
- lines.append(f" 原因:{market_signal['reason']}")
- return ToolResult(
- title="账户 / 人群包基线汇总",
- output="\n".join(lines),
- metadata={
- "json_path": str(out_path),
- "by_account_count": len(by_account),
- "by_audience_tier_count": len(by_tier),
- "by_date_count": len(by_date),
- "global_ad_count": global_stats["ad_count"],
- "market_signal": market_signal["label"],
- },
- )
- except Exception as e:
- logger.exception("calculate_portfolio_summary 失败")
- return ToolResult(title="账户基线计算异常", output=f"错误:{e}")
|