""" 端到端验证脚本 — 用于无运营场景的 pipeline 测试 与 execute_once.py 的区别: - 用户消息里明确告诉 Agent"运营当前不在,发完审批请求不要阻塞等待" - send_approval_request 用 wait_for_reply=False - 跳过 execute_decisions(没审批结果不执行) - 生成报告收尾 """ import asyncio import os import sys from pathlib import Path # 代理设置 os.environ.setdefault("HTTP_PROXY", "http://127.0.0.1:29758") os.environ.setdefault("HTTPS_PROXY", "http://127.0.0.1:29758") # 添加项目根目录到 Python 路径 sys.path.insert(0, str(Path(__file__).parent.parent.parent)) from dotenv import load_dotenv load_dotenv() from agent.core.runner import AgentRunner from agent.trace import FileSystemTraceStore, Trace, Message from agent.llm import create_openrouter_llm_call from agent.utils import setup_logging from examples.auto_put_ad_mini.config import ( MAIN_CONFIG, SKILLS_DIR, TRACE_STORE_PATH, LOG_LEVEL, LOG_FILE, ) # 导入自定义工具 from examples.auto_put_ad_mini.tools.data_query import fetch_creative_data, merge_creative_data from examples.auto_put_ad_mini.tools.roi_calculator import calculate_roi_metrics from examples.auto_put_ad_mini.tools.portfolio_metrics import calculate_portfolio_summary from examples.auto_put_ad_mini.tools.ad_decision import get_ads_for_review, apply_decisions, query_ad_detail, modify_decisions from examples.auto_put_ad_mini.tools.report_generator import generate_report from examples.auto_put_ad_mini.tools.guardrails import validate_decisions from examples.auto_put_ad_mini.tools.execution_engine import execute_decisions, check_execution_feedback from examples.auto_put_ad_mini.tools.im_approval import send_approval_request, check_approval_status, send_feishu_text_message try: from examples.auto_put_ad_mini.tools.feishu_doc import import_to_feishu except ImportError: pass async def main(): base_dir = Path(__file__).parent setup_logging(level=LOG_LEVEL, file=LOG_FILE) prompt_path = base_dir / "prompts" / "system.prompt" system_prompt = prompt_path.read_text(encoding="utf-8") if prompt_path.exists() else "" presets_path = base_dir / "presets.json" if presets_path.exists(): from agent.core.presets import load_presets_from_json load_presets_from_json(str(presets_path)) store = FileSystemTraceStore(base_path=TRACE_STORE_PATH) runner = AgentRunner( trace_store=store, llm_call=create_openrouter_llm_call(model=MAIN_CONFIG.model), skills_dir=SKILLS_DIR if Path(SKILLS_DIR).exists() else None, logger_name="agents.auto_put_ad_mini_test", ) config = MAIN_CONFIG if system_prompt: config.system_prompt = system_prompt print("=" * 70) print(" [端到端验证] 广告智能调控助手 — 无运营模式") print("=" * 70) print() # 关键指令(测试用):告诉 Agent 跳过阻塞等待 user_message = ( "分析广告,执行完整的ROI计算和决策流程。请使用 2026-04-19(end_date=20260419)" "作为数据截止日期,因为 2026-04-20 的数据尚未回流。\n\n" "⚠️ 本次为端到端验证运行,运营当前不在:\n" "1. send_approval_request 必须用 wait_for_reply=False(不要阻塞等待飞书回复)\n" "2. 发送完审批请求后,直接调用 generate_report 生成报告收尾\n" "3. 不要调用 execute_decisions,因为没有实际审批通过\n" "4. 决策数量较多时,请严格按第四部分的 agent(task=[...]) 并发模式按 tier 拆分" ) messages = [{"role": "user", "content": user_message}] config.trace_id = None step_count = 0 last_approval_path = None try: async for item in runner.run(messages=messages, config=config): if isinstance(item, Trace): if item.status == "completed": print(f"\n✅ [Trace] 完成 id={item.trace_id}") elif item.status == "failed": print(f"\n❌ [Trace] 失败 id={item.trace_id}") elif isinstance(item, Message): if item.role == "assistant" and item.content: content = item.content text = content.get("text", "") if isinstance(content, dict) else content if text and text.strip(): truncated = text[:300] + ("..." if len(text) > 300 else "") print(f"\n💭 {truncated}\n") elif item.role == "tool" and item.content: content = item.content if isinstance(content, dict): tool_name = content.get("tool_name", "unknown") result = content.get("result", content.get("text", str(content))) step_count += 1 marker = f"📌 步骤 {step_count}: {tool_name}" print(f"\n{'='*70}\n{marker}\n{'='*70}") text = result if isinstance(result, str) else str(result) print(text[:400] + ("..." if len(text) > 400 else "")) # 记录最后一个审批消息文件路径 if tool_name == "send_approval_request" and isinstance(result, str) and "req_" in result: import re m = re.search(r"req_\d+_\d+_[a-f0-9]+", result) if m: last_approval_path = f"outputs/approvals/{m.group(0)}.txt" print("\n" + "=" * 70) print("✅ 执行完成") print("=" * 70) if last_approval_path: print(f"📂 审批消息文件: {last_approval_path}") except Exception as e: print(f"\n❌ 执行失败: {e}") import traceback traceback.print_exc() if __name__ == "__main__": asyncio.run(main())