|
|
@@ -0,0 +1,406 @@
|
|
|
+"""
|
|
|
+创意级动态 ROI 计算器 — auto_put_ad_mini
|
|
|
+
|
|
|
+把 roi_calculator.py 的"动态 ROI 7 日均值"公式按 (ad_id, creative_id) 维度下沉,
|
|
|
+为创意级 pause 决策提供数值基础。
|
|
|
+
|
|
|
+核心公式(与 roi_calculator.py 完全一致,仅聚合维度从 ad_id 改为 (ad_id, creative_id)):
|
|
|
+ T0裂变系数 = SUM(fission0_count) / SUM(open_count)
|
|
|
+ arpu = SUM(total_revenue) / SUM(total_return_count)
|
|
|
+ 当日裂变收益率 = SUM(fission0_count) * arpu / SUM(cost)
|
|
|
+ 当日回流倍数 = SUM(total_return_count) / SUM(open_count)
|
|
|
+ T0裂变系数_7日均值 = mean(T0裂变系数) over 7 天
|
|
|
+ 回流倍数_7日均值 = mean(当日回流倍数) over 7 天
|
|
|
+ 裂变效率稳定因子 = 回流倍数_7日均值 / T0裂变系数_7日均值
|
|
|
+ 创意动态ROI = 当日裂变收益率 × 裂变效率稳定因子
|
|
|
+ 创意动态ROI_7日均值 = mean(创意动态ROI) over 7 天 ← 决策参考值
|
|
|
+
|
|
|
+前置条件:
|
|
|
+ - 单日 (ad_id, creative_id) 消耗 < 100 元的天数不参与计算(NaN)
|
|
|
+ - min_periods=3:至少 3 天合格数据才计算 7 日滚动均值
|
|
|
+
|
|
|
+⚠️ 归因语义说明:
|
|
|
+ ODPS 表 loghubods.ad_put_tencent_creative_data_day 的 fission0_count / total_return_count /
|
|
|
+ total_revenue 等字段,本期假设按"创意级独立归因"处理(即同一用户被多创意触达时归到首次触达
|
|
|
+ 的 creative_id)。如果实际是按曝光数加权拆分到所有创意,需要另外修正聚合逻辑。
|
|
|
+ 本期先按现有口径实现,待阶段 6 端到端测试时通过对比"创意 ROI 加权平均"与"广告级 ROI"做交叉验证。
|
|
|
+"""
|
|
|
+
|
|
|
+import logging
|
|
|
+from datetime import datetime, timedelta
|
|
|
+from pathlib import Path
|
|
|
+from typing import Optional
|
|
|
+
|
|
|
+import numpy as np
|
|
|
+import pandas as pd
|
|
|
+
|
|
|
+from agent.tools import tool
|
|
|
+from agent.tools.models import ToolContext, ToolResult
|
|
|
+
|
|
|
+logger = logging.getLogger(__name__)
|
|
|
+
|
|
|
+_MINI_DIR = Path(__file__).resolve().parent.parent
|
|
|
+_MERGED_DIR = _MINI_DIR / "outputs" / "merged"
|
|
|
+_CREATIVE_ROI_DIR = _MINI_DIR / "outputs" / "creative_roi"
|
|
|
+
|
|
|
+
|
|
|
+# ===== 内部聚合 =====
|
|
|
+
|
|
|
+def _aggregate_creative_to_creative_day(df: pd.DataFrame) -> pd.DataFrame:
|
|
|
+ """按 (ad_id, creative_id, date) 聚合(同一天可能多分片,需 SUM)。"""
|
|
|
+ if df.empty:
|
|
|
+ return pd.DataFrame()
|
|
|
+
|
|
|
+ df = df.copy()
|
|
|
+
|
|
|
+ # bizdate → date
|
|
|
+ if "bizdate" in df.columns:
|
|
|
+ df["date"] = df["bizdate"].astype(str)
|
|
|
+ elif "date" not in df.columns:
|
|
|
+ logger.warning("creative_roi: DataFrame 缺少 bizdate/date 列")
|
|
|
+ return pd.DataFrame()
|
|
|
+
|
|
|
+ # 列名标准化(与 roi_calculator 对齐)
|
|
|
+ COLUMN_RENAME = {
|
|
|
+ "首层小程序打开数": "open_count",
|
|
|
+ "裂变0层回流数": "fission0_count",
|
|
|
+ "裂变层回流数": "fission_count",
|
|
|
+ "裂变1层回流数": "fission1_count",
|
|
|
+ "总回流人数": "total_return_count",
|
|
|
+ "总收入": "total_revenue",
|
|
|
+ "ad_status": "configured_status",
|
|
|
+ }
|
|
|
+ rename_map = {k: v for k, v in COLUMN_RENAME.items() if k in df.columns}
|
|
|
+ df = df.rename(columns=rename_map)
|
|
|
+
|
|
|
+ # 过滤无 creative_id 的行(广告状态行)
|
|
|
+ df = df[df["creative_id"].notna() & (df["creative_id"].astype(str).str.strip() != "")]
|
|
|
+ if df.empty:
|
|
|
+ return pd.DataFrame()
|
|
|
+
|
|
|
+ # 数值字段安全转换
|
|
|
+ numeric_cols = [
|
|
|
+ "cost", "view_count", "valid_click_count",
|
|
|
+ "open_count", "fission0_count", "fission_count", "fission1_count",
|
|
|
+ "total_return_count", "total_revenue",
|
|
|
+ ]
|
|
|
+ for col in numeric_cols:
|
|
|
+ if col in df.columns:
|
|
|
+ df[col] = pd.to_numeric(df[col], errors="coerce").fillna(0)
|
|
|
+
|
|
|
+ agg_dict = {
|
|
|
+ "account_id": "first",
|
|
|
+ "ad_name": "first",
|
|
|
+ "creative_name": "first",
|
|
|
+ "create_time": "first",
|
|
|
+ "configured_status": "first",
|
|
|
+ "package_name": "first",
|
|
|
+ }
|
|
|
+ for col in numeric_cols:
|
|
|
+ if col in df.columns:
|
|
|
+ agg_dict[col] = "sum"
|
|
|
+ agg_dict = {k: v for k, v in agg_dict.items() if k in df.columns}
|
|
|
+
|
|
|
+ grouped = df.groupby(["ad_id", "creative_id", "date"], as_index=False).agg(agg_dict)
|
|
|
+ return grouped
|
|
|
+
|
|
|
+
|
|
|
+# ===== 创意级动态 ROI =====
|
|
|
+
|
|
|
+def _calculate_creative_dynamic_roi(
|
|
|
+ cdf: pd.DataFrame,
|
|
|
+ min_daily_cost: float = 100.0,
|
|
|
+) -> pd.DataFrame:
|
|
|
+ """
|
|
|
+ 在 (ad_id, creative_id, date) 粒度上计算动态 ROI。
|
|
|
+ """
|
|
|
+ if cdf.empty:
|
|
|
+ return cdf
|
|
|
+
|
|
|
+ cdf = cdf.sort_values(["ad_id", "creative_id", "date"]).reset_index(drop=True)
|
|
|
+ group_keys = ["ad_id", "creative_id"]
|
|
|
+
|
|
|
+ # 当日基础指标(单日消耗不足时设 NaN)
|
|
|
+ cdf["T0裂变系数"] = np.where(
|
|
|
+ (cdf.get("open_count", 0) > 0) & (cdf.get("cost", 0) >= min_daily_cost),
|
|
|
+ cdf["fission0_count"] / cdf["open_count"].replace(0, np.nan),
|
|
|
+ np.nan,
|
|
|
+ )
|
|
|
+ cdf["arpu"] = np.where(
|
|
|
+ (cdf.get("total_return_count", 0) > 0) & (cdf.get("cost", 0) >= min_daily_cost),
|
|
|
+ cdf["total_revenue"] / cdf["total_return_count"].replace(0, np.nan),
|
|
|
+ np.nan,
|
|
|
+ )
|
|
|
+ cdf["当日裂变收益率"] = np.where(
|
|
|
+ (cdf.get("cost", 0) > 0) & (cdf.get("cost", 0) >= min_daily_cost),
|
|
|
+ cdf["fission0_count"] * cdf["arpu"] / cdf["cost"].replace(0, np.nan),
|
|
|
+ np.nan,
|
|
|
+ )
|
|
|
+ cdf["当日回流倍数"] = np.where(
|
|
|
+ (cdf.get("open_count", 0) > 0) & (cdf.get("cost", 0) >= min_daily_cost),
|
|
|
+ cdf["total_return_count"] / cdf["open_count"].replace(0, np.nan),
|
|
|
+ np.nan,
|
|
|
+ )
|
|
|
+
|
|
|
+ # 7 日滚动均值(按创意分组,min_periods=3)
|
|
|
+ cdf["T0裂变系数_7日均值"] = (
|
|
|
+ cdf.groupby(group_keys)["T0裂变系数"]
|
|
|
+ .transform(lambda x: x.rolling(window=7, min_periods=3).mean())
|
|
|
+ )
|
|
|
+ cdf["回流倍数_7日均值"] = (
|
|
|
+ cdf.groupby(group_keys)["当日回流倍数"]
|
|
|
+ .transform(lambda x: x.rolling(window=7, min_periods=3).mean())
|
|
|
+ )
|
|
|
+
|
|
|
+ cdf["裂变效率稳定因子"] = np.where(
|
|
|
+ cdf["T0裂变系数_7日均值"] > 0,
|
|
|
+ cdf["回流倍数_7日均值"] / cdf["T0裂变系数_7日均值"],
|
|
|
+ np.nan,
|
|
|
+ )
|
|
|
+
|
|
|
+ cdf["创意动态ROI"] = cdf["当日裂变收益率"] * cdf["裂变效率稳定因子"]
|
|
|
+ cdf["创意动态ROI_7日均值"] = (
|
|
|
+ cdf.groupby(group_keys)["创意动态ROI"]
|
|
|
+ .transform(lambda x: x.rolling(window=7, min_periods=3).mean())
|
|
|
+ )
|
|
|
+ cdf["roi_valid_days"] = (
|
|
|
+ cdf.groupby(group_keys)["创意动态ROI"]
|
|
|
+ .transform(lambda x: x.notna().sum())
|
|
|
+ )
|
|
|
+
|
|
|
+ return cdf
|
|
|
+
|
|
|
+
|
|
|
+def _build_creative_summary(
|
|
|
+ cdf: pd.DataFrame,
|
|
|
+ end_date: str,
|
|
|
+) -> pd.DataFrame:
|
|
|
+ """
|
|
|
+ 按 (ad_id, creative_id) 汇总最新一天指标 + 7 日累计 + 创意年龄 + 占比。
|
|
|
+ """
|
|
|
+ if cdf.empty:
|
|
|
+ return pd.DataFrame()
|
|
|
+
|
|
|
+ end_dt = datetime.strptime(end_date, "%Y%m%d")
|
|
|
+ start_dt_7d = end_dt - timedelta(days=6)
|
|
|
+ start_date_7d = start_dt_7d.strftime("%Y%m%d")
|
|
|
+
|
|
|
+ # 最近 7 天累计消耗
|
|
|
+ df_7d = cdf[(cdf["date"] >= start_date_7d) & (cdf["date"] <= end_date)].copy()
|
|
|
+ cost_7d = df_7d.groupby(["ad_id", "creative_id"], as_index=False)["cost"].sum()
|
|
|
+ cost_7d.rename(columns={"cost": "cost_7d"}, inplace=True)
|
|
|
+
|
|
|
+ # 广告级 7 日累计(用来算占比)
|
|
|
+ ad_cost_7d = df_7d.groupby("ad_id", as_index=False)["cost"].sum()
|
|
|
+ ad_cost_7d.rename(columns={"cost": "ad_cost_7d"}, inplace=True)
|
|
|
+
|
|
|
+ summary = cost_7d.merge(ad_cost_7d, on="ad_id", how="left")
|
|
|
+ summary["cost_share_7d"] = np.where(
|
|
|
+ summary["ad_cost_7d"] > 0,
|
|
|
+ (summary["cost_7d"] / summary["ad_cost_7d"]).round(4),
|
|
|
+ 0.0,
|
|
|
+ )
|
|
|
+
|
|
|
+ # 创意年龄:以创意首次出现日期(min bizdate)为锚点,相对 end_date 计算
|
|
|
+ first_date = (
|
|
|
+ cdf.groupby(["ad_id", "creative_id"], as_index=False)["date"]
|
|
|
+ .min()
|
|
|
+ .rename(columns={"date": "first_date"})
|
|
|
+ )
|
|
|
+
|
|
|
+ def _age(row):
|
|
|
+ try:
|
|
|
+ first_dt = datetime.strptime(str(row["first_date"]), "%Y%m%d")
|
|
|
+ return max((end_dt - first_dt).days, 0)
|
|
|
+ except Exception:
|
|
|
+ return None
|
|
|
+
|
|
|
+ first_date["creative_age_days"] = first_date.apply(_age, axis=1)
|
|
|
+ summary = summary.merge(first_date[["ad_id", "creative_id", "creative_age_days"]],
|
|
|
+ on=["ad_id", "creative_id"], how="left")
|
|
|
+
|
|
|
+ # 最新一天的创意动态 ROI + 创意属性
|
|
|
+ latest = cdf[cdf["date"] == end_date][[
|
|
|
+ c for c in [
|
|
|
+ "ad_id", "creative_id", "creative_name", "ad_name", "account_id",
|
|
|
+ "configured_status", "创意动态ROI", "创意动态ROI_7日均值", "roi_valid_days",
|
|
|
+ ] if c in cdf.columns
|
|
|
+ ]].copy()
|
|
|
+
|
|
|
+ summary = summary.merge(latest, on=["ad_id", "creative_id"], how="left")
|
|
|
+
|
|
|
+ # 兜底:创意如果在 end_date 当天没数据,从最近一天回填属性
|
|
|
+ missing_mask = summary["creative_name"].isna() if "creative_name" in summary.columns else None
|
|
|
+ if missing_mask is not None and missing_mask.any():
|
|
|
+ last_seen = (
|
|
|
+ cdf.sort_values("date")
|
|
|
+ .groupby(["ad_id", "creative_id"], as_index=False)
|
|
|
+ .last()[[c for c in [
|
|
|
+ "ad_id", "creative_id", "creative_name", "ad_name", "account_id",
|
|
|
+ "configured_status", "创意动态ROI_7日均值", "roi_valid_days",
|
|
|
+ ] if c in cdf.columns]]
|
|
|
+ .rename(columns={
|
|
|
+ "creative_name": "_creative_name_fb",
|
|
|
+ "ad_name": "_ad_name_fb",
|
|
|
+ "account_id": "_account_id_fb",
|
|
|
+ "configured_status": "_configured_status_fb",
|
|
|
+ "创意动态ROI_7日均值": "_roi_7d_fb",
|
|
|
+ "roi_valid_days": "_roi_valid_days_fb",
|
|
|
+ })
|
|
|
+ )
|
|
|
+ summary = summary.merge(last_seen, on=["ad_id", "creative_id"], how="left")
|
|
|
+ for col, fb in [
|
|
|
+ ("creative_name", "_creative_name_fb"),
|
|
|
+ ("ad_name", "_ad_name_fb"),
|
|
|
+ ("account_id", "_account_id_fb"),
|
|
|
+ ("configured_status", "_configured_status_fb"),
|
|
|
+ ("创意动态ROI_7日均值", "_roi_7d_fb"),
|
|
|
+ ("roi_valid_days", "_roi_valid_days_fb"),
|
|
|
+ ]:
|
|
|
+ if col in summary.columns and fb in summary.columns:
|
|
|
+ summary[col] = summary[col].where(summary[col].notna(), summary[fb])
|
|
|
+ summary.drop(columns=[fb], inplace=True)
|
|
|
+
|
|
|
+ summary["roi_valid_days"] = summary["roi_valid_days"].fillna(0).astype(int)
|
|
|
+
|
|
|
+ return summary
|
|
|
+
|
|
|
+
|
|
|
+# ===== 工具入口 =====
|
|
|
+
|
|
|
+@tool(description="计算创意级动态 ROI(7 日均值),用于创意级 pause 决策")
|
|
|
+async def calculate_creative_roi(
|
|
|
+ ctx: ToolContext = None,
|
|
|
+ end_date: str = "yesterday",
|
|
|
+ min_daily_cost: float = 100.0,
|
|
|
+ window_days: int = 30,
|
|
|
+) -> ToolResult:
|
|
|
+ """
|
|
|
+ 创意级动态 ROI 计算工具。
|
|
|
+
|
|
|
+ 工作流:
|
|
|
+ 1. 加载最近 window_days 天的 merged_*.csv
|
|
|
+ 2. 按 (ad_id, creative_id, date) 聚合
|
|
|
+ 3. 计算每天的 T0 裂变系数 / arpu / 当日裂变收益率 / 当日回流倍数
|
|
|
+ 4. 计算 7 日滚动均值 + 裂变效率稳定因子
|
|
|
+ 5. 计算创意动态 ROI / 创意动态 ROI 7 日均值
|
|
|
+ 6. 输出 outputs/creative_roi/creative_roi_{end_date}.csv
|
|
|
+
|
|
|
+ Args:
|
|
|
+ end_date: 结束日期(YYYYMMDD 或 "yesterday")
|
|
|
+ min_daily_cost: 单日消耗门槛(默认 100 元),低于此值的天数不参与
|
|
|
+ window_days: 加载历史窗口天数(默认 30 天,与 roi_calculator 一致)
|
|
|
+
|
|
|
+ Returns:
|
|
|
+ ToolResult,包含 csv_path / 创意总数 / eligible 创意数 / 全体均值
|
|
|
+ """
|
|
|
+ try:
|
|
|
+ # 解析日期
|
|
|
+ if end_date == "yesterday":
|
|
|
+ end_dt = datetime.now() - timedelta(days=1)
|
|
|
+ else:
|
|
|
+ end_dt = datetime.strptime(end_date.replace("-", ""), "%Y%m%d")
|
|
|
+ end_date_str = end_dt.strftime("%Y%m%d")
|
|
|
+
|
|
|
+ # 加载 merged
|
|
|
+ start_dt = end_dt - timedelta(days=window_days - 1)
|
|
|
+ merged_dfs = []
|
|
|
+ for i in range(window_days):
|
|
|
+ d = (start_dt + timedelta(days=i)).strftime("%Y%m%d")
|
|
|
+ csv = _MERGED_DIR / f"merged_{d}.csv"
|
|
|
+ if not csv.exists():
|
|
|
+ continue
|
|
|
+ df = pd.read_csv(csv, dtype={"ad_id": str, "creative_id": str, "account_id": str})
|
|
|
+ merged_dfs.append(df)
|
|
|
+
|
|
|
+ if not merged_dfs:
|
|
|
+ return ToolResult(
|
|
|
+ title="创意级 ROI 计算失败",
|
|
|
+ output=f"未找到任何 merged 数据({_MERGED_DIR})",
|
|
|
+ )
|
|
|
+
|
|
|
+ creative_df = pd.concat(merged_dfs, ignore_index=True)
|
|
|
+ logger.info("创意级 ROI: 加载 merged 数据 %d 行(%d 天)", len(creative_df), len(merged_dfs))
|
|
|
+
|
|
|
+ # 同步 roi_calculator 的"近 7 天累计消耗 = 0 视为已关闭"前置过滤
|
|
|
+ last_7_start = (end_dt - timedelta(days=6)).strftime("%Y%m%d")
|
|
|
+ bz = creative_df["bizdate"].astype(str)
|
|
|
+ recent7 = creative_df[(bz >= last_7_start) & (bz <= end_date_str)]
|
|
|
+ zero_ads = (
|
|
|
+ recent7.groupby("ad_id")["cost"].sum()
|
|
|
+ .pipe(lambda s: s[s.fillna(0) <= 0].index.tolist())
|
|
|
+ )
|
|
|
+ if zero_ads:
|
|
|
+ before = len(creative_df)
|
|
|
+ creative_df = creative_df[~creative_df["ad_id"].isin(zero_ads)].reset_index(drop=True)
|
|
|
+ logger.info(
|
|
|
+ "创意级 ROI: 前置过滤 %d 条近 7 天 0 消耗广告,creative_df %d → %d 行",
|
|
|
+ len(zero_ads), before, len(creative_df),
|
|
|
+ )
|
|
|
+
|
|
|
+ # 聚合到 (ad_id, creative_id, date)
|
|
|
+ cdf = _aggregate_creative_to_creative_day(creative_df)
|
|
|
+ if cdf.empty:
|
|
|
+ return ToolResult(
|
|
|
+ title="创意级 ROI 计算失败",
|
|
|
+ output="creative-day 聚合结果为空(可能 creative_id 全为空)",
|
|
|
+ )
|
|
|
+ logger.info("创意级 ROI: 聚合到 (ad_id, creative_id, date) %d 行", len(cdf))
|
|
|
+
|
|
|
+ # 计算动态 ROI
|
|
|
+ cdf = _calculate_creative_dynamic_roi(cdf, min_daily_cost)
|
|
|
+
|
|
|
+ # 汇总到 (ad_id, creative_id)
|
|
|
+ summary = _build_creative_summary(cdf, end_date_str)
|
|
|
+ if summary.empty:
|
|
|
+ return ToolResult(
|
|
|
+ title="创意级 ROI 计算失败",
|
|
|
+ output="creative summary 为空",
|
|
|
+ )
|
|
|
+
|
|
|
+ # 输出列排序
|
|
|
+ out_cols = [
|
|
|
+ "ad_id", "account_id", "ad_name", "creative_id", "creative_name",
|
|
|
+ "configured_status", "creative_age_days",
|
|
|
+ "cost_7d", "ad_cost_7d", "cost_share_7d",
|
|
|
+ "创意动态ROI", "创意动态ROI_7日均值", "roi_valid_days",
|
|
|
+ ]
|
|
|
+ out_cols = [c for c in out_cols if c in summary.columns]
|
|
|
+ summary = summary[out_cols].copy()
|
|
|
+
|
|
|
+ # 保存
|
|
|
+ _CREATIVE_ROI_DIR.mkdir(parents=True, exist_ok=True)
|
|
|
+ out_path = _CREATIVE_ROI_DIR / f"creative_roi_{end_date_str}.csv"
|
|
|
+ summary.to_csv(out_path, index=False, encoding="utf-8-sig")
|
|
|
+ logger.info("创意级 ROI CSV 已保存: %s", out_path)
|
|
|
+
|
|
|
+ # 统计
|
|
|
+ total = len(summary)
|
|
|
+ roi_col = "创意动态ROI_7日均值"
|
|
|
+ valid = int(summary[roi_col].notna().sum()) if roi_col in summary.columns else 0
|
|
|
+ roi_mean = float(summary[roi_col].mean()) if valid > 0 else float("nan")
|
|
|
+ ad_count = int(summary["ad_id"].nunique())
|
|
|
+
|
|
|
+ lines = [
|
|
|
+ f"✅ 创意级动态 ROI 计算完成(截至 {end_date_str})",
|
|
|
+ f" 输出文件:{out_path}",
|
|
|
+ f" 创意总数:{total}(覆盖 {ad_count} 条广告)",
|
|
|
+ f" 有 ROI 值的创意数(7 日均值非 NaN):{valid}",
|
|
|
+ f" 创意动态 ROI_7 日均值 全体均值:{roi_mean:.4f}",
|
|
|
+ ]
|
|
|
+ return ToolResult(
|
|
|
+ title=f"创意级 ROI 计算完成({total} 个创意)",
|
|
|
+ output="\n".join(lines),
|
|
|
+ metadata={
|
|
|
+ "csv_path": str(out_path),
|
|
|
+ "total_creatives": total,
|
|
|
+ "valid_creatives": valid,
|
|
|
+ "ad_count": ad_count,
|
|
|
+ "roi_mean": roi_mean,
|
|
|
+ "end_date": end_date_str,
|
|
|
+ },
|
|
|
+ )
|
|
|
+
|
|
|
+ except Exception as e:
|
|
|
+ logger.exception("calculate_creative_roi 失败")
|
|
|
+ return ToolResult(title="创意级 ROI 计算异常", output=f"错误:{e}")
|