| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320 |
- """
- 真实环境集成测试(Agent-main)。
- """
- import asyncio
- import json
- import os
- import sys
- from dataclasses import dataclass
- from pathlib import Path
- from tempfile import TemporaryDirectory
- from typing import Any, Dict, List
- # 避免 browser_use 在受限环境写 ~/.config 触发权限错误
- os.environ.setdefault("BROWSER_USE_CONFIG_DIR", "/tmp/browseruse-test")
- PROJECT_ROOT = Path(__file__).resolve().parents[2]
- sys.path.insert(0, str(PROJECT_ROOT))
- @dataclass
- class CheckResult:
- name: str
- ok: bool
- detail: str
- def record(results: List[CheckResult], name: str, ok: bool, detail: str) -> None:
- results.append(CheckResult(name=name, ok=ok, detail=detail))
- mark = "PASS" if ok else "FAIL"
- print(f"[{mark}] {name}: {detail}")
- async def mock_llm_call(messages, model="gpt-4o", tools=None, **kwargs):
- """
- 测试专用 mock LLM:
- - 当有工具可用时:第一轮触发 bash_command,第二轮返回文本结论
- - 当是 subagent 任务时:按 prompt 类型返回固定文本
- """
- state = kwargs.get("_test_state")
- if isinstance(state, dict):
- call_no = state.get("call_no", 0)
- state["call_no"] = call_no + 1
- else:
- call_no = 0
- last_user = ""
- for msg in reversed(messages):
- if msg.get("role") == "user":
- last_user = str(msg.get("content", ""))
- break
- if "# 评估任务" in last_user:
- return {
- "content": "## 评估结论\n通过\n\n## 评估理由\n结果满足要求。",
- "tool_calls": None,
- "prompt_tokens": 10,
- "completion_tokens": 10,
- "finish_reason": "stop",
- "cost": 0.0,
- }
- if "# 探索任务" in last_user:
- return {
- "content": "探索结论:优先采用方案 1。",
- "tool_calls": None,
- "prompt_tokens": 10,
- "completion_tokens": 10,
- "finish_reason": "stop",
- "cost": 0.0,
- }
- if "委托" in last_user or "实现" in last_user or "继续" in last_user or "优化" in last_user:
- return {
- "content": "委托任务执行完成。",
- "tool_calls": None,
- "prompt_tokens": 10,
- "completion_tokens": 10,
- "finish_reason": "stop",
- "cost": 0.0,
- }
- if call_no == 0 and tools:
- return {
- "content": "",
- "tool_calls": [
- {
- "id": "tc_1",
- "type": "function",
- "function": {
- "name": "bash_command",
- "arguments": json.dumps(
- {
- "command": "echo runner_run_ok",
- "description": "integration",
- }
- ),
- },
- }
- ],
- "prompt_tokens": 12,
- "completion_tokens": 8,
- "finish_reason": "tool_calls",
- "cost": 0.0,
- }
- return {
- "content": "run_fallback_ok",
- "tool_calls": None,
- "prompt_tokens": 8,
- "completion_tokens": 6,
- "finish_reason": "stop",
- "cost": 0.0,
- }
- def check_tool_registry(results: List[CheckResult]) -> None:
- from agent.tools import get_tool_registry
- registry = get_tool_registry()
- names = set(registry.get_tool_names())
- core_required = {
- "read_file",
- "edit_file",
- "write_file",
- "glob_files",
- "grep_content",
- "bash_command",
- "skill",
- "list_skills",
- "subagent",
- }
- core_missing = sorted(core_required - names)
- record(
- results,
- "tool_registry_core",
- len(core_missing) == 0,
- "all core tools registered" if not core_missing else f"missing: {core_missing}",
- )
- browser_subset = {
- "browser_search_web",
- "browser_navigate_to_url",
- "browser_screenshot",
- }
- browser_missing = sorted(browser_subset - names)
- record(
- results,
- "tool_registry_browser",
- len(browser_missing) == 0,
- "browser tools visible" if not browser_missing else f"missing: {browser_missing}",
- )
- async def check_file_tools(results: List[CheckResult]) -> None:
- from agent.tools.builtin.file.write import write_file
- from agent.tools.builtin.file.read import read_file
- from agent.tools.builtin.file.edit import edit_file
- from agent.tools.builtin.file.glob import glob_files
- from agent.tools.builtin.file.grep import grep_content
- from agent.tools.builtin.bash import bash_command
- with TemporaryDirectory(prefix="agent-main-int-") as tmp:
- tmp_path = Path(tmp)
- target = tmp_path / "notes.txt"
- wr = await write_file(file_path=str(target), content="hello\npython\nagent\n")
- record(results, "write_file", wr.error is None, wr.error or "write success")
- rd = await read_file(file_path=str(target))
- read_ok = (rd.error is None) and ("python" in rd.output)
- record(results, "read_file", read_ok, rd.error or "content contains python")
- ed = await edit_file(file_path=str(target), old_string="python", new_string="python3")
- record(results, "edit_file", ed.error is None, ed.error or "edit success")
- gp = await grep_content(pattern="python3", path=str(tmp_path))
- grep_ok = gp.error is None and "notes.txt" in gp.output
- record(results, "grep_content", grep_ok, gp.error or "pattern found")
- gb = await glob_files(pattern="**/*.txt", path=str(tmp_path))
- glob_ok = gb.error is None and "notes.txt" in gb.output
- record(results, "glob_files", glob_ok, gb.error or "glob matched")
- bs = await bash_command(
- command="echo integration_ok",
- description="integration test",
- workdir=str(tmp_path),
- )
- bash_ok = bs.error is None and "integration_ok" in bs.output
- record(results, "bash_command", bash_ok, bs.error or "command output ok")
- async def check_runner(results: List[CheckResult]) -> None:
- from agent.core.runner import AgentRunner
- from agent.trace.store import FileSystemTraceStore
- from agent.trace.models import Trace, Message
- with TemporaryDirectory(prefix="agent-main-runner-") as tmp:
- store = FileSystemTraceStore(base_path=tmp)
- # call 模式
- runner_call = AgentRunner(trace_store=store, llm_call=mock_llm_call)
- call_result = await runner_call.call(messages=[{"role": "user", "content": "ping"}], trace=True)
- call_ok = bool(call_result.trace_id) and isinstance(call_result.reply, str)
- record(results, "runner_call", call_ok, f"trace_id={call_result.trace_id}, reply={call_result.reply}")
- # run 模式(含工具调用)
- state = {"call_no": 0}
- async def llm_with_state(messages, model="gpt-4o", tools=None, **kwargs):
- kwargs["_test_state"] = state
- return await mock_llm_call(messages=messages, model=model, tools=tools, **kwargs)
- runner_run = AgentRunner(trace_store=store, llm_call=llm_with_state)
- events: List[Any] = []
- async for item in runner_run.run(
- task="请执行一次bash并给出结果",
- system_prompt="你是测试助手",
- model="gpt-4o-mini",
- ):
- events.append(item)
- final_trace = None
- assistant_texts = []
- for item in events:
- if isinstance(item, Trace):
- final_trace = item
- if isinstance(item, Message) and item.role == "assistant":
- content = item.content
- text = content.get("text", "") if isinstance(content, dict) else str(content)
- if text:
- assistant_texts.append(text)
- run_ok = bool(final_trace) and final_trace.status == "completed" and "run_fallback_ok" in assistant_texts
- record(
- results,
- "runner_run",
- run_ok,
- f"status={getattr(final_trace, 'status', 'n/a')}, assistant_count={len(assistant_texts)}",
- )
- async def check_subagent(results: List[CheckResult]) -> None:
- from agent.core.runner import AgentRunner
- from agent.trace.store import FileSystemTraceStore
- from agent.trace.models import Trace
- from agent.trace.goal_models import GoalTree
- from agent.tools.builtin.subagent import subagent
- with TemporaryDirectory(prefix="agent-main-subagent-") as tmp:
- store = FileSystemTraceStore(base_path=tmp)
- runner = AgentRunner(trace_store=store, llm_call=mock_llm_call)
- main_trace = Trace(
- trace_id="main-trace",
- mode="agent",
- task="主任务",
- agent_type="default",
- status="running",
- )
- await store.create_trace(main_trace)
- goal_tree = GoalTree(mission="主任务")
- goals = goal_tree.add_goals(["验证 subagent 功能"])
- goal_tree.focus(goals[0].id)
- await store.update_goal_tree(main_trace.trace_id, goal_tree)
- ctx = {"store": store, "trace_id": main_trace.trace_id, "goal_id": goals[0].id, "runner": runner}
- r1 = await subagent(mode="delegate", task="实现登录", context=ctx)
- r2 = await subagent(mode="explore", branches=["方案A", "方案B"], background="请比较", context=ctx)
- r3 = await subagent(
- mode="evaluate",
- target_goal_id=goals[0].id,
- evaluation_input={"actual_result": "实现完成"},
- requirements="给出是否通过",
- context=ctx,
- )
- r4 = await subagent(mode="delegate", task="继续优化", continue_from=r1["sub_trace_id"], context=ctx)
- s1 = str(r1.get("status", "")).strip()
- s2 = str(r2.get("status", "")).strip()
- s3 = str(r3.get("status", "")).strip()
- s4 = str(r4.get("status", "")).strip()
- same_trace = str(r4.get("sub_trace_id", "")).strip() == str(r1.get("sub_trace_id", "")).strip()
- ok = (s1 == "completed" and s2 == "completed" and s3 == "completed" and s4 == "completed" and same_trace)
- detail = (
- f"delegate={s1}, explore={s2}, evaluate={s3}, continue={s4}, continue_same={same_trace}"
- )
- record(results, "subagent_unified", ok, detail)
- async def main() -> int:
- results: List[CheckResult] = []
- try:
- check_tool_registry(results)
- await check_file_tools(results)
- await check_runner(results)
- await check_subagent(results)
- except Exception as exc:
- record(results, "unexpected_exception", False, repr(exc))
- total = len(results)
- passed = sum(1 for r in results if r.ok)
- failed = total - passed
- print("\n=== Integration Summary ===")
- print(f"Total: {total}")
- print(f"Passed: {passed}")
- print(f"Failed: {failed}")
- return 0 if failed == 0 else 1
- if __name__ == "__main__":
- raise SystemExit(asyncio.run(main()))
|