diff --git a/src/khoj/processor/conversation/openai/utils.py b/src/khoj/processor/conversation/openai/utils.py index fc3ebd90..50678139 100644 --- a/src/khoj/processor/conversation/openai/utils.py +++ b/src/khoj/processor/conversation/openai/utils.py @@ -527,7 +527,19 @@ async def adefault_stream_processor( Async generator to cast and return chunks from the standard openai chat completions stream. """ async for chunk in chat_stream: - yield ChatCompletionWithThoughtsChunk.model_validate(chunk.model_dump()) + try: + # Validate the chunk has the required fields before processing + chunk_data = chunk.model_dump() + + # Skip chunks that don't have the required object field or have invalid values + if not chunk_data.get("object") or chunk_data.get("object") != "chat.completion.chunk": + logger.warning(f"Skipping invalid chunk with object field: {chunk_data.get('object', 'missing')}") + continue + + yield ChatCompletionWithThoughtsChunk.model_validate(chunk_data) + except Exception as e: + logger.warning(f"Error processing chunk: {e}. Skipping malformed chunk.") + continue async def adeepseek_stream_processor( @@ -537,14 +549,26 @@ async def adeepseek_stream_processor( Async generator to cast and return chunks from the deepseek chat completions stream. """ async for chunk in chat_stream: - tchunk = ChatCompletionWithThoughtsChunk.model_validate(chunk.model_dump()) - if ( - len(tchunk.choices) > 0 - and hasattr(tchunk.choices[0].delta, "reasoning_content") - and tchunk.choices[0].delta.reasoning_content - ): - tchunk.choices[0].delta.thought = chunk.choices[0].delta.reasoning_content - yield tchunk + try: + # Validate the chunk has the required fields before processing + chunk_data = chunk.model_dump() + + # Skip chunks that don't have the required object field or have invalid values + if not chunk_data.get("object") or chunk_data.get("object") != "chat.completion.chunk": + logger.warning(f"Skipping invalid chunk with object field: {chunk_data.get('object', 'missing')}") + continue + + tchunk = ChatCompletionWithThoughtsChunk.model_validate(chunk_data) + if ( + len(tchunk.choices) > 0 + and hasattr(tchunk.choices[0].delta, "reasoning_content") + and tchunk.choices[0].delta.reasoning_content + ): + tchunk.choices[0].delta.thought = chunk.choices[0].delta.reasoning_content + yield tchunk + except Exception as e: + logger.warning(f"Error processing chunk: {e}. Skipping malformed chunk.") + continue def in_stream_thought_processor(