Implement file editor and terminal tools, in-built in claude

This should improve viewing, editing files and viewing terminal
command outputs by anthropic operator
This commit is contained in:
Debanjum
2025-05-30 04:21:45 -07:00
parent 21bf7f1d6d
commit e0ea151f20
3 changed files with 293 additions and 9 deletions

View File

@@ -125,6 +125,49 @@ class NoopAction(BaseAction):
type: Literal["noop"] = "noop"
# --- Text Editor Actions ---
class TextEditorViewAction(BaseAction):
"""View contents of a file."""
type: Literal["text_editor_view"] = "text_editor_view"
path: str
view_range: Optional[List[int]] = None # [start_line, end_line]
class TextEditorCreateAction(BaseAction):
"""Create a new file with specified contents."""
type: Literal["text_editor_create"] = "text_editor_create"
path: str
file_text: str
class TextEditorStrReplaceAction(BaseAction):
"""Execute an exact string match replacement on a file."""
type: Literal["text_editor_str_replace"] = "text_editor_str_replace"
path: str
old_str: str
new_str: str
class TextEditorInsertAction(BaseAction):
"""Insert new text after a specified line number."""
type: Literal["text_editor_insert"] = "text_editor_insert"
path: str
insert_line: int
new_str: str
class TerminalAction(BaseAction):
"""Insert new text after a specified line number."""
type: Literal["terminal"] = "terminal"
command: str
restart: bool = False
OperatorAction = Union[
ClickAction,
DoubleClickAction,
@@ -146,4 +189,9 @@ OperatorAction = Union[
BackAction,
RequestUserAction,
NoopAction,
TextEditorViewAction,
TextEditorCreateAction,
TextEditorStrReplaceAction,
TextEditorInsertAction,
TerminalAction,
]

View File

@@ -135,6 +135,40 @@ class AnthropicOperatorAgent(OperatorAgent):
logger.warning("Goto tool called without URL.")
elif tool_name == "back":
action_to_run = BackAction()
elif tool_name == self.model_default_tool("terminal")["name"]:
command = tool_input.get("command")
restart = tool_input.get("restart", False)
if command:
action_to_run = TerminalAction(command=command, restart=restart)
elif tool_name == "str_replace_based_edit_tool":
# Handle text editor tool calls
command = tool_input.get("command")
if command == "view":
path = tool_input.get("path")
view_range = tool_input.get("view_range")
if path:
action_to_run = TextEditorViewAction(path=path, view_range=view_range)
elif command == "create":
path = tool_input.get("path")
file_text = tool_input.get("file_text", "")
if path:
action_to_run = TextEditorCreateAction(path=path, file_text=file_text)
elif command == "str_replace":
path = tool_input.get("path")
old_str = tool_input.get("old_str")
new_str = tool_input.get("new_str")
if path and old_str is not None and new_str is not None:
action_to_run = TextEditorStrReplaceAction(path=path, old_str=old_str, new_str=new_str)
elif command == "insert":
path = tool_input.get("path")
insert_line = tool_input.get("insert_line")
new_str = tool_input.get("new_str")
if path and insert_line is not None and new_str is not None:
action_to_run = TextEditorInsertAction(
path=path, insert_line=insert_line, new_str=new_str
)
else:
logger.warning(f"Unsupported text editor command: {command}")
else:
logger.warning(f"Unsupported Anthropic computer action type: {tool_name}")
@@ -237,7 +271,11 @@ class AnthropicOperatorAgent(OperatorAgent):
compiled_response.append(block.text)
elif block.type == "tool_use":
block_input = {"action": block.name}
if block.name == "computer":
if block.name in (
self.model_default_tool("computer")["name"],
self.model_default_tool("editor")["name"],
self.model_default_tool("terminal")["name"],
):
block_input = block.input # Computer action details are in input dict
elif block.name == "goto":
block_input["url"] = block.input.get("url", "[Missing URL]")
@@ -294,7 +332,34 @@ class AnthropicOperatorAgent(OperatorAgent):
else:
# Handle other actions
render_texts += [f"{action.capitalize()}"]
elif block.name == self.model_default_tool("editor")["name"]:
# Handle text editor actions
command = block.input.get("command")
if command == "view":
path = block.input.get("path")
view_range = block.input.get("view_range")
if path:
render_texts += [f"View file: {path} (lines {view_range})"]
elif command == "create":
path = block.input.get("path")
file_text = block.input.get("file_text", "")
if path:
render_texts += [f"Create file: {path} with content:\n{file_text}"]
elif command == "str_replace":
path = block.input.get("path")
old_str = block.input.get("old_str")
new_str = block.input.get("new_str")
if path and old_str is not None and new_str is not None:
render_texts += [f"File: {path}\n**Find**\n{old_str}\n**Replace**\n{new_str}'"]
elif command == "insert":
path = block.input.get("path")
insert_line = block.input.get("insert_line")
new_str = block.input.get("new_str")
if path and insert_line is not None and new_str is not None:
render_texts += [f"In file: {path} at line {insert_line} insert\n{new_str}"]
render_texts += [f"Edit file: {block.input['path']}"]
elif block.name == self.model_default_tool("terminal")["name"]:
render_texts += [f"Run command:\n{block.input['command']}"]
# If screenshot is not available when screenshot action was requested
if isinstance(block.input, dict) and block.input.get("action") == "screenshot" and not screenshot:
render_texts += ["Failed to get screenshot"]
@@ -369,8 +434,9 @@ class AnthropicOperatorAgent(OperatorAgent):
original_messages = list(self.messages)
messages_to_summarize = self.messages[: self.compress_length]
# ensure last message isn't a tool call request
if messages_to_summarize[-1].role == "assistant" and any(
isinstance(block, BetaToolUseBlock) for block in messages_to_summarize[-1].content
if messages_to_summarize[-1].role == "assistant" and (
any(isinstance(block, BetaToolUseBlock) for block in messages_to_summarize[-1].content)
or any(block["type"] == "tool_use" for block in messages_to_summarize[-1].content)
):
messages_to_summarize.pop()
@@ -429,14 +495,22 @@ class AnthropicOperatorAgent(OperatorAgent):
return coord
def model_default_tool(self, tool_type: Literal["computer", "editor", "terminal"]) -> str:
def model_default_tool(self, tool_type: Literal["computer", "editor", "terminal"]) -> dict[str, str]:
"""Get the default tool of specified type for the given model."""
if self.vision_model.name.startswith("claude-3-7-sonnet"):
if tool_type == "computer":
return "computer_20250124"
return {"name": "computer", "type": "computer_20250124"}
elif tool_type == "editor":
return {"name": "str_replace_editor", "type": "text_editor_20250124"}
elif tool_type == "terminal":
return {"name": "bash_20250124", "type": "bash"}
elif self.vision_model.name.startswith("claude-sonnet-4") or self.vision_model.name.startswith("claude-opus-4"):
if tool_type == "computer":
return "computer_20250124"
return {"name": "computer", "type": "computer_20250124"}
elif tool_type == "editor":
return {"name": "str_replace_based_edit_tool", "type": "text_editor_20250429"}
elif tool_type == "terminal":
return {"name": "bash", "type": "bash_20250124"}
raise ValueError(f"Unsupported tool type for model '{self.vision_model.name}': {tool_type}")
def model_default_headers(self) -> list[str]:
@@ -498,11 +572,19 @@ class AnthropicOperatorAgent(OperatorAgent):
"""Return the tools available for the Anthropic operator."""
tools = [
{
"type": self.model_default_tool("computer"),
"type": self.model_default_tool("computer")["type"],
"name": "computer",
"display_width_px": current_state.width,
"display_height_px": current_state.height,
}
},
{
"type": self.model_default_tool("editor")["type"],
"name": self.model_default_tool("editor")["name"],
},
{
"type": self.model_default_tool("terminal")["type"],
"name": self.model_default_tool("terminal")["name"],
},
]
if environment == "browser":

View File

@@ -5,6 +5,7 @@ import io
import logging
import platform
import subprocess
from pathlib import Path
from typing import Literal, Optional, Union
from PIL import Image, ImageDraw
@@ -340,6 +341,116 @@ class ComputerEnvironment(Environment):
output = "Back action is not applicable for ComputerEnvironment."
logger.warning(f"Unsupported action: {action.type} for ComputerEnvironment.")
case "terminal":
# Execute terminal command
result = await self._execute_shell_command(action.command)
if result["success"]:
output = f"Command executed successfully:\n{result['output']}"
else:
error = f"Command execution failed: {result['error']}"
logger.debug(f"Action: {action.type} with command '{action.command}'")
case "text_editor_view":
# View file contents
path = action.path
view_range = action.view_range
escaped_path = path.replace("'", "'\"'\"'")
is_dir = await self._execute("os.path.isdir", escaped_path)
if is_dir:
cmd = rf"find {escaped_path} -maxdepth 2 -not -path '*/\.*'"
elif view_range:
# Use head/tail to view specific line range
start_line, end_line = view_range
lines_to_show = end_line - start_line + 1
cmd = f"head -n {end_line} '{escaped_path}' | tail -n {lines_to_show}"
else:
# View entire file
cmd = f"cat '{escaped_path}'"
result = await self._execute_shell_command(cmd)
MAX_OUTPUT_LENGTH = 15000 # Limit output length to avoid excessive data
if len(result["output"]) > MAX_OUTPUT_LENGTH:
result["output"] = f"{result['output'][:MAX_OUTPUT_LENGTH]}..."
if result["success"]:
if is_dir:
output = f"Here's the files and directories up to 2 levels deep in {path}, excluding hidden items:\n{result['output']}"
else:
output = f"File contents of {path}:\n{result['output']}"
else:
error = f"Failed to view file {path}: {result['error']}"
logger.debug(f"Action: {action.type} for file {path}")
case "text_editor_create":
# Create new file with contents
path = action.path
file_text = action.file_text
escaped_path = path.replace("'", "'\"'\"'")
escaped_content = file_text.replace("\t", " ").replace(
"'", "'\"'\"'"
) # Escape single quotes for shell
cmd = f"echo '{escaped_content}' > '{escaped_path}'"
result = await self._execute_shell_command(cmd)
if result["success"]:
output = f"Created file {path} with {len(file_text)} characters"
else:
error = f"Failed to create file {path}: {result['error']}"
logger.debug(f"Action: {action.type} created file {path}")
case "text_editor_str_replace":
# Execute string replacement
path = action.path
old_str = action.old_str
new_str = action.new_str
# Use sed for string replacement, escaping special characters
escaped_path = path.replace("'", "'\"'\"'")
escaped_old = (
old_str.replace("\t", " ")
.replace("\\", "\\\\")
.replace("\n", "\\n")
.replace("/", "\\/")
.replace("'", "'\"'\"'")
)
escaped_new = (
new_str.replace("\t", " ")
.replace("\\", "\\\\")
.replace("\n", "\\n")
.replace("&", "\\&")
.replace("/", "\\/")
.replace("'", "'\"'\"'")
)
cmd = f"sed -i.bak 's/{escaped_old}/{escaped_new}/g' '{escaped_path}'"
result = await self._execute_shell_command(cmd)
if result["success"]:
output = f"Replaced '{old_str[:50]}...' with '{new_str[:50]}...' in {path}"
else:
error = f"Failed to replace text in {path}: {result['error']}"
logger.debug(f"Action: {action.type} in file {path}")
case "text_editor_insert":
# Insert text after specified line
path = action.path
insert_line = action.insert_line
new_str = action.new_str
escaped_path = path.replace("'", "'\"'\"'")
escaped_content = (
new_str.replace("\t", " ")
.replace("\\", "\\\\")
.replace("'", "'\"'\"'")
.replace("\n", "\\\n")
)
cmd = f"sed -i.bak '{insert_line}a\\{escaped_content}' '{escaped_path}'"
result = await self._execute_shell_command(cmd)
if result["success"]:
output = f"Inserted text after line {insert_line} in {path}"
else:
error = f"Failed to insert text in {path}: {result['error']}"
logger.debug(f"Action: {action.type} at line {insert_line} in file {path}")
case _:
error = f"Unrecognized action type: {action.type}"
logger.warning(error)
@@ -365,6 +476,49 @@ class ComputerEnvironment(Environment):
screenshot_base64=after_state.screenshot,
)
async def _execute_shell_command(self, command: str, new: bool = True) -> dict:
"""Execute a shell command and return the result."""
try:
if self.provider == "docker":
# Execute command in Docker container
docker_args = [
"docker",
"exec",
self.docker_container_name,
"bash",
"-c",
command, # The command string is passed as a single argument to bash -c
]
process = await asyncio.to_thread(
subprocess.run,
docker_args,
capture_output=True,
text=True,
check=False,
timeout=120,
)
else:
# Execute command locally
process = await asyncio.to_thread(
subprocess.run,
command,
shell=True,
capture_output=True,
text=True,
check=False,
start_new_session=new,
timeout=120,
)
if process.returncode == 0:
return {"success": True, "output": process.stdout, "error": None}
else:
return {"success": False, "output": process.stdout, "error": process.stderr}
except asyncio.TimeoutError:
return {"success": False, "output": "", "error": f"Command timed out after 120 seconds."}
except Exception as e:
return {"success": False, "output": "", "error": str(e)}
async def close(self) -> None:
logger.debug("Computer environment closed. No specific resources to release for PyAutoGUI.")