From 21a9556b06c52cb1b5a6d472249312a885ee0e89 Mon Sep 17 00:00:00 2001 From: Debanjum Date: Fri, 9 May 2025 14:47:31 -0600 Subject: [PATCH] Show formatted action, env screenshot after action on each operator step Show natural language, formatted text for each action. Previously we were just showing json dumps of the actions taken. Pass screenshot at each step for openai, anthropic and binary operator agents Use text and image field in json passed to client for rendering both. Show actions, env screenshot after actions applied in train of thought. Showing the post action application screenshot seems more intuitive. Previously we were showing the screenshot used to decide next action. This pre action application screenshot was being shown after next action decided (in train of thought). This was anyway misleading to the actual ordering of event. Rendered response is now a structured payload (dict) passing image and text to be rendered up from operator to clients for rendering of train of thought. --- .../processor/operator/grounding_agent.py | 11 ++- .../processor/operator/operate_browser.py | 16 ++-- .../operator/operator_agent_anthropic.py | 75 +++++++++++++------ .../processor/operator/operator_agent_base.py | 2 +- .../operator/operator_agent_binary.py | 38 +++++++--- .../operator/operator_agent_openai.py | 45 +++++------ .../operator/operator_environment_browser.py | 9 ++- 7 files changed, 128 insertions(+), 68 deletions(-) diff --git a/src/khoj/processor/operator/grounding_agent.py b/src/khoj/processor/operator/grounding_agent.py index 1466dec3..a090c9df 100644 --- a/src/khoj/processor/operator/grounding_agent.py +++ b/src/khoj/processor/operator/grounding_agent.py @@ -204,7 +204,7 @@ class GroundingAgent: # Parse tool calls grounding_message = grounding_response.choices[0].message - action_results = self._parse_action(grounding_message, instruction) + action_results = self._parse_action(grounding_message, instruction, current_state) # Update usage by grounding model self.tracer["usage"] = get_chat_usage_metrics( @@ -262,7 +262,9 @@ back() # Use this to go back to the previous page. ) return [{"role": "user", "content": grounding_messages_content}] - def _parse_action(self, grounding_message: ChatCompletionMessage, instruction: str) -> AgentActResult: + def _parse_action( + self, grounding_message: ChatCompletionMessage, instruction: str, current_state: EnvState + ) -> AgentActResult: """Parse the tool calls from the grounding LLM response and convert them to action objects.""" actions: List[OperatorAction] = [] action_results: List[dict] = [] @@ -340,7 +342,10 @@ back() # Use this to go back to the previous page. return AgentActResult( actions=actions, action_results=action_results, - rendered_response=rendered_response, + rendered_response={ + "text": rendered_response, + "image": f"data:image/webp;base64,{current_state.screenshot}", + }, ) def reset(self): diff --git a/src/khoj/processor/operator/operate_browser.py b/src/khoj/processor/operator/operate_browser.py index 871f1288..c892e2a5 100644 --- a/src/khoj/processor/operator/operate_browser.py +++ b/src/khoj/processor/operator/operate_browser.py @@ -1,4 +1,5 @@ import asyncio +import json import logging from typing import Callable, List, Optional @@ -89,12 +90,6 @@ async def operate_browser( # 2. Agent decides action(s) agent_result = await operator_agent.act(browser_state) - # Render status update - rendered_response = agent_result.rendered_response - if send_status_func and rendered_response: - async for event in send_status_func(f"**Operating Browser**:\n{rendered_response}"): - yield {ChatEvent.STATUS: event} - # 3. Execute actions in the environment env_steps: List[EnvStepResult] = [] for action in agent_result.actions: @@ -110,6 +105,15 @@ async def operate_browser( env_step = await environment.step(action) env_steps.append(env_step) + # Render status update + latest_screenshot = f"data:image/webp;base64,{env_steps[-1].screenshot_base64 if env_steps else browser_state.screenshot}" + render_payload = agent_result.rendered_response + render_payload["image"] = latest_screenshot + render_content = f"**Action**: {json.dumps(render_payload)}" + if send_status_func: + async for event in send_status_func(f"**Operating Browser**:\n{render_content}"): + yield {ChatEvent.STATUS: event} + # Check if termination conditions are met task_completed = not agent_result.actions # No actions requested by agent trigger_iteration_limit = iterations == max_iterations diff --git a/src/khoj/processor/operator/operator_agent_anthropic.py b/src/khoj/processor/operator/operator_agent_anthropic.py index 4e332a6f..3c3b96ba 100644 --- a/src/khoj/processor/operator/operator_agent_anthropic.py +++ b/src/khoj/processor/operator/operator_agent_anthropic.py @@ -292,31 +292,60 @@ class AnthropicOperatorAgent(OperatorAgent): return "\n- ".join(filter(None, compiled_response)) # Filter out empty strings @staticmethod - async def _render_response(response_content: list[BetaContentBlock], screenshot: Optional[str] = None) -> str: + async def _render_response(response_content: list[BetaContentBlock], screenshot: Optional[str] = None) -> dict: """Render Anthropic response, potentially including actual screenshots.""" - rendered_response = [""] + render_texts = [] for block in deepcopy(response_content): # Use deepcopy to avoid modifying original - if block.type == "text": - rendered_response.append(block.text) - elif block.type == "tool_use": - block_input = {"action": block.name} - if block.name == "computer": - block_input = block.input - elif block.name == "goto": - block_input["url"] = block.input.get("url", "[Missing URL]") - - # If it's a screenshot action - if isinstance(block_input, dict) and block_input.get("action") == "screenshot": - # Render the screenshot data if available - if screenshot: - block_input["image"] = f"data:image/webp;base64,{screenshot}" - else: - block_input["image"] = "[Failed to get screenshot]" - - rendered_response.append(f"**Action**: {json.dumps(block_input)}") - elif block.type == "thinking": + if block.type == "thinking": thinking_content = getattr(block, "thinking", None) if thinking_content: - rendered_response.append(f"**Thought**: {thinking_content}") + render_texts += [f"**Thought**: {thinking_content}"] + elif block.type == "text": + render_texts += [block.text] + elif block.type == "tool_use": + if block.name == "goto": + render_texts += [f"Open URL: {block.input.get('url', '[Missing URL]')}"] + elif block.name == "back": + render_texts += ["Go back to the previous page."] + elif block.name == "computer": + block_input = block.input + if not isinstance(block_input, dict): + render_texts += [json.dumps(block_input)] + # Handle computer action details + elif "action" in block_input: + action = block_input["action"] + if action == "type": + text = block_input.get("text") + if text: + render_texts += [f'Type "{text}"'] + elif action == "key": + text: str = block_input.get("text") + if text: + render_texts += [f"Press {text}"] + elif action == "hold_key": + text = block_input.get("text") + duration = block_input.get("duration", 1.0) + if text: + render_texts += [f"Hold {text} for {duration} seconds"] + else: + # Handle other actions + render_texts += [f"{action.capitalize()}"] - return "\n- ".join(filter(None, rendered_response)) + # If screenshot is not available when screenshot action was requested + if isinstance(block.input, dict) and block.input.get("action") == "screenshot" and not screenshot: + render_texts += ["Failed to get screenshot"] + + # Do not show screenshot if no actions requested + if all([block.type != "tool_use" for block in response_content]): + # If all blocks are not tool_use, return None + screenshot = None + + # Create render payload + render_payload = { + # Combine text into a single string and filter out empty strings + "text": "\n- ".join(filter(None, render_texts)), + # Add screenshot data if available + "image": f"data:image/webp;base64,{screenshot}" if screenshot else None, + } + + return render_payload diff --git a/src/khoj/processor/operator/operator_agent_base.py b/src/khoj/processor/operator/operator_agent_base.py index 76ad4f54..0b6e0d6b 100644 --- a/src/khoj/processor/operator/operator_agent_base.py +++ b/src/khoj/processor/operator/operator_agent_base.py @@ -16,7 +16,7 @@ logger = logging.getLogger(__name__) class AgentActResult(BaseModel): actions: List[OperatorAction] = [] action_results: List[dict] = [] # Model-specific format - rendered_response: Optional[str] = None + rendered_response: Optional[dict] = None class AgentMessage(BaseModel): diff --git a/src/khoj/processor/operator/operator_agent_binary.py b/src/khoj/processor/operator/operator_agent_binary.py index 61f3de73..58c1d2ea 100644 --- a/src/khoj/processor/operator/operator_agent_binary.py +++ b/src/khoj/processor/operator/operator_agent_binary.py @@ -70,13 +70,13 @@ class BinaryOperatorAgent(OperatorAgent): return AgentActResult( actions=[], action_results=[], - rendered_response=natural_language_action, + rendered_response={"text": natural_language_action, "image": None}, ) elif reasoner_response["type"] == "done": return AgentActResult( actions=[], action_results=[], - rendered_response=natural_language_action, + rendered_response={"text": natural_language_action, "image": None}, ) # --- Step 2: Grounding LLM converts NL action to structured action --- @@ -185,15 +185,31 @@ Focus on the visual action and provide all necessary context. actions.append(WaitAction(duration=1.0)) rendered_parts += ["Could not process response."] else: - rendered_parts += [f"**Thought (Grounding)**: {grounding_response}"] + grounding_thoughts = grounding_response.rsplit("\nAction: ", 1)[0] + rendered_parts += [f"**Thought (Grounding)**: {grounding_thoughts}"] for action in actions: - rendered_parts += [f"**Action**: {action}"] + if action.type == "type": + rendered_parts += [f'**Action**: Type "{action.text}"'] + elif action.type == "keypress": + rendered_parts += [f'**Action**: Press "{action.keys}"'] + elif action.type == "hold_key": + rendered_parts += [f'**Action**: Hold "{action.text}" for {action.duration} seconds'] + elif action.type == "key_up": + rendered_parts += [f'**Action**: Release Key "{action.key}"'] + elif action.type == "key_down": + rendered_parts += [f'**Action**: Press Key "{action.key}"'] + elif action.type == "screenshot" and not current_state.screenshot: + rendered_parts += [f"**Error**: Failed to take screenshot"] + elif action.type == "goto": + rendered_parts += [f"**Action**: Open URL {action.url}"] + else: + rendered_parts += [f"**Action**: {action.type}"] action_results += [{"content": None}] # content set after environment step except Exception as e: logger.error(f"Error calling Grounding LLM: {e}") rendered_parts += [f"**Error**: Error contacting Grounding LLM: {e}"] - rendered_response = "\n- ".join(rendered_parts) + rendered_response = self._render_response(rendered_parts, current_state.screenshot) return AgentActResult( actions=actions, @@ -291,11 +307,13 @@ Focus on the visual action and provide all necessary context. # Fallback for unexpected types return str(response_content) - def _render_response(self, response: List, screenshot: Optional[str]) -> Optional[str]: - """Render response for display. Currently uses compile_response.""" - # TODO: Could potentially enhance rendering, e.g., showing vision thought + grounding actions distinctly. - # For now, rely on the structure built during the 'act' phase. - return response # The rendered_response is already built in act() + def _render_response(self, response: List, screenshot: str | None) -> dict: + """Render response for display""" + render_payload = { + "text": "\n- ".join(response), + "image": f"data:image/webp;base64,{screenshot}" if screenshot else None, + } + return render_payload def _get_message_text(self, message: AgentMessage) -> str: if isinstance(message.content, list): diff --git a/src/khoj/processor/operator/operator_agent_openai.py b/src/khoj/processor/operator/operator_agent_openai.py index 7eec9e3b..8ff0ae4a 100644 --- a/src/khoj/processor/operator/operator_agent_openai.py +++ b/src/khoj/processor/operator/operator_agent_openai.py @@ -273,35 +273,38 @@ class OpenAIOperatorAgent(OperatorAgent): return "\n- ".join(filter(None, compiled_response)) # Filter out empty strings @staticmethod - async def _render_response(response_content: list[ResponseOutputItem], screenshot: Optional[str] = None) -> str: + async def _render_response(response_content: list[ResponseOutputItem], screenshot: Optional[str] = None) -> dict: """Render OpenAI response for display, potentially including screenshots.""" - rendered_response = [""] + render_texts = [] for block in deepcopy(response_content): # Use deepcopy to avoid modifying original if block.type == "message": text_content = block.text if hasattr(block, "text") else block.model_dump_json() - rendered_response.append(text_content) + render_texts += [text_content] elif block.type == "function_call": block_input = {"action": block.name} if block.name == "goto": - try: - args = json.loads(block.arguments) - block_input["url"] = args.get("url", "[Missing URL]") - except json.JSONDecodeError: - block_input["arguments"] = block.arguments - rendered_response.append(f"**Action**: {json.dumps(block_input)}") + args = json.loads(block.arguments) + render_texts = [f'Open URL: {args.get("url", "[Missing URL]")}'] + else: + render_texts += [block.name] elif block.type == "computer_call": block_input = block.action - # If it's a screenshot action - if block_input.type == "screenshot": - # Render screenshot if available - block_input_render = block_input.model_dump() - if screenshot: - block_input_render["image"] = f"data:image/webp;base64,{screenshot}" - else: - block_input_render["image"] = "[Failed to get screenshot]" - rendered_response.append(f"**Action**: {json.dumps(block_input_render)}") + if block_input.type == "screenshot" and not screenshot: + render_texts += ["Failed to get screenshot"] + elif block_input.type == "type": + render_texts += [f'Type "{block_input.text}"'] + elif block_input.type == "keypress": + render_texts += [f"Press {'+'.join(block_input.keys)}"] else: - rendered_response.append(f"**Action**: {block_input.model_dump_json()}") + render_texts += [f"{block_input.type.capitalize()}"] elif block.type == "reasoning" and block.summary: - rendered_response.append(f"**Thought**: {block.summary}") - return "\n- ".join(filter(None, rendered_response)) + render_texts += [f"**Thought**: {block.summary}"] + + render_payload = { + # Combine text into a single string and filter out empty strings + "text": "\n- ".join(filter(None, render_texts)), + # Add screenshot data if available + "image": f"data:image/webp;base64,{screenshot}" if screenshot else None, + } + + return render_payload diff --git a/src/khoj/processor/operator/operator_environment_browser.py b/src/khoj/processor/operator/operator_environment_browser.py index 64db38ab..0be2ce14 100644 --- a/src/khoj/processor/operator/operator_environment_browser.py +++ b/src/khoj/processor/operator/operator_environment_browser.py @@ -133,7 +133,7 @@ class BrowserEnvironment(Environment): if not self.page or self.page.is_closed(): return EnvStepResult(error="Browser page is not available or closed.") - state = await self.get_state() + before_state = await self.get_state() output, error, step_type = None, None, "text" try: match action.type: @@ -232,7 +232,7 @@ class BrowserEnvironment(Environment): case "screenshot": step_type = "image" - output = {"image": state.screenshot, "url": state.url} + output = {"image": before_state.screenshot, "url": before_state.url} logger.debug(f"Action: {action.type}") case "move": @@ -324,12 +324,13 @@ class BrowserEnvironment(Environment): error = f"Error executing action {action.type}: {e}" logger.exception(f"Error during step execution for action: {action.model_dump_json()}") + after_state = await self.get_state() return EnvStepResult( type=step_type, output=output, error=error, - current_url=state.url, - screenshot_base64=state.screenshot, + current_url=after_state.url, + screenshot_base64=after_state.screenshot, ) def reset(self) -> None: