From 37b8fc5577ad8b1dd154faba47fbf4d0aacd2819 Mon Sep 17 00:00:00 2001 From: Debanjum Singh Solanky Date: Wed, 24 Jul 2024 16:51:04 +0530 Subject: [PATCH] Extract events even when http chunk contains partial or mutiple events Previous logic was more brittle to break with simple unbalanced '{' or '}' string present in the event data. This method of trying to identify valid json obj was fairly brittle. It only allowed json objects or processed event as raw strings. Now we buffer chunk until we see our unicode magic delimiter and only then process it. This is much less likely to break based on event data and the delimiter is more tunable if we want to reduce rendering breakage likelihood further --- src/interface/desktop/chatutils.js | 49 ++++++----------------- src/interface/obsidian/src/chat_view.ts | 49 ++++++----------------- src/khoj/interface/web/chat.html | 52 +++++++------------------ src/khoj/routers/api_chat.py | 6 ++- 4 files changed, 43 insertions(+), 113 deletions(-) diff --git a/src/interface/desktop/chatutils.js b/src/interface/desktop/chatutils.js index 4f4fb64e..5213979f 100644 --- a/src/interface/desktop/chatutils.js +++ b/src/interface/desktop/chatutils.js @@ -437,36 +437,6 @@ function finalizeChatBodyResponse(references, newResponseElement) { document.getElementById("chat-input")?.removeAttribute("disabled"); } -function collectJsonsInBufferedMessageChunk(chunk) { - // Collect list of JSON objects and raw strings in the chunk - // Return the list of objects and the remaining raw string - let startIndex = chunk.indexOf('{'); - if (startIndex === -1) return { objects: [chunk], remainder: '' }; - const objects = [chunk.slice(0, startIndex)]; - let openBraces = 0; - let currentObject = ''; - - for (let i = startIndex; i < chunk.length; i++) { - if (chunk[i] === '{') { - if (openBraces === 0) startIndex = i; - openBraces++; - } - if (chunk[i] === '}') { - openBraces--; - if (openBraces === 0) { - currentObject = chunk.slice(startIndex, i + 1); - objects.push(currentObject); - currentObject = ''; - } - } - } - - return { - objects: objects, - remainder: openBraces > 0 ? chunk.slice(startIndex) : '' - }; -} - function convertMessageChunkToJson(rawChunk) { // Split the chunk into lines if (rawChunk?.startsWith("{") && rawChunk?.endsWith("}")) { @@ -554,8 +524,8 @@ async function readChatStream(response) { if (!response.body) return; const reader = response.body.getReader(); const decoder = new TextDecoder(); + const eventDelimiter = '␃🔚␗'; let buffer = ''; - let netBracketCount = 0; while (true) { const { value, done } = await reader.read(); @@ -569,14 +539,19 @@ async function readChatStream(response) { // Read chunk from stream and append it to the buffer const chunk = decoder.decode(value, { stream: true }); + console.debug("Raw Chunk:", chunk) + // Start buffering chunks until complete event is received buffer += chunk; - // Check if the buffer contains (0 or more) complete JSON objects - netBracketCount += (chunk.match(/{/g) || []).length - (chunk.match(/}/g) || []).length; - if (netBracketCount === 0) { - let chunks = collectJsonsInBufferedMessageChunk(buffer); - chunks.objects.forEach((chunk) => processMessageChunk(chunk)); - buffer = chunks.remainder; + // Once the buffer contains a complete event + let newEventIndex; + while ((newEventIndex = buffer.indexOf(eventDelimiter)) !== -1) { + // Extract the event from the buffer + const event = buffer.slice(0, newEventIndex); + buffer = buffer.slice(newEventIndex + eventDelimiter.length); + + // Process the event + if (event) processMessageChunk(event); } } } diff --git a/src/interface/obsidian/src/chat_view.ts b/src/interface/obsidian/src/chat_view.ts index a6c62fd5..cbd0f7bf 100644 --- a/src/interface/obsidian/src/chat_view.ts +++ b/src/interface/obsidian/src/chat_view.ts @@ -869,36 +869,6 @@ export class KhojChatView extends KhojPaneView { return true; } - collectJsonsInBufferedMessageChunk(chunk: string): ChunkResult { - // Collect list of JSON objects and raw strings in the chunk - // Return the list of objects and the remaining raw string - let startIndex = chunk.indexOf('{'); - if (startIndex === -1) return { objects: [chunk], remainder: '' }; - const objects: string[] = [chunk.slice(0, startIndex)]; - let openBraces = 0; - let currentObject = ''; - - for (let i = startIndex; i < chunk.length; i++) { - if (chunk[i] === '{') { - if (openBraces === 0) startIndex = i; - openBraces++; - } - if (chunk[i] === '}') { - openBraces--; - if (openBraces === 0) { - currentObject = chunk.slice(startIndex, i + 1); - objects.push(currentObject); - currentObject = ''; - } - } - } - - return { - objects: objects, - remainder: openBraces > 0 ? chunk.slice(startIndex) : '' - }; - } - convertMessageChunkToJson(rawChunk: string): MessageChunk { if (rawChunk?.startsWith("{") && rawChunk?.endsWith("}")) { try { @@ -988,8 +958,8 @@ export class KhojChatView extends KhojPaneView { const reader = response.body.getReader(); const decoder = new TextDecoder(); + const eventDelimiter = '␃🔚␗'; let buffer = ''; - let netBracketCount = 0; while (true) { const { value, done } = await reader.read(); @@ -1002,14 +972,19 @@ export class KhojChatView extends KhojPaneView { } const chunk = decoder.decode(value, { stream: true }); + console.debug("Raw Chunk:", chunk) + // Start buffering chunks until complete event is received buffer += chunk; - // Check if the buffer contains (0 or more) complete JSON objects - netBracketCount += (chunk.match(/{/g) || []).length - (chunk.match(/}/g) || []).length; - if (netBracketCount === 0) { - let chunks = this.collectJsonsInBufferedMessageChunk(buffer); - chunks.objects.forEach((chunk) => this.processMessageChunk(chunk)); - buffer = chunks.remainder; + // Once the buffer contains a complete event + let newEventIndex; + while ((newEventIndex = buffer.indexOf(eventDelimiter)) !== -1) { + // Extract the event from the buffer + const event = buffer.slice(0, newEventIndex); + buffer = buffer.slice(newEventIndex + eventDelimiter.length); + + // Process the event + if (event) this.processMessageChunk(event); } } } diff --git a/src/khoj/interface/web/chat.html b/src/khoj/interface/web/chat.html index 616e66bc..024af9ad 100644 --- a/src/khoj/interface/web/chat.html +++ b/src/khoj/interface/web/chat.html @@ -756,38 +756,9 @@ To get started, just start typing below. You can also type / to see a list of co document.getElementById("chat-input")?.removeAttribute("disabled"); } - function collectJsonsInBufferedMessageChunk(chunk) { - // Collect list of JSON objects and raw strings in the chunk - // Return the list of objects and the remaining raw string - let startIndex = chunk.indexOf('{'); - if (startIndex === -1) return { objects: [chunk], remainder: '' }; - const objects = [chunk.slice(0, startIndex)]; - let openBraces = 0; - let currentObject = ''; - - for (let i = startIndex; i < chunk.length; i++) { - if (chunk[i] === '{') { - if (openBraces === 0) startIndex = i; - openBraces++; - } - if (chunk[i] === '}') { - openBraces--; - if (openBraces === 0) { - currentObject = chunk.slice(startIndex, i + 1); - objects.push(currentObject); - currentObject = ''; - } - } - } - - return { - objects: objects, - remainder: openBraces > 0 ? chunk.slice(startIndex) : '' - }; - } - function convertMessageChunkToJson(rawChunk) { // Split the chunk into lines + console.debug("Raw Event:", rawChunk); if (rawChunk?.startsWith("{") && rawChunk?.endsWith("}")) { try { let jsonChunk = JSON.parse(rawChunk); @@ -804,7 +775,7 @@ To get started, just start typing below. You can also type / to see a list of co function processMessageChunk(rawChunk) { const chunk = convertMessageChunkToJson(rawChunk); - console.debug("Chunk:", chunk); + console.debug("Json Event:", chunk); if (!chunk || !chunk.type) return; if (chunk.type ==='status') { console.log(`status: ${chunk.data}`); @@ -873,8 +844,8 @@ To get started, just start typing below. You can also type / to see a list of co if (!response.body) return; const reader = response.body.getReader(); const decoder = new TextDecoder(); + const eventDelimiter = '␃🔚␗'; let buffer = ''; - let netBracketCount = 0; while (true) { const { value, done } = await reader.read(); @@ -888,14 +859,19 @@ To get started, just start typing below. You can also type / to see a list of co // Read chunk from stream and append it to the buffer const chunk = decoder.decode(value, { stream: true }); + console.debug("Raw Chunk:", chunk) + // Start buffering chunks until complete event is received buffer += chunk; - // Check if the buffer contains (0 or more) complete JSON objects - netBracketCount += (chunk.match(/{/g) || []).length - (chunk.match(/}/g) || []).length; - if (netBracketCount === 0) { - let chunks = collectJsonsInBufferedMessageChunk(buffer); - chunks.objects.forEach((chunk) => processMessageChunk(chunk)); - buffer = chunks.remainder; + // Once the buffer contains a complete event + let newEventIndex; + while ((newEventIndex = buffer.indexOf(eventDelimiter)) !== -1) { + // Extract the event from the buffer + const event = buffer.slice(0, newEventIndex); + buffer = buffer.slice(newEventIndex + eventDelimiter.length); + + // Process the event + if (event) processMessageChunk(event); } } } diff --git a/src/khoj/routers/api_chat.py b/src/khoj/routers/api_chat.py index 22fb4f03..9154bff8 100644 --- a/src/khoj/routers/api_chat.py +++ b/src/khoj/routers/api_chat.py @@ -548,6 +548,7 @@ async def chat( chat_metadata: dict = {} connection_alive = True user: KhojUser = request.user.object + event_delimiter = "␃🔚␗" q = unquote(q) async def send_event(event_type: str, data: str | dict): @@ -564,7 +565,7 @@ async def chat( if event_type == "message": yield data elif event_type == "references" or stream: - yield json.dumps({"type": event_type, "data": data}) + yield json.dumps({"type": event_type, "data": data}, ensure_ascii=False) except asyncio.CancelledError: connection_alive = False logger.warn(f"User {user} disconnected from {common.client} client") @@ -573,6 +574,9 @@ async def chat( connection_alive = False logger.error(f"Failed to stream chat API response to {user} on {common.client}: {e}", exc_info=True) return + finally: + if stream: + yield event_delimiter async def send_llm_response(response: str): async for result in send_event("start_llm_response", ""):