From 3141035f48ee90856fa4a52407ce9871b6ae54ad Mon Sep 17 00:00:00 2001 From: Debanjum Date: Wed, 9 Jul 2025 17:49:52 -0700 Subject: [PATCH] Handle unexpected chunks streamed from Openai (compatible) APIs --- .../processor/conversation/openai/utils.py | 42 +++++++++++++++---- 1 file changed, 33 insertions(+), 9 deletions(-) diff --git a/src/khoj/processor/conversation/openai/utils.py b/src/khoj/processor/conversation/openai/utils.py index fc3ebd90..50678139 100644 --- a/src/khoj/processor/conversation/openai/utils.py +++ b/src/khoj/processor/conversation/openai/utils.py @@ -527,7 +527,19 @@ async def adefault_stream_processor( Async generator to cast and return chunks from the standard openai chat completions stream. """ async for chunk in chat_stream: - yield ChatCompletionWithThoughtsChunk.model_validate(chunk.model_dump()) + try: + # Validate the chunk has the required fields before processing + chunk_data = chunk.model_dump() + + # Skip chunks that don't have the required object field or have invalid values + if not chunk_data.get("object") or chunk_data.get("object") != "chat.completion.chunk": + logger.warning(f"Skipping invalid chunk with object field: {chunk_data.get('object', 'missing')}") + continue + + yield ChatCompletionWithThoughtsChunk.model_validate(chunk_data) + except Exception as e: + logger.warning(f"Error processing chunk: {e}. Skipping malformed chunk.") + continue async def adeepseek_stream_processor( @@ -537,14 +549,26 @@ async def adeepseek_stream_processor( Async generator to cast and return chunks from the deepseek chat completions stream. """ async for chunk in chat_stream: - tchunk = ChatCompletionWithThoughtsChunk.model_validate(chunk.model_dump()) - if ( - len(tchunk.choices) > 0 - and hasattr(tchunk.choices[0].delta, "reasoning_content") - and tchunk.choices[0].delta.reasoning_content - ): - tchunk.choices[0].delta.thought = chunk.choices[0].delta.reasoning_content - yield tchunk + try: + # Validate the chunk has the required fields before processing + chunk_data = chunk.model_dump() + + # Skip chunks that don't have the required object field or have invalid values + if not chunk_data.get("object") or chunk_data.get("object") != "chat.completion.chunk": + logger.warning(f"Skipping invalid chunk with object field: {chunk_data.get('object', 'missing')}") + continue + + tchunk = ChatCompletionWithThoughtsChunk.model_validate(chunk_data) + if ( + len(tchunk.choices) > 0 + and hasattr(tchunk.choices[0].delta, "reasoning_content") + and tchunk.choices[0].delta.reasoning_content + ): + tchunk.choices[0].delta.thought = chunk.choices[0].delta.reasoning_content + yield tchunk + except Exception as e: + logger.warning(f"Error processing chunk: {e}. Skipping malformed chunk.") + continue def in_stream_thought_processor(