diff --git a/src/interface/web/app/share/chat/page.tsx b/src/interface/web/app/share/chat/page.tsx index d270c1f0..f02ed1a1 100644 --- a/src/interface/web/app/share/chat/page.tsx +++ b/src/interface/web/app/share/chat/page.tsx @@ -10,12 +10,12 @@ import Loading from '../../components/loading/loading'; import 'katex/dist/katex.min.css'; -import { welcomeConsole } from '../../common/utils'; +import { useIPLocationData, welcomeConsole } from '../../common/utils'; import { useAuthenticatedData } from '@/app/common/auth'; import ChatInputArea, { ChatOptions } from '@/app/components/chatInputArea/chatInputArea'; import { StreamMessage } from '@/app/components/chatMessage/chatMessage'; -import { handleCompiledReferences, handleImageResponse, setupWebSocket } from '@/app/common/chatFunctions'; +import { convertMessageChunkToJson, handleCompiledReferences, handleImageResponse, RawReferenceData, setupWebSocket } from '@/app/common/chatFunctions'; import { AgentData } from '@/app/agents/page'; @@ -95,7 +95,6 @@ export default function SharedChat() { const [isLoading, setLoading] = useState(true); const [title, setTitle] = useState('Khoj AI - Chat'); const [conversationId, setConversationID] = useState(undefined); - const [chatWS, setChatWS] = useState(null); const [messages, setMessages] = useState([]); const [queryToProcess, setQueryToProcess] = useState(''); const [processQuerySignal, setProcessQuerySignal] = useState(false); @@ -103,76 +102,11 @@ export default function SharedChat() { const [isMobileWidth, setIsMobileWidth] = useState(false); const [paramSlug, setParamSlug] = useState(undefined); + const locationData = useIPLocationData(); const authenticatedData = useAuthenticatedData(); welcomeConsole(); - const handleWebSocketMessage = (event: MessageEvent) => { - let chunk = event.data; - - let currentMessage = messages.find(message => !message.completed); - - if (!currentMessage) { - console.error("No current message found"); - return; - } - - // Process WebSocket streamed data - if (chunk === "start_llm_response") { - console.log("Started streaming", new Date()); - } else if (chunk === "end_llm_response") { - currentMessage.completed = true; - } else { - // Get the current message - // Process and update state with the new message - if (chunk.includes("application/json")) { - chunk = JSON.parse(chunk); - } - - const contentType = chunk["content-type"]; - if (contentType === "application/json") { - try { - if (chunk.image || chunk.detail) { - let responseWithReference = handleImageResponse(chunk); - console.log("Image response", responseWithReference); - if (responseWithReference.response) currentMessage.rawResponse = responseWithReference.response; - if (responseWithReference.online) currentMessage.onlineContext = responseWithReference.online; - if (responseWithReference.context) currentMessage.context = responseWithReference.context; - } else if (chunk.type == "status") { - currentMessage.trainOfThought.push(chunk.message); - } else if (chunk.type == "rate_limit") { - console.log("Rate limit message", chunk); - currentMessage.rawResponse = chunk.message; - } else { - console.log("any message", chunk); - } - } catch (error) { - console.error("Error processing message", error); - currentMessage.completed = true; - } finally { - // no-op - } - - } else { - // Update the current message with the new chunk - if (chunk && chunk.includes("### compiled references:")) { - let responseWithReference = handleCompiledReferences(chunk, ""); - currentMessage.rawResponse += responseWithReference.response; - - if (responseWithReference.response) currentMessage.rawResponse = responseWithReference.response; - if (responseWithReference.online) currentMessage.onlineContext = responseWithReference.online; - if (responseWithReference.context) currentMessage.context = responseWithReference.context; - } else { - // If the chunk is not a JSON object, just display it as is - currentMessage.rawResponse += chunk; - } - - } - }; - // Update the state with the new message, currentMessage - setMessages([...messages]); - } - useEffect(() => { fetch('/api/chat/options') @@ -201,6 +135,7 @@ export default function SharedChat() { useEffect(() => { if (queryToProcess && !conversationId) { + // If the user has not yet started conversing in the chat, create a new conversation fetch(`/api/chat/share/fork?public_conversation_slug=${paramSlug}`, { method: 'POST', headers: { @@ -219,7 +154,7 @@ export default function SharedChat() { } - if (chatWS && queryToProcess) { + if (queryToProcess) { // Add a new object to the state const newStreamMessage: StreamMessage = { rawResponse: "", @@ -232,40 +167,68 @@ export default function SharedChat() { } setMessages(prevMessages => [...prevMessages, newStreamMessage]); setProcessQuerySignal(true); - } else { - if (!chatWS) { - console.error("No WebSocket connection available"); - } - if (!queryToProcess) { - console.error("No query to process"); - } } }, [queryToProcess]); useEffect(() => { - if (processQuerySignal && chatWS) { - setProcessQuerySignal(false); - chatWS.onmessage = handleWebSocketMessage; - chatWS?.send(queryToProcess); + if (processQuerySignal) { + chat(); } }, [processQuerySignal]); - useEffect(() => { - if (chatWS) { - chatWS.onmessage = handleWebSocketMessage; + + async function readChatStream(response: Response) { + if (!response.ok) throw new Error(response.statusText); + if (!response.body) throw new Error("Response body is null"); + + const reader = response.body.getReader(); + const decoder = new TextDecoder(); + const eventDelimiter = '␃🔚␗'; + let buffer = ""; + + while (true) { + + const { done, value } = await reader.read(); + if (done) { + setQueryToProcess(''); + setProcessQuerySignal(false); + break; + } + + const chunk = decoder.decode(value, { stream: true }); + + buffer += chunk; + + let newEventIndex; + while ((newEventIndex = buffer.indexOf(eventDelimiter)) !== -1) { + const event = buffer.slice(0, newEventIndex); + buffer = buffer.slice(newEventIndex + eventDelimiter.length); + if (event) { + processMessageChunk(event); + } + } + } - }, [chatWS]); + } + + async function chat() { + if (!queryToProcess || !conversationId) return; + let chatAPI = `/api/chat?q=${encodeURIComponent(queryToProcess)}&conversation_id=${conversationId}&stream=true&client=web`; + if (locationData) { + chatAPI += `®ion=${locationData.region}&country=${locationData.country}&city=${locationData.city}&timezone=${locationData.timezone}`; + } + + const response = await fetch(chatAPI); + try { + await readChatStream(response); + } catch (err) { + console.log(err); + } + } useEffect(() => { (async () => { if (conversationId) { - const newWS = await setupWebSocket(conversationId, queryToProcess); - if (!newWS) { - console.error("No WebSocket connection available"); - return; - } - setChatWS(newWS); - // Add a new object to the state const newStreamMessage: StreamMessage = { rawResponse: "", @@ -276,11 +239,66 @@ export default function SharedChat() { timestamp: (new Date()).toISOString(), rawQuery: queryToProcess || "", } + setProcessQuerySignal(true); setMessages(prevMessages => [...prevMessages, newStreamMessage]); } })(); }, [conversationId]); + function processMessageChunk(rawChunk: string) { + const chunk = convertMessageChunkToJson(rawChunk); + const currentMessage = messages.find(message => !message.completed); + + if (!currentMessage) { + return; + } + + if (!chunk || !chunk.type) { + return; + } + + if (chunk.type === "status") { + const statusMessage = chunk.data as string; + currentMessage.trainOfThought.push(statusMessage); + } else if (chunk.type === "references") { + const references = chunk.data as RawReferenceData; + + if (references.context) { + currentMessage.context = references.context; + } + + if (references.onlineContext) { + currentMessage.onlineContext = references.onlineContext; + } + } else if (chunk.type === "message") { + const chunkData = chunk.data; + + if (chunkData !== null && typeof chunkData === 'object') { + try { + const jsonData = chunkData as any; + if (jsonData.image || jsonData.detail) { + let responseWithReference = handleImageResponse(chunk.data, true); + if (responseWithReference.response) currentMessage.rawResponse = responseWithReference.response; + if (responseWithReference.online) currentMessage.onlineContext = responseWithReference.online; + if (responseWithReference.context) currentMessage.context = responseWithReference.context; + } else if (jsonData.response) { + currentMessage.rawResponse = jsonData.response; + } + else { + console.log("any message", chunk); + } + } catch (e) { + currentMessage.rawResponse += chunkData; + } + } else { + currentMessage.rawResponse += chunkData; + } + } else if (chunk.type === "end_llm_response") { + currentMessage.completed = true; + } + setMessages([...messages]); + } + if (isLoading) { return ; } @@ -301,7 +319,6 @@ export default function SharedChat() {