execute_once_test.py 5.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142
  1. """
  2. 端到端验证脚本 — 用于无运营场景的 pipeline 测试
  3. 与 execute_once.py 的区别:
  4. - 用户消息里明确告诉 Agent"运营当前不在,发完审批请求不要阻塞等待"
  5. - send_approval_request 用 wait_for_reply=False
  6. - 跳过 execute_decisions(没审批结果不执行)
  7. - 生成报告收尾
  8. """
  9. import asyncio
  10. import os
  11. import sys
  12. from pathlib import Path
  13. # 代理设置
  14. os.environ.setdefault("HTTP_PROXY", "http://127.0.0.1:29758")
  15. os.environ.setdefault("HTTPS_PROXY", "http://127.0.0.1:29758")
  16. # 添加项目根目录到 Python 路径
  17. sys.path.insert(0, str(Path(__file__).parent.parent.parent))
  18. from dotenv import load_dotenv
  19. load_dotenv()
  20. from agent.core.runner import AgentRunner
  21. from agent.trace import FileSystemTraceStore, Trace, Message
  22. from agent.llm import create_openrouter_llm_call
  23. from agent.utils import setup_logging
  24. from examples.auto_put_ad_mini.config import (
  25. MAIN_CONFIG, SKILLS_DIR, TRACE_STORE_PATH, LOG_LEVEL, LOG_FILE,
  26. )
  27. # 导入自定义工具
  28. from examples.auto_put_ad_mini.tools.data_query import fetch_creative_data, merge_creative_data
  29. from examples.auto_put_ad_mini.tools.roi_calculator import calculate_roi_metrics
  30. from examples.auto_put_ad_mini.tools.portfolio_metrics import calculate_portfolio_summary
  31. from examples.auto_put_ad_mini.tools.ad_decision import get_ads_for_review, apply_decisions, query_ad_detail, modify_decisions
  32. from examples.auto_put_ad_mini.tools.report_generator import generate_report
  33. from examples.auto_put_ad_mini.tools.guardrails import validate_decisions
  34. from examples.auto_put_ad_mini.tools.execution_engine import execute_decisions, check_execution_feedback
  35. from examples.auto_put_ad_mini.tools.im_approval import send_approval_request, check_approval_status, send_feishu_text_message
  36. try:
  37. from examples.auto_put_ad_mini.tools.feishu_doc import import_to_feishu
  38. except ImportError:
  39. pass
  40. async def main():
  41. base_dir = Path(__file__).parent
  42. setup_logging(level=LOG_LEVEL, file=LOG_FILE)
  43. prompt_path = base_dir / "prompts" / "system.prompt"
  44. system_prompt = prompt_path.read_text(encoding="utf-8") if prompt_path.exists() else ""
  45. presets_path = base_dir / "presets.json"
  46. if presets_path.exists():
  47. from agent.core.presets import load_presets_from_json
  48. load_presets_from_json(str(presets_path))
  49. store = FileSystemTraceStore(base_path=TRACE_STORE_PATH)
  50. runner = AgentRunner(
  51. trace_store=store,
  52. llm_call=create_openrouter_llm_call(model=MAIN_CONFIG.model),
  53. skills_dir=SKILLS_DIR if Path(SKILLS_DIR).exists() else None,
  54. logger_name="agents.auto_put_ad_mini_test",
  55. )
  56. config = MAIN_CONFIG
  57. if system_prompt:
  58. config.system_prompt = system_prompt
  59. print("=" * 70)
  60. print(" [端到端验证] 广告智能调控助手 — 无运营模式")
  61. print("=" * 70)
  62. print()
  63. # 关键指令(测试用):告诉 Agent 跳过阻塞等待
  64. user_message = (
  65. "分析广告,执行完整的ROI计算和决策流程。请使用 2026-04-19(end_date=20260419)"
  66. "作为数据截止日期,因为 2026-04-20 的数据尚未回流。\n\n"
  67. "⚠️ 本次为端到端验证运行,运营当前不在:\n"
  68. "1. send_approval_request 必须用 wait_for_reply=False(不要阻塞等待飞书回复)\n"
  69. "2. 发送完审批请求后,直接调用 generate_report 生成报告收尾\n"
  70. "3. 不要调用 execute_decisions,因为没有实际审批通过\n"
  71. "4. 决策数量较多时,请严格按第四部分的 agent(task=[...]) 并发模式按 tier 拆分"
  72. )
  73. messages = [{"role": "user", "content": user_message}]
  74. config.trace_id = None
  75. step_count = 0
  76. last_approval_path = None
  77. try:
  78. async for item in runner.run(messages=messages, config=config):
  79. if isinstance(item, Trace):
  80. if item.status == "completed":
  81. print(f"\n✅ [Trace] 完成 id={item.trace_id}")
  82. elif item.status == "failed":
  83. print(f"\n❌ [Trace] 失败 id={item.trace_id}")
  84. elif isinstance(item, Message):
  85. if item.role == "assistant" and item.content:
  86. content = item.content
  87. text = content.get("text", "") if isinstance(content, dict) else content
  88. if text and text.strip():
  89. truncated = text[:300] + ("..." if len(text) > 300 else "")
  90. print(f"\n💭 {truncated}\n")
  91. elif item.role == "tool" and item.content:
  92. content = item.content
  93. if isinstance(content, dict):
  94. tool_name = content.get("tool_name", "unknown")
  95. result = content.get("result", content.get("text", str(content)))
  96. step_count += 1
  97. marker = f"📌 步骤 {step_count}: {tool_name}"
  98. print(f"\n{'='*70}\n{marker}\n{'='*70}")
  99. text = result if isinstance(result, str) else str(result)
  100. print(text[:400] + ("..." if len(text) > 400 else ""))
  101. # 记录最后一个审批消息文件路径
  102. if tool_name == "send_approval_request" and isinstance(result, str) and "req_" in result:
  103. import re
  104. m = re.search(r"req_\d+_\d+_[a-f0-9]+", result)
  105. if m:
  106. last_approval_path = f"outputs/approvals/{m.group(0)}.txt"
  107. print("\n" + "=" * 70)
  108. print("✅ 执行完成")
  109. print("=" * 70)
  110. if last_approval_path:
  111. print(f"📂 审批消息文件: {last_approval_path}")
  112. except Exception as e:
  113. print(f"\n❌ 执行失败: {e}")
  114. import traceback
  115. traceback.print_exc()
  116. if __name__ == "__main__":
  117. asyncio.run(main())