diff --git a/src/khoj/processor/operator/operator_agent_anthropic.py b/src/khoj/processor/operator/operator_agent_anthropic.py index 7374e6ad..3128f718 100644 --- a/src/khoj/processor/operator/operator_agent_anthropic.py +++ b/src/khoj/processor/operator/operator_agent_anthropic.py @@ -1,3 +1,4 @@ +import ast import json import logging from copy import deepcopy @@ -98,6 +99,9 @@ class AnthropicOperatorAgent(OperatorAgent): for block in response.content: if block.type == "tool_use": + content = None + is_error = False + action_to_run: Optional[OperatorAction] = None tool_input = block.input tool_name = block.input.get("action") if block.name == "computer" else block.name @@ -105,34 +109,34 @@ class AnthropicOperatorAgent(OperatorAgent): try: if tool_name == "mouse_move": - coord = tool_input.get("coordinate") + coord = self.get_coordinates(tool_input) if coord: action_to_run = MoveAction(x=coord[0], y=coord[1]) elif tool_name == "left_click": - coord = tool_input.get("coordinate") + coord = self.get_coordinates(tool_input) if coord: action_to_run = ClickAction( x=coord[0], y=coord[1], button="left", modifier=tool_input.get("text") ) elif tool_name == "right_click": - coord = tool_input.get("coordinate") + coord = self.get_coordinates(tool_input) if coord: action_to_run = ClickAction(x=coord[0], y=coord[1], button="right") elif tool_name == "middle_click": - coord = tool_input.get("coordinate") + coord = self.get_coordinates(tool_input) if coord: action_to_run = ClickAction(x=coord[0], y=coord[1], button="middle") elif tool_name == "double_click": - coord = tool_input.get("coordinate") + coord = self.get_coordinates(tool_input) if coord: action_to_run = DoubleClickAction(x=coord[0], y=coord[1]) elif tool_name == "triple_click": - coord = tool_input.get("coordinate") + coord = self.get_coordinates(tool_input) if coord: action_to_run = TripleClickAction(x=coord[0], y=coord[1]) elif tool_name == "left_click_drag": - start_coord = tool_input.get("start_coordinate") - end_coord = tool_input.get("coordinate") + start_coord = self.get_coordinates(tool_input, key="start_coordinate") + end_coord = self.get_coordinates(tool_input) if start_coord and end_coord: action_to_run = DragAction(path=[Point(x=p[0], y=p[1]) for p in [start_coord, end_coord]]) elif tool_name == "left_mouse_down": @@ -145,8 +149,8 @@ class AnthropicOperatorAgent(OperatorAgent): action_to_run = TypeAction(text=text) elif tool_name == "scroll": direction = tool_input.get("scroll_direction") - amount = tool_input.get("scroll_amount", 5) - coord = tool_input.get("coordinate") + amount = int(tool_input.get("scroll_amount", 5)) + coord = self.get_coordinates(tool_input) x = coord[0] if coord else None y = coord[1] if coord else None if direction: @@ -179,7 +183,11 @@ class AnthropicOperatorAgent(OperatorAgent): logger.warning(f"Unsupported Anthropic computer action type: {tool_name}") except Exception as e: - logger.error(f"Error converting Anthropic action {tool_name} ({tool_input}): {e}") + error_msg = f"Error converting Anthropic action {tool_name} ({tool_input}): {e}" + logger.error(error_msg) + content = error_msg + is_error = True + action_to_run = NoopAction() if action_to_run: actions.append(action_to_run) @@ -187,8 +195,8 @@ class AnthropicOperatorAgent(OperatorAgent): { "type": "tool_result", "tool_use_id": tool_use_id, - "content": None, # Updated after environment step - "is_error": False, # Updated after environment step + "content": content, # Updated after environment step + "is_error": is_error, # Updated after environment step } ) @@ -356,3 +364,20 @@ class AnthropicOperatorAgent(OperatorAgent): } return render_payload + + def get_coordinates(self, tool_input: dict, key: str = "coordinate") -> Optional[list | tuple]: + """Get coordinates from tool input.""" + raw_coord = tool_input.get(key) + if not raw_coord: + return None + try: + coord = ast.literal_eval(raw_coord) if isinstance(raw_coord, str) else raw_coord + except (ValueError, SyntaxError): + logger.warning(f"Could not parse coordinate from value: {raw_coord}") + return None + + if not isinstance(coord, (list, tuple)) or not len(coord) == 2: + logger.warning(f"Parsed coordinate string '{raw_coord}' is not a 2-element list/tuple: {coord}") + return None + + return coord