diff --git a/src/khoj/processor/conversation/google/utils.py b/src/khoj/processor/conversation/google/utils.py index 92f8cce7..d3776319 100644 --- a/src/khoj/processor/conversation/google/utils.py +++ b/src/khoj/processor/conversation/google/utils.py @@ -72,10 +72,52 @@ SAFETY_SETTINGS = [ ] +class GeminiRetryableClientError(Exception): + """Wrapper for retryable Gemini client errors that should surface a friendly message if retries exhaust. + + Stores the original exception plus a fallback `response_text` to return after retries are exhausted. + """ + + def __init__(self, original: gerrors.ClientError, response_text: str): + super().__init__(str(original)) + self.original = original + self.response_text = response_text + # Expose code attribute so existing retry predicate logic can still inspect it if needed + self.code = getattr(original, "code", None) + + +def _gemini_retry_error_callback(retry_state: RetryCallState): + """Produce a graceful fallback ResponseWithThought after all retry attempts fail. + + Tenacity will call this when stop condition reached and reraise=False. + Extract our custom exception to build a ResponseWithThought with the stored friendly message. + """ + exc = retry_state.outcome.exception() if retry_state.outcome else None + if isinstance(exc, GeminiRetryableClientError): + # Access original call arguments to optionally record a trace + kwargs = retry_state.kwargs or {} + messages = kwargs.get("messages") + tracer = kwargs.get("tracer", {}) + model_name = kwargs.get("model_name") + temperature = kwargs.get("temperature") + if tracer is not None: + tracer["chat_model"] = model_name + tracer["temperature"] = temperature + if messages and is_promptrace_enabled(): + try: + commit_conversation_trace(messages, exc.response_text, tracer or {}) + except Exception: + logger.debug("Failed to commit conversation trace on retry exhaustion", exc_info=True) + return ResponseWithThought(text=exc.response_text, thought=None, raw_content=[]) + else: + # Propagate other exceptions to caller. Tenacity re-raises if we re-raise here. + raise exc + + def _is_retryable_error(exception: BaseException) -> bool: """Check if the exception is a retryable error""" # server errors - if isinstance(exception, (gerrors.APIError, gerrors.ClientError)): + if isinstance(exception, (gerrors.APIError, gerrors.ClientError, GeminiRetryableClientError)): return exception.code in [429, 502, 503, 504] # client errors if ( @@ -134,7 +176,8 @@ def _wait_with_gemini_delay(min_wait=4, max_wait=120, multiplier=1, fallback_wai wait=_wait_with_gemini_delay(min_wait=1, max_wait=10, fallback_wait=wait_random_exponential(min=1, max=10)), stop=stop_after_attempt(2), before_sleep=before_sleep_log(logger, logging.DEBUG), - reraise=True, + reraise=False, + retry_error_callback=_gemini_retry_error_callback, ) def gemini_completion_with_backoff( messages: list[ChatMessage], @@ -210,15 +253,17 @@ def gemini_completion_with_backoff( ) except gerrors.ClientError as e: response = None - # Handle 429 rate limit errors directly + # For 429 rate-limit errors, raise wrapped exception so tenacity can retry. if e.code == 429: + # Prepare friendly message for eventual exhaustion response_text = "My brain is exhausted. Can you please try again in a bit?" - # Log the full error details for debugging - logger.error(f"Gemini ClientError: {e.code} {e.status}. Details: {e.details}") - # Handle other errors + logger.warning(f"Retryable Gemini ClientError: {e.code} {e.status}. Details: {e.details}") + # Raise wrapped so our retry callback can produce final ResponseWithThought + raise GeminiRetryableClientError(e, response_text) + # Handle non-retryable client errors else: + # Respond with reason for stopping response_text, _ = handle_gemini_response(e.args) - # Respond with reason for stopping logger.warning( f"LLM Response Prevented for {model_name}: {response_text}.\n" + f"Last Message by {messages[-1].role}: {messages[-1].content}" @@ -240,7 +285,9 @@ def gemini_completion_with_backoff( # Validate the response. If empty, raise an error to retry. if is_none_or_empty(response_text): - logger.warning(f"No response by {model_name}\nLast Message by {messages[-1].role}: {messages[-1].content}.") + logger.warning( + f"No response by {model_name}\nLast Message by {messages[-1].role}: {messages[-1].content}. Retry." + ) raise ValueError(f"Empty or no response by {model_name} over API. Retry if needed.") # Save conversation trace @@ -373,14 +420,14 @@ def handle_gemini_response( elif isinstance(candidates[0], str): message = candidates[0] stopped = True - # Check if the response was blocked due to safety concerns with the generated content - elif candidates[0].finish_reason == gtypes.FinishReason.SAFETY: - message = generate_safety_response(candidates[0].safety_ratings) - stopped = True # Check if finish reason is empty, therefore generation is in progress elif not candidates[0].finish_reason: message = None stopped = False + # Check if the response was blocked due to safety concerns with the generated content + elif candidates[0].finish_reason == gtypes.FinishReason.SAFETY: + message = generate_safety_response(candidates[0].safety_ratings) + stopped = True # Check if the response was stopped due to reaching maximum token limit or other reasons elif candidates[0].finish_reason != gtypes.FinishReason.STOP: message = f"\nI can't talk further about that because of **{candidates[0].finish_reason.name} issue.**"