test_strategy_upgrade.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309
  1. #!/usr/bin/env python3
  2. """
  3. 广告调控策略升级 - 端到端验证测试
  4. 验证内容:
  5. 1. ROI计算是否添加了 audience_tier 和 roi_valid_days
  6. 2. portfolio_metrics 是否包含裂变率统计
  7. 3. 决策引擎是否包含同类对比字段
  8. 4. 决策引擎是否正确标记年龄分段
  9. """
  10. import sys
  11. import json
  12. import pandas as pd
  13. from pathlib import Path
  14. from datetime import datetime, timedelta
  15. # 添加项目根目录到路径
  16. PROJECT_ROOT = Path(__file__).parent.parent
  17. sys.path.insert(0, str(PROJECT_ROOT))
  18. MINI_DIR = Path(__file__).parent
  19. OUTPUTS_DIR = MINI_DIR / "outputs"
  20. def print_section(title):
  21. """打印测试章节标题"""
  22. print("\n" + "=" * 70)
  23. print(f" {title}")
  24. print("=" * 70)
  25. def test_roi_calculator():
  26. """测试1: ROI计算器是否添加了新字段"""
  27. print_section("测试 1/4: ROI 计算器 - 验证 audience_tier 和 roi_valid_days")
  28. # 查找最新的 metrics CSV
  29. metrics_files = list(OUTPUTS_DIR.glob("metrics_*.csv"))
  30. if not metrics_files:
  31. print("❌ 未找到 metrics CSV 文件,跳过测试")
  32. return False
  33. latest_metrics = max(metrics_files, key=lambda p: p.stat().st_mtime)
  34. print(f"📂 读取文件: {latest_metrics.name}")
  35. df = pd.read_csv(latest_metrics)
  36. print(f" 广告总数: {len(df)}")
  37. # 验证新列
  38. if 'audience_tier' not in df.columns:
  39. print("❌ 缺少 audience_tier 列")
  40. return False
  41. if 'roi_valid_days' not in df.columns:
  42. print("❌ 缺少 roi_valid_days 列")
  43. return False
  44. print("✅ 新列验证通过: audience_tier, roi_valid_days")
  45. # 人群包分布
  46. tier_dist = df['audience_tier'].value_counts()
  47. print(f"\n📊 人群包分布 (前10):")
  48. for tier, count in tier_dist.head(10).items():
  49. pct = count / len(df) * 100
  50. print(f" {tier:12s}: {count:4d} 个广告 ({pct:5.1f}%)")
  51. # roi_valid_days 统计
  52. print(f"\n📊 ROI有效数据天数统计:")
  53. print(f" 最小值: {df['roi_valid_days'].min():.0f} 天")
  54. print(f" 平均值: {df['roi_valid_days'].mean():.1f} 天")
  55. print(f" 最大值: {df['roi_valid_days'].max():.0f} 天")
  56. # 按天数分布
  57. days_dist = df['roi_valid_days'].value_counts().sort_index()
  58. print(f"\n📊 按置信度分布:")
  59. high_conf = (df['roi_valid_days'] >= 7).sum()
  60. mid_conf = ((df['roi_valid_days'] >= 4) & (df['roi_valid_days'] < 7)).sum()
  61. low_conf = (df['roi_valid_days'] == 3).sum()
  62. none_conf = (df['roi_valid_days'] < 3).sum()
  63. print(f" 高置信度 (≥7天): {high_conf:4d} 个广告 ({high_conf/len(df)*100:5.1f}%)")
  64. print(f" 中置信度 (4-6天): {mid_conf:4d} 个广告 ({mid_conf/len(df)*100:5.1f}%)")
  65. print(f" 低置信度 (3天): {low_conf:4d} 个广告 ({low_conf/len(df)*100:5.1f}%)")
  66. print(f" 无数据 (<3天): {none_conf:4d} 个广告 ({none_conf/len(df)*100:5.1f}%)")
  67. return True
  68. def test_portfolio_metrics():
  69. """测试2: portfolio_metrics 是否包含裂变率统计"""
  70. print_section("测试 2/4: Portfolio Metrics - 验证裂变率统计")
  71. # 查找最新的 portfolio_summary JSON
  72. portfolio_dir = OUTPUTS_DIR / "portfolio_summary"
  73. if not portfolio_dir.exists():
  74. print("❌ portfolio_summary 目录不存在,跳过测试")
  75. return False
  76. json_files = list(portfolio_dir.glob("portfolio_summary_*.json"))
  77. if not json_files:
  78. print("❌ 未找到 portfolio_summary JSON 文件,跳过测试")
  79. return False
  80. latest_json = max(json_files, key=lambda p: p.stat().st_mtime)
  81. print(f"📂 读取文件: {latest_json.name}")
  82. with open(latest_json, 'r', encoding='utf-8') as f:
  83. data = json.load(f)
  84. by_tier = data.get('by_audience_tier', {})
  85. if not by_tier:
  86. print("❌ 未找到 by_audience_tier 数据")
  87. return False
  88. print(f" 人群包数量: {len(by_tier)}")
  89. # 检查第一个人群包是否包含裂变率字段
  90. sample_tier = list(by_tier.keys())[0]
  91. sample_stats = by_tier[sample_tier]
  92. if 'fission_mean' not in sample_stats and 'fission_p50' not in sample_stats:
  93. print(f"❌ 人群包 {sample_tier} 缺少裂变率字段")
  94. return False
  95. print("✅ 裂变率字段验证通过: fission_mean, fission_p50")
  96. # 展示各人群包的裂变率
  97. print(f"\n📊 各人群包裂变率统计:")
  98. tier_order = ["R500", "R330+", "R330", "R180", "R100", "R50", "R10", "R2", "default"]
  99. for tier in tier_order:
  100. if tier in by_tier:
  101. stats = by_tier[tier]
  102. fission_mean = stats.get('fission_mean', 'N/A')
  103. roi_p50 = stats.get('roi_p50', 'N/A')
  104. ad_count = stats.get('ad_count', 0)
  105. if fission_mean != 'N/A' and fission_mean is not None:
  106. print(f" {tier:12s}: fission_mean={fission_mean:.4f}, roi_p50={roi_p50}, ads={ad_count}")
  107. else:
  108. print(f" {tier:12s}: 无裂变数据, roi_p50={roi_p50}, ads={ad_count}")
  109. return True
  110. def test_ad_decision_fields():
  111. """测试3: 决策引擎是否包含同类对比字段"""
  112. print_section("测试 3/4: 决策引擎 - 验证同类对比字段和年龄分段")
  113. # 查找最新的 metrics CSV(用于模拟决策引擎输入)
  114. metrics_files = list(OUTPUTS_DIR.glob("metrics_*.csv"))
  115. if not metrics_files:
  116. print("❌ 未找到 metrics CSV 文件,跳过测试")
  117. return False
  118. latest_metrics = max(metrics_files, key=lambda p: p.stat().st_mtime)
  119. df = pd.read_csv(latest_metrics)
  120. # 模拟一个需要评估的广告
  121. sample_ads = df[df['cost_7d_avg'] > 100].head(3)
  122. if len(sample_ads) == 0:
  123. print("⚠️ 没有找到消耗>100的广告样本")
  124. return True
  125. print(f" 选取 {len(sample_ads)} 个样本广告进行验证\n")
  126. # 检查必要字段
  127. required_fields = ['audience_tier', 'roi_valid_days', 'ad_age_days']
  128. missing_fields = [f for f in required_fields if f not in df.columns]
  129. if missing_fields:
  130. print(f"❌ 缺少必要字段: {', '.join(missing_fields)}")
  131. return False
  132. print("✅ 基础字段验证通过")
  133. # 展示样本广告的关键字段
  134. print(f"\n📊 样本广告分析:")
  135. for idx, row in sample_ads.iterrows():
  136. ad_id = row['ad_id']
  137. ad_name = row.get('ad_name', 'N/A')
  138. tier = row.get('audience_tier', 'default')
  139. roi = row.get('动态ROI_7日均值', None)
  140. roi_valid_days = row.get('roi_valid_days', 0)
  141. ad_age = row.get('ad_age_days', None)
  142. cost_7d = row.get('cost_7d_avg', 0)
  143. # 判断年龄分段
  144. if ad_age is not None:
  145. if ad_age <= 3:
  146. age_segment = "newborn (极度保护)"
  147. elif ad_age <= 7:
  148. age_segment = "cold_start (仅允许提价)"
  149. else:
  150. age_segment = "mature (正常调控)"
  151. else:
  152. age_segment = "unknown"
  153. # 判断置信度
  154. if roi_valid_days >= 7:
  155. confidence = "高"
  156. elif roi_valid_days >= 4:
  157. confidence = "中"
  158. elif roi_valid_days == 3:
  159. confidence = "低"
  160. else:
  161. confidence = "无"
  162. print(f"\n 广告 {ad_id}:")
  163. print(f" 人群包: {tier}")
  164. # 格式化ROI显示
  165. roi_str = f"{roi:.4f}" if (roi is not None and roi == roi) else "N/A" # roi == roi 用于检查NaN
  166. print(f" ROI: {roi_str} (基于{int(roi_valid_days)}天数据, 置信度:{confidence})")
  167. print(f" 年龄: {int(ad_age) if ad_age else 'N/A'}天 → {age_segment}")
  168. print(f" 7日均消耗: {cost_7d:.2f}元")
  169. return True
  170. def test_decision_output():
  171. """测试4: 检查决策输出是否符合预期"""
  172. print_section("测试 4/4: 决策输出 - 验证理由表达和动作类型")
  173. # 查找最新的 llm_decisions CSV
  174. reports_dir = OUTPUTS_DIR / "reports"
  175. if not reports_dir.exists():
  176. print("⚠️ reports 目录不存在,可能还未运行过决策引擎")
  177. print(" 提示:运行 `python3 execute_once.py --date YYYYMMDD` 生成决策")
  178. return True
  179. decision_files = list(reports_dir.glob("llm_decisions_*.csv"))
  180. if not decision_files:
  181. print("⚠️ 未找到 llm_decisions 文件,可能还未运行过决策引擎")
  182. print(" 提示:运行 `python3 execute_once.py --date YYYYMMDD` 生成决策")
  183. return True
  184. latest_decision = max(decision_files, key=lambda p: p.stat().st_mtime)
  185. print(f"📂 读取文件: {latest_decision.name}")
  186. df = pd.read_csv(latest_decision)
  187. print(f" 决策总数: {len(df)}")
  188. # 统计动作分布
  189. action_dist = df['action'].value_counts()
  190. print(f"\n📊 决策动作分布:")
  191. for action, count in action_dist.items():
  192. pct = count / len(df) * 100
  193. print(f" {action:18s}: {count:4d} ({pct:5.1f}%)")
  194. # 检查是否有新增动作
  195. new_actions = ['creative_adjust', 'observe']
  196. found_new_actions = [a for a in new_actions if a in action_dist.index]
  197. if found_new_actions:
  198. print(f"\n✅ 发现新增动作: {', '.join(found_new_actions)}")
  199. for action in found_new_actions:
  200. sample = df[df['action'] == action].head(1)
  201. if not sample.empty:
  202. reason = sample.iloc[0].get('reason', 'N/A')
  203. print(f"\n {action} 示例理由:")
  204. print(f" 「{reason[:200]}...」" if len(reason) > 200 else f" 「{reason}」")
  205. else:
  206. print(f"\n⚠️ 未发现新增动作 (creative_adjust/observe)")
  207. print(" 这可能是正常的,取决于数据中是否有符合条件的广告")
  208. # 检查理由中是否包含"人群包同类对比"相关表述
  209. if 'reason' in df.columns:
  210. tier_compare_keywords = ['组中位数', '同类', 'R500组', 'R330组', 'R180组']
  211. has_tier_compare = df['reason'].str.contains('|'.join(tier_compare_keywords), na=False).sum()
  212. print(f"\n📊 理由质量分析:")
  213. print(f" 包含人群包同类对比: {has_tier_compare}/{len(df)} ({has_tier_compare/len(df)*100:.1f}%)")
  214. if has_tier_compare > 0:
  215. print("\n✅ 发现使用人群包同类对比的理由示例:")
  216. sample_reason = df[df['reason'].str.contains('|'.join(tier_compare_keywords), na=False)].iloc[0]['reason']
  217. print(f" 「{sample_reason[:250]}...」" if len(sample_reason) > 250 else f" 「{sample_reason}」")
  218. return True
  219. def main():
  220. """主测试流程"""
  221. print("\n" + "🧪" * 35)
  222. print(" " * 15 + "广告调控策略升级 - 端到端验证")
  223. print("🧪" * 35)
  224. results = {
  225. "ROI计算器": test_roi_calculator(),
  226. "Portfolio统计": test_portfolio_metrics(),
  227. "决策引擎字段": test_ad_decision_fields(),
  228. "决策输出": test_decision_output(),
  229. }
  230. # 总结
  231. print_section("测试总结")
  232. passed = sum(1 for v in results.values() if v)
  233. total = len(results)
  234. for test_name, result in results.items():
  235. status = "✅ 通过" if result else "❌ 失败"
  236. print(f" {test_name:20s}: {status}")
  237. print(f"\n 总计: {passed}/{total} 项测试通过")
  238. if passed == total:
  239. print("\n🎉 所有测试通过!策略升级成功部署。")
  240. return 0
  241. else:
  242. print(f"\n⚠️ 有 {total - passed} 项测试未通过,请检查相关功能。")
  243. return 1
  244. if __name__ == "__main__":
  245. sys.exit(main())