opencode_bun_adapter.py 4.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138
  1. """
  2. OpenCode Bun 适配器 - 通过子进程调用 opencode 工具
  3. 这个适配器真正调用 opencode 的 TypeScript 实现,
  4. 而不是 Python 重新实现。
  5. 使用场景:
  6. - 高级工具(LSP、CodeSearch 等)
  7. - 需要完整功能(9 种编辑策略)
  8. - 不在意性能开销(50-100ms per call)
  9. """
  10. import json
  11. import asyncio
  12. import subprocess
  13. from pathlib import Path
  14. from typing import Any, Dict, Optional
  15. from agent.tools.adapters.base import ToolAdapter
  16. from agent.tools.models import ToolResult, ToolContext
  17. class OpenCodeBunAdapter(ToolAdapter):
  18. """
  19. 通过 Bun 子进程调用 opencode 工具
  20. 需要安装 Bun: https://bun.sh/
  21. """
  22. def __init__(self):
  23. # wrapper 和 adapter 在同一目录
  24. self.wrapper_script = Path(__file__).parent / "opencode-wrapper.ts"
  25. self.opencode_path = Path(__file__).parent.parent.parent.parent / "vendor/opencode"
  26. # 检查 Bun 是否可用
  27. self._check_bun()
  28. def _check_bun(self):
  29. """检查 Bun 运行时是否可用"""
  30. try:
  31. result = subprocess.run(
  32. ["bun", "--version"],
  33. capture_output=True,
  34. timeout=5
  35. )
  36. if result.returncode != 0:
  37. raise RuntimeError("Bun is not available")
  38. except FileNotFoundError:
  39. raise RuntimeError(
  40. "Bun runtime not found. Install from https://bun.sh/\n"
  41. "Or use Python-based tools instead."
  42. )
  43. async def adapt_execute(
  44. self,
  45. tool_name: str, # 'read', 'edit', 'bash' 等
  46. args: Dict[str, Any],
  47. context: Optional[ToolContext] = None
  48. ) -> ToolResult:
  49. """
  50. 调用 opencode 工具
  51. Args:
  52. tool_name: opencode 工具名称
  53. args: 工具参数
  54. context: 上下文
  55. """
  56. # 构造命令
  57. cmd = [
  58. "bun", "run",
  59. str(self.wrapper_script),
  60. tool_name,
  61. json.dumps(args)
  62. ]
  63. # 执行
  64. try:
  65. process = await asyncio.create_subprocess_exec(
  66. *cmd,
  67. stdout=asyncio.subprocess.PIPE,
  68. stderr=asyncio.subprocess.PIPE,
  69. cwd=str(self.opencode_path)
  70. )
  71. stdout, stderr = await asyncio.wait_for(
  72. process.communicate(),
  73. timeout=30 # 30 秒超时
  74. )
  75. if process.returncode != 0:
  76. error_msg = stderr.decode('utf-8', errors='replace')
  77. return ToolResult(
  78. title="OpenCode Error",
  79. output=f"工具执行失败: {error_msg}",
  80. error=error_msg
  81. )
  82. # 解析结果
  83. result_data = json.loads(stdout.decode('utf-8'))
  84. # 转换为 ToolResult
  85. return ToolResult(
  86. title=result_data.get("title", ""),
  87. output=result_data.get("output", ""),
  88. metadata=result_data.get("metadata", {}),
  89. long_term_memory=self.extract_memory(result_data)
  90. )
  91. except asyncio.TimeoutError:
  92. return ToolResult(
  93. title="Timeout",
  94. output="OpenCode 工具执行超时",
  95. error="Timeout after 30s"
  96. )
  97. except Exception as e:
  98. return ToolResult(
  99. title="Execution Error",
  100. output=f"调用 OpenCode 失败: {str(e)}",
  101. error=str(e)
  102. )
  103. def adapt_schema(self, original_schema: Dict) -> Dict:
  104. """OpenCode 使用 OpenAI 格式,直接返回"""
  105. return original_schema
  106. def extract_memory(self, result: Dict) -> str:
  107. """从 opencode 结果提取记忆"""
  108. metadata = result.get("metadata", {})
  109. if metadata.get("truncated"):
  110. return f"输出被截断 (file: {result.get('title', '')})"
  111. if "diagnostics" in metadata:
  112. count = len(metadata["diagnostics"])
  113. if count > 0:
  114. return f"检测到 {count} 个诊断问题"
  115. return ""