diff --git a/src/khoj/processor/operator/grounding_agent.py b/src/khoj/processor/operator/grounding_agent.py index 1466dec3..a090c9df 100644 --- a/src/khoj/processor/operator/grounding_agent.py +++ b/src/khoj/processor/operator/grounding_agent.py @@ -204,7 +204,7 @@ class GroundingAgent: # Parse tool calls grounding_message = grounding_response.choices[0].message - action_results = self._parse_action(grounding_message, instruction) + action_results = self._parse_action(grounding_message, instruction, current_state) # Update usage by grounding model self.tracer["usage"] = get_chat_usage_metrics( @@ -262,7 +262,9 @@ back() # Use this to go back to the previous page. ) return [{"role": "user", "content": grounding_messages_content}] - def _parse_action(self, grounding_message: ChatCompletionMessage, instruction: str) -> AgentActResult: + def _parse_action( + self, grounding_message: ChatCompletionMessage, instruction: str, current_state: EnvState + ) -> AgentActResult: """Parse the tool calls from the grounding LLM response and convert them to action objects.""" actions: List[OperatorAction] = [] action_results: List[dict] = [] @@ -340,7 +342,10 @@ back() # Use this to go back to the previous page. return AgentActResult( actions=actions, action_results=action_results, - rendered_response=rendered_response, + rendered_response={ + "text": rendered_response, + "image": f"data:image/webp;base64,{current_state.screenshot}", + }, ) def reset(self): diff --git a/src/khoj/processor/operator/operate_browser.py b/src/khoj/processor/operator/operate_browser.py index 871f1288..c892e2a5 100644 --- a/src/khoj/processor/operator/operate_browser.py +++ b/src/khoj/processor/operator/operate_browser.py @@ -1,4 +1,5 @@ import asyncio +import json import logging from typing import Callable, List, Optional @@ -89,12 +90,6 @@ async def operate_browser( # 2. Agent decides action(s) agent_result = await operator_agent.act(browser_state) - # Render status update - rendered_response = agent_result.rendered_response - if send_status_func and rendered_response: - async for event in send_status_func(f"**Operating Browser**:\n{rendered_response}"): - yield {ChatEvent.STATUS: event} - # 3. Execute actions in the environment env_steps: List[EnvStepResult] = [] for action in agent_result.actions: @@ -110,6 +105,15 @@ async def operate_browser( env_step = await environment.step(action) env_steps.append(env_step) + # Render status update + latest_screenshot = f"data:image/webp;base64,{env_steps[-1].screenshot_base64 if env_steps else browser_state.screenshot}" + render_payload = agent_result.rendered_response + render_payload["image"] = latest_screenshot + render_content = f"**Action**: {json.dumps(render_payload)}" + if send_status_func: + async for event in send_status_func(f"**Operating Browser**:\n{render_content}"): + yield {ChatEvent.STATUS: event} + # Check if termination conditions are met task_completed = not agent_result.actions # No actions requested by agent trigger_iteration_limit = iterations == max_iterations diff --git a/src/khoj/processor/operator/operator_agent_anthropic.py b/src/khoj/processor/operator/operator_agent_anthropic.py index 4e332a6f..3c3b96ba 100644 --- a/src/khoj/processor/operator/operator_agent_anthropic.py +++ b/src/khoj/processor/operator/operator_agent_anthropic.py @@ -292,31 +292,60 @@ class AnthropicOperatorAgent(OperatorAgent): return "\n- ".join(filter(None, compiled_response)) # Filter out empty strings @staticmethod - async def _render_response(response_content: list[BetaContentBlock], screenshot: Optional[str] = None) -> str: + async def _render_response(response_content: list[BetaContentBlock], screenshot: Optional[str] = None) -> dict: """Render Anthropic response, potentially including actual screenshots.""" - rendered_response = [""] + render_texts = [] for block in deepcopy(response_content): # Use deepcopy to avoid modifying original - if block.type == "text": - rendered_response.append(block.text) - elif block.type == "tool_use": - block_input = {"action": block.name} - if block.name == "computer": - block_input = block.input - elif block.name == "goto": - block_input["url"] = block.input.get("url", "[Missing URL]") - - # If it's a screenshot action - if isinstance(block_input, dict) and block_input.get("action") == "screenshot": - # Render the screenshot data if available - if screenshot: - block_input["image"] = f"data:image/webp;base64,{screenshot}" - else: - block_input["image"] = "[Failed to get screenshot]" - - rendered_response.append(f"**Action**: {json.dumps(block_input)}") - elif block.type == "thinking": + if block.type == "thinking": thinking_content = getattr(block, "thinking", None) if thinking_content: - rendered_response.append(f"**Thought**: {thinking_content}") + render_texts += [f"**Thought**: {thinking_content}"] + elif block.type == "text": + render_texts += [block.text] + elif block.type == "tool_use": + if block.name == "goto": + render_texts += [f"Open URL: {block.input.get('url', '[Missing URL]')}"] + elif block.name == "back": + render_texts += ["Go back to the previous page."] + elif block.name == "computer": + block_input = block.input + if not isinstance(block_input, dict): + render_texts += [json.dumps(block_input)] + # Handle computer action details + elif "action" in block_input: + action = block_input["action"] + if action == "type": + text = block_input.get("text") + if text: + render_texts += [f'Type "{text}"'] + elif action == "key": + text: str = block_input.get("text") + if text: + render_texts += [f"Press {text}"] + elif action == "hold_key": + text = block_input.get("text") + duration = block_input.get("duration", 1.0) + if text: + render_texts += [f"Hold {text} for {duration} seconds"] + else: + # Handle other actions + render_texts += [f"{action.capitalize()}"] - return "\n- ".join(filter(None, rendered_response)) + # If screenshot is not available when screenshot action was requested + if isinstance(block.input, dict) and block.input.get("action") == "screenshot" and not screenshot: + render_texts += ["Failed to get screenshot"] + + # Do not show screenshot if no actions requested + if all([block.type != "tool_use" for block in response_content]): + # If all blocks are not tool_use, return None + screenshot = None + + # Create render payload + render_payload = { + # Combine text into a single string and filter out empty strings + "text": "\n- ".join(filter(None, render_texts)), + # Add screenshot data if available + "image": f"data:image/webp;base64,{screenshot}" if screenshot else None, + } + + return render_payload diff --git a/src/khoj/processor/operator/operator_agent_base.py b/src/khoj/processor/operator/operator_agent_base.py index 76ad4f54..0b6e0d6b 100644 --- a/src/khoj/processor/operator/operator_agent_base.py +++ b/src/khoj/processor/operator/operator_agent_base.py @@ -16,7 +16,7 @@ logger = logging.getLogger(__name__) class AgentActResult(BaseModel): actions: List[OperatorAction] = [] action_results: List[dict] = [] # Model-specific format - rendered_response: Optional[str] = None + rendered_response: Optional[dict] = None class AgentMessage(BaseModel): diff --git a/src/khoj/processor/operator/operator_agent_binary.py b/src/khoj/processor/operator/operator_agent_binary.py index 61f3de73..58c1d2ea 100644 --- a/src/khoj/processor/operator/operator_agent_binary.py +++ b/src/khoj/processor/operator/operator_agent_binary.py @@ -70,13 +70,13 @@ class BinaryOperatorAgent(OperatorAgent): return AgentActResult( actions=[], action_results=[], - rendered_response=natural_language_action, + rendered_response={"text": natural_language_action, "image": None}, ) elif reasoner_response["type"] == "done": return AgentActResult( actions=[], action_results=[], - rendered_response=natural_language_action, + rendered_response={"text": natural_language_action, "image": None}, ) # --- Step 2: Grounding LLM converts NL action to structured action --- @@ -185,15 +185,31 @@ Focus on the visual action and provide all necessary context. actions.append(WaitAction(duration=1.0)) rendered_parts += ["Could not process response."] else: - rendered_parts += [f"**Thought (Grounding)**: {grounding_response}"] + grounding_thoughts = grounding_response.rsplit("\nAction: ", 1)[0] + rendered_parts += [f"**Thought (Grounding)**: {grounding_thoughts}"] for action in actions: - rendered_parts += [f"**Action**: {action}"] + if action.type == "type": + rendered_parts += [f'**Action**: Type "{action.text}"'] + elif action.type == "keypress": + rendered_parts += [f'**Action**: Press "{action.keys}"'] + elif action.type == "hold_key": + rendered_parts += [f'**Action**: Hold "{action.text}" for {action.duration} seconds'] + elif action.type == "key_up": + rendered_parts += [f'**Action**: Release Key "{action.key}"'] + elif action.type == "key_down": + rendered_parts += [f'**Action**: Press Key "{action.key}"'] + elif action.type == "screenshot" and not current_state.screenshot: + rendered_parts += [f"**Error**: Failed to take screenshot"] + elif action.type == "goto": + rendered_parts += [f"**Action**: Open URL {action.url}"] + else: + rendered_parts += [f"**Action**: {action.type}"] action_results += [{"content": None}] # content set after environment step except Exception as e: logger.error(f"Error calling Grounding LLM: {e}") rendered_parts += [f"**Error**: Error contacting Grounding LLM: {e}"] - rendered_response = "\n- ".join(rendered_parts) + rendered_response = self._render_response(rendered_parts, current_state.screenshot) return AgentActResult( actions=actions, @@ -291,11 +307,13 @@ Focus on the visual action and provide all necessary context. # Fallback for unexpected types return str(response_content) - def _render_response(self, response: List, screenshot: Optional[str]) -> Optional[str]: - """Render response for display. Currently uses compile_response.""" - # TODO: Could potentially enhance rendering, e.g., showing vision thought + grounding actions distinctly. - # For now, rely on the structure built during the 'act' phase. - return response # The rendered_response is already built in act() + def _render_response(self, response: List, screenshot: str | None) -> dict: + """Render response for display""" + render_payload = { + "text": "\n- ".join(response), + "image": f"data:image/webp;base64,{screenshot}" if screenshot else None, + } + return render_payload def _get_message_text(self, message: AgentMessage) -> str: if isinstance(message.content, list): diff --git a/src/khoj/processor/operator/operator_agent_openai.py b/src/khoj/processor/operator/operator_agent_openai.py index 7eec9e3b..8ff0ae4a 100644 --- a/src/khoj/processor/operator/operator_agent_openai.py +++ b/src/khoj/processor/operator/operator_agent_openai.py @@ -273,35 +273,38 @@ class OpenAIOperatorAgent(OperatorAgent): return "\n- ".join(filter(None, compiled_response)) # Filter out empty strings @staticmethod - async def _render_response(response_content: list[ResponseOutputItem], screenshot: Optional[str] = None) -> str: + async def _render_response(response_content: list[ResponseOutputItem], screenshot: Optional[str] = None) -> dict: """Render OpenAI response for display, potentially including screenshots.""" - rendered_response = [""] + render_texts = [] for block in deepcopy(response_content): # Use deepcopy to avoid modifying original if block.type == "message": text_content = block.text if hasattr(block, "text") else block.model_dump_json() - rendered_response.append(text_content) + render_texts += [text_content] elif block.type == "function_call": block_input = {"action": block.name} if block.name == "goto": - try: - args = json.loads(block.arguments) - block_input["url"] = args.get("url", "[Missing URL]") - except json.JSONDecodeError: - block_input["arguments"] = block.arguments - rendered_response.append(f"**Action**: {json.dumps(block_input)}") + args = json.loads(block.arguments) + render_texts = [f'Open URL: {args.get("url", "[Missing URL]")}'] + else: + render_texts += [block.name] elif block.type == "computer_call": block_input = block.action - # If it's a screenshot action - if block_input.type == "screenshot": - # Render screenshot if available - block_input_render = block_input.model_dump() - if screenshot: - block_input_render["image"] = f"data:image/webp;base64,{screenshot}" - else: - block_input_render["image"] = "[Failed to get screenshot]" - rendered_response.append(f"**Action**: {json.dumps(block_input_render)}") + if block_input.type == "screenshot" and not screenshot: + render_texts += ["Failed to get screenshot"] + elif block_input.type == "type": + render_texts += [f'Type "{block_input.text}"'] + elif block_input.type == "keypress": + render_texts += [f"Press {'+'.join(block_input.keys)}"] else: - rendered_response.append(f"**Action**: {block_input.model_dump_json()}") + render_texts += [f"{block_input.type.capitalize()}"] elif block.type == "reasoning" and block.summary: - rendered_response.append(f"**Thought**: {block.summary}") - return "\n- ".join(filter(None, rendered_response)) + render_texts += [f"**Thought**: {block.summary}"] + + render_payload = { + # Combine text into a single string and filter out empty strings + "text": "\n- ".join(filter(None, render_texts)), + # Add screenshot data if available + "image": f"data:image/webp;base64,{screenshot}" if screenshot else None, + } + + return render_payload diff --git a/src/khoj/processor/operator/operator_environment_browser.py b/src/khoj/processor/operator/operator_environment_browser.py index 64db38ab..0be2ce14 100644 --- a/src/khoj/processor/operator/operator_environment_browser.py +++ b/src/khoj/processor/operator/operator_environment_browser.py @@ -133,7 +133,7 @@ class BrowserEnvironment(Environment): if not self.page or self.page.is_closed(): return EnvStepResult(error="Browser page is not available or closed.") - state = await self.get_state() + before_state = await self.get_state() output, error, step_type = None, None, "text" try: match action.type: @@ -232,7 +232,7 @@ class BrowserEnvironment(Environment): case "screenshot": step_type = "image" - output = {"image": state.screenshot, "url": state.url} + output = {"image": before_state.screenshot, "url": before_state.url} logger.debug(f"Action: {action.type}") case "move": @@ -324,12 +324,13 @@ class BrowserEnvironment(Environment): error = f"Error executing action {action.type}: {e}" logger.exception(f"Error during step execution for action: {action.model_dump_json()}") + after_state = await self.get_state() return EnvStepResult( type=step_type, output=output, error=error, - current_url=state.url, - screenshot_base64=state.screenshot, + current_url=after_state.url, + screenshot_base64=after_state.screenshot, ) def reset(self) -> None: