mirror of
https://github.com/khoaliber/khoj.git
synced 2026-03-09 13:25:11 +00:00
Handle unexpected chunks streamed from Openai (compatible) APIs
This commit is contained in:
@@ -527,7 +527,19 @@ async def adefault_stream_processor(
|
|||||||
Async generator to cast and return chunks from the standard openai chat completions stream.
|
Async generator to cast and return chunks from the standard openai chat completions stream.
|
||||||
"""
|
"""
|
||||||
async for chunk in chat_stream:
|
async for chunk in chat_stream:
|
||||||
yield ChatCompletionWithThoughtsChunk.model_validate(chunk.model_dump())
|
try:
|
||||||
|
# Validate the chunk has the required fields before processing
|
||||||
|
chunk_data = chunk.model_dump()
|
||||||
|
|
||||||
|
# Skip chunks that don't have the required object field or have invalid values
|
||||||
|
if not chunk_data.get("object") or chunk_data.get("object") != "chat.completion.chunk":
|
||||||
|
logger.warning(f"Skipping invalid chunk with object field: {chunk_data.get('object', 'missing')}")
|
||||||
|
continue
|
||||||
|
|
||||||
|
yield ChatCompletionWithThoughtsChunk.model_validate(chunk_data)
|
||||||
|
except Exception as e:
|
||||||
|
logger.warning(f"Error processing chunk: {e}. Skipping malformed chunk.")
|
||||||
|
continue
|
||||||
|
|
||||||
|
|
||||||
async def adeepseek_stream_processor(
|
async def adeepseek_stream_processor(
|
||||||
@@ -537,7 +549,16 @@ async def adeepseek_stream_processor(
|
|||||||
Async generator to cast and return chunks from the deepseek chat completions stream.
|
Async generator to cast and return chunks from the deepseek chat completions stream.
|
||||||
"""
|
"""
|
||||||
async for chunk in chat_stream:
|
async for chunk in chat_stream:
|
||||||
tchunk = ChatCompletionWithThoughtsChunk.model_validate(chunk.model_dump())
|
try:
|
||||||
|
# Validate the chunk has the required fields before processing
|
||||||
|
chunk_data = chunk.model_dump()
|
||||||
|
|
||||||
|
# Skip chunks that don't have the required object field or have invalid values
|
||||||
|
if not chunk_data.get("object") or chunk_data.get("object") != "chat.completion.chunk":
|
||||||
|
logger.warning(f"Skipping invalid chunk with object field: {chunk_data.get('object', 'missing')}")
|
||||||
|
continue
|
||||||
|
|
||||||
|
tchunk = ChatCompletionWithThoughtsChunk.model_validate(chunk_data)
|
||||||
if (
|
if (
|
||||||
len(tchunk.choices) > 0
|
len(tchunk.choices) > 0
|
||||||
and hasattr(tchunk.choices[0].delta, "reasoning_content")
|
and hasattr(tchunk.choices[0].delta, "reasoning_content")
|
||||||
@@ -545,6 +566,9 @@ async def adeepseek_stream_processor(
|
|||||||
):
|
):
|
||||||
tchunk.choices[0].delta.thought = chunk.choices[0].delta.reasoning_content
|
tchunk.choices[0].delta.thought = chunk.choices[0].delta.reasoning_content
|
||||||
yield tchunk
|
yield tchunk
|
||||||
|
except Exception as e:
|
||||||
|
logger.warning(f"Error processing chunk: {e}. Skipping malformed chunk.")
|
||||||
|
continue
|
||||||
|
|
||||||
|
|
||||||
def in_stream_thought_processor(
|
def in_stream_thought_processor(
|
||||||
|
|||||||
Reference in New Issue
Block a user