From 61a50efcc3debe0eabf572149d440f62c30b9ecd Mon Sep 17 00:00:00 2001 From: Debanjum Date: Fri, 2 May 2025 18:36:24 -0600 Subject: [PATCH] Parse DeepSeek reasoning model thoughts served via OpenAI compatible API DeepSeek reasoners returns reasoning in reasoning_content field. Create an async stream processor to parse the reasoning out when using the deepseek reasoner model. --- src/khoj/processor/conversation/openai/utils.py | 15 ++++++++++++++- 1 file changed, 14 insertions(+), 1 deletion(-) diff --git a/src/khoj/processor/conversation/openai/utils.py b/src/khoj/processor/conversation/openai/utils.py index bec9997b..cb337362 100644 --- a/src/khoj/processor/conversation/openai/utils.py +++ b/src/khoj/processor/conversation/openai/utils.py @@ -184,6 +184,7 @@ async def chat_completion_with_backoff( reasoning_effort = "high" if deepthought else "low" model_kwargs["reasoning_effort"] = reasoning_effort elif model_name.startswith("deepseek-reasoner"): + stream_processor = adeepseek_stream_processor # Two successive messages cannot be from the same role. Should merge any back-to-back messages from the same role. # The first message should always be a user message (except system message). updated_messages: List[dict] = [] @@ -194,7 +195,6 @@ async def chat_completion_with_backoff( updated_messages[-1]["content"] += " " + message["content"] else: updated_messages.append(message) - formatted_messages = updated_messages elif is_qwen_reasoning_model(model_name, api_base_url): stream_processor = partial(ain_stream_thought_processor, thought_tag="think") @@ -349,6 +349,19 @@ async def adefault_stream_processor( yield ChatCompletionWithThoughtsChunk.model_validate(chunk.model_dump()) +async def adeepseek_stream_processor( + chat_stream: openai.AsyncStream[ChatCompletionChunk], +) -> AsyncGenerator[ChatCompletionWithThoughtsChunk, None]: + """ + Async generator to cast and return chunks from the deepseek chat completions stream. + """ + async for chunk in chat_stream: + tchunk = ChatCompletionWithThoughtsChunk.model_validate(chunk.model_dump()) + if len(tchunk.choices) > 0 and tchunk.choices[0].delta.reasoning_content: + tchunk.choices[0].delta.thought = chunk.choices[0].delta.reasoning_content + yield tchunk + + def in_stream_thought_processor( chat_stream: openai.Stream[ChatCompletionChunk], thought_tag="think" ) -> Generator[ChatCompletionStreamWithThoughtEvent, None, None]: