# 安装基础依赖
pip install -r requirements.txt
# 安装 browser 模块依赖(如果需要浏览器工具)
pip install dbutils pymysql
# 配置环境变量
cp .env.example .env
# 编辑 .env 填入你的 API Key
创建 my_agent/run.py:
import asyncio
import sys
from pathlib import Path
# 添加项目根目录到 Python 路径
sys.path.insert(0, str(Path(__file__).parent.parent))
from agent import AgentRunner, RunConfig, FileSystemTraceStore
from agent.llm import create_openrouter_llm_call
async def main():
# 1. 初始化存储
trace_store = FileSystemTraceStore(base_path=".cache/traces")
# 2. 初始化 LLM
llm_call = create_openrouter_llm_call(
model="anthropic/claude-sonnet-4.5"
)
# 3. 创建 Runner
runner = AgentRunner(
llm_call=llm_call,
trace_store=trace_store,
)
# 4. 运行 Agent
async for item in runner.run(
messages=[{"role": "user", "content": "你的任务描述"}],
config=RunConfig(
model="anthropic/claude-sonnet-4.5",
max_iterations=30,
),
):
print(item)
if __name__ == "__main__":
asyncio.run(main())
python my_agent/run.py
from agent import AgentRunner, RunConfig
runner = AgentRunner(
llm_call=llm_call, # LLM 调用函数
trace_store=trace_store, # Trace 存储
memory_store=memory_store, # 记忆存储(可选)
skills_dir="./skills", # Skills 目录(可选)
)
框架支持三种 LLM 提供商:
# OpenRouter(推荐,支持多种模型)
from agent.llm import create_openrouter_llm_call
llm_call = create_openrouter_llm_call(
model="anthropic/claude-sonnet-4.5",
api_key="your-api-key", # 或从环境变量读取
)
# Gemini
from agent.llm import create_gemini_llm_call
llm_call = create_gemini_llm_call(
model="gemini-2.0-flash-exp",
api_key="your-api-key",
)
# Yescode(内部)
from agent.llm import create_yescode_llm_call
llm_call = create_yescode_llm_call(
model="gpt-4o",
api_key="your-api-key",
)
使用 @tool 装饰器注册工具:
from agent import tool, ToolResult, ToolContext
@tool(description="查询产品库存")
async def check_inventory(
product_id: str,
warehouse: str = "default",
ctx: ToolContext = None,
) -> ToolResult:
"""
查询指定仓库的产品库存
Args:
product_id: 产品唯一标识符
warehouse: 仓库编码,默认为主仓库
ctx: 工具上下文(自动注入)
"""
# 你的业务逻辑
stock = await query_database(product_id, warehouse)
return ToolResult(
title="库存查询成功",
output=f"产品 {product_id} 在 {warehouse} 仓库的库存: {stock}",
data={"product_id": product_id, "stock": stock},
)
重要:确保定义工具的模块在 runner.run() 之前被 import。
创建 skills/my-skill.md:
---
name: my-skill
description: 我的自定义技能
category: custom
---
# 我的技能
## 何时使用
- 场景1:当需要...
- 场景2:当遇到...
## 使用指南
1. 首先...
2. 然后...
3. 最后...
## 最佳实践
- 建议1
- 建议2
加载 Skills:
runner = AgentRunner(
llm_call=llm_call,
trace_store=trace_store,
skills_dir="./skills", # 指定 skills 目录
)
# 或在 RunConfig 中指定
config = RunConfig(
skills=["my-skill", "planning"], # 只加载指定的 skills
)
使用内置预设:
from agent import RunConfig
# 默认 Agent(全部工具权限)
config = RunConfig(agent_type="default")
# 探索型 Agent(只读权限)
config = RunConfig(agent_type="explore")
# 评估型 Agent(只读+评估)
config = RunConfig(agent_type="evaluate")
自定义预设:
from agent import AgentPreset
from agent.core.presets import register_preset
# 定义预设
my_preset = AgentPreset(
allowed_tools=["read_file", "grep_content", "my_custom_tool"],
denied_tools=["bash_command"],
max_iterations=20,
skills=["my-skill"],
description="我的自定义 Agent",
)
# 注册预设
register_preset("my-agent", my_preset)
# 使用预设
config = RunConfig(agent_type="my-agent")
参考 examples/content_finder/:
# 1. 定义工具
@tool(description="从抖音搜索视频")
async def douyin_search(keywords: str, ctx: ToolContext = None) -> ToolResult:
results = await call_douyin_api(keywords)
return ToolResult(output=f"找到 {len(results)} 条内容", data=results)
# 2. 定义 Skill
# skills/content-finder.md
# 3. 运行 Agent
runner = AgentRunner(
llm_call=llm_call,
trace_store=trace_store,
skills_dir="./skills",
)
async for item in runner.run(
messages=[{"role": "user", "content": "搜索美食类视频"}],
config=RunConfig(skills=["content-finder"]),
):
if isinstance(item, Message):
print(item.content)
@tool(description="查询数据库")
async def query_db(sql: str, ctx: ToolContext = None) -> ToolResult:
results = await execute_sql(sql)
return ToolResult(
title="查询成功",
output=f"返回 {len(results)} 条记录",
data=results,
)
@tool(description="生成图表")
async def create_chart(data: list, chart_type: str, ctx: ToolContext = None) -> ToolResult:
chart_url = await generate_chart(data, chart_type)
return ToolResult(
title="图表已生成",
output=f"图表类型: {chart_type}",
data={"url": chart_url},
)
# 运行
async for item in runner.run(
messages=[{"role": "user", "content": "分析最近一周的销售数据并生成图表"}],
config=RunConfig(
skills=["data-analysis"],
max_iterations=50,
),
):
pass
@tool(description="运行测试用例")
async def run_tests(test_path: str, ctx: ToolContext = None) -> ToolResult:
result = await execute_tests(test_path)
return ToolResult(
title="测试完成",
output=f"通过: {result.passed}, 失败: {result.failed}",
data=result.to_dict(),
)
@tool(description="生成测试报告")
async def generate_report(test_results: dict, ctx: ToolContext = None) -> ToolResult:
report_path = await create_html_report(test_results)
return ToolResult(
title="报告已生成",
output=f"报告路径: {report_path}",
)
创建子 Agent 处理子任务:
from agent.tools.builtin.subagent import agent
# Agent 会自动调用 agent 工具创建子 Agent
# 在 system prompt 中说明何时使用子 Agent
使用记忆存储跨会话数据:
from agent.memory.stores import MemoryMemoryStore
memory_store = MemoryMemoryStore()
runner = AgentRunner(
llm_call=llm_call,
trace_store=trace_store,
memory_store=memory_store,
)
Agent 自动使用 goal 工具管理计划:
# Agent 会自动创建和更新 Goal
# 查看 Goal Tree
from agent.trace import FileSystemTraceStore
trace_store = FileSystemTraceStore(base_path=".cache/traces")
trace = await trace_store.get_trace(trace_id)
goal_tree = trace.goal_tree
从指定消息后继续执行:
# 续跑
async for item in runner.run(
messages=[],
config=RunConfig(
trace_id="existing-trace-id",
after_sequence=10, # 从第10条消息后继续
),
):
pass
# 回溯重跑
async for item in runner.run(
messages=[],
config=RunConfig(
trace_id="existing-trace-id",
after_sequence=5, # 回到第5条消息重新执行
),
):
pass
启动 API Server 查看执行过程:
python api_server.py
访问:
http://localhost:8000/api/traces - 查看所有 Traceshttp://localhost:8000/api/traces/{id} - 查看 Trace 详情ws://localhost:8000/api/traces/{id}/watch - 实时监控@tool(description="发送邮件")
async def send_email(
to: str,
subject: str,
body: str,
ctx: ToolContext = None,
) -> ToolResult:
"""
发送邮件
Args:
to: 收件人邮箱地址
subject: 邮件主题
body: 邮件正文
"""
try:
await email_service.send(to, subject, body)
return ToolResult(
title="邮件发送成功",
output=f"已发送到 {to}",
)
except Exception as e:
return ToolResult(
title="邮件发送失败",
output=f"错误: {str(e)}",
error=str(e),
)
# 1. 限制迭代次数
config = RunConfig(max_iterations=30)
# 2. 使用合适的模型
llm_call = create_openrouter_llm_call(
model="anthropic/claude-haiku-4.5", # 更快更便宜
)
# 3. 启用 Prompt Caching(Claude 模型)
config = RunConfig(enable_prompt_caching=True)
# 4. 限制工具权限
config = RunConfig(
agent_type="explore", # 只读权限,更快
)
try:
async for item in runner.run(messages=messages, config=config):
if isinstance(item, Message):
# 处理消息
pass
except Exception as e:
logger.error(f"Agent 执行失败: {e}")
# 错误处理逻辑
import logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
)
logger = logging.getLogger(__name__)
# 在关键位置添加日志
logger.info("开始执行 Agent")
logger.debug(f"配置: {config}")
# 1. 打印所有消息
async for item in runner.run(messages=messages, config=config):
print(f"[{type(item).__name__}] {item}")
# 2. 查看 Trace
trace = await trace_store.get_trace(trace_id)
for msg in trace.messages:
print(f"{msg.role}: {msg.content}")
# 3. 启动 API Server 可视化
python api_server.py
检查:
@tool 装饰器)allowed_tools)# 1. 限制迭代次数
config = RunConfig(max_iterations=20)
# 2. 在 Skill 中明确终止条件
# 3. 检查工具返回是否有明确的成功/失败标识
# 使用异步处理
import asyncio
async def long_running_task():
async for item in runner.run(messages=messages, config=config):
# 定期保存状态
if isinstance(item, Trace):
await save_checkpoint(item)
# 支持中断和恢复
config = RunConfig(
trace_id=last_trace_id,
after_sequence=last_sequence,
)
agent/README.mdagent/docs/architecture.mdagent/docs/tools.mdagent/docs/skills.mdexamples/
examples/content_finder/ - 内容寻找 Agentexamples/how/ - 完整示例examples/research/ - 研究型 Agentexamples/content_finder/demo.py 开始