bash.py 5.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180
  1. """
  2. Bash Tool - 命令执行工具
  3. 参考 OpenCode bash.ts 完整实现。
  4. 核心功能:
  5. - 执行 shell 命令
  6. - 超时控制
  7. - 工作目录设置
  8. - 环境变量传递
  9. """
  10. import os
  11. import signal
  12. import asyncio
  13. from pathlib import Path
  14. from typing import Optional, Dict
  15. from agent.tools import tool, ToolResult, ToolContext
  16. # 常量
  17. DEFAULT_TIMEOUT = 120 # 2 分钟
  18. MAX_OUTPUT_LENGTH = 50000 # 最大输出长度
  19. GRACEFUL_KILL_WAIT = 3 # SIGTERM 后等几秒再 SIGKILL
  20. def _kill_process_tree(pid: int) -> None:
  21. """先 SIGTERM 整个进程组,等 GRACEFUL_KILL_WAIT 秒后 SIGKILL 兜底。"""
  22. import time
  23. try:
  24. pgid = os.getpgid(pid)
  25. except ProcessLookupError:
  26. return
  27. # 先优雅终止
  28. try:
  29. os.killpg(pgid, signal.SIGTERM)
  30. except ProcessLookupError:
  31. return
  32. # 等一小段时间让进程自行退出
  33. time.sleep(GRACEFUL_KILL_WAIT)
  34. # 强制杀
  35. try:
  36. os.killpg(pgid, signal.SIGKILL)
  37. except ProcessLookupError:
  38. pass # 已退出
  39. @tool(description="执行 bash 命令")
  40. async def bash_command(
  41. command: str,
  42. timeout: Optional[int] = None,
  43. workdir: Optional[str] = None,
  44. env: Optional[Dict[str, str]] = None,
  45. description: str = "",
  46. context: Optional[ToolContext] = None
  47. ) -> ToolResult:
  48. """
  49. 执行 bash 命令
  50. Args:
  51. command: 要执行的命令
  52. timeout: 超时时间(秒),默认 120 秒
  53. workdir: 工作目录,默认为当前目录
  54. env: 环境变量字典(会合并到系统环境变量)
  55. description: 命令描述(5-10 个词)
  56. context: 工具上下文
  57. Returns:
  58. ToolResult: 命令输出
  59. """
  60. # 参数验证
  61. if timeout is not None and timeout < 0:
  62. return ToolResult(
  63. title="参数错误",
  64. output=f"无效的 timeout 值: {timeout}。必须是正数。",
  65. error="Invalid timeout"
  66. )
  67. timeout_sec = timeout or DEFAULT_TIMEOUT
  68. # 工作目录
  69. cwd = Path(workdir) if workdir else Path.cwd()
  70. if not cwd.exists():
  71. return ToolResult(
  72. title="目录不存在",
  73. output=f"工作目录不存在: {workdir}",
  74. error="Directory not found"
  75. )
  76. # 准备环境变量
  77. process_env = os.environ.copy()
  78. if env:
  79. process_env.update(env)
  80. # 执行命令
  81. try:
  82. process = await asyncio.create_subprocess_shell(
  83. command,
  84. stdout=asyncio.subprocess.PIPE,
  85. stderr=asyncio.subprocess.PIPE,
  86. cwd=str(cwd),
  87. env=process_env,
  88. start_new_session=True, # 新进程组,超时时可杀整棵进程树
  89. )
  90. # 等待命令完成(带超时)
  91. try:
  92. stdout, stderr = await asyncio.wait_for(
  93. process.communicate(),
  94. timeout=timeout_sec
  95. )
  96. except asyncio.TimeoutError:
  97. # 超时,杀整个进程组(shell + 所有子进程)
  98. _kill_process_tree(process.pid)
  99. try:
  100. await asyncio.wait_for(process.wait(), timeout=GRACEFUL_KILL_WAIT + 2)
  101. except asyncio.TimeoutError:
  102. pass
  103. return ToolResult(
  104. title="命令超时",
  105. output=f"命令执行超时(>{timeout_sec}s): {command[:100]}",
  106. error="Timeout",
  107. metadata={"command": command, "timeout": timeout_sec}
  108. )
  109. # 解码输出
  110. stdout_text = stdout.decode('utf-8', errors='replace') if stdout else ""
  111. stderr_text = stderr.decode('utf-8', errors='replace') if stderr else ""
  112. # 截断过长输出
  113. truncated = False
  114. if len(stdout_text) > MAX_OUTPUT_LENGTH:
  115. stdout_text = stdout_text[:MAX_OUTPUT_LENGTH] + f"\n\n(输出被截断,总长度: {len(stdout_text)} 字符)"
  116. truncated = True
  117. # 组合输出
  118. output = ""
  119. if stdout_text:
  120. output += stdout_text
  121. if stderr_text:
  122. if output:
  123. output += "\n\n--- stderr ---\n"
  124. output += stderr_text
  125. if not output:
  126. output = "(命令无输出)"
  127. # 检查退出码
  128. exit_code = process.returncode
  129. success = exit_code == 0
  130. title = description or f"命令: {command[:50]}"
  131. if not success:
  132. title += f" (exit code: {exit_code})"
  133. return ToolResult(
  134. title=title,
  135. output=output,
  136. metadata={
  137. "exit_code": exit_code,
  138. "success": success,
  139. "truncated": truncated,
  140. "command": command,
  141. "cwd": str(cwd)
  142. },
  143. error=None if success else f"Command failed with exit code {exit_code}"
  144. )
  145. except Exception as e:
  146. return ToolResult(
  147. title="执行错误",
  148. output=f"命令执行失败: {str(e)}",
  149. error=str(e),
  150. metadata={"command": command}
  151. )