| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142 |
- """
- 端到端验证脚本 — 用于无运营场景的 pipeline 测试
- 与 execute_once.py 的区别:
- - 用户消息里明确告诉 Agent"运营当前不在,发完审批请求不要阻塞等待"
- - send_approval_request 用 wait_for_reply=False
- - 跳过 execute_decisions(没审批结果不执行)
- - 生成报告收尾
- """
- import asyncio
- import os
- import sys
- from pathlib import Path
- # 代理设置
- os.environ.setdefault("HTTP_PROXY", "http://127.0.0.1:29758")
- os.environ.setdefault("HTTPS_PROXY", "http://127.0.0.1:29758")
- # 添加项目根目录到 Python 路径
- sys.path.insert(0, str(Path(__file__).parent.parent.parent))
- from dotenv import load_dotenv
- load_dotenv()
- from agent.core.runner import AgentRunner
- from agent.trace import FileSystemTraceStore, Trace, Message
- from agent.llm import create_openrouter_llm_call
- from agent.utils import setup_logging
- from examples.auto_put_ad_mini.config import (
- MAIN_CONFIG, SKILLS_DIR, TRACE_STORE_PATH, LOG_LEVEL, LOG_FILE,
- )
- # 导入自定义工具
- from examples.auto_put_ad_mini.tools.data_query import fetch_creative_data, merge_creative_data
- from examples.auto_put_ad_mini.tools.roi_calculator import calculate_roi_metrics
- from examples.auto_put_ad_mini.tools.portfolio_metrics import calculate_portfolio_summary
- from examples.auto_put_ad_mini.tools.ad_decision import get_ads_for_review, apply_decisions, query_ad_detail, modify_decisions
- from examples.auto_put_ad_mini.tools.report_generator import generate_report
- from examples.auto_put_ad_mini.tools.guardrails import validate_decisions
- from examples.auto_put_ad_mini.tools.execution_engine import execute_decisions, check_execution_feedback
- from examples.auto_put_ad_mini.tools.im_approval import send_approval_request, check_approval_status, send_feishu_text_message
- try:
- from examples.auto_put_ad_mini.tools.feishu_doc import import_to_feishu
- except ImportError:
- pass
- async def main():
- base_dir = Path(__file__).parent
- setup_logging(level=LOG_LEVEL, file=LOG_FILE)
- prompt_path = base_dir / "prompts" / "system.prompt"
- system_prompt = prompt_path.read_text(encoding="utf-8") if prompt_path.exists() else ""
- presets_path = base_dir / "presets.json"
- if presets_path.exists():
- from agent.core.presets import load_presets_from_json
- load_presets_from_json(str(presets_path))
- store = FileSystemTraceStore(base_path=TRACE_STORE_PATH)
- runner = AgentRunner(
- trace_store=store,
- llm_call=create_openrouter_llm_call(model=MAIN_CONFIG.model),
- skills_dir=SKILLS_DIR if Path(SKILLS_DIR).exists() else None,
- logger_name="agents.auto_put_ad_mini_test",
- )
- config = MAIN_CONFIG
- if system_prompt:
- config.system_prompt = system_prompt
- print("=" * 70)
- print(" [端到端验证] 广告智能调控助手 — 无运营模式")
- print("=" * 70)
- print()
- # 关键指令(测试用):告诉 Agent 跳过阻塞等待
- user_message = (
- "分析广告,执行完整的ROI计算和决策流程。请使用 2026-04-19(end_date=20260419)"
- "作为数据截止日期,因为 2026-04-20 的数据尚未回流。\n\n"
- "⚠️ 本次为端到端验证运行,运营当前不在:\n"
- "1. send_approval_request 必须用 wait_for_reply=False(不要阻塞等待飞书回复)\n"
- "2. 发送完审批请求后,直接调用 generate_report 生成报告收尾\n"
- "3. 不要调用 execute_decisions,因为没有实际审批通过\n"
- "4. 决策数量较多时,请严格按第四部分的 agent(task=[...]) 并发模式按 tier 拆分"
- )
- messages = [{"role": "user", "content": user_message}]
- config.trace_id = None
- step_count = 0
- last_approval_path = None
- try:
- async for item in runner.run(messages=messages, config=config):
- if isinstance(item, Trace):
- if item.status == "completed":
- print(f"\n✅ [Trace] 完成 id={item.trace_id}")
- elif item.status == "failed":
- print(f"\n❌ [Trace] 失败 id={item.trace_id}")
- elif isinstance(item, Message):
- if item.role == "assistant" and item.content:
- content = item.content
- text = content.get("text", "") if isinstance(content, dict) else content
- if text and text.strip():
- truncated = text[:300] + ("..." if len(text) > 300 else "")
- print(f"\n💭 {truncated}\n")
- elif item.role == "tool" and item.content:
- content = item.content
- if isinstance(content, dict):
- tool_name = content.get("tool_name", "unknown")
- result = content.get("result", content.get("text", str(content)))
- step_count += 1
- marker = f"📌 步骤 {step_count}: {tool_name}"
- print(f"\n{'='*70}\n{marker}\n{'='*70}")
- text = result if isinstance(result, str) else str(result)
- print(text[:400] + ("..." if len(text) > 400 else ""))
- # 记录最后一个审批消息文件路径
- if tool_name == "send_approval_request" and isinstance(result, str) and "req_" in result:
- import re
- m = re.search(r"req_\d+_\d+_[a-f0-9]+", result)
- if m:
- last_approval_path = f"outputs/approvals/{m.group(0)}.txt"
- print("\n" + "=" * 70)
- print("✅ 执行完成")
- print("=" * 70)
- if last_approval_path:
- print(f"📂 审批消息文件: {last_approval_path}")
- except Exception as e:
- print(f"\n❌ 执行失败: {e}")
- import traceback
- traceback.print_exc()
- if __name__ == "__main__":
- asyncio.run(main())
|