From 3f5f418d0ea87205914c2c6d4fb9f534bb53a008 Mon Sep 17 00:00:00 2001 From: Debanjum Singh Solanky Date: Tue, 23 Jul 2024 15:02:31 +0530 Subject: [PATCH] Use new chat streaming API to show Khoj train of thought in Obsidian client --- src/interface/obsidian/src/chat_view.ts | 292 +++++++++++++++--------- 1 file changed, 179 insertions(+), 113 deletions(-) diff --git a/src/interface/obsidian/src/chat_view.ts b/src/interface/obsidian/src/chat_view.ts index 9ad187b0..121d0a87 100644 --- a/src/interface/obsidian/src/chat_view.ts +++ b/src/interface/obsidian/src/chat_view.ts @@ -12,6 +12,25 @@ export interface ChatJsonResult { inferredQueries?: string[]; } +interface ChunkResult { + objects: string[]; + remainder: string; +} + +interface MessageChunk { + type: string; + data: any; +} + +interface ChatMessageState { + newResponseTextEl: HTMLElement | null; + newResponseEl: HTMLElement | null; + loadingEllipsis: HTMLElement | null; + references: any; + rawResponse: string; + rawQuery: string; + isVoice: boolean; +} interface Location { region: string; @@ -26,6 +45,7 @@ export class KhojChatView extends KhojPaneView { waitingForLocation: boolean; location: Location; keyPressTimeout: NodeJS.Timeout | null = null; + chatMessageState: ChatMessageState; constructor(leaf: WorkspaceLeaf, setting: KhojSetting) { super(leaf, setting); @@ -410,7 +430,6 @@ export class KhojChatView extends KhojPaneView { // Convert the message to html, sanitize the message html and render it to the real DOM let chatMessageBodyTextEl = this.contentEl.createDiv(); - chatMessageBodyTextEl.className = "chat-message-text-response"; chatMessageBodyTextEl.innerHTML = this.markdownTextToSanitizedHtml(message, this); // Add a copy button to each chat message, if it doesn't already exist @@ -541,11 +560,7 @@ export class KhojChatView extends KhojPaneView { "data-meta": `🏮 Khoj at ${messageTime}`, class: `khoj-chat-message khoj` }, - }).createDiv({ - attr: { - class: `khoj-chat-message-text khoj` - }, - }).createDiv(); + }) // Scroll to bottom after inserting chat messages this.scrollChatToBottom(); @@ -554,14 +569,14 @@ export class KhojChatView extends KhojPaneView { } async renderIncrementalMessage(htmlElement: HTMLDivElement, additionalMessage: string) { - this.result += additionalMessage; + this.chatMessageState.rawResponse += additionalMessage; htmlElement.innerHTML = ""; // Sanitize the markdown to render - this.result = DOMPurify.sanitize(this.result); + this.chatMessageState.rawResponse = DOMPurify.sanitize(this.chatMessageState.rawResponse); // @ts-ignore - htmlElement.innerHTML = this.markdownTextToSanitizedHtml(this.result, this); + htmlElement.innerHTML = this.markdownTextToSanitizedHtml(this.chatMessageState.rawResponse, this); // Render action buttons for the message - this.renderActionButtons(this.result, htmlElement); + this.renderActionButtons(this.chatMessageState.rawResponse, htmlElement); // Scroll to bottom of modal, till the send message input box this.scrollChatToBottom(); } @@ -854,35 +869,147 @@ export class KhojChatView extends KhojPaneView { return true; } - async readChatStream(response: Response, responseElement: HTMLDivElement, isVoice: boolean = false): Promise { + collectJsonsInBufferedMessageChunk(chunk: string): ChunkResult { + // Collect list of JSON objects and raw strings in the chunk + // Return the list of objects and the remaining raw string + let startIndex = chunk.indexOf('{'); + if (startIndex === -1) return { objects: [chunk], remainder: '' }; + const objects: string[] = [chunk.slice(0, startIndex)]; + let openBraces = 0; + let currentObject = ''; + + for (let i = startIndex; i < chunk.length; i++) { + if (chunk[i] === '{') { + if (openBraces === 0) startIndex = i; + openBraces++; + } + if (chunk[i] === '}') { + openBraces--; + if (openBraces === 0) { + currentObject = chunk.slice(startIndex, i + 1); + objects.push(currentObject); + currentObject = ''; + } + } + } + + return { + objects: objects, + remainder: openBraces > 0 ? chunk.slice(startIndex) : '' + }; + } + + convertMessageChunkToJson(rawChunk: string): MessageChunk { + if (rawChunk?.startsWith("{") && rawChunk?.endsWith("}")) { + try { + let jsonChunk = JSON.parse(rawChunk); + if (!jsonChunk.type) + jsonChunk = {type: 'message', data: jsonChunk}; + return jsonChunk; + } catch (e) { + return {type: 'message', data: rawChunk}; + } + } else if (rawChunk.length > 0) { + return {type: 'message', data: rawChunk}; + } + return {type: '', data: ''}; + } + + processMessageChunk(rawChunk: string): void { + const chunk = this.convertMessageChunkToJson(rawChunk); + console.debug("Chunk:", chunk); + if (!chunk || !chunk.type) return; + if (chunk.type === 'status') { + console.log(`status: ${chunk.data}`); + const statusMessage = chunk.data; + this.handleStreamResponse(this.chatMessageState.newResponseTextEl, statusMessage, this.chatMessageState.loadingEllipsis, false); + } else if (chunk.type === 'start_llm_response') { + console.log("Started streaming", new Date()); + } else if (chunk.type === 'end_llm_response') { + console.log("Stopped streaming", new Date()); + + // Automatically respond with voice if the subscribed user has sent voice message + if (this.chatMessageState.isVoice && this.setting.userInfo?.is_active) + this.textToSpeech(this.chatMessageState.rawResponse); + + // Append any references after all the data has been streamed + this.finalizeChatBodyResponse(this.chatMessageState.references, this.chatMessageState.newResponseTextEl); + + const liveQuery = this.chatMessageState.rawQuery; + // Reset variables + this.chatMessageState = { + newResponseTextEl: null, + newResponseEl: null, + loadingEllipsis: null, + references: {}, + rawResponse: "", + rawQuery: liveQuery, + isVoice: false, + }; + } else if (chunk.type === "references") { + this.chatMessageState.references = {"notes": chunk.data.context, "online": chunk.data.online_results}; + } else if (chunk.type === 'message') { + const chunkData = chunk.data; + if (typeof chunkData === 'object' && chunkData !== null) { + // If chunkData is already a JSON object + this.handleJsonResponse(chunkData); + } else if (typeof chunkData === 'string' && chunkData.trim()?.startsWith("{") && chunkData.trim()?.endsWith("}")) { + // Try process chunk data as if it is a JSON object + try { + const jsonData = JSON.parse(chunkData.trim()); + this.handleJsonResponse(jsonData); + } catch (e) { + this.chatMessageState.rawResponse += chunkData; + this.handleStreamResponse(this.chatMessageState.newResponseTextEl, this.chatMessageState.rawResponse, this.chatMessageState.loadingEllipsis); + } + } else { + this.chatMessageState.rawResponse += chunkData; + this.handleStreamResponse(this.chatMessageState.newResponseTextEl, this.chatMessageState.rawResponse, this.chatMessageState.loadingEllipsis); + } + } + } + + handleJsonResponse(jsonData: any): void { + if (jsonData.image || jsonData.detail) { + this.chatMessageState.rawResponse = this.handleImageResponse(jsonData, this.chatMessageState.rawResponse); + } else if (jsonData.response) { + this.chatMessageState.rawResponse = jsonData.response; + } + + if (this.chatMessageState.newResponseTextEl) { + this.chatMessageState.newResponseTextEl.innerHTML = ""; + this.chatMessageState.newResponseTextEl.appendChild(this.formatHTMLMessage(this.chatMessageState.rawResponse)); + } + } + + async readChatStream(response: Response): Promise { // Exit if response body is empty if (response.body == null) return; const reader = response.body.getReader(); const decoder = new TextDecoder(); + let buffer = ''; + let netBracketCount = 0; while (true) { const { value, done } = await reader.read(); if (done) { - // Automatically respond with voice if the subscribed user has sent voice message - if (isVoice && this.setting.userInfo?.is_active) this.textToSpeech(this.result); + this.processMessageChunk(buffer); + buffer = ''; // Break if the stream is done break; } - let responseText = decoder.decode(value); - if (responseText.includes("### compiled references:")) { - // Render any references used to generate the response - const [additionalResponse, rawReference] = responseText.split("### compiled references:", 2); - await this.renderIncrementalMessage(responseElement, additionalResponse); + const chunk = decoder.decode(value, { stream: true }); + buffer += chunk; - const rawReferenceAsJson = JSON.parse(rawReference); - let references = this.extractReferences(rawReferenceAsJson); - responseElement.appendChild(this.createReferenceSection(references)); - } else { - // Render incremental chat response - await this.renderIncrementalMessage(responseElement, responseText); + // Check if the buffer contains (0 or more) complete JSON objects + netBracketCount += (chunk.match(/{/g) || []).length - (chunk.match(/}/g) || []).length; + if (netBracketCount === 0) { + let chunks = this.collectJsonsInBufferedMessageChunk(buffer); + chunks.objects.forEach((chunk) => this.processMessageChunk(chunk)); + buffer = chunks.remainder; } } } @@ -909,69 +1036,45 @@ export class KhojChatView extends KhojPaneView { // Get chat response from Khoj backend let encodedQuery = encodeURIComponent(query); - let chatUrl = `${this.setting.khojUrl}/api/chat?q=${encodedQuery}&n=${this.setting.resultsCount}&client=obsidian&stream=true®ion=${this.location.region}&city=${this.location.city}&country=${this.location.countryName}&timezone=${this.location.timezone}`; - let responseElement = this.createKhojResponseDiv(); + let chatUrl = `${this.setting.khojUrl}/api/chat?q=${encodedQuery}&conversation_id=${conversationId}&n=${this.setting.resultsCount}&stream=true&client=obsidian`; + if (!!this.location) chatUrl += `®ion=${this.location.region}&city=${this.location.city}&country=${this.location.countryName}&timezone=${this.location.timezone}`; + + let newResponseEl = this.createKhojResponseDiv(); + let newResponseTextEl = newResponseEl.createDiv(); + newResponseTextEl.classList.add("khoj-chat-message-text", "khoj"); // Temporary status message to indicate that Khoj is thinking - this.result = ""; let loadingEllipsis = this.createLoadingEllipse(); - responseElement.appendChild(loadingEllipsis); + newResponseTextEl.appendChild(loadingEllipsis); + + // Set chat message state + this.chatMessageState = { + newResponseEl: newResponseEl, + newResponseTextEl: newResponseTextEl, + loadingEllipsis: loadingEllipsis, + references: {}, + rawQuery: query, + rawResponse: "", + isVoice: isVoice, + }; let response = await fetch(chatUrl, { method: "GET", headers: { - "Content-Type": "text/event-stream", + "Content-Type": "text/plain", "Authorization": `Bearer ${this.setting.khojApiKey}`, }, }) try { - if (response.body === null) { - throw new Error("Response body is null"); - } + if (response.body === null) throw new Error("Response body is null"); - // Clear loading status message - if (responseElement.getElementsByClassName("lds-ellipsis").length > 0 && loadingEllipsis) { - responseElement.removeChild(loadingEllipsis); - } - - // Reset collated chat result to empty string - this.result = ""; - responseElement.innerHTML = ""; - if (response.headers.get("content-type") === "application/json") { - let responseText = "" - try { - const responseAsJson = await response.json() as ChatJsonResult; - if (responseAsJson.image) { - // If response has image field, response is a generated image. - if (responseAsJson.intentType === "text-to-image") { - responseText += `![${query}](data:image/png;base64,${responseAsJson.image})`; - } else if (responseAsJson.intentType === "text-to-image2") { - responseText += `![${query}](${responseAsJson.image})`; - } else if (responseAsJson.intentType === "text-to-image-v3") { - responseText += `![${query}](data:image/webp;base64,${responseAsJson.image})`; - } - const inferredQuery = responseAsJson.inferredQueries?.[0]; - if (inferredQuery) { - responseText += `\n\n**Inferred Query**:\n\n${inferredQuery}`; - } - } else if (responseAsJson.detail) { - responseText = responseAsJson.detail; - } - } catch (error) { - // If the chunk is not a JSON object, just display it as is - responseText = await response.text(); - } finally { - await this.renderIncrementalMessage(responseElement, responseText); - } - } else { - // Stream and render chat response - await this.readChatStream(response, responseElement, isVoice); - } + // Stream and render chat response + await this.readChatStream(response); } catch (err) { - console.log(`Khoj chat response failed with\n${err}`); + console.error(`Khoj chat response failed with\n${err}`); let errorMsg = "Sorry, unable to get response from Khoj backend ❤️‍🩹. Retry or contact developers for help at team@khoj.dev or on Discord"; - responseElement.innerHTML = errorMsg + newResponseTextEl.textContent = errorMsg; } } @@ -1196,7 +1299,7 @@ export class KhojChatView extends KhojPaneView { handleStreamResponse(newResponseElement: HTMLElement | null, rawResponse: string, loadingEllipsis: HTMLElement | null, replace = true) { if (!newResponseElement) return; - if (newResponseElement.getElementsByClassName("lds-ellipsis").length > 0 && loadingEllipsis) { + if (replace && newResponseElement.getElementsByClassName("lds-ellipsis").length > 0 && loadingEllipsis) { newResponseElement.removeChild(loadingEllipsis); } if (replace) { @@ -1206,20 +1309,6 @@ export class KhojChatView extends KhojPaneView { this.scrollChatToBottom(); } - handleCompiledReferences(rawResponseElement: HTMLElement | null, chunk: string, references: any, rawResponse: string) { - if (!rawResponseElement || !chunk) return { rawResponse, references }; - - const [additionalResponse, rawReference] = chunk.split("### compiled references:", 2); - rawResponse += additionalResponse; - rawResponseElement.innerHTML = ""; - rawResponseElement.appendChild(this.formatHTMLMessage(rawResponse)); - - const rawReferenceAsJson = JSON.parse(rawReference); - references = this.extractReferences(rawReferenceAsJson); - - return { rawResponse, references }; - } - handleImageResponse(imageJson: any, rawResponse: string) { if (imageJson.image) { const inferredQuery = imageJson.inferredQueries?.[0] ?? "generated image"; @@ -1236,33 +1325,10 @@ export class KhojChatView extends KhojPaneView { rawResponse += `\n\n**Inferred Query**:\n\n${inferredQuery}`; } } - let references = {}; - if (imageJson.context && imageJson.context.length > 0) { - references = this.extractReferences(imageJson.context); - } - if (imageJson.detail) { - // If response has detail field, response is an error message. - rawResponse += imageJson.detail; - } - return { rawResponse, references }; - } + // If response has detail field, response is an error message. + if (imageJson.detail) rawResponse += imageJson.detail; - extractReferences(rawReferenceAsJson: any): object { - let references: any = {}; - if (rawReferenceAsJson instanceof Array) { - references["notes"] = rawReferenceAsJson; - } else if (typeof rawReferenceAsJson === "object" && rawReferenceAsJson !== null) { - references["online"] = rawReferenceAsJson; - } - return references; - } - - addMessageToChatBody(rawResponse: string, newResponseElement: HTMLElement | null, references: any) { - if (!newResponseElement) return; - newResponseElement.innerHTML = ""; - newResponseElement.appendChild(this.formatHTMLMessage(rawResponse)); - - this.finalizeChatBodyResponse(references, newResponseElement); + return rawResponse; } finalizeChatBodyResponse(references: object, newResponseElement: HTMLElement | null) {