Implement docker computer environment for operator

- Generalize building pyautogui into executable python code snippet.
  This should work across docker and local. And should be easier to
  extend to operate a remote computer over the network as well.

- Create dockerfile for pyautogui operate-able containerized computer
This commit is contained in:
Debanjum
2025-05-27 15:22:07 -07:00
parent e117f57f64
commit 771909f76a
2 changed files with 310 additions and 77 deletions

129
computer.Dockerfile Normal file
View File

@@ -0,0 +1,129 @@
FROM ubuntu:24.04
ENV DEBIAN_FRONTEND=noninteractive
# Install System Dependencies
RUN apt update \
&& apt install -y \
ca-certificates \
gnupg \
xfce4 \
xfce4-goodies \
x11vnc \
xvfb \
xdotool \
imagemagick \
x11-apps \
dbus-x11 \
sudo \
python3-pip \
python3-tk \
python3-dev \
build-essential \
scrot \
gnome-screenshot \
net-tools \
libx11-dev \
libxext-dev \
libxtst-dev \
libxinerama-dev \
libxmu-dev \
libxrandr-dev \
libxfixes-dev \
software-properties-common \
&& add-apt-repository ppa:mozillateam/ppa && apt update \
&& apt install -y --no-install-recommends \
# Desktop apps
firefox-esr \
libreoffice \
x11-apps \
xpdf \
gedit \
xpaint \
tint2 \
galculator \
pcmanfm \
unzip \
# Terminal apps like file editors, viewers, git, wget/curl etc.
less \
nano \
neovim \
vim \
git \
curl \
wget \
procps \
# Python/pyenv dependencies
libssl-dev \
zlib1g-dev \
libbz2-dev \
libreadline-dev \
libsqlite3-dev \
libncursesw5-dev \
xz-utils \
tk-dev \
libxml2-dev \
libxmlsec1-dev \
libffi-dev \
liblzma-dev \
# set default browser
&& update-alternatives --set x-www-browser /usr/bin/firefox-esr \
&& apt-get clean && rm -rf /var/lib/apt/lists/* \
# remove screen locks, power managers
&& apt remove -y light-locker xfce4-screensaver xfce4-power-manager || true
# Create Computer User
ENV USERNAME=operator
ENV HOME=/home/$USERNAME
RUN useradd -m -s /bin/bash -d $HOME -g $USERNAME $USERNAME && echo "${USERNAME} ALL=(ALL) NOPASSWD: ALL" >> /etc/sudoers
USER $USERNAME
WORKDIR $HOME
# Setup Python
RUN git clone https://github.com/pyenv/pyenv.git ~/.pyenv && \
cd ~/.pyenv && src/configure && make -C src && cd .. && \
echo 'export PYENV_ROOT="$HOME/.pyenv"' >> ~/.bashrc && \
echo 'command -v pyenv >/dev/null || export PATH="$PYENV_ROOT/bin:$PATH"' >> ~/.bashrc && \
echo 'eval "$(pyenv init -)"' >> ~/.bashrc
ENV PYENV_ROOT="$HOME/.pyenv"
ENV PATH="$PYENV_ROOT/bin:$PATH"
ENV PYENV_VERSION_MAJOR=3
ENV PYENV_VERSION_MINOR=11
ENV PYENV_VERSION_PATCH=6
ENV PYENV_VERSION=$PYENV_VERSION_MAJOR.$PYENV_VERSION_MINOR.$PYENV_VERSION_PATCH
RUN eval "$(pyenv init -)" && \
pyenv install $PYENV_VERSION && \
pyenv global $PYENV_VERSION && \
pyenv rehash
ENV PATH="$HOME/.pyenv/shims:$HOME/.pyenv/bin:$PATH"
# Install Python Packages
RUN python3 -m pip install --no-cache-dir \
pyautogui \
Pillow \
pyperclip \
pygetwindow
# Setup VNC
RUN x11vnc -storepasswd secret /home/operator/.vncpass
ARG WIDTH=1024
ARG HEIGHT=768
ARG DISPLAY_NUM=99
ENV WIDTH=$WIDTH
ENV HEIGHT=$HEIGHT
ENV DISPLAY_NUM=$DISPLAY_NUM
ENV DISPLAY=":$DISPLAY_NUM"
# Expose VNC on port 5900
# run Xvfb, x11vnc, Xfce (no login manager)
EXPOSE 5900
CMD ["/bin/sh", "-c", " export XDG_RUNTIME_DIR=/run/user/$(id -u); \
mkdir -p $XDG_RUNTIME_DIR && chown $USERNAME:$USERNAME $XDG_RUNTIME_DIR && chmod 0700 $XDG_RUNTIME_DIR; \
Xvfb $DISPLAY -screen 0 ${WIDTH}x${HEIGHT}x24 -dpi 96 -auth /home/$USERNAME/.Xauthority >/dev/null 2>&1 & \
sleep 1; \
xauth add $DISPLAY . $(mcookie); \
x11vnc -display $DISPLAY -forever -rfbauth /home/$USERNAME/.vncpass -listen 0.0.0.0 -rfbport 5900 >/dev/null 2>&1 & \
eval $(dbus-launch --sh-syntax) && \
startxfce4 & \
sleep 2 && echo 'Container running!' && \
tail -f /dev/null "]

View File

@@ -1,9 +1,11 @@
import ast
import asyncio import asyncio
import base64 import base64
import io import io
import logging import logging
import platform # To help with key mapping import platform
from typing import Optional, Union import subprocess
from typing import Literal, Optional, Union
from PIL import Image, ImageDraw from PIL import Image, ImageDraw
@@ -17,51 +19,79 @@ from khoj.utils.helpers import convert_image_to_webp
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
try:
import pyautogui
except ImportError:
pyautogui = None
logging.getLogger(__name__).warning(
"Pyautogui not found. ComputerEnvironment will not be available. " "Install with 'pip install pyautogui'."
)
# --- Concrete Computer Environment --- # --- Concrete Computer Environment ---
class ComputerEnvironment(Environment): class ComputerEnvironment(Environment):
def __init__(self): def __init__(
if pyautogui is None: self,
raise ImportError("Pyautogui is not installed. ComputerEnvironment cannot be initialized.") provider: Literal["local", "docker"] = "local",
docker_display: str = ":99",
docker_container_name: str = "khoj-computer",
):
self.provider = provider
self.docker_display = docker_display
self.docker_container_name = docker_container_name
self.width: int = 0 self.width: int = 0
self.height: int = 0 self.height: int = 0
self.mouse_pos: Point = Point(x=0, y=0) self.mouse_pos: Point = Point(x=0, y=0)
pyautogui.FAILSAFE = True # Abort by moving mouse to a corner
# pyautogui.PAUSE = 0.05 # Optional: slight pause after each pyautogui call
async def _execute(self, func, *args, **kwargs): async def _execute(self, func_name, *args, **kwargs):
""" """
Executes a pyautogui function, abstracting the execution context. Executes a pyautogui function, abstracting the execution context.
Currently runs locally using asyncio.to_thread. Currently runs locally using asyncio.to_thread.
""" """
# TODO: Support executing in local/remote docker container or remote computer python_command_str = self.generate_pyautogui_command(func_name, *args, **kwargs)
if pyautogui: # Docker execution
if self.provider == "docker":
try: try:
# Use asyncio.to_thread to have pyautogui calls not block the event loop output_str = await self.docker_execute(python_command_str)
return await asyncio.to_thread(func, *args, **kwargs) except RuntimeError as e: # Catch other Docker execution errors
except pyautogui.FailSafeException as e: logger.error(f"Error during Docker execution of {func_name}: {e}")
raise KeyboardInterrupt("User interrupt") from e raise # Re-raise as a general error for the caller to handle
# Local execution
else:
process = await asyncio.to_thread(
subprocess.run,
["python3", "-c", python_command_str],
capture_output=True,
text=True,
check=False, # We check returncode manually
)
output_str = process.stdout.strip()
if process.returncode != 0:
if "FailSafeException" in process.stderr or "FailSafeException" in process.stdout:
# Extract the message if possible, otherwise use generic
fs_msg = process.stderr or process.stdout
raise KeyboardInterrupt(fs_msg)
else:
error_msg = (
f'Local script execution failed:\nCmd: python3 -c "{python_command_str[:200]}...{python_command_str[-200:]}\n'
f"Return Code: {process.returncode}\nStderr: {process.stderr}\nStdout: {process.stdout}"
)
logger.error(error_msg)
raise RuntimeError(f"Local script execution error: {process.stderr or process.stdout}")
if not output_str or output_str == "None":
return None
try:
return ast.literal_eval(output_str)
except (ValueError, SyntaxError):
# If not a literal (e.g., some other string output), return as is
return output_str
async def start(self, width: int, height: int) -> None: async def start(self, width: int, height: int) -> None:
""" """
Initializes the computer environment. Initializes the computer environment.
The width and height parameters are logged, but actual screen dimensions are used. The width and height parameters are logged, but actual screen dimensions are used.
""" """
screen_width, screen_height = await self._execute(pyautogui.size) screen_width, screen_height = await self._execute("size")
self.width = screen_width self.width = screen_width
self.height = screen_height self.height = screen_height
# Initialize mouse position to center, or current if available # Initialize mouse position to center, or current if available
try: try:
current_x, current_y = await self._execute(pyautogui.position) current_x, current_y = await self._execute("position")
self.mouse_pos = Point(x=current_x, y=current_y) self.mouse_pos = Point(x=current_x, y=current_y)
except Exception: # Fallback if position cannot be obtained initially except Exception: # Fallback if position cannot be obtained initially
self.mouse_pos = Point(x=self.width / 2, y=self.height / 2) self.mouse_pos = Point(x=self.width / 2, y=self.height / 2)
@@ -74,25 +104,22 @@ class ComputerEnvironment(Environment):
async def _get_screenshot(self) -> Optional[str]: async def _get_screenshot(self) -> Optional[str]:
try: try:
screenshot_pil = await self._execute(pyautogui.screenshot) # Get screenshot
base64_png_str = await self._execute("screenshot")
screenshot_bytes = base64.b64decode(base64_png_str)
img_byte_arr = io.BytesIO() # Get current mouse position
screenshot_pil.save(img_byte_arr, format="PNG") current_mouse_x, current_mouse_y = await self._execute("position")
screenshot_bytes = img_byte_arr.getvalue() draw_pos = Point(x=current_mouse_x, y=current_mouse_y)
# Get current mouse position to draw accurately # Add mouse position to screenshot
try: screenshot_bytes_with_mouse = await self._draw_mouse_position(screenshot_bytes, draw_pos)
current_mouse_x, current_mouse_y = await self._execute(pyautogui.position) screenshot_webp_bytes = convert_image_to_webp(screenshot_bytes_with_mouse)
draw_pos = Point(x=current_mouse_x, y=current_mouse_y)
except Exception: # Fallback to stored mouse_pos
draw_pos = self.mouse_pos
screenshot_bytes = await self._draw_mouse_position(screenshot_bytes, draw_pos)
screenshot_webp_bytes = convert_image_to_webp(screenshot_bytes)
return base64.b64encode(screenshot_webp_bytes).decode("utf-8") return base64.b64encode(screenshot_webp_bytes).decode("utf-8")
except KeyboardInterrupt: # Propagate keyboard interrupts
raise
except Exception as e: except Exception as e:
logger.error(f"Failed to get screenshot: {e}") logger.error(f"Failed to get screenshot: {e}", exc_info=True)
return None return None
async def _draw_mouse_position(self, screenshot_bytes: bytes, mouse_pos: Point) -> bytes: async def _draw_mouse_position(self, screenshot_bytes: bytes, mouse_pos: Point) -> bytes:
@@ -131,46 +158,45 @@ class ComputerEnvironment(Environment):
x, y, button_name = action.x, action.y, action.button x, y, button_name = action.x, action.y, action.button
modifiers_to_press = self.parse_key_combination(action.modifiers) if action.modifiers else [] modifiers_to_press = self.parse_key_combination(action.modifiers) if action.modifiers else []
for mod_key in modifiers_to_press: for mod_key in modifiers_to_press:
await self._execute(pyautogui.keyDown, mod_key) await self._execute("keyDown", mod_key)
if button_name == "wheel": if button_name == "wheel":
# Perform a small scroll action at this position (e.g., one "tick" down) # Perform a small scroll action at this position (e.g., one "tick" down)
# Pyautogui scroll: positive up, negative down. # Pyautogui scroll: positive up, negative down.
# Let's make it scroll down by a small amount (e.g. 3 units for pyautogui) await self._execute("scroll", -1, x=x, y=y)
await self._execute(pyautogui.scroll, -3, x=x, y=y)
output = f"Scrolled wheel at ({x}, {y})" output = f"Scrolled wheel at ({x}, {y})"
else: else:
pyautogui_button = button_name.lower() if button_name else "left" pyautogui_button = button_name.lower() if button_name else "left"
await self._execute(pyautogui.click, x=x, y=y, button=pyautogui_button) await self._execute("click", x=x, y=y, button=pyautogui_button)
output = f"{button_name.capitalize() if button_name else 'Left'} clicked at ({x}, {y})" output = f"{button_name.capitalize() if button_name else 'Left'} clicked at ({x}, {y})"
for mod_key in reversed(modifiers_to_press): for mod_key in reversed(modifiers_to_press):
await self._execute(pyautogui.keyUp, mod_key) await self._execute("keyUp", mod_key)
self.mouse_pos = Point(x=x, y=y) self.mouse_pos = Point(x=x, y=y)
logger.debug(f"Action: {action.type} {button_name} at ({x},{y}) with modifiers {action.modifiers}") logger.debug(f"Action: {action.type} {button_name} at ({x},{y}) with modifiers {action.modifiers}")
case "double_click": case "double_click":
x, y = action.x, action.y x, y = action.x, action.y
await self._execute(pyautogui.doubleClick, x=x, y=y) await self._execute("doubleClick", x=x, y=y)
self.mouse_pos = Point(x=x, y=y) self.mouse_pos = Point(x=x, y=y)
output = f"Double clicked at ({x}, {y})" output = f"Double clicked at ({x}, {y})"
logger.debug(f"Action: {action.type} at ({x},{y})") logger.debug(f"Action: {action.type} at ({x},{y})")
case "triple_click": case "triple_click":
x, y = action.x, action.y x, y = action.x, action.y
await self._execute(pyautogui.click, x=x, y=y, clicks=3) await self._execute("click", x=x, y=y, clicks=3)
self.mouse_pos = Point(x=x, y=y) self.mouse_pos = Point(x=x, y=y)
output = f"Triple clicked at ({x}, {y})" output = f"Triple clicked at ({x}, {y})"
logger.debug(f"Action: {action.type} at ({x},{y})") logger.debug(f"Action: {action.type} at ({x},{y})")
case "scroll": case "scroll":
current_x_pos, current_y_pos = await self._execute(pyautogui.position) current_x_pos, current_y_pos = await self._execute("position")
target_x = action.x if action.x is not None else current_x_pos target_x = action.x if action.x is not None else current_x_pos
target_y = action.y if action.y is not None else current_y_pos target_y = action.y if action.y is not None else current_y_pos
if target_x != current_x_pos or target_y != current_y_pos: if target_x != current_x_pos or target_y != current_y_pos:
await self._execute(pyautogui.moveTo, target_x, target_y) await self._execute("moveTo", target_x, target_y)
self.mouse_pos = Point(x=target_x, y=target_y) # Update mouse pos to scroll location self.mouse_pos = Point(x=target_x, y=target_y) # Update mouse pos to scroll location
@@ -179,34 +205,30 @@ class ComputerEnvironment(Environment):
scroll_y_amount = action.scroll_y or 0 scroll_y_amount = action.scroll_y or 0
if scroll_x_amount != 0: if scroll_x_amount != 0:
await self._execute(pyautogui.hscroll, scroll_x_amount) # pyautogui.hscroll: positive right await self._execute("hscroll", scroll_x_amount)
if scroll_y_amount != 0: if scroll_y_amount != 0:
await self._execute(pyautogui.scroll, -scroll_y_amount) # pyautogui.scroll: positive up # pyautogui scroll: positive up, so negate for typical "scroll down" meaning positive y
await self._execute("scroll", -scroll_y_amount)
output = f"Scrolled by (x:{scroll_x_amount}, y:{scroll_y_amount}) at ({target_x}, {target_y})" output = f"Scrolled by (x:{scroll_x_amount}, y:{scroll_y_amount}) at ({target_x}, {target_y})"
logger.debug(
f"Action: {action.type} by ({scroll_x_amount},{scroll_y_amount}) at ({target_x},{target_y})"
)
elif action.scroll_direction: elif action.scroll_direction:
# Define scroll unit (number of pyautogui scroll 'clicks') # Define scroll unit (number of pyautogui scroll 'clicks')
# This might need tuning based on desired sensitivity. # This might need tuning based on desired sensitivity.
pyautogui_scroll_clicks_per_unit = 20 pyautogui_scroll_clicks_per_unit = 1
amount = action.scroll_amount or 1 amount = action.scroll_amount or 1
total_scroll_clicks = pyautogui_scroll_clicks_per_unit * amount total_scroll_clicks = pyautogui_scroll_clicks_per_unit * amount
if action.scroll_direction == "up": if action.scroll_direction == "up":
await self._execute(pyautogui.scroll, total_scroll_clicks) await self._execute("scroll", total_scroll_clicks)
elif action.scroll_direction == "down": elif action.scroll_direction == "down":
await self._execute(pyautogui.scroll, -total_scroll_clicks) await self._execute("scroll", -total_scroll_clicks)
elif action.scroll_direction == "left": elif action.scroll_direction == "left":
await self._execute(pyautogui.hscroll, -total_scroll_clicks) await self._execute("hscroll", -total_scroll_clicks)
elif action.scroll_direction == "right": elif action.scroll_direction == "right":
await self._execute(pyautogui.hscroll, total_scroll_clicks) await self._execute("hscroll", total_scroll_clicks)
output = f"Scrolled {action.scroll_direction} by {amount} units at ({target_x}, {target_y})" output = f"Scrolled {action.scroll_direction} by {amount} units at ({target_x}, {target_y})"
logger.debug(
f"Action: {action.type} {action.scroll_direction} by {amount} at ({target_x},{target_y})"
)
else: else:
error = "Scroll action requires either scroll_x/y or scroll_direction" error = "Scroll action requires either scroll_x/y or scroll_direction"
logger.debug(f"Action: {action.type} details: {output or error}")
case "keypress": case "keypress":
mapped_keys = [self.CUA_KEY_TO_PYAUTOGUI_KEY.get(k.lower(), k) for k in action.keys] mapped_keys = [self.CUA_KEY_TO_PYAUTOGUI_KEY.get(k.lower(), k) for k in action.keys]
@@ -214,10 +236,10 @@ class ComputerEnvironment(Environment):
if not mapped_keys: if not mapped_keys:
error = "Keypress action requires at least one key" error = "Keypress action requires at least one key"
elif len(mapped_keys) > 1: elif len(mapped_keys) > 1:
await self._execute(pyautogui.hotkey, *mapped_keys) await self._execute("hotkey", *mapped_keys)
key_string = "+".join(mapped_keys) key_string = "+".join(mapped_keys)
else: else:
await self._execute(pyautogui.press, mapped_keys[0]) await self._execute("press", mapped_keys[0])
key_string = mapped_keys[0] key_string = mapped_keys[0]
if not error: if not error:
output = f"Pressed key(s): {key_string}" output = f"Pressed key(s): {key_string}"
@@ -225,7 +247,7 @@ class ComputerEnvironment(Environment):
case "type": case "type":
text_to_type = action.text text_to_type = action.text
await self._execute(pyautogui.typewrite, text_to_type, interval=0.02) # Small interval await self._execute("typewrite", text_to_type, interval=0.02) # Small interval
output = f"Typed text: {text_to_type}" output = f"Typed text: {text_to_type}"
logger.debug(f"Action: {action.type} '{text_to_type}'") logger.debug(f"Action: {action.type} '{text_to_type}'")
@@ -243,7 +265,7 @@ class ComputerEnvironment(Environment):
case "move": case "move":
x, y = action.x, action.y x, y = action.x, action.y
await self._execute(pyautogui.moveTo, x, y, duration=0.2) # Small duration for smooth move await self._execute("moveTo", x, y, duration=0.2) # Small duration for smooth move
self.mouse_pos = Point(x=x, y=y) self.mouse_pos = Point(x=x, y=y)
output = f"Moved mouse to ({x}, {y})" output = f"Moved mouse to ({x}, {y})"
logger.debug(f"Action: {action.type} to ({x},{y})") logger.debug(f"Action: {action.type} to ({x},{y})")
@@ -254,24 +276,24 @@ class ComputerEnvironment(Environment):
error = "Missing path for drag action" error = "Missing path for drag action"
else: else:
start_x, start_y = path[0].x, path[0].y start_x, start_y = path[0].x, path[0].y
await self._execute(pyautogui.moveTo, start_x, start_y, duration=0.1) await self._execute("moveTo", start_x, start_y, duration=0.1)
await self._execute(pyautogui.mouseDown) await self._execute("mouseDown")
for point in path[1:]: for point in path[1:]:
await self._execute(pyautogui.moveTo, point.x, point.y, duration=0.05) await self._execute("moveTo", point.x, point.y, duration=0.05)
await self._execute(pyautogui.mouseUp) await self._execute("mouseUp")
self.mouse_pos = Point(x=path[-1].x, y=path[-1].y) self.mouse_pos = Point(x=path[-1].x, y=path[-1].y)
output = f"Drag along path starting at ({start_x},{start_y})" output = f"Drag along path starting at ({start_x},{start_y})"
logger.debug(f"Action: {action.type} with {len(path)} points") logger.debug(f"Action: {action.type} with {len(path)} points")
case "mouse_down": case "mouse_down":
pyautogui_button = action.button.lower() if action.button else "left" pyautogui_button = action.button.lower() if action.button else "left"
await self._execute(pyautogui.mouseDown, button=pyautogui_button) await self._execute("mouseDown", button=pyautogui_button)
output = f"{action.button.capitalize() if action.button else 'Left'} mouse button down" output = f"{action.button.capitalize() if action.button else 'Left'} mouse button down"
logger.debug(f"Action: {action.type} {action.button}") logger.debug(f"Action: {action.type} {action.button}")
case "mouse_up": case "mouse_up":
pyautogui_button = action.button.lower() if action.button else "left" pyautogui_button = action.button.lower() if action.button else "left"
await self._execute(pyautogui.mouseUp, button=pyautogui_button) await self._execute("mouseUp", button=pyautogui_button)
output = f"{action.button.capitalize() if action.button else 'Left'} mouse button up" output = f"{action.button.capitalize() if action.button else 'Left'} mouse button up"
logger.debug(f"Action: {action.type} {action.button}") logger.debug(f"Action: {action.type} {action.button}")
@@ -283,10 +305,10 @@ class ComputerEnvironment(Environment):
error = f"No valid keys found in '{keys_to_hold_str}' for hold_key" error = f"No valid keys found in '{keys_to_hold_str}' for hold_key"
else: else:
for key_to_hold in parsed_keys: for key_to_hold in parsed_keys:
await self._execute(pyautogui.keyDown, key_to_hold) await self._execute("keyDown", key_to_hold)
await asyncio.sleep(duration) # Non-pyautogui, direct sleep await asyncio.sleep(duration) # Non-pyautogui, direct sleep
for key_to_hold in reversed(parsed_keys): # Release in reverse order for key_to_hold in reversed(parsed_keys): # Release in reverse order
await self._execute(pyautogui.keyUp, key_to_hold) await self._execute("keyUp", key_to_hold)
output = ( output = (
f"Held key{'s' if len(parsed_keys) > 1 else ''} {keys_to_hold_str} for {duration} seconds" f"Held key{'s' if len(parsed_keys) > 1 else ''} {keys_to_hold_str} for {duration} seconds"
) )
@@ -294,18 +316,18 @@ class ComputerEnvironment(Environment):
case "key_down": case "key_down":
key_to_press = self.CUA_KEY_TO_PYAUTOGUI_KEY.get(action.key.lower(), action.key) key_to_press = self.CUA_KEY_TO_PYAUTOGUI_KEY.get(action.key.lower(), action.key)
await self._execute(pyautogui.keyDown, key_to_press) await self._execute("keyDown", key_to_press)
output = f"Key down: {key_to_press}" output = f"Key down: {key_to_press}"
logger.debug(f"Action: {action.type} {key_to_press}") logger.debug(f"Action: {action.type} {key_to_press}")
case "key_up": case "key_up":
key_to_release = self.CUA_KEY_TO_PYAUTOGUI_KEY.get(action.key.lower(), action.key) key_to_release = self.CUA_KEY_TO_PYAUTOGUI_KEY.get(action.key.lower(), action.key)
await self._execute(pyautogui.keyUp, key_to_release) await self._execute("keyUp", key_to_release)
output = f"Key up: {key_to_release}" output = f"Key up: {key_to_release}"
logger.debug(f"Action: {action.type} {key_to_release}") logger.debug(f"Action: {action.type} {key_to_release}")
case "cursor_position": case "cursor_position":
pos_x, pos_y = await self._execute(pyautogui.position) pos_x, pos_y = await self._execute("position")
self.mouse_pos = Point(x=pos_x, y=pos_y) self.mouse_pos = Point(x=pos_x, y=pos_y)
output = f"Cursor position is ({pos_x}, {pos_y})" output = f"Cursor position is ({pos_x}, {pos_y})"
logger.debug(f"Action: {action.type}, position: ({pos_x},{pos_y})") logger.debug(f"Action: {action.type}, position: ({pos_x},{pos_y})")
@@ -321,7 +343,7 @@ class ComputerEnvironment(Environment):
case _: case _:
error = f"Unrecognized action type: {action.type}" error = f"Unrecognized action type: {action.type}"
logger.warning(error) logger.warning(error)
except KeyboardInterrupt as e: except KeyboardInterrupt:
error = "User interrupt. Operation aborted." error = "User interrupt. Operation aborted."
logger.error(error) logger.error(error)
except Exception as e: except Exception as e:
@@ -381,3 +403,85 @@ class ComputerEnvironment(Environment):
# Use the mapped key if found, otherwise use the string itself (e.g. 'a', '1') # Use the mapped key if found, otherwise use the string itself (e.g. 'a', '1')
mapped_keys.append(ComputerEnvironment.CUA_KEY_TO_PYAUTOGUI_KEY.get(k_str.strip(), k_str.strip())) mapped_keys.append(ComputerEnvironment.CUA_KEY_TO_PYAUTOGUI_KEY.get(k_str.strip(), k_str.strip()))
return mapped_keys return mapped_keys
def generate_pyautogui_command(self, func_name: str, *args, **kwargs) -> str:
args_repr = [repr(arg) for arg in args]
kwargs_repr = [f"{k}={repr(v)}" for k, v in kwargs.items()]
all_params_repr = ", ".join(args_repr + kwargs_repr)
# Base script setup
script_lines = [
"import os",
"import pyautogui",
]
if self.provider == "docker":
script_lines.extend(
[
# Display export for Docker.
f"os.environ['DISPLAY']='{self.docker_display}'",
# Disable failsafe in Docker to avoid accidental exits
"pyautogui.FAILSAFE = False",
]
)
# Function-specific logic
if func_name == "screenshot":
script_lines.extend(
[
"import io",
"import base64",
"img = pyautogui.screenshot()",
"buf = io.BytesIO()",
"img.save(buf, format='PNG')",
"print(base64.b64encode(buf.getvalue()).decode('utf-8'))",
]
)
elif func_name == "size":
script_lines.extend(["size = pyautogui.size()", "print(f'({size.width}, {size.height})')"])
elif func_name == "position":
script_lines.extend(["pos = pyautogui.position()", "print(f'({pos.x}, {pos.y})')"])
else: # General command structure
script_lines.extend(
[f"result = pyautogui.{func_name}({all_params_repr})", "print(result if result is not None else '')"]
)
return "; ".join(script_lines)
async def docker_execute(self, python_command_str: str) -> Optional[str]:
if not self.docker_container_name or not self.docker_display:
logger.error("Container name or Docker display not set for Docker execution.")
return None
safe_python_cmd = python_command_str.replace('"', '\\"')
docker_full_cmd = (
f'docker exec -e DISPLAY={self.docker_display} "{self.docker_container_name}" '
f'python3 -c "{safe_python_cmd}"'
)
try:
process = await asyncio.to_thread(
subprocess.run,
docker_full_cmd,
shell=True,
capture_output=True,
text=True,
check=False, # We check returncode manually
)
if process.returncode != 0:
if "FailSafeException" in process.stderr or "FailSafeException" in process.stdout:
raise KeyboardInterrupt(process.stderr or process.stdout)
else:
error_msg = (
f"Docker command failed:\nCmd: {docker_full_cmd}\n"
f"Return Code: {process.returncode}\nStderr: {process.stderr}\nStdout: {process.stdout}"
)
logger.error(error_msg)
raise RuntimeError(f"Docker exec error: {process.stderr or process.stdout}")
return process.stdout.strip()
except KeyboardInterrupt: # Re-raise if caught from above
raise
except Exception as e:
logger.error(f"Unexpected error running command in Docker '{docker_full_cmd}': {e}")
# Encapsulate as RuntimeError to avoid leaking subprocess errors directly
raise RuntimeError(f"Unexpected Docker error: {e}") from e