| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138 |
- """
- OpenCode Bun 适配器 - 通过子进程调用 opencode 工具
- 这个适配器真正调用 opencode 的 TypeScript 实现,
- 而不是 Python 重新实现。
- 使用场景:
- - 高级工具(LSP、CodeSearch 等)
- - 需要完整功能(9 种编辑策略)
- - 不在意性能开销(50-100ms per call)
- """
- import json
- import asyncio
- import subprocess
- from pathlib import Path
- from typing import Any, Dict, Optional
- from agent.tools.adapters.base import ToolAdapter
- from agent.tools.models import ToolResult, ToolContext
- class OpenCodeBunAdapter(ToolAdapter):
- """
- 通过 Bun 子进程调用 opencode 工具
- 需要安装 Bun: https://bun.sh/
- """
- def __init__(self):
- # wrapper 和 adapter 在同一目录
- self.wrapper_script = Path(__file__).parent / "opencode-wrapper.ts"
- self.opencode_path = Path(__file__).parent.parent.parent.parent / "vendor/opencode"
- # 检查 Bun 是否可用
- self._check_bun()
- def _check_bun(self):
- """检查 Bun 运行时是否可用"""
- try:
- result = subprocess.run(
- ["bun", "--version"],
- capture_output=True,
- timeout=5
- )
- if result.returncode != 0:
- raise RuntimeError("Bun is not available")
- except FileNotFoundError:
- raise RuntimeError(
- "Bun runtime not found. Install from https://bun.sh/\n"
- "Or use Python-based tools instead."
- )
- async def adapt_execute(
- self,
- tool_name: str, # 'read', 'edit', 'bash' 等
- args: Dict[str, Any],
- context: Optional[ToolContext] = None
- ) -> ToolResult:
- """
- 调用 opencode 工具
- Args:
- tool_name: opencode 工具名称
- args: 工具参数
- context: 上下文
- """
- # 构造命令
- cmd = [
- "bun", "run",
- str(self.wrapper_script),
- tool_name,
- json.dumps(args)
- ]
- # 执行
- try:
- process = await asyncio.create_subprocess_exec(
- *cmd,
- stdout=asyncio.subprocess.PIPE,
- stderr=asyncio.subprocess.PIPE,
- cwd=str(self.opencode_path)
- )
- stdout, stderr = await asyncio.wait_for(
- process.communicate(),
- timeout=30 # 30 秒超时
- )
- if process.returncode != 0:
- error_msg = stderr.decode('utf-8', errors='replace')
- return ToolResult(
- title="OpenCode Error",
- output=f"工具执行失败: {error_msg}",
- error=error_msg
- )
- # 解析结果
- result_data = json.loads(stdout.decode('utf-8'))
- # 转换为 ToolResult
- return ToolResult(
- title=result_data.get("title", ""),
- output=result_data.get("output", ""),
- metadata=result_data.get("metadata", {}),
- long_term_memory=self.extract_memory(result_data)
- )
- except asyncio.TimeoutError:
- return ToolResult(
- title="Timeout",
- output="OpenCode 工具执行超时",
- error="Timeout after 30s"
- )
- except Exception as e:
- return ToolResult(
- title="Execution Error",
- output=f"调用 OpenCode 失败: {str(e)}",
- error=str(e)
- )
- def adapt_schema(self, original_schema: Dict) -> Dict:
- """OpenCode 使用 OpenAI 格式,直接返回"""
- return original_schema
- def extract_memory(self, result: Dict) -> str:
- """从 opencode 结果提取记忆"""
- metadata = result.get("metadata", {})
- if metadata.get("truncated"):
- return f"输出被截断 (file: {result.get('title', '')})"
- if "diagnostics" in metadata:
- count = len(metadata["diagnostics"])
- if count > 0:
- return f"检测到 {count} 个诊断问题"
- return ""
|