portfolio_metrics.py 20 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543
  1. """
  2. 账户级 + 人群包级 ROI 基线汇总 — auto_put_ad_mini (Step 4.2)
  3. 职责:
  4. - 读取 metrics CSV(calculate_roi_metrics 输出)
  5. - 按 account_id / audience_tier / global 三层聚合
  6. - 输出 p25/p50/p75 分位数作为决策基线
  7. 使用定位:
  8. - LLM 在判断广告 ROI "偏低" 时,应先和【同人群包 p25】比,而非全体均值 × 0.5
  9. - 跑量放大候选:同人群包 p75 是最低门槛
  10. - 大盘下滑信号:账户 roi_mean 比历史显著低 → 行情差,不应过度降价触发死亡螺旋
  11. """
  12. import json
  13. import logging
  14. import math
  15. import sys
  16. from datetime import datetime, timedelta
  17. from pathlib import Path
  18. from typing import Any, Dict, List, Optional, Tuple
  19. import numpy as np
  20. import pandas as pd
  21. from agent.tools import tool
  22. from agent.tools.models import ToolContext, ToolResult
  23. logger = logging.getLogger(__name__)
  24. _MINI_DIR = Path(__file__).resolve().parent.parent
  25. # metrics CSV 保存在 outputs/metrics_{date}.csv(roi_calculator.py line 769)
  26. _OUTPUTS_DIR = _MINI_DIR / "outputs"
  27. _PORTFOLIO_DIR = _MINI_DIR / "outputs" / "portfolio_summary"
  28. # 引入 config 中的市场信号阈值
  29. sys.path.insert(0, str(_MINI_DIR))
  30. try:
  31. from config import MARKET_VOLATILE_PCT, MARKET_TRENDING_DOWN_PCT # noqa: E402
  32. except ImportError:
  33. MARKET_VOLATILE_PCT = 0.15
  34. MARKET_TRENDING_DOWN_PCT = -0.10
  35. def _safe_float(v) -> Optional[float]:
  36. try:
  37. f = float(v)
  38. if math.isnan(f) or math.isinf(f):
  39. return None
  40. return f
  41. except (TypeError, ValueError):
  42. return None
  43. def _round_or_none(v: Optional[float], digits: int = 4) -> Optional[float]:
  44. return round(v, digits) if v is not None else None
  45. def _describe_group(df: pd.DataFrame) -> Dict[str, Any]:
  46. """对一个分组 DataFrame 计算统计指标。"""
  47. if df.empty:
  48. return {
  49. "ad_count": 0,
  50. "active_ads": 0,
  51. "daily_cost_avg": 0.0,
  52. "daily_revenue_avg": 0.0,
  53. "roi_mean": None,
  54. "roi_p25": None,
  55. "roi_p50": None,
  56. "roi_p75": None,
  57. "stable_ads_pct": 0.0,
  58. }
  59. ad_count = len(df)
  60. # active_ads: 7 日有消耗
  61. if "cost_7d_avg" in df.columns:
  62. active = int((pd.to_numeric(df["cost_7d_avg"], errors="coerce").fillna(0) > 0).sum())
  63. else:
  64. active = 0
  65. # 日消耗 / 日收入均值(跨广告的日均)
  66. if "cost_7d_avg" in df.columns:
  67. daily_cost_avg = float(pd.to_numeric(df["cost_7d_avg"], errors="coerce").fillna(0).sum())
  68. else:
  69. daily_cost_avg = 0.0
  70. # 日收入 = 7日总收入 / 7(若有 revenue_7d_total),否则 fallback 为空
  71. daily_revenue_avg = 0.0
  72. if "revenue_7d_total" in df.columns:
  73. total_rev = pd.to_numeric(df["revenue_7d_total"], errors="coerce").fillna(0).sum()
  74. daily_revenue_avg = float(total_rev) / 7.0
  75. # ROI 分位数(使用 动态ROI_7日均值,若缺则尝试 f_7日动态ROI)
  76. roi_col = None
  77. for candidate in ("动态ROI_7日均值", "f_7日动态ROI"):
  78. if candidate in df.columns:
  79. roi_col = candidate
  80. break
  81. roi_stats = {"roi_mean": None, "roi_p25": None, "roi_p50": None, "roi_p75": None}
  82. if roi_col:
  83. roi_series = pd.to_numeric(df[roi_col], errors="coerce").dropna()
  84. # 过滤掉 ROI = 0 的记录(通常代表无数据)
  85. roi_series = roi_series[roi_series > 0]
  86. if not roi_series.empty:
  87. roi_stats = {
  88. "roi_mean": _round_or_none(float(roi_series.mean())),
  89. "roi_p25": _round_or_none(float(roi_series.quantile(0.25))),
  90. "roi_p50": _round_or_none(float(roi_series.quantile(0.50))),
  91. "roi_p75": _round_or_none(float(roi_series.quantile(0.75))),
  92. }
  93. # ===== 新增:裂变率统计 =====
  94. fission_stats = {"fission_mean": None, "fission_p50": None}
  95. # 假设metrics CSV中有 "T0裂变系数_7日均值" 字段
  96. fission_col = None
  97. for candidate in ("T0裂变系数_7日均值", "T0裂变系数", "fission_ratio"):
  98. if candidate in df.columns:
  99. fission_col = candidate
  100. break
  101. if fission_col:
  102. fission_series = pd.to_numeric(df[fission_col], errors="coerce").dropna()
  103. fission_series = fission_series[fission_series > 0] # 过滤无效值
  104. if not fission_series.empty:
  105. fission_stats = {
  106. "fission_mean": _round_or_none(float(fission_series.mean())),
  107. "fission_p50": _round_or_none(float(fission_series.quantile(0.50))),
  108. }
  109. # ===== 新增:CTR 统计 =====
  110. ctr_stats = {"ctr_mean": None, "ctr_p50": None}
  111. if "valid_click_count" in df.columns and "view_count" in df.columns:
  112. click = pd.to_numeric(df["valid_click_count"], errors="coerce").fillna(0)
  113. view = pd.to_numeric(df["view_count"], errors="coerce").fillna(0)
  114. ctr_series = (click / view).replace([np.inf, -np.inf], np.nan).dropna()
  115. ctr_series = ctr_series[ctr_series > 0]
  116. if not ctr_series.empty:
  117. ctr_stats = {
  118. "ctr_mean": round(float(ctr_series.mean()), 6),
  119. "ctr_p50": round(float(ctr_series.quantile(0.50)), 6),
  120. }
  121. # ===== 新增:出价统计 =====
  122. bid_stats = {"bid_mean": None, "bid_p50": None}
  123. if "bid_amount" in df.columns:
  124. bid_series = pd.to_numeric(df["bid_amount"], errors="coerce").dropna()
  125. bid_series = bid_series[bid_series > 0]
  126. if not bid_series.empty:
  127. bid_stats = {
  128. "bid_mean": round(float(bid_series.mean()), 4),
  129. "bid_p50": round(float(bid_series.quantile(0.50)), 4),
  130. }
  131. # 稳定消耗比例:stable_spend_days_30d >= 7 视为稳定
  132. stable_pct = 0.0
  133. if "stable_spend_days_30d" in df.columns and ad_count > 0:
  134. stable_mask = pd.to_numeric(df["stable_spend_days_30d"], errors="coerce").fillna(0) >= 7
  135. stable_pct = float(stable_mask.sum()) / ad_count
  136. return {
  137. "ad_count": ad_count,
  138. "active_ads": active,
  139. "daily_cost_avg": round(daily_cost_avg, 2),
  140. "daily_revenue_avg": round(daily_revenue_avg, 2),
  141. **roi_stats,
  142. **fission_stats,
  143. **ctr_stats,
  144. **bid_stats,
  145. "stable_ads_pct": round(stable_pct, 4),
  146. }
  147. def _resolve_metrics_csv(metrics_csv: str, end_date: str) -> Optional[Path]:
  148. """解析 metrics CSV 路径(优先用传入的路径,否则根据 end_date 找)。"""
  149. if metrics_csv:
  150. p = Path(metrics_csv)
  151. if not p.is_absolute():
  152. p = _MINI_DIR / p
  153. if p.exists():
  154. return p
  155. # Fallback:根据 end_date
  156. if end_date == "yesterday":
  157. end_dt = datetime.now() - timedelta(days=1)
  158. else:
  159. try:
  160. end_dt = datetime.strptime(end_date.replace("-", ""), "%Y%m%d")
  161. except ValueError:
  162. return None
  163. end_date_str = end_dt.strftime("%Y%m%d")
  164. # 按常见命名找
  165. candidates = [
  166. _OUTPUTS_DIR / f"metrics_{end_date_str}.csv",
  167. _OUTPUTS_DIR / "metrics_temp.csv",
  168. ]
  169. for c in candidates:
  170. if c.exists():
  171. return c
  172. # 最后尝试目录下最新的 metrics_*.csv
  173. if _OUTPUTS_DIR.exists():
  174. csvs = sorted(_OUTPUTS_DIR.glob("metrics_*.csv"), key=lambda p: p.stat().st_mtime, reverse=True)
  175. if csvs:
  176. return csvs[0]
  177. return None
  178. def _compute_daily_tier_snapshot(
  179. end_dt: datetime, days: int = 7
  180. ) -> Dict[str, Dict[str, Any]]:
  181. """读取最近 N 天的 metrics_{date}.csv,按 (date, audience_tier) 计算 p25/p50/p75。
  182. 结构:
  183. {
  184. "YYYYMMDD": {
  185. "global": {roi_p25, roi_p50, roi_p75, ad_count},
  186. "by_audience_tier": {
  187. "R500": {roi_p25, roi_p50, roi_p75, ad_count},
  188. ...
  189. }
  190. },
  191. ...
  192. }
  193. 用途:给 LLM 看"同 tier 基线的日级波动",辅助判断大盘行情;
  194. 也是 relative_trend_pct 的审计来源(虽然 roi_calculator 内部已自算)。
  195. 对历史 metrics CSV 中无 audience_tier 列的情况 → 该天只输出 global。
  196. """
  197. by_date: Dict[str, Dict[str, Any]] = {}
  198. for i in range(days):
  199. date_str = (end_dt - timedelta(days=i)).strftime("%Y%m%d")
  200. csv_path = _OUTPUTS_DIR / f"metrics_{date_str}.csv"
  201. if not csv_path.exists():
  202. continue
  203. try:
  204. dfi = pd.read_csv(csv_path, dtype={"ad_id": str, "account_id": str})
  205. except Exception as e:
  206. logger.warning("读取 %s 失败:%s", csv_path.name, e)
  207. continue
  208. if dfi.empty:
  209. continue
  210. day_entry: Dict[str, Any] = {
  211. "global": _describe_group(dfi),
  212. }
  213. if "audience_tier" in dfi.columns:
  214. tier_map: Dict[str, Any] = {}
  215. for tier, group in dfi.groupby("audience_tier"):
  216. tier_map[str(tier)] = _describe_group(group)
  217. day_entry["by_audience_tier"] = tier_map
  218. else:
  219. day_entry["by_audience_tier"] = {}
  220. by_date[date_str] = day_entry
  221. return by_date
  222. def _compute_market_signal(by_date: Dict[str, Dict[str, Any]]) -> Dict[str, Any]:
  223. """
  224. 根据最近 N 天的全局 p50 序列,自动判定大盘行情(不让 LLM 看 by_date 自己猜)。
  225. 判定规则(优先级从高到低):
  226. 1. 数据 < 4 天 → unknown(无法判断)
  227. 2. 最近 3 日 p50 均值 / 之前 N-3 日均值 - 1 ≤ MARKET_TRENDING_DOWN_PCT (-10%) → trending_down
  228. 3. (max - min) / min > MARKET_VOLATILE_PCT (15%) → volatile
  229. 4. 否则 → stable
  230. 输出:
  231. {
  232. "label": "stable" / "volatile" / "trending_down" / "unknown",
  233. "p50_volatility_pct": 0.12,
  234. "recent_vs_baseline_pct": -0.08,
  235. "p50_series": [{"date": "20260410", "p50": 1.78}, ...],
  236. "ad_count_today": 1234,
  237. "reason": "..."
  238. }
  239. """
  240. p50_pairs: List[Tuple[str, float]] = sorted(
  241. [
  242. (d, entry["global"].get("roi_p50"))
  243. for d, entry in by_date.items()
  244. if entry.get("global", {}).get("roi_p50") is not None
  245. ]
  246. )
  247. if len(p50_pairs) < 4:
  248. return {
  249. "label": "unknown",
  250. "p50_volatility_pct": None,
  251. "recent_vs_baseline_pct": None,
  252. "p50_series": [{"date": d, "p50": p} for d, p in p50_pairs],
  253. "ad_count_today": None,
  254. "reason": f"数据不足(只有 {len(p50_pairs)} 天 p50),无法判断行情",
  255. }
  256. p50_values = [p for _, p in p50_pairs]
  257. p50_min = min(p50_values)
  258. p50_max = max(p50_values)
  259. volatility_pct = (p50_max - p50_min) / p50_min if p50_min > 0 else 0.0
  260. # 最近 3 天 vs 之前若干天
  261. recent = p50_values[-3:]
  262. baseline = p50_values[:-3]
  263. recent_mean = sum(recent) / len(recent)
  264. baseline_mean = sum(baseline) / len(baseline) if baseline else recent_mean
  265. recent_vs_baseline_pct = (
  266. (recent_mean - baseline_mean) / baseline_mean if baseline_mean > 0 else 0.0
  267. )
  268. # 判定
  269. if recent_vs_baseline_pct <= MARKET_TRENDING_DOWN_PCT:
  270. label = "trending_down"
  271. reason = (
  272. f"最近 3 日 p50 均值 {recent_mean:.3f} 比之前 {len(baseline)} 日均值 "
  273. f"{baseline_mean:.3f} 低 {-recent_vs_baseline_pct * 100:.1f}%(阈值 "
  274. f"{-MARKET_TRENDING_DOWN_PCT * 100:.0f}%),大盘下行"
  275. )
  276. elif volatility_pct > MARKET_VOLATILE_PCT:
  277. label = "volatile"
  278. reason = (
  279. f"全局 p50 跨日波幅 {volatility_pct * 100:.1f}%(阈值 "
  280. f"{MARKET_VOLATILE_PCT * 100:.0f}%),ARPU 共模噪声大,单广告决策应保守"
  281. )
  282. else:
  283. label = "stable"
  284. reason = (
  285. f"全局 p50 跨日波幅 {volatility_pct * 100:.1f}% < "
  286. f"{MARKET_VOLATILE_PCT * 100:.0f}% 且无显著趋势,大盘平稳"
  287. )
  288. # 当日广告数量(取最后一天)
  289. ad_count_today = None
  290. if p50_pairs:
  291. last_date = p50_pairs[-1][0]
  292. ad_count_today = by_date.get(last_date, {}).get("global", {}).get("ad_count")
  293. return {
  294. "label": label,
  295. "p50_volatility_pct": round(volatility_pct, 4),
  296. "recent_vs_baseline_pct": round(recent_vs_baseline_pct, 4),
  297. "p50_series": [{"date": d, "p50": round(p, 4)} for d, p in p50_pairs],
  298. "ad_count_today": ad_count_today,
  299. "reason": reason,
  300. }
  301. @tool(description="账户级 + 人群包级 ROI 基线汇总(p25/p50/p75 + 最近 7 天日级基线快照 + 大盘行情判定),供 LLM 对标用")
  302. async def calculate_portfolio_summary(
  303. ctx: ToolContext = None,
  304. metrics_csv: str = "",
  305. end_date: str = "yesterday",
  306. ) -> ToolResult:
  307. """
  308. 读取 metrics CSV,按账户 / 人群包 / 全局三层汇总;
  309. 同时读取最近 7 天 metrics CSV 生成日级基线快照(供 LLM 审计大盘波动)。
  310. 输出:outputs/portfolio_summary/portfolio_summary_{end_date}.json
  311. 结构:
  312. {
  313. "end_date": "YYYYMMDD",
  314. "by_account": { "<account_id>": { ... } },
  315. "by_audience_tier": { "<tier>": { ... } }, # 基于 end_date CSV
  316. "global": { ... },
  317. "by_date": {
  318. "YYYYMMDD": {
  319. "global": { roi_p25, roi_p50, roi_p75, ad_count, ... },
  320. "by_audience_tier": { "R500": {...}, "R50": {...} }
  321. },
  322. ...(最近 7 天)
  323. }
  324. }
  325. Args:
  326. metrics_csv: metrics CSV 路径(可选,默认根据 end_date 自动解析)
  327. end_date: 结束日期(YYYYMMDD 或 "yesterday")
  328. Returns:
  329. ToolResult 含 JSON 路径 + 关键摘要
  330. """
  331. try:
  332. if end_date == "yesterday":
  333. end_dt = datetime.now() - timedelta(days=1)
  334. else:
  335. end_dt = datetime.strptime(end_date.replace("-", ""), "%Y%m%d")
  336. end_date_str = end_dt.strftime("%Y%m%d")
  337. csv_path = _resolve_metrics_csv(metrics_csv, end_date)
  338. if csv_path is None or not csv_path.exists():
  339. return ToolResult(
  340. title="账户基线计算失败",
  341. output=f"未找到 metrics CSV(指定:{metrics_csv or '自动'},日期:{end_date})",
  342. )
  343. df = pd.read_csv(csv_path, dtype={"ad_id": str, "account_id": str})
  344. if df.empty:
  345. return ToolResult(
  346. title="账户基线计算失败",
  347. output=f"metrics CSV 为空:{csv_path}",
  348. )
  349. # 按 account_id 分组
  350. by_account: Dict[str, Any] = {}
  351. if "account_id" in df.columns:
  352. for acc_id, group in df.groupby("account_id"):
  353. by_account[str(acc_id)] = _describe_group(group)
  354. # 按 audience_tier 分组
  355. by_tier: Dict[str, Any] = {}
  356. if "audience_tier" in df.columns:
  357. for tier, group in df.groupby("audience_tier"):
  358. by_tier[str(tier)] = _describe_group(group)
  359. # 按 (audience_tier, 广告优化目标) 双键分组 — 用于同类+同转化目标的均值出价
  360. by_tier_goal: Dict[str, Any] = {}
  361. goal_col = None
  362. for candidate in ("广告优化目标", "optimization_goal"):
  363. if candidate in df.columns:
  364. goal_col = candidate
  365. break
  366. if "audience_tier" in df.columns and goal_col:
  367. for (tier, goal), group in df.groupby(["audience_tier", goal_col]):
  368. key = f"{tier}_{goal}"
  369. by_tier_goal[key] = _describe_group(group)
  370. # 全局
  371. global_stats = _describe_group(df)
  372. # 日级基线快照(最近 7 天)
  373. by_date = _compute_daily_tier_snapshot(end_dt, days=7)
  374. # 大盘行情判定(V4:由代码出,不让 LLM 看 by_date 自己猜)
  375. market_signal = _compute_market_signal(by_date)
  376. summary = {
  377. "end_date": end_date_str,
  378. "source_csv": str(csv_path),
  379. "by_account": by_account,
  380. "by_audience_tier": by_tier,
  381. "by_tier_goal": by_tier_goal,
  382. "global": global_stats,
  383. "by_date": by_date,
  384. "market_signal": market_signal,
  385. }
  386. _PORTFOLIO_DIR.mkdir(parents=True, exist_ok=True)
  387. out_path = _PORTFOLIO_DIR / f"portfolio_summary_{end_date_str}.json"
  388. out_path.write_text(
  389. json.dumps(summary, ensure_ascii=False, indent=2),
  390. encoding="utf-8",
  391. )
  392. # 生成可读摘要(只显示 top 层)
  393. lines = [
  394. f"✅ 账户 / 人群包基线汇总完成",
  395. f" 数据源:{csv_path.name}",
  396. f" 输出文件:{out_path}",
  397. "",
  398. f"【全局】ad_count={global_stats['ad_count']}, "
  399. f"active={global_stats['active_ads']}, "
  400. f"roi_mean={global_stats['roi_mean']}, "
  401. f"p25/p50/p75={global_stats['roi_p25']}/{global_stats['roi_p50']}/{global_stats['roi_p75']}",
  402. ]
  403. if by_tier:
  404. lines.append("")
  405. lines.append("【人群包(end_date 截面)】")
  406. # 按 R 值典型顺序展示
  407. tier_order = ["R500", "R330+", "R330", "R180", "R100", "R50", "R10", "R2", "default"]
  408. for tier in tier_order:
  409. if tier in by_tier:
  410. s = by_tier[tier]
  411. lines.append(
  412. f" {tier}: n={s['ad_count']}, "
  413. f"roi_p25/p50/p75={s['roi_p25']}/{s['roi_p50']}/{s['roi_p75']}, "
  414. f"stable_pct={s['stable_ads_pct']*100:.0f}%"
  415. )
  416. # 其他未列出的
  417. for tier, s in by_tier.items():
  418. if tier not in tier_order:
  419. lines.append(
  420. f" {tier}: n={s['ad_count']}, "
  421. f"roi_p25/p50/p75={s['roi_p25']}/{s['roi_p50']}/{s['roi_p75']}"
  422. )
  423. if by_date:
  424. lines.append("")
  425. lines.append(f"【日级基线快照(最近 {len(by_date)} 天)】")
  426. # 全局 p50 波动范围
  427. p50_series = [
  428. (d, entry["global"].get("roi_p50"))
  429. for d, entry in sorted(by_date.items())
  430. if entry.get("global", {}).get("roi_p50") is not None
  431. ]
  432. if p50_series:
  433. p50_values = [v for _, v in p50_series]
  434. lines.append(
  435. f" 全局 p50 波动:min={min(p50_values):.3f}, "
  436. f"max={max(p50_values):.3f}, "
  437. f"波幅={(max(p50_values) - min(p50_values)) / min(p50_values) * 100:.1f}% "
  438. f"(大盘行情参考,波动大说明 ARPU 共模噪声大)"
  439. )
  440. # 显示每天每 tier 的 p50
  441. lines.append(" 每日 tier p50 快照:")
  442. for d, entry in sorted(by_date.items()):
  443. tier_map = entry.get("by_audience_tier", {})
  444. tier_p50s = []
  445. for t in tier_order:
  446. if t in tier_map and tier_map[t].get("roi_p50") is not None:
  447. tier_p50s.append(f"{t}={tier_map[t]['roi_p50']:.2f}")
  448. if tier_p50s:
  449. lines.append(f" {d}: {' | '.join(tier_p50s)}")
  450. else:
  451. g = entry.get("global", {})
  452. lines.append(f" {d}: global_p50={g.get('roi_p50')}(无 tier 数据)")
  453. # 大盘行情可读输出
  454. lines.append("")
  455. lines.append(f"【大盘行情】label={market_signal['label']}")
  456. if market_signal.get("p50_volatility_pct") is not None:
  457. lines.append(
  458. f" 波幅={market_signal['p50_volatility_pct'] * 100:.1f}%, "
  459. f"近 3 日 vs 之前={market_signal['recent_vs_baseline_pct'] * 100:+.1f}%"
  460. )
  461. lines.append(f" 原因:{market_signal['reason']}")
  462. return ToolResult(
  463. title="账户 / 人群包基线汇总",
  464. output="\n".join(lines),
  465. metadata={
  466. "json_path": str(out_path),
  467. "by_account_count": len(by_account),
  468. "by_audience_tier_count": len(by_tier),
  469. "by_date_count": len(by_date),
  470. "global_ad_count": global_stats["ad_count"],
  471. "market_signal": market_signal["label"],
  472. },
  473. )
  474. except Exception as e:
  475. logger.exception("calculate_portfolio_summary 失败")
  476. return ToolResult(title="账户基线计算异常", output=f"错误:{e}")