""" 执行 Agent — 运行入口 运行方式: cd /Users/liulidong/project/agent/Agent python examples/auto_put_ad/run_execute.py 用途: 读取运营确认后的调整方案(Excel 文件),执行出价调整和广告关停。 与 run.py 的区别: - run.py → 分析 Agent,只输出方案不执行 - run_execute.py → 执行 Agent,读取确认方案并调用 API """ import asyncio import os import sys from pathlib import Path # 添加项目根目录到 Python 路径 sys.path.insert(0, str(Path(__file__).parent.parent.parent)) from dotenv import load_dotenv load_dotenv() from agent.core.runner import AgentRunner from agent.trace import FileSystemTraceStore, Trace, Message from agent.llm import create_openrouter_llm_call from agent.utils import setup_logging # 导入配置(使用执行 Agent 配置) from examples.auto_put_ad.config import EXECUTE_CONFIG, SKILLS_DIR, TRACE_STORE_PATH, LOG_LEVEL, LOG_FILE # 导入自定义工具(触发 @tool 注册) from examples.auto_put_ad.tools.ad_api import ( account_get_info, ad_batch_update_status, ad_create, ad_get_list, ad_get_report, ad_update, asset_get_list, audience_get_list, creative_create, creative_get_report, creative_update, ) from examples.auto_put_ad.tools.audience_tools import ( audience_build_targeting, audience_recommend_targeting, ) from examples.auto_put_ad.tools.budget_calc import ( account_evaluate, bid_adjustment_execute, budget_calculate_from_data, ) from examples.auto_put_ad.tools.data_query import ( data_aggregate, data_query, get_ad_current_status, ) from examples.auto_put_ad.tools.monitor_tools import ( monitor_check_metrics, monitor_circuit_break, ) # 导入执行 Agent 工具 from examples.auto_put_ad.tools.execute_agent import ( execute_adjustment_plan, ) def _find_latest_excel() -> str: """找到 outputs/ 目录下最新的调整方案 Excel 文件""" outputs_dir = Path(__file__).parent / "outputs" if not outputs_dir.exists(): return "" xlsx_files = sorted(outputs_dir.glob("adjustment_plan_*.xlsx"), reverse=True) return str(xlsx_files[0]) if xlsx_files else "" async def init_project_env(messages=None): """供 api_server 可视化调用:返回 (runner, messages, config)""" base_dir = Path(__file__).parent # 读取 execute prompt prompt_path = base_dir / "prompts" / "execute.prompt" system_prompt = "" if prompt_path.exists(): system_prompt = prompt_path.read_text(encoding="utf-8") store = FileSystemTraceStore(base_path=TRACE_STORE_PATH) runner = AgentRunner( trace_store=store, llm_call=create_openrouter_llm_call(model=EXECUTE_CONFIG.model), skills_dir=SKILLS_DIR if Path(SKILLS_DIR).exists() else None, logger_name="agents.auto_put_ad.execute", ) config = EXECUTE_CONFIG if system_prompt: config.system_prompt = system_prompt if not messages: latest_excel = _find_latest_excel() if latest_excel: messages = [{ "role": "user", "content": f"请执行以下调整方案(dry-run 模式先验证):\n文件路径: {latest_excel}", }] else: messages = [{"role": "user", "content": "没有找到调整方案文件,请先运行分析 Agent 生成方案"}] return runner, messages, config async def main(): """主函数""" base_dir = Path(__file__).parent # 初始化日志 setup_logging(level=LOG_LEVEL, file=LOG_FILE) # 读取 execute prompt prompt_path = base_dir / "prompts" / "execute.prompt" system_prompt = "" if prompt_path.exists(): system_prompt = prompt_path.read_text(encoding="utf-8") # 创建 Runner store = FileSystemTraceStore(base_path=TRACE_STORE_PATH) runner = AgentRunner( trace_store=store, llm_call=create_openrouter_llm_call(model=EXECUTE_CONFIG.model), skills_dir=SKILLS_DIR if Path(SKILLS_DIR).exists() else None, logger_name="agents.auto_put_ad.execute", ) config = EXECUTE_CONFIG if system_prompt: config.system_prompt = system_prompt print("=" * 60) print(" 执行 Agent 已启动") print("=" * 60) # 查找最新方案文件 latest_excel = _find_latest_excel() if latest_excel: print(f"最新方案文件: {latest_excel}") else: print("未找到方案文件,请先运行 run.py 生成方案") print() print("请输入执行指令(输入 'exit' 退出):") print("示例:") print(f" - 执行最新方案(dry-run)") print(f" - 执行方案 outputs/adjustment_plan_xxx.xlsx") print(f" - 确认关停") print() while True: try: user_input = input("\n> ").strip() if not user_input: continue if user_input.lower() in ("exit", "quit", "q"): print("退出系统") break # 自动注入最新 Excel 路径(如果用户没指定) if "最新" in user_input and latest_excel: user_input += f"\n\n最新方案文件路径: {latest_excel}" messages = [{"role": "user", "content": user_input}] config.trace_id = None print(f"\n执行任务: {user_input[:80]}...\n") async for item in runner.run(messages=messages, config=config): if isinstance(item, Trace): print(f"[Trace] 状态: {item.status}") elif isinstance(item, Message): if item.role == "assistant" and item.content: content = item.content if isinstance(content, dict): text = content.get("text", "") else: text = content if text and text.strip(): print(f"\n{text}\n") elif item.role == "tool" and item.content: content = item.content if isinstance(content, str): text = content elif isinstance(content, dict): text = content.get("text", str(content)) else: text = str(content) if len(text) > 500: text = text[:500] + "..." print(f" [Tool] {text}") print("\n" + "=" * 60) print("任务完成") print("=" * 60) except KeyboardInterrupt: print("\n用户中断,退出系统") break except Exception as e: print(f"\n执行失败: {e}") import traceback traceback.print_exc() if __name__ == "__main__": asyncio.run(main())