| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687 |
- """
- 快速分析脚本:直接使用已有数据,跳过数据拉取
- """
- import os
- import sys
- from pathlib import Path
- import json
- import asyncio
- sys.path.insert(0, str(Path(__file__).parent.parent.parent))
- from dotenv import load_dotenv
- load_dotenv()
- from agent.utils import setup_logging
- setup_logging(level="INFO")
- # 设置环境
- sys.path.insert(0, str(Path(__file__).parent))
- from config import OUTPUTS_DIR
- async def main():
- print("=" * 70)
- print(" 快速分析 — 使用20260415已有数据")
- print("=" * 70)
- print()
- # 步骤1: 检查数据完整性
- print("📌 步骤 1: 检查数据完整性")
- print("-" * 70)
- metrics_file = OUTPUTS_DIR / "metrics_20260415.csv"
- if metrics_file.exists():
- import pandas as pd
- df = pd.read_csv(metrics_file)
- print(f"✅ metrics_20260415.csv 存在: {len(df)} 个广告")
- else:
- print("❌ metrics_20260415.csv 不存在,需要先运行 ROI 计算")
- sys.exit(1)
- print()
- # 步骤2: 分析广告并生成决策
- print("📌 步骤 2: 调用决策引擎")
- print("-" * 70)
- from tools.ad_decision import get_ads_for_review
- # ctx参数实际未使用,传入None即可
- result = await get_ads_for_review(None, metrics_csv=str(metrics_file))
- if result.error:
- print(f"❌ 决策引擎失败: {result.error}")
- sys.exit(1)
- print(result.output)
- print()
- # 解析结果
- import re
- json_match = re.search(r'```json\n(.*?)\n```', result.output, re.DOTALL)
- if json_match:
- review_data = json.loads(json_match.group(1))
- print(f"✅ 分类完成:")
- print(f" A类(极端差): {review_data.get('a_count', 0)} 个")
- print(f" B类(边缘): {review_data.get('b_count', 0)} 个")
- print(f" C类(正常): {review_data.get('c_count', 0)} 个")
- print(f" 📊 全局统计: ROI均值={review_data.get('roi_mean_all', 0):.2f}")
- # 展示B类广告样例
- if 'b_ads' in review_data and len(review_data['b_ads']) > 0:
- print(f"\n📋 B类广告样例(需AI推理):")
- for i, ad in enumerate(review_data['b_ads'][:3], 1):
- print(f"\n {i}. 广告 {ad.get('ad_id')}")
- print(f" ROI: {ad.get('动态ROI_7日均值', 0):.2f} 消耗: {ad.get('cost_7d_avg', 0):.0f}元/天")
- print(f" 建议: {ad.get('bid_candidate', 'N/A')}")
- else:
- print("⚠️ 无法解析决策结果")
- print()
- print("=" * 70)
- print("✅ 分析完成 — 现在可以让Agent基于B类广告进行AI推理决策")
- print("=" * 70)
- if __name__ == "__main__":
- asyncio.run(main())
|