Extract events even when http chunk contains partial or mutiple events

Previous logic was more brittle to break with simple unbalanced
'{' or '}' string present in the event data. This method of trying to
identify valid json obj was fairly brittle. It only allowed json
objects or processed event as raw strings.

Now we buffer chunk until we see our unicode magic delimiter and only
then process it.

This is much less likely to break based on event data and the
delimiter is more tunable if we want to reduce rendering breakage
likelihood further
This commit is contained in:
Debanjum Singh Solanky
2024-07-24 16:51:04 +05:30
parent 70201e8db8
commit 37b8fc5577
4 changed files with 43 additions and 113 deletions

View File

@@ -437,36 +437,6 @@ function finalizeChatBodyResponse(references, newResponseElement) {
document.getElementById("chat-input")?.removeAttribute("disabled"); document.getElementById("chat-input")?.removeAttribute("disabled");
} }
function collectJsonsInBufferedMessageChunk(chunk) {
// Collect list of JSON objects and raw strings in the chunk
// Return the list of objects and the remaining raw string
let startIndex = chunk.indexOf('{');
if (startIndex === -1) return { objects: [chunk], remainder: '' };
const objects = [chunk.slice(0, startIndex)];
let openBraces = 0;
let currentObject = '';
for (let i = startIndex; i < chunk.length; i++) {
if (chunk[i] === '{') {
if (openBraces === 0) startIndex = i;
openBraces++;
}
if (chunk[i] === '}') {
openBraces--;
if (openBraces === 0) {
currentObject = chunk.slice(startIndex, i + 1);
objects.push(currentObject);
currentObject = '';
}
}
}
return {
objects: objects,
remainder: openBraces > 0 ? chunk.slice(startIndex) : ''
};
}
function convertMessageChunkToJson(rawChunk) { function convertMessageChunkToJson(rawChunk) {
// Split the chunk into lines // Split the chunk into lines
if (rawChunk?.startsWith("{") && rawChunk?.endsWith("}")) { if (rawChunk?.startsWith("{") && rawChunk?.endsWith("}")) {
@@ -554,8 +524,8 @@ async function readChatStream(response) {
if (!response.body) return; if (!response.body) return;
const reader = response.body.getReader(); const reader = response.body.getReader();
const decoder = new TextDecoder(); const decoder = new TextDecoder();
const eventDelimiter = '␃🔚␗';
let buffer = ''; let buffer = '';
let netBracketCount = 0;
while (true) { while (true) {
const { value, done } = await reader.read(); const { value, done } = await reader.read();
@@ -569,14 +539,19 @@ async function readChatStream(response) {
// Read chunk from stream and append it to the buffer // Read chunk from stream and append it to the buffer
const chunk = decoder.decode(value, { stream: true }); const chunk = decoder.decode(value, { stream: true });
console.debug("Raw Chunk:", chunk)
// Start buffering chunks until complete event is received
buffer += chunk; buffer += chunk;
// Check if the buffer contains (0 or more) complete JSON objects // Once the buffer contains a complete event
netBracketCount += (chunk.match(/{/g) || []).length - (chunk.match(/}/g) || []).length; let newEventIndex;
if (netBracketCount === 0) { while ((newEventIndex = buffer.indexOf(eventDelimiter)) !== -1) {
let chunks = collectJsonsInBufferedMessageChunk(buffer); // Extract the event from the buffer
chunks.objects.forEach((chunk) => processMessageChunk(chunk)); const event = buffer.slice(0, newEventIndex);
buffer = chunks.remainder; buffer = buffer.slice(newEventIndex + eventDelimiter.length);
// Process the event
if (event) processMessageChunk(event);
} }
} }
} }

View File

@@ -869,36 +869,6 @@ export class KhojChatView extends KhojPaneView {
return true; return true;
} }
collectJsonsInBufferedMessageChunk(chunk: string): ChunkResult {
// Collect list of JSON objects and raw strings in the chunk
// Return the list of objects and the remaining raw string
let startIndex = chunk.indexOf('{');
if (startIndex === -1) return { objects: [chunk], remainder: '' };
const objects: string[] = [chunk.slice(0, startIndex)];
let openBraces = 0;
let currentObject = '';
for (let i = startIndex; i < chunk.length; i++) {
if (chunk[i] === '{') {
if (openBraces === 0) startIndex = i;
openBraces++;
}
if (chunk[i] === '}') {
openBraces--;
if (openBraces === 0) {
currentObject = chunk.slice(startIndex, i + 1);
objects.push(currentObject);
currentObject = '';
}
}
}
return {
objects: objects,
remainder: openBraces > 0 ? chunk.slice(startIndex) : ''
};
}
convertMessageChunkToJson(rawChunk: string): MessageChunk { convertMessageChunkToJson(rawChunk: string): MessageChunk {
if (rawChunk?.startsWith("{") && rawChunk?.endsWith("}")) { if (rawChunk?.startsWith("{") && rawChunk?.endsWith("}")) {
try { try {
@@ -988,8 +958,8 @@ export class KhojChatView extends KhojPaneView {
const reader = response.body.getReader(); const reader = response.body.getReader();
const decoder = new TextDecoder(); const decoder = new TextDecoder();
const eventDelimiter = '␃🔚␗';
let buffer = ''; let buffer = '';
let netBracketCount = 0;
while (true) { while (true) {
const { value, done } = await reader.read(); const { value, done } = await reader.read();
@@ -1002,14 +972,19 @@ export class KhojChatView extends KhojPaneView {
} }
const chunk = decoder.decode(value, { stream: true }); const chunk = decoder.decode(value, { stream: true });
console.debug("Raw Chunk:", chunk)
// Start buffering chunks until complete event is received
buffer += chunk; buffer += chunk;
// Check if the buffer contains (0 or more) complete JSON objects // Once the buffer contains a complete event
netBracketCount += (chunk.match(/{/g) || []).length - (chunk.match(/}/g) || []).length; let newEventIndex;
if (netBracketCount === 0) { while ((newEventIndex = buffer.indexOf(eventDelimiter)) !== -1) {
let chunks = this.collectJsonsInBufferedMessageChunk(buffer); // Extract the event from the buffer
chunks.objects.forEach((chunk) => this.processMessageChunk(chunk)); const event = buffer.slice(0, newEventIndex);
buffer = chunks.remainder; buffer = buffer.slice(newEventIndex + eventDelimiter.length);
// Process the event
if (event) this.processMessageChunk(event);
} }
} }
} }

View File

@@ -756,38 +756,9 @@ To get started, just start typing below. You can also type / to see a list of co
document.getElementById("chat-input")?.removeAttribute("disabled"); document.getElementById("chat-input")?.removeAttribute("disabled");
} }
function collectJsonsInBufferedMessageChunk(chunk) {
// Collect list of JSON objects and raw strings in the chunk
// Return the list of objects and the remaining raw string
let startIndex = chunk.indexOf('{');
if (startIndex === -1) return { objects: [chunk], remainder: '' };
const objects = [chunk.slice(0, startIndex)];
let openBraces = 0;
let currentObject = '';
for (let i = startIndex; i < chunk.length; i++) {
if (chunk[i] === '{') {
if (openBraces === 0) startIndex = i;
openBraces++;
}
if (chunk[i] === '}') {
openBraces--;
if (openBraces === 0) {
currentObject = chunk.slice(startIndex, i + 1);
objects.push(currentObject);
currentObject = '';
}
}
}
return {
objects: objects,
remainder: openBraces > 0 ? chunk.slice(startIndex) : ''
};
}
function convertMessageChunkToJson(rawChunk) { function convertMessageChunkToJson(rawChunk) {
// Split the chunk into lines // Split the chunk into lines
console.debug("Raw Event:", rawChunk);
if (rawChunk?.startsWith("{") && rawChunk?.endsWith("}")) { if (rawChunk?.startsWith("{") && rawChunk?.endsWith("}")) {
try { try {
let jsonChunk = JSON.parse(rawChunk); let jsonChunk = JSON.parse(rawChunk);
@@ -804,7 +775,7 @@ To get started, just start typing below. You can also type / to see a list of co
function processMessageChunk(rawChunk) { function processMessageChunk(rawChunk) {
const chunk = convertMessageChunkToJson(rawChunk); const chunk = convertMessageChunkToJson(rawChunk);
console.debug("Chunk:", chunk); console.debug("Json Event:", chunk);
if (!chunk || !chunk.type) return; if (!chunk || !chunk.type) return;
if (chunk.type ==='status') { if (chunk.type ==='status') {
console.log(`status: ${chunk.data}`); console.log(`status: ${chunk.data}`);
@@ -873,8 +844,8 @@ To get started, just start typing below. You can also type / to see a list of co
if (!response.body) return; if (!response.body) return;
const reader = response.body.getReader(); const reader = response.body.getReader();
const decoder = new TextDecoder(); const decoder = new TextDecoder();
const eventDelimiter = '␃🔚␗';
let buffer = ''; let buffer = '';
let netBracketCount = 0;
while (true) { while (true) {
const { value, done } = await reader.read(); const { value, done } = await reader.read();
@@ -888,14 +859,19 @@ To get started, just start typing below. You can also type / to see a list of co
// Read chunk from stream and append it to the buffer // Read chunk from stream and append it to the buffer
const chunk = decoder.decode(value, { stream: true }); const chunk = decoder.decode(value, { stream: true });
console.debug("Raw Chunk:", chunk)
// Start buffering chunks until complete event is received
buffer += chunk; buffer += chunk;
// Check if the buffer contains (0 or more) complete JSON objects // Once the buffer contains a complete event
netBracketCount += (chunk.match(/{/g) || []).length - (chunk.match(/}/g) || []).length; let newEventIndex;
if (netBracketCount === 0) { while ((newEventIndex = buffer.indexOf(eventDelimiter)) !== -1) {
let chunks = collectJsonsInBufferedMessageChunk(buffer); // Extract the event from the buffer
chunks.objects.forEach((chunk) => processMessageChunk(chunk)); const event = buffer.slice(0, newEventIndex);
buffer = chunks.remainder; buffer = buffer.slice(newEventIndex + eventDelimiter.length);
// Process the event
if (event) processMessageChunk(event);
} }
} }
} }

View File

@@ -548,6 +548,7 @@ async def chat(
chat_metadata: dict = {} chat_metadata: dict = {}
connection_alive = True connection_alive = True
user: KhojUser = request.user.object user: KhojUser = request.user.object
event_delimiter = "␃🔚␗"
q = unquote(q) q = unquote(q)
async def send_event(event_type: str, data: str | dict): async def send_event(event_type: str, data: str | dict):
@@ -564,7 +565,7 @@ async def chat(
if event_type == "message": if event_type == "message":
yield data yield data
elif event_type == "references" or stream: elif event_type == "references" or stream:
yield json.dumps({"type": event_type, "data": data}) yield json.dumps({"type": event_type, "data": data}, ensure_ascii=False)
except asyncio.CancelledError: except asyncio.CancelledError:
connection_alive = False connection_alive = False
logger.warn(f"User {user} disconnected from {common.client} client") logger.warn(f"User {user} disconnected from {common.client} client")
@@ -573,6 +574,9 @@ async def chat(
connection_alive = False connection_alive = False
logger.error(f"Failed to stream chat API response to {user} on {common.client}: {e}", exc_info=True) logger.error(f"Failed to stream chat API response to {user} on {common.client}: {e}", exc_info=True)
return return
finally:
if stream:
yield event_delimiter
async def send_llm_response(response: str): async def send_llm_response(response: str):
async for result in send_event("start_llm_response", ""): async for result in send_event("start_llm_response", ""):