diff --git a/src/khoj/processor/operator/operator_actions.py b/src/khoj/processor/operator/operator_actions.py index 05a65d01..b5e8e022 100644 --- a/src/khoj/processor/operator/operator_actions.py +++ b/src/khoj/processor/operator/operator_actions.py @@ -125,6 +125,49 @@ class NoopAction(BaseAction): type: Literal["noop"] = "noop" +# --- Text Editor Actions --- +class TextEditorViewAction(BaseAction): + """View contents of a file.""" + + type: Literal["text_editor_view"] = "text_editor_view" + path: str + view_range: Optional[List[int]] = None # [start_line, end_line] + + +class TextEditorCreateAction(BaseAction): + """Create a new file with specified contents.""" + + type: Literal["text_editor_create"] = "text_editor_create" + path: str + file_text: str + + +class TextEditorStrReplaceAction(BaseAction): + """Execute an exact string match replacement on a file.""" + + type: Literal["text_editor_str_replace"] = "text_editor_str_replace" + path: str + old_str: str + new_str: str + + +class TextEditorInsertAction(BaseAction): + """Insert new text after a specified line number.""" + + type: Literal["text_editor_insert"] = "text_editor_insert" + path: str + insert_line: int + new_str: str + + +class TerminalAction(BaseAction): + """Insert new text after a specified line number.""" + + type: Literal["terminal"] = "terminal" + command: str + restart: bool = False + + OperatorAction = Union[ ClickAction, DoubleClickAction, @@ -146,4 +189,9 @@ OperatorAction = Union[ BackAction, RequestUserAction, NoopAction, + TextEditorViewAction, + TextEditorCreateAction, + TextEditorStrReplaceAction, + TextEditorInsertAction, + TerminalAction, ] diff --git a/src/khoj/processor/operator/operator_agent_anthropic.py b/src/khoj/processor/operator/operator_agent_anthropic.py index 2b98d50d..ee069330 100644 --- a/src/khoj/processor/operator/operator_agent_anthropic.py +++ b/src/khoj/processor/operator/operator_agent_anthropic.py @@ -135,6 +135,40 @@ class AnthropicOperatorAgent(OperatorAgent): logger.warning("Goto tool called without URL.") elif tool_name == "back": action_to_run = BackAction() + elif tool_name == self.model_default_tool("terminal")["name"]: + command = tool_input.get("command") + restart = tool_input.get("restart", False) + if command: + action_to_run = TerminalAction(command=command, restart=restart) + elif tool_name == "str_replace_based_edit_tool": + # Handle text editor tool calls + command = tool_input.get("command") + if command == "view": + path = tool_input.get("path") + view_range = tool_input.get("view_range") + if path: + action_to_run = TextEditorViewAction(path=path, view_range=view_range) + elif command == "create": + path = tool_input.get("path") + file_text = tool_input.get("file_text", "") + if path: + action_to_run = TextEditorCreateAction(path=path, file_text=file_text) + elif command == "str_replace": + path = tool_input.get("path") + old_str = tool_input.get("old_str") + new_str = tool_input.get("new_str") + if path and old_str is not None and new_str is not None: + action_to_run = TextEditorStrReplaceAction(path=path, old_str=old_str, new_str=new_str) + elif command == "insert": + path = tool_input.get("path") + insert_line = tool_input.get("insert_line") + new_str = tool_input.get("new_str") + if path and insert_line is not None and new_str is not None: + action_to_run = TextEditorInsertAction( + path=path, insert_line=insert_line, new_str=new_str + ) + else: + logger.warning(f"Unsupported text editor command: {command}") else: logger.warning(f"Unsupported Anthropic computer action type: {tool_name}") @@ -237,7 +271,11 @@ class AnthropicOperatorAgent(OperatorAgent): compiled_response.append(block.text) elif block.type == "tool_use": block_input = {"action": block.name} - if block.name == "computer": + if block.name in ( + self.model_default_tool("computer")["name"], + self.model_default_tool("editor")["name"], + self.model_default_tool("terminal")["name"], + ): block_input = block.input # Computer action details are in input dict elif block.name == "goto": block_input["url"] = block.input.get("url", "[Missing URL]") @@ -294,7 +332,34 @@ class AnthropicOperatorAgent(OperatorAgent): else: # Handle other actions render_texts += [f"{action.capitalize()}"] - + elif block.name == self.model_default_tool("editor")["name"]: + # Handle text editor actions + command = block.input.get("command") + if command == "view": + path = block.input.get("path") + view_range = block.input.get("view_range") + if path: + render_texts += [f"View file: {path} (lines {view_range})"] + elif command == "create": + path = block.input.get("path") + file_text = block.input.get("file_text", "") + if path: + render_texts += [f"Create file: {path} with content:\n{file_text}"] + elif command == "str_replace": + path = block.input.get("path") + old_str = block.input.get("old_str") + new_str = block.input.get("new_str") + if path and old_str is not None and new_str is not None: + render_texts += [f"File: {path}\n**Find**\n{old_str}\n**Replace**\n{new_str}'"] + elif command == "insert": + path = block.input.get("path") + insert_line = block.input.get("insert_line") + new_str = block.input.get("new_str") + if path and insert_line is not None and new_str is not None: + render_texts += [f"In file: {path} at line {insert_line} insert\n{new_str}"] + render_texts += [f"Edit file: {block.input['path']}"] + elif block.name == self.model_default_tool("terminal")["name"]: + render_texts += [f"Run command:\n{block.input['command']}"] # If screenshot is not available when screenshot action was requested if isinstance(block.input, dict) and block.input.get("action") == "screenshot" and not screenshot: render_texts += ["Failed to get screenshot"] @@ -369,8 +434,9 @@ class AnthropicOperatorAgent(OperatorAgent): original_messages = list(self.messages) messages_to_summarize = self.messages[: self.compress_length] # ensure last message isn't a tool call request - if messages_to_summarize[-1].role == "assistant" and any( - isinstance(block, BetaToolUseBlock) for block in messages_to_summarize[-1].content + if messages_to_summarize[-1].role == "assistant" and ( + any(isinstance(block, BetaToolUseBlock) for block in messages_to_summarize[-1].content) + or any(block["type"] == "tool_use" for block in messages_to_summarize[-1].content) ): messages_to_summarize.pop() @@ -429,14 +495,22 @@ class AnthropicOperatorAgent(OperatorAgent): return coord - def model_default_tool(self, tool_type: Literal["computer", "editor", "terminal"]) -> str: + def model_default_tool(self, tool_type: Literal["computer", "editor", "terminal"]) -> dict[str, str]: """Get the default tool of specified type for the given model.""" if self.vision_model.name.startswith("claude-3-7-sonnet"): if tool_type == "computer": - return "computer_20250124" + return {"name": "computer", "type": "computer_20250124"} + elif tool_type == "editor": + return {"name": "str_replace_editor", "type": "text_editor_20250124"} + elif tool_type == "terminal": + return {"name": "bash_20250124", "type": "bash"} elif self.vision_model.name.startswith("claude-sonnet-4") or self.vision_model.name.startswith("claude-opus-4"): if tool_type == "computer": - return "computer_20250124" + return {"name": "computer", "type": "computer_20250124"} + elif tool_type == "editor": + return {"name": "str_replace_based_edit_tool", "type": "text_editor_20250429"} + elif tool_type == "terminal": + return {"name": "bash", "type": "bash_20250124"} raise ValueError(f"Unsupported tool type for model '{self.vision_model.name}': {tool_type}") def model_default_headers(self) -> list[str]: @@ -498,11 +572,19 @@ class AnthropicOperatorAgent(OperatorAgent): """Return the tools available for the Anthropic operator.""" tools = [ { - "type": self.model_default_tool("computer"), + "type": self.model_default_tool("computer")["type"], "name": "computer", "display_width_px": current_state.width, "display_height_px": current_state.height, - } + }, + { + "type": self.model_default_tool("editor")["type"], + "name": self.model_default_tool("editor")["name"], + }, + { + "type": self.model_default_tool("terminal")["type"], + "name": self.model_default_tool("terminal")["name"], + }, ] if environment == "browser": diff --git a/src/khoj/processor/operator/operator_environment_computer.py b/src/khoj/processor/operator/operator_environment_computer.py index f833e085..59062017 100644 --- a/src/khoj/processor/operator/operator_environment_computer.py +++ b/src/khoj/processor/operator/operator_environment_computer.py @@ -5,6 +5,7 @@ import io import logging import platform import subprocess +from pathlib import Path from typing import Literal, Optional, Union from PIL import Image, ImageDraw @@ -340,6 +341,116 @@ class ComputerEnvironment(Environment): output = "Back action is not applicable for ComputerEnvironment." logger.warning(f"Unsupported action: {action.type} for ComputerEnvironment.") + case "terminal": + # Execute terminal command + result = await self._execute_shell_command(action.command) + if result["success"]: + output = f"Command executed successfully:\n{result['output']}" + else: + error = f"Command execution failed: {result['error']}" + logger.debug(f"Action: {action.type} with command '{action.command}'") + + case "text_editor_view": + # View file contents + path = action.path + view_range = action.view_range + escaped_path = path.replace("'", "'\"'\"'") + is_dir = await self._execute("os.path.isdir", escaped_path) + if is_dir: + cmd = rf"find {escaped_path} -maxdepth 2 -not -path '*/\.*'" + elif view_range: + # Use head/tail to view specific line range + start_line, end_line = view_range + lines_to_show = end_line - start_line + 1 + cmd = f"head -n {end_line} '{escaped_path}' | tail -n {lines_to_show}" + else: + # View entire file + cmd = f"cat '{escaped_path}'" + + result = await self._execute_shell_command(cmd) + MAX_OUTPUT_LENGTH = 15000 # Limit output length to avoid excessive data + if len(result["output"]) > MAX_OUTPUT_LENGTH: + result["output"] = f"{result['output'][:MAX_OUTPUT_LENGTH]}..." + if result["success"]: + if is_dir: + output = f"Here's the files and directories up to 2 levels deep in {path}, excluding hidden items:\n{result['output']}" + else: + output = f"File contents of {path}:\n{result['output']}" + else: + error = f"Failed to view file {path}: {result['error']}" + logger.debug(f"Action: {action.type} for file {path}") + + case "text_editor_create": + # Create new file with contents + path = action.path + file_text = action.file_text + escaped_path = path.replace("'", "'\"'\"'") + escaped_content = file_text.replace("\t", " ").replace( + "'", "'\"'\"'" + ) # Escape single quotes for shell + cmd = f"echo '{escaped_content}' > '{escaped_path}'" + + result = await self._execute_shell_command(cmd) + if result["success"]: + output = f"Created file {path} with {len(file_text)} characters" + else: + error = f"Failed to create file {path}: {result['error']}" + logger.debug(f"Action: {action.type} created file {path}") + + case "text_editor_str_replace": + # Execute string replacement + path = action.path + old_str = action.old_str + new_str = action.new_str + + # Use sed for string replacement, escaping special characters + escaped_path = path.replace("'", "'\"'\"'") + escaped_old = ( + old_str.replace("\t", " ") + .replace("\\", "\\\\") + .replace("\n", "\\n") + .replace("/", "\\/") + .replace("'", "'\"'\"'") + ) + escaped_new = ( + new_str.replace("\t", " ") + .replace("\\", "\\\\") + .replace("\n", "\\n") + .replace("&", "\\&") + .replace("/", "\\/") + .replace("'", "'\"'\"'") + ) + cmd = f"sed -i.bak 's/{escaped_old}/{escaped_new}/g' '{escaped_path}'" + + result = await self._execute_shell_command(cmd) + if result["success"]: + output = f"Replaced '{old_str[:50]}...' with '{new_str[:50]}...' in {path}" + else: + error = f"Failed to replace text in {path}: {result['error']}" + logger.debug(f"Action: {action.type} in file {path}") + + case "text_editor_insert": + # Insert text after specified line + path = action.path + insert_line = action.insert_line + new_str = action.new_str + + escaped_path = path.replace("'", "'\"'\"'") + escaped_content = ( + new_str.replace("\t", " ") + .replace("\\", "\\\\") + .replace("'", "'\"'\"'") + .replace("\n", "\\\n") + ) + cmd = f"sed -i.bak '{insert_line}a\\{escaped_content}' '{escaped_path}'" + + result = await self._execute_shell_command(cmd) + if result["success"]: + output = f"Inserted text after line {insert_line} in {path}" + else: + error = f"Failed to insert text in {path}: {result['error']}" + logger.debug(f"Action: {action.type} at line {insert_line} in file {path}") + case _: error = f"Unrecognized action type: {action.type}" logger.warning(error) @@ -365,6 +476,49 @@ class ComputerEnvironment(Environment): screenshot_base64=after_state.screenshot, ) + async def _execute_shell_command(self, command: str, new: bool = True) -> dict: + """Execute a shell command and return the result.""" + try: + if self.provider == "docker": + # Execute command in Docker container + docker_args = [ + "docker", + "exec", + self.docker_container_name, + "bash", + "-c", + command, # The command string is passed as a single argument to bash -c + ] + process = await asyncio.to_thread( + subprocess.run, + docker_args, + capture_output=True, + text=True, + check=False, + timeout=120, + ) + else: + # Execute command locally + process = await asyncio.to_thread( + subprocess.run, + command, + shell=True, + capture_output=True, + text=True, + check=False, + start_new_session=new, + timeout=120, + ) + + if process.returncode == 0: + return {"success": True, "output": process.stdout, "error": None} + else: + return {"success": False, "output": process.stdout, "error": process.stderr} + except asyncio.TimeoutError: + return {"success": False, "output": "", "error": f"Command timed out after 120 seconds."} + except Exception as e: + return {"success": False, "output": "", "error": str(e)} + async def close(self) -> None: logger.debug("Computer environment closed. No specific resources to release for PyAutoGUI.")