diff --git a/src/interface/desktop/chatutils.js b/src/interface/desktop/chatutils.js index 4f4fb64e..5213979f 100644 --- a/src/interface/desktop/chatutils.js +++ b/src/interface/desktop/chatutils.js @@ -437,36 +437,6 @@ function finalizeChatBodyResponse(references, newResponseElement) { document.getElementById("chat-input")?.removeAttribute("disabled"); } -function collectJsonsInBufferedMessageChunk(chunk) { - // Collect list of JSON objects and raw strings in the chunk - // Return the list of objects and the remaining raw string - let startIndex = chunk.indexOf('{'); - if (startIndex === -1) return { objects: [chunk], remainder: '' }; - const objects = [chunk.slice(0, startIndex)]; - let openBraces = 0; - let currentObject = ''; - - for (let i = startIndex; i < chunk.length; i++) { - if (chunk[i] === '{') { - if (openBraces === 0) startIndex = i; - openBraces++; - } - if (chunk[i] === '}') { - openBraces--; - if (openBraces === 0) { - currentObject = chunk.slice(startIndex, i + 1); - objects.push(currentObject); - currentObject = ''; - } - } - } - - return { - objects: objects, - remainder: openBraces > 0 ? chunk.slice(startIndex) : '' - }; -} - function convertMessageChunkToJson(rawChunk) { // Split the chunk into lines if (rawChunk?.startsWith("{") && rawChunk?.endsWith("}")) { @@ -554,8 +524,8 @@ async function readChatStream(response) { if (!response.body) return; const reader = response.body.getReader(); const decoder = new TextDecoder(); + const eventDelimiter = '␃🔚␗'; let buffer = ''; - let netBracketCount = 0; while (true) { const { value, done } = await reader.read(); @@ -569,14 +539,19 @@ async function readChatStream(response) { // Read chunk from stream and append it to the buffer const chunk = decoder.decode(value, { stream: true }); + console.debug("Raw Chunk:", chunk) + // Start buffering chunks until complete event is received buffer += chunk; - // Check if the buffer contains (0 or more) complete JSON objects - netBracketCount += (chunk.match(/{/g) || []).length - (chunk.match(/}/g) || []).length; - if (netBracketCount === 0) { - let chunks = collectJsonsInBufferedMessageChunk(buffer); - chunks.objects.forEach((chunk) => processMessageChunk(chunk)); - buffer = chunks.remainder; + // Once the buffer contains a complete event + let newEventIndex; + while ((newEventIndex = buffer.indexOf(eventDelimiter)) !== -1) { + // Extract the event from the buffer + const event = buffer.slice(0, newEventIndex); + buffer = buffer.slice(newEventIndex + eventDelimiter.length); + + // Process the event + if (event) processMessageChunk(event); } } } diff --git a/src/interface/obsidian/src/chat_view.ts b/src/interface/obsidian/src/chat_view.ts index a6c62fd5..cbd0f7bf 100644 --- a/src/interface/obsidian/src/chat_view.ts +++ b/src/interface/obsidian/src/chat_view.ts @@ -869,36 +869,6 @@ export class KhojChatView extends KhojPaneView { return true; } - collectJsonsInBufferedMessageChunk(chunk: string): ChunkResult { - // Collect list of JSON objects and raw strings in the chunk - // Return the list of objects and the remaining raw string - let startIndex = chunk.indexOf('{'); - if (startIndex === -1) return { objects: [chunk], remainder: '' }; - const objects: string[] = [chunk.slice(0, startIndex)]; - let openBraces = 0; - let currentObject = ''; - - for (let i = startIndex; i < chunk.length; i++) { - if (chunk[i] === '{') { - if (openBraces === 0) startIndex = i; - openBraces++; - } - if (chunk[i] === '}') { - openBraces--; - if (openBraces === 0) { - currentObject = chunk.slice(startIndex, i + 1); - objects.push(currentObject); - currentObject = ''; - } - } - } - - return { - objects: objects, - remainder: openBraces > 0 ? chunk.slice(startIndex) : '' - }; - } - convertMessageChunkToJson(rawChunk: string): MessageChunk { if (rawChunk?.startsWith("{") && rawChunk?.endsWith("}")) { try { @@ -988,8 +958,8 @@ export class KhojChatView extends KhojPaneView { const reader = response.body.getReader(); const decoder = new TextDecoder(); + const eventDelimiter = '␃🔚␗'; let buffer = ''; - let netBracketCount = 0; while (true) { const { value, done } = await reader.read(); @@ -1002,14 +972,19 @@ export class KhojChatView extends KhojPaneView { } const chunk = decoder.decode(value, { stream: true }); + console.debug("Raw Chunk:", chunk) + // Start buffering chunks until complete event is received buffer += chunk; - // Check if the buffer contains (0 or more) complete JSON objects - netBracketCount += (chunk.match(/{/g) || []).length - (chunk.match(/}/g) || []).length; - if (netBracketCount === 0) { - let chunks = this.collectJsonsInBufferedMessageChunk(buffer); - chunks.objects.forEach((chunk) => this.processMessageChunk(chunk)); - buffer = chunks.remainder; + // Once the buffer contains a complete event + let newEventIndex; + while ((newEventIndex = buffer.indexOf(eventDelimiter)) !== -1) { + // Extract the event from the buffer + const event = buffer.slice(0, newEventIndex); + buffer = buffer.slice(newEventIndex + eventDelimiter.length); + + // Process the event + if (event) this.processMessageChunk(event); } } } diff --git a/src/khoj/interface/web/chat.html b/src/khoj/interface/web/chat.html index 616e66bc..024af9ad 100644 --- a/src/khoj/interface/web/chat.html +++ b/src/khoj/interface/web/chat.html @@ -756,38 +756,9 @@ To get started, just start typing below. You can also type / to see a list of co document.getElementById("chat-input")?.removeAttribute("disabled"); } - function collectJsonsInBufferedMessageChunk(chunk) { - // Collect list of JSON objects and raw strings in the chunk - // Return the list of objects and the remaining raw string - let startIndex = chunk.indexOf('{'); - if (startIndex === -1) return { objects: [chunk], remainder: '' }; - const objects = [chunk.slice(0, startIndex)]; - let openBraces = 0; - let currentObject = ''; - - for (let i = startIndex; i < chunk.length; i++) { - if (chunk[i] === '{') { - if (openBraces === 0) startIndex = i; - openBraces++; - } - if (chunk[i] === '}') { - openBraces--; - if (openBraces === 0) { - currentObject = chunk.slice(startIndex, i + 1); - objects.push(currentObject); - currentObject = ''; - } - } - } - - return { - objects: objects, - remainder: openBraces > 0 ? chunk.slice(startIndex) : '' - }; - } - function convertMessageChunkToJson(rawChunk) { // Split the chunk into lines + console.debug("Raw Event:", rawChunk); if (rawChunk?.startsWith("{") && rawChunk?.endsWith("}")) { try { let jsonChunk = JSON.parse(rawChunk); @@ -804,7 +775,7 @@ To get started, just start typing below. You can also type / to see a list of co function processMessageChunk(rawChunk) { const chunk = convertMessageChunkToJson(rawChunk); - console.debug("Chunk:", chunk); + console.debug("Json Event:", chunk); if (!chunk || !chunk.type) return; if (chunk.type ==='status') { console.log(`status: ${chunk.data}`); @@ -873,8 +844,8 @@ To get started, just start typing below. You can also type / to see a list of co if (!response.body) return; const reader = response.body.getReader(); const decoder = new TextDecoder(); + const eventDelimiter = '␃🔚␗'; let buffer = ''; - let netBracketCount = 0; while (true) { const { value, done } = await reader.read(); @@ -888,14 +859,19 @@ To get started, just start typing below. You can also type / to see a list of co // Read chunk from stream and append it to the buffer const chunk = decoder.decode(value, { stream: true }); + console.debug("Raw Chunk:", chunk) + // Start buffering chunks until complete event is received buffer += chunk; - // Check if the buffer contains (0 or more) complete JSON objects - netBracketCount += (chunk.match(/{/g) || []).length - (chunk.match(/}/g) || []).length; - if (netBracketCount === 0) { - let chunks = collectJsonsInBufferedMessageChunk(buffer); - chunks.objects.forEach((chunk) => processMessageChunk(chunk)); - buffer = chunks.remainder; + // Once the buffer contains a complete event + let newEventIndex; + while ((newEventIndex = buffer.indexOf(eventDelimiter)) !== -1) { + // Extract the event from the buffer + const event = buffer.slice(0, newEventIndex); + buffer = buffer.slice(newEventIndex + eventDelimiter.length); + + // Process the event + if (event) processMessageChunk(event); } } } diff --git a/src/khoj/routers/api_chat.py b/src/khoj/routers/api_chat.py index 22fb4f03..9154bff8 100644 --- a/src/khoj/routers/api_chat.py +++ b/src/khoj/routers/api_chat.py @@ -548,6 +548,7 @@ async def chat( chat_metadata: dict = {} connection_alive = True user: KhojUser = request.user.object + event_delimiter = "␃🔚␗" q = unquote(q) async def send_event(event_type: str, data: str | dict): @@ -564,7 +565,7 @@ async def chat( if event_type == "message": yield data elif event_type == "references" or stream: - yield json.dumps({"type": event_type, "data": data}) + yield json.dumps({"type": event_type, "data": data}, ensure_ascii=False) except asyncio.CancelledError: connection_alive = False logger.warn(f"User {user} disconnected from {common.client} client") @@ -573,6 +574,9 @@ async def chat( connection_alive = False logger.error(f"Failed to stream chat API response to {user} on {common.client}: {e}", exc_info=True) return + finally: + if stream: + yield event_delimiter async def send_llm_response(response: str): async for result in send_event("start_llm_response", ""):