diff --git a/src/khoj/processor/conversation/openai/utils.py b/src/khoj/processor/conversation/openai/utils.py index bec9997b..cb337362 100644 --- a/src/khoj/processor/conversation/openai/utils.py +++ b/src/khoj/processor/conversation/openai/utils.py @@ -184,6 +184,7 @@ async def chat_completion_with_backoff( reasoning_effort = "high" if deepthought else "low" model_kwargs["reasoning_effort"] = reasoning_effort elif model_name.startswith("deepseek-reasoner"): + stream_processor = adeepseek_stream_processor # Two successive messages cannot be from the same role. Should merge any back-to-back messages from the same role. # The first message should always be a user message (except system message). updated_messages: List[dict] = [] @@ -194,7 +195,6 @@ async def chat_completion_with_backoff( updated_messages[-1]["content"] += " " + message["content"] else: updated_messages.append(message) - formatted_messages = updated_messages elif is_qwen_reasoning_model(model_name, api_base_url): stream_processor = partial(ain_stream_thought_processor, thought_tag="think") @@ -349,6 +349,19 @@ async def adefault_stream_processor( yield ChatCompletionWithThoughtsChunk.model_validate(chunk.model_dump()) +async def adeepseek_stream_processor( + chat_stream: openai.AsyncStream[ChatCompletionChunk], +) -> AsyncGenerator[ChatCompletionWithThoughtsChunk, None]: + """ + Async generator to cast and return chunks from the deepseek chat completions stream. + """ + async for chunk in chat_stream: + tchunk = ChatCompletionWithThoughtsChunk.model_validate(chunk.model_dump()) + if len(tchunk.choices) > 0 and tchunk.choices[0].delta.reasoning_content: + tchunk.choices[0].delta.thought = chunk.choices[0].delta.reasoning_content + yield tchunk + + def in_stream_thought_processor( chat_stream: openai.Stream[ChatCompletionChunk], thought_tag="think" ) -> Generator[ChatCompletionStreamWithThoughtEvent, None, None]: