Support operating web browser with Anthropic models

- Add back() and goto(url) helper functions to operate browser
- Cache operator messages to Anthropic API for speed and cost savings
This commit is contained in:
Debanjum
2025-03-24 15:18:20 +05:30
parent 2e86141575
commit b9ea538b02
3 changed files with 509 additions and 5 deletions

View File

@@ -144,6 +144,7 @@ async def converse_anthropic(
user_query,
online_results: Optional[Dict[str, Dict]] = None,
code_results: Optional[Dict[str, Dict]] = None,
operator_results: Optional[Dict[str, Dict]] = None,
conversation_log={},
model: Optional[str] = "claude-3-7-sonnet-latest",
api_key: Optional[str] = None,
@@ -214,6 +215,10 @@ async def converse_anthropic(
context_message += (
f"{prompts.code_executed_context.format(code_results=truncate_code_context(code_results))}\n\n"
)
if ConversationCommand.Operator in conversation_commands and not is_none_or_empty(operator_results):
context_message += (
f"{prompts.operator_execution_context.format(operator_results=yaml_dump(operator_results))}\n\n"
)
context_message = context_message.strip()
# Setup Prompt with Primer or Conversation History

View File

@@ -1,18 +1,28 @@
import asyncio
import base64
import json
import logging
import time
from typing import Any, Callable, Dict, List, Optional
from datetime import datetime
from typing import Callable, List, Literal, Optional
import requests
from anthropic.types.beta import BetaContentBlock
from langchain.schema import ChatMessage
from openai.types.responses import ResponseOutputItem, response_computer_tool_call
from playwright.async_api import Page, async_playwright
from pydantic import BaseModel
from khoj.database.adapters import AgentAdapters, ConversationAdapters
from khoj.database.models import Agent, ChatModel, KhojUser
from khoj.processor.conversation.utils import commit_conversation_trace
from khoj.routers.helpers import ChatEvent
from khoj.utils.helpers import get_openai_async_client, timer
from khoj.utils.helpers import (
get_anthropic_async_client,
get_chat_usage_metrics,
get_openai_async_client,
is_promptrace_enabled,
timer,
)
from khoj.utils.rawconfig import LocationData
logger = logging.getLogger(__name__)
@@ -34,7 +44,7 @@ async def operate_browser(
# chat_model = await ConversationAdapters.aget_chat_model(user)
agent_chat_model = await AgentAdapters.aget_agent_chat_model(agent, user) if agent else None
chat_model: ChatModel = await ConversationAdapters.aget_default_chat_model(user, agent_chat_model)
supported_operator_model_types = [ChatModel.ModelType.OPENAI]
supported_operator_model_types = [ChatModel.ModelType.OPENAI, ChatModel.ModelType.ANTHROPIC]
if not chat_model or chat_model.model_type not in supported_operator_model_types:
# If a computer use capable model has not configured, return an unsupported on server error
raise ValueError(
@@ -71,7 +81,22 @@ async def operate_browser(
else:
response, safety_check_message = result
elif chat_model.model_type == ChatModel.ModelType.ANTHROPIC:
pass
async for result in browser_use_anthropic(
message,
chat_model,
page,
width,
height,
max_iterations=max_iterations,
send_status_func=send_status_func,
user=user,
agent=agent,
tracer=tracer,
):
if isinstance(result, dict) and ChatEvent.STATUS in result:
yield result
else:
response, safety_check_message = result
except requests.RequestException as e:
raise ValueError(f"Browser use with {chat_model.model_type} model failed due to a network error: {e}")
except Exception as e:
@@ -300,6 +325,219 @@ async def browser_use_openai(
yield (final_response, safety_check)
async def browser_use_anthropic(
query: str,
chat_model: ChatModel,
page: Page,
width: int = 1024,
height: int = 768,
max_tokens: int = 4096,
thinking_budget: int | None = 1024,
max_iterations: int = 40, # Add iteration limit to prevent infinite loops
send_status_func: Optional[Callable] = None,
user: KhojUser = None,
agent: Agent = None,
tracer: dict = {},
):
"""
A simple agent loop for Claude computer use interactions.
This function handles the back-and-forth between:
1. Sending user messages to Claude
2. Claude requesting to use browser use tools
3. Playwright executing those tools in browser.
4. Sending tool results back to Claude
"""
# Set up tools and API parameters
client = get_anthropic_async_client(chat_model.ai_model_api.api_key, chat_model.ai_model_api.api_base_url)
messages = [{"role": "user", "content": query}]
tool_version = "2025-01-24"
betas = [f"computer-use-{tool_version}", "token-efficient-tools-2025-02-19"]
temperature = 1.0
safety_check = None
system_prompt = f"""<SYSTEM_CAPABILITY>
* You are Khoj, a smart web browser operating assistant. You help the users accomplish tasks using a web browser.
* You operate a Chromium browser using Playwright.
* You cannot access the OS or filesystem.
* You can interact with the web browser to perform tasks like clicking, typing, scrolling, and more.
* You can use the additional back() and goto() helper functions to ease navigating the browser. If you see nothing, try goto duckduckgo.com
* When viewing a webpage it can be helpful to zoom out so that you can see everything on the page. Either that, or make sure you scroll down to see everything before deciding something isn't available.
* When using your computer function calls, they take a while to run and send back to you. Where possible/feasible, try to chain multiple of these calls all into one function calls request.
* Perform web searches using DuckDuckGo. Don't use Google even if requested as the query will fail.
* The current date is {datetime.today().strftime('%A, %B %-d, %Y')}.
</SYSTEM_CAPABILITY>
<IMPORTANT>
* You are allowed upto {max_iterations} iterations to complete the task.
* Do not loop on wait, screenshot for too many turns without taking any action.
* After initialization if the browser is blank, enter a website URL using the goto() function instead of waiting
</IMPORTANT>
"""
# Configure tools available to Claude
tools = [
{"type": f"computer_20250124", "name": "computer", "display_width_px": width, "display_height_px": height},
{
"name": "back",
"description": "Go back to the previous page.",
"input_schema": {
"type": "object",
"properties": {},
},
},
{
"name": "goto",
"description": "Go to a specific URL.",
"input_schema": {
"type": "object",
"properties": {
"url": {
"type": "string",
"description": "Fully qualified URL to navigate to.",
},
},
"required": ["url"],
},
},
# {"type": f"text_editor_20250124", "name": "str_replace_editor"},
# {"type": f"bash_20250124", "name": "bash"}
]
if agent:
agent.chat_model = chat_model
# Main agent loop (with iteration limit to prevent runaway API costs)
compiled_operator_messages: List[ChatMessage] = []
run_summarize = False
task_completed = False
iterations = 0
while iterations < max_iterations:
iterations += 1
# Set up optional thinking parameter (for Claude 3.7 Sonnet)
thinking = {"type": "disabled"}
if chat_model.name.startswith("claude-3-7") and thinking_budget:
thinking = {"type": "enabled", "budget_tokens": thinking_budget}
# Call the Claude API
response = await client.beta.messages.create(
messages=messages,
model=chat_model.name,
system=system_prompt,
tools=tools,
betas=betas,
thinking=thinking,
max_tokens=max_tokens,
temperature=temperature,
)
response_content = response.content
compiled_response = compile_claude_response(response_content)
# Add Claude's response to the conversation history
messages.append({"role": "assistant", "content": response_content})
compiled_operator_messages.append(ChatMessage(role="assistant", content=compiled_response))
logger.debug(f"Claude response: {response.model_dump_json()}")
if send_status_func:
async for event in send_status_func(f"**Operating Browser**:\n{compiled_response}"):
yield {ChatEvent.STATUS: event}
# Check if Claude used any tools
tool_results = []
for block in response_content:
if block.type == "tool_use":
if hasattr(block, "name") and block.name == "goto":
block_input = {"action": block.name, "url": block.input.get("url")}
elif hasattr(block, "name") and block.name == "back":
block_input = {"action": block.name}
else:
block_input = block.input
result = await handle_browser_action_anthropic(page, block_input)
content = result.get("output") or result.get("error")
if isinstance(content, str):
compiled_operator_messages.append(ChatMessage(role="browser", content=content))
elif isinstance(content, list) and content[0]["type"] == "image":
# Handle the case where the content is an image
compiled_operator_messages.append(
ChatMessage(role="browser", content="[placeholder for screenshot]")
)
# Format the result for Claude
tool_results.append(
{
"type": "tool_result",
"tool_use_id": block.id,
"content": content,
"is_error": "error" in result,
}
)
# Calculate cost of chat
input_tokens = response.usage.input_tokens
output_tokens = response.usage.output_tokens
cache_read_tokens = response.usage.cache_read_input_tokens
cache_write_tokens = response.usage.cache_creation_input_tokens
tracer["usage"] = get_chat_usage_metrics(
chat_model.name,
input_tokens,
output_tokens,
cache_read_tokens,
cache_write_tokens,
usage=tracer.get("usage"),
)
logger.debug(f"Operator usage by Claude: {tracer['usage']}")
# Save conversation trace
tracer["chat_model"] = chat_model.name
tracer["temperature"] = temperature
if is_promptrace_enabled():
commit_conversation_trace(compiled_operator_messages[:-1], compiled_operator_messages[-1].content, tracer)
# Run one last iteration to collate results of browser use, if no tool use requested or iteration limit reached.
if not tool_results and not run_summarize:
iterations = max_iterations - 1
run_summarize = True
task_completed = True
tool_results.append(
{
"type": "text",
"text": f"Collate all relevant information from your research so far to answer the target query:\n{query}.",
}
)
elif iterations == max_iterations and not run_summarize:
iterations = max_iterations - 1
run_summarize = True
task_completed = not tool_results
tool_results.append(
{
"type": "text",
"text": f"Collate all relevant information from your research so far to answer the target query:\n{query}.",
}
)
# Mark the final tool result as a cache break point
if tool_results:
tool_results[-1]["cache_control"] = {"type": "ephemeral"}
# Remove all previous cache break point
for message in messages:
if isinstance(message["content"], list):
for tool_result in message["content"]:
if isinstance(tool_result, dict) and "cache_control" in tool_result:
del tool_result["cache_control"]
elif isinstance(message["content"], dict) and "cache_control" in message["content"]:
del message["content"]["cache_control"]
# Add tool results to messages for the next iteration with Claude
messages.append({"role": "user", "content": tool_results})
if task_completed:
final_response = compiled_response
else:
final_response = f"Operator hit iteration limit. If the results seems incomplete try again, assign a smaller task or try a different approach.\nThese were the results till now:\n{compiled_response}"
yield (final_response, safety_check)
# Mapping of CUA keys to Playwright keys
CUA_KEY_TO_PLAYWRIGHT_KEY = {
"/": "Divide",
"\\": "Backslash",
@@ -315,6 +553,7 @@ CUA_KEY_TO_PLAYWRIGHT_KEY = {
"delete": "Delete",
"end": "End",
"enter": "Enter",
"return": "Enter",
"esc": "Escape",
"home": "Home",
"insert": "Insert",
@@ -329,6 +568,26 @@ CUA_KEY_TO_PLAYWRIGHT_KEY = {
}
AnthropicAction = Literal[
"key",
"type",
"mouse_move",
"left_click",
"left_click_drag",
"right_click",
"middle_click",
"double_click",
"screenshot",
"cursor_position",
"left_mouse_down",
"left_mouse_up",
"scroll",
"hold_key",
"wait",
"triple_click",
]
def parse_key_combination(text: str) -> list[str]:
"""
Parse an xdotool-style key combination (e.g., "ctrl+o", "shift+tab")
@@ -490,6 +749,245 @@ def compile_openai_response(response_content: list[ResponseOutputItem]) -> str:
return "\n- ".join(compiled_response)
async def handle_browser_action_anthropic(page: Page, action_input: dict):
"""
Given a computer action from Anthropic's computer use API,
execute the corresponding operation on the Playwright page.
"""
action_type = action_input.get("action")
if not action_type:
return {"error": "Missing action type in input"}
try:
logger.debug(f"Anthropic Action: {action_type} with input: {action_input}")
match action_type:
case "mouse_move":
if not action_input.get("coordinate"):
return {"error": "Missing coordinate for mouse_move"}
x, y = action_input["coordinate"]
await page.mouse.move(x, y)
return {"output": f"Moved mouse to ({x}, {y})"}
case "left_click":
coordinate = action_input.get("coordinate")
if coordinate:
x, y = coordinate
await page.mouse.move(x, y)
# Handle optional key modifier
key = action_input.get("key")
if key:
mapped_key = CUA_KEY_TO_PLAYWRIGHT_KEY.get(key.lower(), key)
await page.keyboard.down(mapped_key)
await page.mouse.click(x, y, button="left")
if key:
await page.keyboard.up(mapped_key)
return {"output": f"Left clicked at ({x}, {y})"}
case "right_click":
coordinate = action_input.get("coordinate")
if coordinate:
x, y = coordinate
await page.mouse.move(x, y)
await page.mouse.click(x, y, button="right")
return {"output": f"Right clicked at ({x}, {y})"}
case "middle_click":
coordinate = action_input.get("coordinate")
if coordinate:
coordinate = json.loads(coordinate) if isinstance(coordinate, str) else coordinate
x, y = coordinate
await page.mouse.move(x, y)
await page.mouse.click(x, y, button="middle")
return {"output": f"Middle clicked at ({x}, {y})"}
case "double_click":
coordinate = action_input.get("coordinate")
if coordinate:
coordinate = json.loads(coordinate) if isinstance(coordinate, str) else coordinate
x, y = coordinate
await page.mouse.move(x, y)
await page.mouse.dblclick(x, y)
return {"output": f"Double clicked at ({x}, {y})"}
case "triple_click":
coordinate = action_input.get("coordinate")
if coordinate:
coordinate = json.loads(coordinate) if isinstance(coordinate, str) else coordinate
x, y = coordinate
await page.mouse.move(x, y)
# Playwright doesn't have a triple-click method, so we implement it manually
await page.mouse.click(x, y, click_count=3)
return {"output": f"Triple clicked at ({x}, {y})"}
case "left_click_drag":
if not action_input.get("coordinate"):
return {"error": "Missing coordinate for left_click_drag"}
coordinate = action_input["coordinate"]
coordinate = json.loads(coordinate) if isinstance(coordinate, str) else coordinate
x, y = action_input["coordinate"]
await page.mouse.move(x, y)
await page.mouse.down()
# We'd need to get the target position, but it's not clear from the reference how this is passed
# For now, we'll end the drag at the same position
await page.mouse.up()
return {"output": f"Drag operation from ({x}, {y})"}
case "left_mouse_down":
await page.mouse.down(button="left")
return {"output": "Left mouse button down"}
case "left_mouse_up":
await page.mouse.up(button="left")
return {"output": "Left mouse button up"}
case "type":
text = action_input.get("text")
if not text:
return {"error": "Missing text for type action"}
await page.keyboard.type(text)
return {"output": f"Typed text: {text}"}
case "scroll":
scroll_direction = action_input.get("scroll_direction")
scroll_amount = action_input.get("scroll_amount", 1)
if not scroll_direction:
return {"error": "Missing scroll_direction for scroll action"}
coordinate = action_input.get("coordinate")
if coordinate:
coordinate = json.loads(coordinate) if isinstance(coordinate, str) else coordinate
x, y = coordinate
await page.mouse.move(x, y)
# Convert direction to x/y values
dx, dy = 0, 0
if scroll_direction == "up":
dy = -100 * scroll_amount
elif scroll_direction == "down":
dy = 100 * scroll_amount
elif scroll_direction == "left":
dx = -100 * scroll_amount
elif scroll_direction == "right":
dx = 100 * scroll_amount
await page.mouse.wheel(dx, dy)
return {"output": f"Scrolled {scroll_direction} by {scroll_amount}"}
case "key":
text = action_input.get("text")
if not text:
return {"error": "Missing text for key action"}
# Parse xdotool style key combinations
keys = parse_key_combination(text)
if len(keys) > 1:
# For key combinations (e.g., ctrl+a)
# Press all modifier keys down
for key in keys[:-1]:
await page.keyboard.down(key)
# Press and release the last key
await page.keyboard.press(keys[-1])
# Release all modifier keys in reverse order
for key in reversed(keys[:-1]):
await page.keyboard.up(key)
else:
# For single keys
await page.keyboard.press(keys[0])
logger.debug(f"Pressed key: {keys} from original: {text}")
return {"output": f"Pressed key: {text}"}
case "hold_key":
text = action_input.get("text")
duration = action_input.get("duration", 1)
if not text:
return {"error": "Missing text for hold_key action"}
# Parse and handle xdotool style key combinations
keys = parse_key_combination(text)
# Press all keys down
for key in keys:
await page.keyboard.down(key)
# Hold for duration
await asyncio.sleep(duration)
# Release all keys in reverse order
for key in reversed(keys):
await page.keyboard.up(key)
return {"output": f"Held key{'s' if len(keys) > 1 else ''} {text} for {duration} seconds"}
case "wait":
duration = action_input.get("duration", 1)
await asyncio.sleep(duration)
return {"output": f"Waited for {duration} seconds"}
case "screenshot":
screenshot_base64 = await get_screenshot(page)
return {
"output": [
{
"type": "image",
"source": {
"type": "base64",
"media_type": "image/webp",
"data": screenshot_base64,
},
}
]
}
case "cursor_position":
mouse_position = await page.evaluate("({x: window.mouseX, y: window.mouseY})")
return {"output": f"Cursor position: {mouse_position}"}
case "goto":
url = action_input.get("url")
if not url:
return {"error": "Missing URL for goto action"}
await page.goto(url)
return {"output": f"Navigated to {url}"}
case "back":
await page.go_back()
return {"output": "Navigated back to the previous page."}
case _:
return {"error": f"Unrecognized action: {action_type}"}
except Exception as e:
error_message = f"Error handling action {action_type}: {str(e)}"
logger.error(error_message)
return {"error": error_message}
def compile_claude_response(response_content: list[BetaContentBlock]) -> str:
"""
Compile the response from Anthropic AI model into a single string.
"""
compiled_response = [""]
for block in response_content:
if block.type == "text":
compiled_response.append(block.text)
elif block.type == "tool_use":
if hasattr(block, "name") and block.name == "goto":
block_input = {"action": block.name, "url": block.input.get("url")}
elif hasattr(block, "name") and block.name == "back":
block_input = {"action": block.name}
else:
block_input = block.input
compiled_response.append(f"**Action**: {json.dumps(block_input)}")
elif block.type == "thinking":
compiled_response.append(f"**Thought**: {block.thinking}")
return "\n- ".join(compiled_response)
async def get_screenshot(page: Page):
"""
Take a viewport screenshot using Playwright and return as base64 encoded webp image.

View File

@@ -1481,6 +1481,7 @@ async def agenerate_chat_response(
query_images=query_images,
online_results=online_results,
code_results=code_results,
operator_results=operator_results,
conversation_log=meta_log,
model=chat_model.name,
api_key=api_key,