| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204 |
- """
- 执行 Agent — 运行入口
- 运行方式:
- cd /Users/liulidong/project/agent/Agent
- python examples/auto_put_ad/run_execute.py
- 用途:
- 读取运营确认后的调整方案(Excel 文件),执行出价调整和广告关停。
- 与 run.py 的区别:
- - run.py → 分析 Agent,只输出方案不执行
- - run_execute.py → 执行 Agent,读取确认方案并调用 API
- """
- import asyncio
- import os
- import sys
- from pathlib import Path
- # 添加项目根目录到 Python 路径
- sys.path.insert(0, str(Path(__file__).parent.parent.parent))
- from dotenv import load_dotenv
- load_dotenv()
- from agent.core.runner import AgentRunner
- from agent.trace import FileSystemTraceStore, Trace, Message
- from agent.llm import create_openrouter_llm_call
- from agent.utils import setup_logging
- # 导入配置(使用执行 Agent 配置)
- from examples.auto_put_ad.config import EXECUTE_CONFIG, SKILLS_DIR, TRACE_STORE_PATH, LOG_LEVEL, LOG_FILE
- # 导入自定义工具(触发 @tool 注册)
- from examples.auto_put_ad.tools.ad_api import (
- account_get_info, ad_batch_update_status, ad_create, ad_get_list,
- ad_get_report, ad_update, asset_get_list, audience_get_list,
- creative_create, creative_get_report, creative_update,
- )
- from examples.auto_put_ad.tools.audience_tools import (
- audience_build_targeting, audience_recommend_targeting,
- )
- from examples.auto_put_ad.tools.budget_calc import (
- account_evaluate, bid_adjustment_execute, budget_calculate_from_data,
- )
- from examples.auto_put_ad.tools.data_query import (
- data_aggregate, data_query, get_ad_current_status,
- )
- from examples.auto_put_ad.tools.monitor_tools import (
- monitor_check_metrics, monitor_circuit_break,
- )
- # 导入执行 Agent 工具
- from examples.auto_put_ad.tools.execute_agent import (
- execute_adjustment_plan,
- )
- def _find_latest_excel() -> str:
- """找到 outputs/ 目录下最新的调整方案 Excel 文件"""
- outputs_dir = Path(__file__).parent / "outputs"
- if not outputs_dir.exists():
- return ""
- xlsx_files = sorted(outputs_dir.glob("adjustment_plan_*.xlsx"), reverse=True)
- return str(xlsx_files[0]) if xlsx_files else ""
- async def init_project_env(messages=None):
- """供 api_server 可视化调用:返回 (runner, messages, config)"""
- base_dir = Path(__file__).parent
- # 读取 execute prompt
- prompt_path = base_dir / "prompts" / "execute.prompt"
- system_prompt = ""
- if prompt_path.exists():
- system_prompt = prompt_path.read_text(encoding="utf-8")
- store = FileSystemTraceStore(base_path=TRACE_STORE_PATH)
- runner = AgentRunner(
- trace_store=store,
- llm_call=create_openrouter_llm_call(model=EXECUTE_CONFIG.model),
- skills_dir=SKILLS_DIR if Path(SKILLS_DIR).exists() else None,
- logger_name="agents.auto_put_ad.execute",
- )
- config = EXECUTE_CONFIG
- if system_prompt:
- config.system_prompt = system_prompt
- if not messages:
- latest_excel = _find_latest_excel()
- if latest_excel:
- messages = [{
- "role": "user",
- "content": f"请执行以下调整方案(dry-run 模式先验证):\n文件路径: {latest_excel}",
- }]
- else:
- messages = [{"role": "user", "content": "没有找到调整方案文件,请先运行分析 Agent 生成方案"}]
- return runner, messages, config
- async def main():
- """主函数"""
- base_dir = Path(__file__).parent
- # 初始化日志
- setup_logging(level=LOG_LEVEL, file=LOG_FILE)
- # 读取 execute prompt
- prompt_path = base_dir / "prompts" / "execute.prompt"
- system_prompt = ""
- if prompt_path.exists():
- system_prompt = prompt_path.read_text(encoding="utf-8")
- # 创建 Runner
- store = FileSystemTraceStore(base_path=TRACE_STORE_PATH)
- runner = AgentRunner(
- trace_store=store,
- llm_call=create_openrouter_llm_call(model=EXECUTE_CONFIG.model),
- skills_dir=SKILLS_DIR if Path(SKILLS_DIR).exists() else None,
- logger_name="agents.auto_put_ad.execute",
- )
- config = EXECUTE_CONFIG
- if system_prompt:
- config.system_prompt = system_prompt
- print("=" * 60)
- print(" 执行 Agent 已启动")
- print("=" * 60)
- # 查找最新方案文件
- latest_excel = _find_latest_excel()
- if latest_excel:
- print(f"最新方案文件: {latest_excel}")
- else:
- print("未找到方案文件,请先运行 run.py 生成方案")
- print()
- print("请输入执行指令(输入 'exit' 退出):")
- print("示例:")
- print(f" - 执行最新方案(dry-run)")
- print(f" - 执行方案 outputs/adjustment_plan_xxx.xlsx")
- print(f" - 确认关停")
- print()
- while True:
- try:
- user_input = input("\n> ").strip()
- if not user_input:
- continue
- if user_input.lower() in ("exit", "quit", "q"):
- print("退出系统")
- break
- # 自动注入最新 Excel 路径(如果用户没指定)
- if "最新" in user_input and latest_excel:
- user_input += f"\n\n最新方案文件路径: {latest_excel}"
- messages = [{"role": "user", "content": user_input}]
- config.trace_id = None
- print(f"\n执行任务: {user_input[:80]}...\n")
- async for item in runner.run(messages=messages, config=config):
- if isinstance(item, Trace):
- print(f"[Trace] 状态: {item.status}")
- elif isinstance(item, Message):
- if item.role == "assistant" and item.content:
- content = item.content
- if isinstance(content, dict):
- text = content.get("text", "")
- else:
- text = content
- if text and text.strip():
- print(f"\n{text}\n")
- elif item.role == "tool" and item.content:
- content = item.content
- if isinstance(content, str):
- text = content
- elif isinstance(content, dict):
- text = content.get("text", str(content))
- else:
- text = str(content)
- if len(text) > 500:
- text = text[:500] + "..."
- print(f" [Tool] {text}")
- print("\n" + "=" * 60)
- print("任务完成")
- print("=" * 60)
- except KeyboardInterrupt:
- print("\n用户中断,退出系统")
- break
- except Exception as e:
- print(f"\n执行失败: {e}")
- import traceback
- traceback.print_exc()
- if __name__ == "__main__":
- asyncio.run(main())
|