Use tenacity retry decorator to retry executing code in sandbox

This commit is contained in:
Debanjum
2025-02-12 23:09:11 +05:30
parent 4a28714a08
commit ecc2f79571

View File

@@ -4,11 +4,18 @@ import datetime
import logging
import mimetypes
import os
from functools import wraps
from pathlib import Path
from typing import Any, Callable, List, NamedTuple, Optional
import aiohttp
from httpx import RemoteProtocolError
from tenacity import (
before_sleep_log,
retry,
retry_if_exception_type,
stop_after_attempt,
wait_random_exponential,
)
from khoj.database.adapters import FileObjectAdapters
from khoj.database.models import Agent, FileObject, KhojUser
@@ -92,6 +99,8 @@ async def run_code(
cleaned_result = truncate_code_context({"cleaned": {"results": result}})["cleaned"]["results"]
logger.info(f"Executed Code\n----\n{code}\n----\nResult\n----\n{cleaned_result}\n----")
yield {query: {"code": code, "results": result}}
except asyncio.TimeoutError as e:
raise ValueError(f"Failed to run code for {query} with Timeout error: {e}")
except Exception as e:
raise ValueError(f"Failed to run code for {query} with error: {e}")
@@ -146,28 +155,19 @@ async def generate_python_code(
return GeneratedCode(code, input_files, input_links)
def async_retry_with_backoff(retries=3, backoff_in_seconds=1):
def decorator(func):
@wraps(func)
async def wrapper(*args, **kwargs):
retry_count = 0
while retry_count < retries:
try:
return await func(*args, **kwargs)
except (aiohttp.ClientError, asyncio.TimeoutError) as e:
retry_count += 1
if retry_count == retries:
raise e
wait_time = backoff_in_seconds * (2 ** (retry_count - 1)) # exponential backoff
await asyncio.sleep(wait_time)
return await func(*args, **kwargs)
return wrapper
return decorator
@async_retry_with_backoff(retries=3, backoff_in_seconds=1)
@retry(
retry=(
retry_if_exception_type(aiohttp.ClientError)
| retry_if_exception_type(aiohttp.ClientTimeout)
| retry_if_exception_type(asyncio.TimeoutError)
| retry_if_exception_type(ConnectionError)
| retry_if_exception_type(RemoteProtocolError)
),
wait=wait_random_exponential(min=1, max=5),
stop=stop_after_attempt(3),
before_sleep=before_sleep_log(logger, logging.DEBUG),
reraise=True,
)
async def execute_sandboxed_python(code: str, input_data: list[dict], sandbox_url: str = SANDBOX_URL) -> dict[str, Any]:
"""
Takes code to run as a string and calls the terrarium API to execute it.