| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418 |
- """
- 创意级指标工具 — auto_put_ad_mini (Step 2)
- 职责:
- - 按 creative_id 聚合最近 N 天 ROI(简单 ROI,非 f_7日动态ROI)
- - 为"负向决策前做创意归因检查"提供数据支撑
- 使用定位(重要):
- - 本阶段 Agent 决策范围是【广告级】:调出价 / 暂停广告
- - 创意变更【不在】本阶段范围内(由后续 创意策略 Agent 负责)
- - 创意级数据的真正目的:在 LLM 准备做 bid_down / pause 决策之前做归因检查
- * 防止广告级误判:ROI 低可能只是"某个创意拖后腿",此时应 hold 等创意层优化
- * 提供决策理由佐证:reason 明确说"整体问题 vs 某创意问题"
- 两个工具:
- - calculate_creative_metrics:批量聚合所有广告的创意级 ROI(Mode 1 工作流 Step 2.5)
- - get_creative_context:对单个广告做创意归因诊断(LLM 决策时按需调用)
- """
- import logging
- from datetime import datetime, timedelta
- from pathlib import Path
- from typing import List, Optional
- import pandas as pd
- from agent.tools import tool
- from agent.tools.models import ToolContext, ToolResult
- logger = logging.getLogger(__name__)
- _MINI_DIR = Path(__file__).resolve().parent.parent
- _MERGED_DIR = _MINI_DIR / "outputs" / "merged"
- _CREATIVE_METRICS_DIR = _MINI_DIR / "outputs" / "creative_metrics"
- # ===== 内部工具函数 =====
- def _load_merged_window(end_date_str: str, days: int) -> Optional[pd.DataFrame]:
- """加载最近 days 天的 merged CSV,concat 后返回。"""
- try:
- end_dt = datetime.strptime(end_date_str, "%Y%m%d")
- except ValueError:
- logger.error("非法 end_date: %s", end_date_str)
- return None
- dfs = []
- for i in range(days):
- date = (end_dt - timedelta(days=days - 1 - i)).strftime("%Y%m%d")
- csv_path = _MERGED_DIR / f"merged_{date}.csv"
- if not csv_path.exists():
- logger.warning("创意聚合:merged 数据缺失 %s", date)
- continue
- df = pd.read_csv(csv_path, dtype={"ad_id": str, "creative_id": str, "account_id": str})
- dfs.append(df)
- if not dfs:
- return None
- return pd.concat(dfs, ignore_index=True)
- def _aggregate_creative_metrics(df: pd.DataFrame, days: int) -> pd.DataFrame:
- """按 (ad_id, creative_id) 聚合。"""
- if df.empty:
- return pd.DataFrame()
- # 保证关键列存在
- required = ["ad_id", "creative_id", "cost", "bizdate"]
- for col in required:
- if col not in df.columns:
- logger.error("创意聚合:缺少必需列 %s", col)
- return pd.DataFrame()
- # 安全数值转换
- for col in ["cost", "总收入", "首层小程序打开数", "裂变0层回流数", "view_count", "valid_click_count"]:
- if col in df.columns:
- df[col] = pd.to_numeric(df[col], errors="coerce").fillna(0)
- # 过滤掉空 creative_id(有些广告状态行可能 creative_id 为空)
- df = df[df["creative_id"].notna() & (df["creative_id"].astype(str).str.strip() != "")]
- # 取 creative_name(若有变更取最新)
- df_sorted = df.sort_values("bizdate")
- creative_name_latest = (
- df_sorted.groupby(["ad_id", "creative_id"])["creative_name"].last()
- if "creative_name" in df.columns
- else None
- )
- # 创意首次出现日期(用于 age)
- creative_first_date = (
- df_sorted.groupby(["ad_id", "creative_id"])["bizdate"].min()
- )
- agg_cols = {"cost": "sum"}
- if "总收入" in df.columns:
- agg_cols["总收入"] = "sum"
- if "首层小程序打开数" in df.columns:
- agg_cols["首层小程序打开数"] = "sum"
- if "裂变0层回流数" in df.columns:
- agg_cols["裂变0层回流数"] = "sum"
- if "view_count" in df.columns:
- agg_cols["view_count"] = "sum"
- if "valid_click_count" in df.columns:
- agg_cols["valid_click_count"] = "sum"
- active_days = df.groupby(["ad_id", "creative_id"]).apply(
- lambda g: g[g["cost"] > 0]["bizdate"].nunique()
- )
- result = df.groupby(["ad_id", "creative_id"]).agg(agg_cols).reset_index()
- result[f"active_days_{days}d"] = result.apply(
- lambda r: int(active_days.get((r["ad_id"], r["creative_id"]), 0)), axis=1
- )
- # 重命名聚合列
- rename_map = {
- "cost": f"cost_{days}d",
- "总收入": f"revenue_{days}d",
- "首层小程序打开数": f"open_count_{days}d",
- "裂变0层回流数": f"fission0_{days}d",
- "view_count": f"view_count_{days}d",
- "valid_click_count": f"click_count_{days}d",
- }
- result = result.rename(columns={k: v for k, v in rename_map.items() if k in result.columns})
- # 附加 creative_name 与 creative_age_days
- if creative_name_latest is not None:
- result["creative_name"] = result.apply(
- lambda r: creative_name_latest.get((r["ad_id"], r["creative_id"]), ""), axis=1
- )
- else:
- result["creative_name"] = ""
- def _age_days(row):
- first = creative_first_date.get((row["ad_id"], row["creative_id"]))
- if not first:
- return None
- try:
- first_dt = datetime.strptime(str(first), "%Y%m%d")
- today = datetime.now()
- return max((today - first_dt).days, 0)
- except ValueError:
- return None
- result["creative_age_days"] = result.apply(_age_days, axis=1)
- # 简单 ROI
- revenue_col = f"revenue_{days}d"
- cost_col = f"cost_{days}d"
- if revenue_col in result.columns:
- result[f"roi_{days}d"] = result.apply(
- lambda r: round(r[revenue_col] / r[cost_col], 4) if r[cost_col] > 0 else None,
- axis=1,
- )
- else:
- result[f"roi_{days}d"] = None
- # 该创意在所属广告中的消耗占比
- ad_total = result.groupby("ad_id")[cost_col].transform("sum")
- result["cost_share"] = result.apply(
- lambda r: round(r[cost_col] / ad_total.loc[r.name], 4) if ad_total.loc[r.name] > 0 else 0,
- axis=1,
- )
- return result
- def _classify_attribution(creatives_for_ad: pd.DataFrame, cost_col: str, roi_col: str) -> str:
- """
- 基于单个广告下的创意表现,判定归因类型。
- 返回值对应 skill 中定义的 4 类 attribution_hint:
- - "single_laggard" : 单创意占 >60% 消耗且 ROI 远低于其他 → 建议 hold
- - "balanced_bad" : 多创意均衡但整体 ROI 差 → 真广告级问题,保持原负向决策
- - "new_cold_start" : 主要消耗集中在 age < 7 天的新创意 → 建议 hold
- - "mixed" : 其他情况,LLM 自行综合判断
- """
- # 仅考虑近期有消耗的创意
- active = creatives_for_ad[creatives_for_ad[cost_col] > 0].copy()
- if active.empty:
- return "mixed"
- active_sorted = active.sort_values(cost_col, ascending=False)
- top = active_sorted.iloc[0]
- top_share = float(top["cost_share"] or 0)
- # 新创意冷启动:主导创意年龄 < 7 天
- top_age = top.get("creative_age_days")
- if top_share > 0.6 and top_age is not None and top_age < 7:
- return "new_cold_start"
- # 单创意拖后腿:主导创意占 >60% 且 ROI 明显低于其他活跃创意
- if top_share > 0.6 and len(active_sorted) >= 2:
- others = active_sorted.iloc[1:]
- top_roi = top.get(roi_col)
- other_roi = others[roi_col].dropna()
- if top_roi is not None and not other_roi.empty:
- other_mean = other_roi.mean()
- # 主导创意 ROI 显著低于其他(低于其他均值的 70%)
- if other_mean > 0 and top_roi < other_mean * 0.7:
- return "single_laggard"
- # 多创意均衡且整体差(≥3 个活跃创意,且占比相对均衡)
- if len(active_sorted) >= 3 and top_share < 0.5:
- return "balanced_bad"
- return "mixed"
- # ===== 工具 1:批量计算创意级指标 =====
- @tool(description="按 creative_id 聚合最近 N 天 ROI(供广告决策做创意归因检查)")
- async def calculate_creative_metrics(
- ctx: ToolContext,
- days: int = 7,
- end_date: str = "yesterday",
- ) -> ToolResult:
- """
- 聚合创意级 ROI 指标。
- 数据源:outputs/merged/merged_*.csv(最近 N 天),按 (ad_id, creative_id) 聚合:
- - cost_{N}d, revenue_{N}d, open_count_{N}d, fission0_{N}d
- - roi_{N}d = revenue_{N}d / cost_{N}d (简单 ROI,不是 f_7日动态ROI)
- - cost_share = 该创意在所属广告中的消耗占比
- - creative_age_days = 该创意首次出现至今天数
- - active_days_{N}d = 近 N 天有消耗的天数
- 输出:outputs/creative_metrics/creative_metrics_{end_date}.csv
- Args:
- days: 聚合窗口(默认 7 天)
- end_date: 结束日期(YYYYMMDD 或 "yesterday")
- Returns:
- ToolResult 含 CSV 路径 + 总览统计
- """
- try:
- if end_date == "yesterday":
- end_dt = datetime.now() - timedelta(days=1)
- else:
- end_dt = datetime.strptime(end_date.replace("-", ""), "%Y%m%d")
- end_date_str = end_dt.strftime("%Y%m%d")
- df = _load_merged_window(end_date_str, days)
- if df is None or df.empty:
- return ToolResult(
- title="创意指标聚合失败",
- output=f"未找到任何 merged 数据({_MERGED_DIR},最近 {days} 天)",
- )
- result = _aggregate_creative_metrics(df, days)
- if result.empty:
- return ToolResult(
- title="创意指标聚合失败",
- output="聚合结果为空(可能 creative_id 全为空)",
- )
- _CREATIVE_METRICS_DIR.mkdir(parents=True, exist_ok=True)
- out_path = _CREATIVE_METRICS_DIR / f"creative_metrics_{end_date_str}.csv"
- result.to_csv(out_path, index=False, encoding="utf-8-sig")
- # 总览统计
- total = len(result)
- active = int((result[f"cost_{days}d"] > 0).sum())
- ad_count = result["ad_id"].nunique()
- summary_lines = [
- f"✅ 创意级指标聚合完成({days} 天窗口)",
- f" 输出文件:{out_path}",
- f" 创意总数:{total}(其中 {active} 个近 {days} 天有消耗)",
- f" 覆盖广告:{ad_count} 条",
- ]
- return ToolResult(
- title=f"创意级指标聚合({days}天)",
- output="\n".join(summary_lines),
- metadata={
- "csv_path": str(out_path),
- "total_creatives": total,
- "active_creatives": active,
- "ad_count": ad_count,
- "days": days,
- },
- )
- except Exception as e:
- logger.exception("calculate_creative_metrics 失败")
- return ToolResult(title="创意指标聚合异常", output=f"错误:{e}")
- # ===== 工具 2:单广告归因检查 =====
- @tool(description="对准备做负向决策(降价/暂停)的广告,做创意归因检查")
- async def get_creative_context(
- ctx: ToolContext,
- ad_id: str,
- days: int = 7,
- end_date: str = "yesterday",
- ) -> ToolResult:
- """
- 对指定广告做创意归因诊断。
- 使用时机(严格):
- - LLM 初步打算判 bid_down / pause 的广告才调用
- - 不对 hold / bid_up 的广告调用(避免无意义开销)
- Args:
- ad_id: 广告 ID
- days: 归因窗口(默认 7 天,需要已先跑过 calculate_creative_metrics)
- end_date: 结束日期(YYYYMMDD 或 "yesterday")
- Returns:
- ToolResult.data:
- {
- "ad_id": ...,
- "total_creatives": int,
- "active_creatives_7d": int,
- "dominant_creative_pct": float,
- "attribution_hint": "single_laggard" | "balanced_bad" | "new_cold_start" | "mixed",
- "creatives": [...按 cost_share 降序...]
- }
- """
- try:
- if end_date == "yesterday":
- end_dt = datetime.now() - timedelta(days=1)
- else:
- end_dt = datetime.strptime(end_date.replace("-", ""), "%Y%m%d")
- end_date_str = end_dt.strftime("%Y%m%d")
- metrics_path = _CREATIVE_METRICS_DIR / f"creative_metrics_{end_date_str}.csv"
- if not metrics_path.exists():
- return ToolResult(
- title="创意归因失败",
- output=f"创意指标 CSV 不存在:{metrics_path}\n请先运行 calculate_creative_metrics。",
- )
- df = pd.read_csv(metrics_path, dtype={"ad_id": str, "creative_id": str})
- ad_str = str(ad_id)
- ad_df = df[df["ad_id"] == ad_str].copy()
- if ad_df.empty:
- return ToolResult(
- title="创意归因:广告无创意数据",
- output=f"广告 {ad_id} 在创意指标表中无记录(可能该广告未产生创意级消耗)",
- metadata={
- "ad_id": ad_str,
- "total_creatives": 0,
- "active_creatives_7d": 0,
- "attribution_hint": "mixed",
- "creatives": [],
- },
- )
- cost_col = f"cost_{days}d"
- roi_col = f"roi_{days}d"
- # 基础统计
- total = len(ad_df)
- active = int((ad_df[cost_col] > 0).sum())
- ad_df_sorted = ad_df.sort_values(cost_col, ascending=False)
- dominant_pct = (
- float(ad_df_sorted.iloc[0]["cost_share"]) if not ad_df_sorted.empty else 0.0
- )
- attribution_hint = _classify_attribution(ad_df, cost_col, roi_col)
- # 构建返回创意列表(按 cost_share 降序)
- creatives_list = []
- for _, row in ad_df_sorted.iterrows():
- creatives_list.append({
- "creative_id": str(row["creative_id"]),
- "creative_name": str(row.get("creative_name", "") or ""),
- "cost": round(float(row[cost_col]), 2) if cost_col in row else 0.0,
- "revenue": round(float(row.get(f"revenue_{days}d", 0)), 2),
- "roi": (
- round(float(row[roi_col]), 4)
- if roi_col in row and pd.notna(row[roi_col])
- else None
- ),
- "cost_share": round(float(row.get("cost_share", 0)), 4),
- "creative_age_days": (
- int(row["creative_age_days"])
- if pd.notna(row.get("creative_age_days"))
- else None
- ),
- "active_days": int(row.get(f"active_days_{days}d", 0) or 0),
- })
- # 生成人类可读摘要
- summary_lines = [
- f"广告 {ad_id} 创意归因(近 {days} 天)",
- f" 总创意数:{total}(活跃 {active})",
- f" 主导创意占比:{dominant_pct*100:.1f}%",
- f" 归因判定:{attribution_hint}",
- ]
- for c in creatives_list[:5]:
- roi_str = f"{c['roi']:.2f}" if c["roi"] is not None else "N/A"
- summary_lines.append(
- f" - {c['creative_id']}: cost={c['cost']:.0f}元, ROI={roi_str}, 占比={c['cost_share']*100:.1f}%, age={c['creative_age_days']}天"
- )
- return ToolResult(
- title=f"创意归因 ad_id={ad_id}",
- output="\n".join(summary_lines),
- metadata={
- "ad_id": ad_str,
- "total_creatives": total,
- "active_creatives_7d": active,
- "dominant_creative_pct": round(dominant_pct, 4),
- "attribution_hint": attribution_hint,
- "creatives": creatives_list,
- },
- )
- except Exception as e:
- logger.exception("get_creative_context 失败")
- return ToolResult(title="创意归因异常", output=f"错误:{e}")
|