Show formatted action, env screenshot after action on each operator step

Show natural language, formatted text for each action. Previously we
were just showing json dumps of the actions taken.

Pass screenshot at each step for openai, anthropic and binary operator agents
Use text and image field in json passed to client for rendering both.

Show actions, env screenshot after actions applied in train of thought.
Showing the post action application screenshot seems more intuitive.

Previously we were showing the screenshot used to decide next action.
This pre action application screenshot was being shown after next
action decided (in train of thought). This was anyway misleading to
the actual ordering of event.

Rendered response is now a structured payload (dict) passing image
and text to be rendered up from operator to clients for rendering of
train of thought.
This commit is contained in:
Debanjum
2025-05-09 14:47:31 -06:00
parent a1d712e031
commit 21a9556b06
7 changed files with 128 additions and 68 deletions

View File

@@ -204,7 +204,7 @@ class GroundingAgent:
# Parse tool calls
grounding_message = grounding_response.choices[0].message
action_results = self._parse_action(grounding_message, instruction)
action_results = self._parse_action(grounding_message, instruction, current_state)
# Update usage by grounding model
self.tracer["usage"] = get_chat_usage_metrics(
@@ -262,7 +262,9 @@ back() # Use this to go back to the previous page.
)
return [{"role": "user", "content": grounding_messages_content}]
def _parse_action(self, grounding_message: ChatCompletionMessage, instruction: str) -> AgentActResult:
def _parse_action(
self, grounding_message: ChatCompletionMessage, instruction: str, current_state: EnvState
) -> AgentActResult:
"""Parse the tool calls from the grounding LLM response and convert them to action objects."""
actions: List[OperatorAction] = []
action_results: List[dict] = []
@@ -340,7 +342,10 @@ back() # Use this to go back to the previous page.
return AgentActResult(
actions=actions,
action_results=action_results,
rendered_response=rendered_response,
rendered_response={
"text": rendered_response,
"image": f"data:image/webp;base64,{current_state.screenshot}",
},
)
def reset(self):

View File

