""" 自动化投放 Agent 系统 — 主入口 运行方式: cd /Users/liulidong/project/agent/Agent python examples/auto_put_ad/run.py """ import asyncio import os import sys from pathlib import Path # 代理设置 os.environ.setdefault("HTTP_PROXY", "http://127.0.0.1:29758") os.environ.setdefault("HTTPS_PROXY", "http://127.0.0.1:29758") # 添加项目根目录到 Python 路径 sys.path.insert(0, str(Path(__file__).parent.parent.parent)) from dotenv import load_dotenv load_dotenv() from agent.core.runner import AgentRunner from agent.trace import FileSystemTraceStore, Trace, Message from agent.llm import create_openrouter_llm_call from agent.utils import setup_logging # 导入配置(使用绝对路径导入) from examples.auto_put_ad.config import MAIN_CONFIG, SKILLS_DIR, TRACE_STORE_PATH, LOG_LEVEL, LOG_FILE # 导入自定义工具(触发 @tool 注册) from examples.auto_put_ad.tools.ad_api import ( account_get_info, ad_batch_update_status, ad_create, ad_get_list, ad_get_report, ad_update, asset_get_list, audience_get_list, creative_create, creative_get_report, creative_update, ) from examples.auto_put_ad.tools.audience_tools import ( audience_build_targeting, audience_recommend_targeting, ) from examples.auto_put_ad.tools.budget_calc import ( get_ad_performance, get_account_summary, compute_budget_thresholds, classify_ads, compute_bid_adjustment, bid_adjustment_execute, ) from examples.auto_put_ad.tools.strategy_config import ( load_strategy_config, update_strategy_config, get_config_history, ) from examples.auto_put_ad.tools.data_query import ( data_aggregate, data_query, get_ad_current_status, ) from examples.auto_put_ad.tools.monitor_tools import ( monitor_check_metrics, monitor_circuit_break, ) async def init_project_env(messages=None): """供 api_server 可视化调用:返回 (runner, messages, config)""" base_dir = Path(__file__).parent # 读取 main.prompt main_prompt_path = base_dir / "prompts" / "main.prompt" system_prompt = "" if main_prompt_path.exists(): system_prompt = main_prompt_path.read_text(encoding="utf-8") # 加载 presets(必须在创建 runner 之前) presets_path = base_dir / "presets.json" if presets_path.exists(): from agent.core.presets import load_presets_from_json load_presets_from_json(str(presets_path)) store = FileSystemTraceStore(base_path=TRACE_STORE_PATH) runner = AgentRunner( trace_store=store, llm_call=create_openrouter_llm_call(model=MAIN_CONFIG.model), skills_dir=SKILLS_DIR if Path(SKILLS_DIR).exists() else None, logger_name="agents.auto_put_ad", ) config = MAIN_CONFIG if system_prompt: config.system_prompt = system_prompt # 如果前端没传 messages,给个默认的 if not messages: messages = [{"role": "user", "content": "今天小程序预算10w"}] # 注入 system prompt 到 messages(run_api.py 的 config 合并会丢失 system_prompt, # 通过 messages 传递确保 system prompt 被 agent 接收) if system_prompt: has_system = any(m.get("role") == "system" for m in messages) if not has_system: messages = [{"role": "system", "content": system_prompt}] + messages return runner, messages, config async def main(): """主函数""" base_dir = Path(__file__).parent # 初始化日志 setup_logging(level=LOG_LEVEL, file=LOG_FILE) # 读取 main.prompt main_prompt_path = base_dir / "prompts" / "main.prompt" system_prompt = "" if main_prompt_path.exists(): system_prompt = main_prompt_path.read_text(encoding="utf-8") # 加载 presets(必须在创建 runner 之前) presets_path = base_dir / "presets.json" if presets_path.exists(): from agent.core.presets import load_presets_from_json load_presets_from_json(str(presets_path)) # 创建 Runner(启用多 Agent) store = FileSystemTraceStore(base_path=TRACE_STORE_PATH) runner = AgentRunner( trace_store=store, llm_call=create_openrouter_llm_call(model=MAIN_CONFIG.model), skills_dir=SKILLS_DIR if Path(SKILLS_DIR).exists() else None, logger_name="agents.auto_put_ad", ) # 如果有 system_prompt,注入到 config config = MAIN_CONFIG if system_prompt: config.system_prompt = system_prompt print("=" * 60) print(" 自动化投放 Agent 系统已启动") print("=" * 60) print("请输入投放任务(输入 'exit' 退出):") print("示例:") print(" - 今天小程序预算10w") print(" - 分析账户昨日投放效果,优化预算分配") print(" - 查询账户余额和今日消耗") print() while True: try: user_input = input("\n> ").strip() if not user_input: continue if user_input.lower() in ("exit", "quit", "q"): print("退出系统") break # 构建消息 messages = [{"role": "user", "content": user_input}] # 每次对话用新的 trace config.trace_id = None print(f"\n🚀 执行任务: {user_input}\n") # 执行 Agent async for item in runner.run(messages=messages, config=config): if isinstance(item, Trace): print(f"[Trace] 状态: {item.status}") elif isinstance(item, Message): if item.role == "assistant" and item.content: content = item.content if isinstance(content, dict): text = content.get("text", "") else: text = content if text and text.strip(): print(f"\n💭 {text}\n") elif item.role == "tool" and item.content: content = item.content if isinstance(content, str): text = content elif isinstance(content, dict): text = content.get("text", str(content)) else: text = str(content) # 只打印前 500 字符避免刷屏 if len(text) > 500: text = text[:500] + "..." print(f" [Tool] {text}") print("\n" + "=" * 60) print("✅ 任务完成") print("=" * 60) except KeyboardInterrupt: print("\n用户中断,退出系统") break except Exception as e: print(f"\n❌ 执行失败: {e}") import traceback traceback.print_exc() if __name__ == "__main__": asyncio.run(main())