#!/usr/bin/env python3 """ 广告调控策略升级 - 端到端验证测试 验证内容: 1. ROI计算是否添加了 audience_tier 和 roi_valid_days 2. portfolio_metrics 是否包含裂变率统计 3. 决策引擎是否包含同类对比字段 4. 决策引擎是否正确标记年龄分段 """ import sys import json import pandas as pd from pathlib import Path from datetime import datetime, timedelta # 添加项目根目录到路径 PROJECT_ROOT = Path(__file__).parent.parent sys.path.insert(0, str(PROJECT_ROOT)) MINI_DIR = Path(__file__).parent OUTPUTS_DIR = MINI_DIR / "outputs" def print_section(title): """打印测试章节标题""" print("\n" + "=" * 70) print(f" {title}") print("=" * 70) def test_roi_calculator(): """测试1: ROI计算器是否添加了新字段""" print_section("测试 1/4: ROI 计算器 - 验证 audience_tier 和 roi_valid_days") # 查找最新的 metrics CSV metrics_files = list(OUTPUTS_DIR.glob("metrics_*.csv")) if not metrics_files: print("❌ 未找到 metrics CSV 文件,跳过测试") return False latest_metrics = max(metrics_files, key=lambda p: p.stat().st_mtime) print(f"📂 读取文件: {latest_metrics.name}") df = pd.read_csv(latest_metrics) print(f" 广告总数: {len(df)}") # 验证新列 if 'audience_tier' not in df.columns: print("❌ 缺少 audience_tier 列") return False if 'roi_valid_days' not in df.columns: print("❌ 缺少 roi_valid_days 列") return False print("✅ 新列验证通过: audience_tier, roi_valid_days") # 人群包分布 tier_dist = df['audience_tier'].value_counts() print(f"\n📊 人群包分布 (前10):") for tier, count in tier_dist.head(10).items(): pct = count / len(df) * 100 print(f" {tier:12s}: {count:4d} 个广告 ({pct:5.1f}%)") # roi_valid_days 统计 print(f"\n📊 ROI有效数据天数统计:") print(f" 最小值: {df['roi_valid_days'].min():.0f} 天") print(f" 平均值: {df['roi_valid_days'].mean():.1f} 天") print(f" 最大值: {df['roi_valid_days'].max():.0f} 天") # 按天数分布 days_dist = df['roi_valid_days'].value_counts().sort_index() print(f"\n📊 按置信度分布:") high_conf = (df['roi_valid_days'] >= 7).sum() mid_conf = ((df['roi_valid_days'] >= 4) & (df['roi_valid_days'] < 7)).sum() low_conf = (df['roi_valid_days'] == 3).sum() none_conf = (df['roi_valid_days'] < 3).sum() print(f" 高置信度 (≥7天): {high_conf:4d} 个广告 ({high_conf/len(df)*100:5.1f}%)") print(f" 中置信度 (4-6天): {mid_conf:4d} 个广告 ({mid_conf/len(df)*100:5.1f}%)") print(f" 低置信度 (3天): {low_conf:4d} 个广告 ({low_conf/len(df)*100:5.1f}%)") print(f" 无数据 (<3天): {none_conf:4d} 个广告 ({none_conf/len(df)*100:5.1f}%)") return True def test_portfolio_metrics(): """测试2: portfolio_metrics 是否包含裂变率统计""" print_section("测试 2/4: Portfolio Metrics - 验证裂变率统计") # 查找最新的 portfolio_summary JSON portfolio_dir = OUTPUTS_DIR / "portfolio_summary" if not portfolio_dir.exists(): print("❌ portfolio_summary 目录不存在,跳过测试") return False json_files = list(portfolio_dir.glob("portfolio_summary_*.json")) if not json_files: print("❌ 未找到 portfolio_summary JSON 文件,跳过测试") return False latest_json = max(json_files, key=lambda p: p.stat().st_mtime) print(f"📂 读取文件: {latest_json.name}") with open(latest_json, 'r', encoding='utf-8') as f: data = json.load(f) by_tier = data.get('by_audience_tier', {}) if not by_tier: print("❌ 未找到 by_audience_tier 数据") return False print(f" 人群包数量: {len(by_tier)}") # 检查第一个人群包是否包含裂变率字段 sample_tier = list(by_tier.keys())[0] sample_stats = by_tier[sample_tier] if 'fission_mean' not in sample_stats and 'fission_p50' not in sample_stats: print(f"❌ 人群包 {sample_tier} 缺少裂变率字段") return False print("✅ 裂变率字段验证通过: fission_mean, fission_p50") # 展示各人群包的裂变率 print(f"\n📊 各人群包裂变率统计:") tier_order = ["R500", "R330+", "R330", "R180", "R100", "R50", "R10", "R2", "default"] for tier in tier_order: if tier in by_tier: stats = by_tier[tier] fission_mean = stats.get('fission_mean', 'N/A') roi_p50 = stats.get('roi_p50', 'N/A') ad_count = stats.get('ad_count', 0) if fission_mean != 'N/A' and fission_mean is not None: print(f" {tier:12s}: fission_mean={fission_mean:.4f}, roi_p50={roi_p50}, ads={ad_count}") else: print(f" {tier:12s}: 无裂变数据, roi_p50={roi_p50}, ads={ad_count}") return True def test_ad_decision_fields(): """测试3: 决策引擎是否包含同类对比字段""" print_section("测试 3/4: 决策引擎 - 验证同类对比字段和年龄分段") # 查找最新的 metrics CSV(用于模拟决策引擎输入) metrics_files = list(OUTPUTS_DIR.glob("metrics_*.csv")) if not metrics_files: print("❌ 未找到 metrics CSV 文件,跳过测试") return False latest_metrics = max(metrics_files, key=lambda p: p.stat().st_mtime) df = pd.read_csv(latest_metrics) # 模拟一个需要评估的广告 sample_ads = df[df['cost_7d_avg'] > 100].head(3) if len(sample_ads) == 0: print("⚠️ 没有找到消耗>100的广告样本") return True print(f" 选取 {len(sample_ads)} 个样本广告进行验证\n") # 检查必要字段 required_fields = ['audience_tier', 'roi_valid_days', 'ad_age_days'] missing_fields = [f for f in required_fields if f not in df.columns] if missing_fields: print(f"❌ 缺少必要字段: {', '.join(missing_fields)}") return False print("✅ 基础字段验证通过") # 展示样本广告的关键字段 print(f"\n📊 样本广告分析:") for idx, row in sample_ads.iterrows(): ad_id = row['ad_id'] ad_name = row.get('ad_name', 'N/A') tier = row.get('audience_tier', 'default') roi = row.get('动态ROI_7日均值', None) roi_valid_days = row.get('roi_valid_days', 0) ad_age = row.get('ad_age_days', None) cost_7d = row.get('cost_7d_avg', 0) # 判断年龄分段 if ad_age is not None: if ad_age <= 3: age_segment = "newborn (极度保护)" elif ad_age <= 7: age_segment = "cold_start (仅允许提价)" else: age_segment = "mature (正常调控)" else: age_segment = "unknown" # 判断置信度 if roi_valid_days >= 7: confidence = "高" elif roi_valid_days >= 4: confidence = "中" elif roi_valid_days == 3: confidence = "低" else: confidence = "无" print(f"\n 广告 {ad_id}:") print(f" 人群包: {tier}") # 格式化ROI显示 roi_str = f"{roi:.4f}" if (roi is not None and roi == roi) else "N/A" # roi == roi 用于检查NaN print(f" ROI: {roi_str} (基于{int(roi_valid_days)}天数据, 置信度:{confidence})") print(f" 年龄: {int(ad_age) if ad_age else 'N/A'}天 → {age_segment}") print(f" 7日均消耗: {cost_7d:.2f}元") return True def test_decision_output(): """测试4: 检查决策输出是否符合预期""" print_section("测试 4/4: 决策输出 - 验证理由表达和动作类型") # 查找最新的 llm_decisions CSV reports_dir = OUTPUTS_DIR / "reports" if not reports_dir.exists(): print("⚠️ reports 目录不存在,可能还未运行过决策引擎") print(" 提示:运行 `python3 execute_once.py --date YYYYMMDD` 生成决策") return True decision_files = list(reports_dir.glob("llm_decisions_*.csv")) if not decision_files: print("⚠️ 未找到 llm_decisions 文件,可能还未运行过决策引擎") print(" 提示:运行 `python3 execute_once.py --date YYYYMMDD` 生成决策") return True latest_decision = max(decision_files, key=lambda p: p.stat().st_mtime) print(f"📂 读取文件: {latest_decision.name}") df = pd.read_csv(latest_decision) print(f" 决策总数: {len(df)}") # 统计动作分布 action_dist = df['action'].value_counts() print(f"\n📊 决策动作分布:") for action, count in action_dist.items(): pct = count / len(df) * 100 print(f" {action:18s}: {count:4d} ({pct:5.1f}%)") # 检查是否有新增动作 new_actions = ['creative_adjust', 'observe'] found_new_actions = [a for a in new_actions if a in action_dist.index] if found_new_actions: print(f"\n✅ 发现新增动作: {', '.join(found_new_actions)}") for action in found_new_actions: sample = df[df['action'] == action].head(1) if not sample.empty: reason = sample.iloc[0].get('reason', 'N/A') print(f"\n {action} 示例理由:") print(f" 「{reason[:200]}...」" if len(reason) > 200 else f" 「{reason}」") else: print(f"\n⚠️ 未发现新增动作 (creative_adjust/observe)") print(" 这可能是正常的,取决于数据中是否有符合条件的广告") # 检查理由中是否包含"人群包同类对比"相关表述 if 'reason' in df.columns: tier_compare_keywords = ['组中位数', '同类', 'R500组', 'R330组', 'R180组'] has_tier_compare = df['reason'].str.contains('|'.join(tier_compare_keywords), na=False).sum() print(f"\n📊 理由质量分析:") print(f" 包含人群包同类对比: {has_tier_compare}/{len(df)} ({has_tier_compare/len(df)*100:.1f}%)") if has_tier_compare > 0: print("\n✅ 发现使用人群包同类对比的理由示例:") sample_reason = df[df['reason'].str.contains('|'.join(tier_compare_keywords), na=False)].iloc[0]['reason'] print(f" 「{sample_reason[:250]}...」" if len(sample_reason) > 250 else f" 「{sample_reason}」") return True def main(): """主测试流程""" print("\n" + "🧪" * 35) print(" " * 15 + "广告调控策略升级 - 端到端验证") print("🧪" * 35) results = { "ROI计算器": test_roi_calculator(), "Portfolio统计": test_portfolio_metrics(), "决策引擎字段": test_ad_decision_fields(), "决策输出": test_decision_output(), } # 总结 print_section("测试总结") passed = sum(1 for v in results.values() if v) total = len(results) for test_name, result in results.items(): status = "✅ 通过" if result else "❌ 失败" print(f" {test_name:20s}: {status}") print(f"\n 总计: {passed}/{total} 项测试通过") if passed == total: print("\n🎉 所有测试通过!策略升级成功部署。") return 0 else: print(f"\n⚠️ 有 {total - passed} 项测试未通过,请检查相关功能。") return 1 if __name__ == "__main__": sys.exit(main())