@@ -1,4 +1,5 @@
import asyncio
import json
import logging
from typing import Callable, List, Optional
@@ -89,12 +90,6 @@ async def operate_browser(
# 2. Agent decides action(s)
agent_result = await operator_agent.act(browser_state)
# Render status update
rendered_response = agent_result.rendered_response
if send_status_func and rendered_response:
async for event in send_status_func(f"**Operating Browser**:\n{rendered_response}"):
yield {ChatEvent.STATUS: event}
# 3. Execute actions in the environment
env_steps: List[EnvStepResult] = []
for action in agent_result.actions:
@@ -110,6 +105,15 @@ async def operate_browser(
env_step = await environment.step(action)
env_steps.append(env_step)
# Render status update
latest_screenshot = f"data:image/webp;base64,{env_steps[-1].screenshot_base64 if env_steps else browser_state.screenshot}"
render_payload = agent_result.rendered_response
render_payload["image"] = latest_screenshot
render_content = f"**Action**: {json.dumps(render_payload)}"
if send_status_func:
async for event in send_status_func(f"**Operating Browser**:\n{render_content}"):
yield {ChatEvent.STATUS: event}
# Check if termination conditions are met
task_completed = not agent_result.actions # No actions requested by agent
trigger_iteration_limit = iterations == max_iterations

View File

@@ -292,31 +292,60 @@ class AnthropicOperatorAgent(OperatorAgent):
return "\n- ".join(filter(None, compiled_response)) # Filter out empty strings
@staticmethod
async def _render_response(response_content: list[BetaContentBlock], screenshot: Optional[str] = None) -> str:
async def _render_response(response_content: list[BetaContentBlock], screenshot: Optional[str] = None) -> dict:
"""Render Anthropic response, potentially including actual screenshots."""
rendered_response = [""]
render_texts = []
for block in deepcopy(response_content): # Use deepcopy to avoid modifying original
if block.type == "text":
rendered_response.append(block.text)
elif block.type == "tool_use":
block_input = {"action": block.name}
if block.name == "computer":
block_input = block.input
elif block.name == "goto":
block_input["url"] = block.input.get("url", "[Missing URL]")
# If it's a screenshot action
if isinstance(block_input, dict) and block_input.get("action") == "screenshot":
# Render the screenshot data if available
if screenshot:
block_input["image"] = f"data:image/webp;base64,{screenshot}"
else:
block_input["image"] = "[Failed to get screenshot]"
rendered_response.append(f"**Action**: {json.dumps(block_input)}")
elif block.type == "thinking":
if block.type == "thinking":
thinking_content = getattr(block, "thinking", None)
if thinking_content:
rendered_response.append(f"**Thought**: {thinking_content}")
render_texts += [f"**Thought**: {thinking_content}"]
elif block.type == "text":
render_texts += [block.text]
elif block.type == "tool_use":
if block.name == "goto":
render_texts += [f"Open URL: {block.input.get('url', '[Missing URL]')}"]
elif block.name == "back":
render_texts += ["Go back to the previous page."]
elif block.name == "computer":
block_input = block.input
if not isinstance(block_input, dict):
render_texts += [json.dumps(block_input)]
# Handle computer action details
elif "action" in block_input:
action = block_input["action"]
if action == "type":
text = block_input.get("text")
if text:
render_texts += [f'Type "{text}"']
elif action == "key":
text: str = block_input.get("text")
if text:
render_texts += [f"Press {text}"]
elif action == "hold_key":
text = block_input.get("text")
duration = block_input.get("duration", 1.0)
if text:
render_texts += [f"Hold {text} for {duration} seconds"]
else:
# Handle other actions
render_texts += [f"{action.capitalize()}"]
return "\n- ".join(filter(None, rendered_response))
# If screenshot is not available when screenshot action was requested
if isinstance(block.input, dict) and block.input.get("action") == "screenshot" and not screenshot:
render_texts += ["Failed to get screenshot"]
# Do not show screenshot if no actions requested
if all([block.type != "tool_use" for block in response_content]):
# If all blocks are not tool_use, return None
screenshot = None
# Create render payload
render_payload = {
# Combine text into a single string and filter out empty strings
"text": "\n- ".join(filter(None, render_texts)),
# Add screenshot data if available
"image": f"data:image/webp;base64,{screenshot}" if screenshot else None,
}
return render_payload

View File

@@ -16,7 +16,7 @@ logger = logging.getLogger(__name__)
class AgentActResult(BaseModel):
actions: List[OperatorAction] = []
action_results: List[dict] = [] # Model-specific format
rendered_response: Optional[str] = None
rendered_response: Optional[dict] = None
class AgentMessage(BaseModel):

View File

@@ -70,13 +70,13 @@ class BinaryOperatorAgent(OperatorAgent):
return AgentActResult(
actions=[],
action_results=[],
rendered_response=natural_language_action,
rendered_response={"text": natural_language_action, "image": None},
)
elif reasoner_response["type"] == "done":
return AgentActResult(
actions=[],
action_results=[],
rendered_response=natural_language_action,
rendered_response={"text": natural_language_action, "image": None},
)
# --- Step 2: Grounding LLM converts NL action to structured action ---
@@ -185,15 +185,31 @@ Focus on the visual action and provide all necessary context.
actions.append(WaitAction(duration=1.0))
rendered_parts += ["Could not process response."]
else:
rendered_parts += [f"**Thought (Grounding)**: {grounding_response}"]
grounding_thoughts = grounding_response.rsplit("\nAction: ", 1)[0]
rendered_parts += [f"**Thought (Grounding)**: {grounding_thoughts}"]
for action in actions:
rendered_parts += [f"**Action**: {action}"]
if action.type == "type":
rendered_parts += [f'**Action**: Type "{action.text}"']
elif action.type == "keypress":
rendered_parts += [f'**Action**: Press "{action.keys}"']
elif action.type == "hold_key":
rendered_parts += [f'**Action**: Hold "{action.text}" for {action.duration} seconds']
elif action.type == "key_up":
rendered_parts += [f'**Action**: Release Key "{action.key}"']
elif action.type == "key_down":
rendered_parts += [f'**Action**: Press Key "{action.key}"']
elif action.type == "screenshot" and not current_state.screenshot:
rendered_parts += [f"**Error**: Failed to take screenshot"]
elif action.type == "goto":
rendered_parts += [f"**Action**: Open URL {action.url}"]
else:
rendered_parts += [f"**Action**: {action.type}"]
action_results += [{"content": None}] # content set after environment step
except Exception as e:
logger.error(f"Error calling Grounding LLM: {e}")
rendered_parts += [f"**Error**: Error contacting Grounding LLM: {e}"]
rendered_response = "\n- ".join(rendered_parts)
rendered_response = self._render_response(rendered_parts, current_state.screenshot)
return AgentActResult(
actions=actions,
@@ -291,11 +307,13 @@ Focus on the visual action and provide all necessary context.
# Fallback for unexpected types
return str(response_content)
def _render_response(self, response: List, screenshot: Optional[str]) -> Optional[str]:
"""Render response for display. Currently uses compile_response."""
# TODO: Could potentially enhance rendering, e.g., showing vision thought + grounding actions distinctly.
# For now, rely on the structure built during the 'act' phase.
return response # The rendered_response is already built in act()
def _render_response(self, response: List, screenshot: str | None) -> dict:
"""Render response for display"""
render_payload = {
"text": "\n- ".join(response),
"image": f"data:image/webp;base64,{screenshot}" if screenshot else None,
}
return render_payload
def _get_message_text(self, message: AgentMessage) -> str:
if isinstance(message.content, list):

View File

@@ -273,35 +273,38 @@ class OpenAIOperatorAgent(OperatorAgent):
return "\n- ".join(filter(None, compiled_response)) # Filter out empty strings
@staticmethod
async def _render_response(response_content: list[ResponseOutputItem], screenshot: Optional[str] = None) -> str:
async def _render_response(response_content: list[ResponseOutputItem], screenshot: Optional[str] = None) -> dict:
"""Render OpenAI response for display, potentially including screenshots."""
rendered_response = [""]
render_texts = []
for block in deepcopy(response_content): # Use deepcopy to avoid modifying original
if block.type == "message":
text_content = block.text if hasattr(block, "text") else block.model_dump_json()
rendered_response.append(text_content)
render_texts += [text_content]
elif block.type == "function_call":
block_input = {"action": block.name}
if block.name == "goto":
try:
args = json.loads(block.arguments)
block_input["url"] = args.get("url", "[Missing URL]")
except json.JSONDecodeError:
block_input["arguments"] = block.arguments
rendered_response.append(f"**Action**: {json.dumps(block_input)}")
args = json.loads(block.arguments)
render_texts = [f'Open URL: {args.get("url", "[Missing URL]")}']
else:
render_texts += [block.name]
elif block.type == "computer_call":
block_input = block.action
# If it's a screenshot action
if block_input.type == "screenshot":
# Render screenshot if available
block_input_render = block_input.model_dump()
if screenshot:
block_input_render["image"] = f"data:image/webp;base64,{screenshot}"
else:
block_input_render["image"] = "[Failed to get screenshot]"
rendered_response.append(f"**Action**: {json.dumps(block_input_render)}")
if block_input.type == "screenshot" and not screenshot:
render_texts += ["Failed to get screenshot"]
elif block_input.type == "type":
render_texts += [f'Type "{block_input.text}"']
elif block_input.type == "keypress":
render_texts += [f"Press {'+'.join(block_input.keys)}"]
else:
rendered_response.append(f"**Action**: {block_input.model_dump_json()}")
render_texts += [f"{block_input.type.capitalize()}"]
elif block.type == "reasoning" and block.summary:
rendered_response.append(f"**Thought**: {block.summary}")
return "\n- ".join(filter(None, rendered_response))
render_texts += [f"**Thought**: {block.summary}"]
render_payload = {
# Combine text into a single string and filter out empty strings
"text": "\n- ".join(filter(None, render_texts)),
# Add screenshot data if available
"image": f"data:image/webp;base64,{screenshot}" if screenshot else None,
}
return render_payload

View File

@@ -133,7 +133,7 @@ class BrowserEnvironment(Environment):
if not self.page or self.page.is_closed():
return EnvStepResult(error="Browser page is not available or closed.")
state = await self.get_state()
before_state = await self.get_state()
output, error, step_type = None, None, "text"
try:
match action.type:
@@ -232,7 +232,7 @@ class BrowserEnvironment(Environment):
case "screenshot":
step_type = "image"
output = {"image": state.screenshot, "url": state.url}
output = {"image": before_state.screenshot, "url": before_state.url}
logger.debug(f"Action: {action.type}")
case "move":
@@ -324,12 +324,13 @@ class BrowserEnvironment(Environment):
error = f"Error executing action {action.type}: {e}"
logger.exception(f"Error during step execution for action: {action.model_dump_json()}")
after_state = await self.get_state()
return EnvStepResult(
type=step_type,
output=output,
error=error,
current_url=state.url,
screenshot_base64=state.screenshot,
current_url=after_state.url,
screenshot_base64=after_state.screenshot,
)
def reset(self) -> None: