diff --git a/src/interface/desktop/main.js b/src/interface/desktop/main.js index ec3e6fa4..a98b15c8 100644 --- a/src/interface/desktop/main.js +++ b/src/interface/desktop/main.js @@ -114,8 +114,8 @@ function processDirectory(filesToPush, folder) { const files = fs.readdirSync(folder.path, { withFileTypes: true, recursive: true }); for (const file of files) { - console.log(file); if (file.isFile() && validFileTypes.includes(file.name.split('.').pop())) { + console.log(`Add ${file.name} in ${folder.path} for indexing`); filesToPush.push(path.join(folder.path, file.name)); } @@ -151,7 +151,7 @@ function pushDataToKhoj (regenerate = false) { } const lastSync = store.get('lastSync') || []; - const formData = new FormData(); + const filesDataToPush = []; for (const file of filesToPush) { const stats = fs.statSync(file); if (!regenerate) { @@ -167,7 +167,7 @@ function pushDataToKhoj (regenerate = false) { let mimeType = filenameToMimeType(file) + (encoding === "utf8" ? "; charset=UTF-8" : ""); let fileContent = Buffer.from(fs.readFileSync(file, { encoding: encoding }), encoding); let fileObj = new Blob([fileContent], { type: mimeType }); - formData.append('files', fileObj, file); + filesDataToPush.push({blob: fileObj, path: file}); state[file] = { success: true, } @@ -184,52 +184,51 @@ function pushDataToKhoj (regenerate = false) { for (const syncedFile of lastSync) { if (!filesToPush.includes(syncedFile.path)) { fileObj = new Blob([""], { type: filenameToMimeType(syncedFile.path) }); - formData.append('files', fileObj, syncedFile.path); + filesDataToPush.push({blob: fileObj, path: syncedFile.path}); } } // Send collected files to Khoj server for indexing - if (!!formData?.entries()?.next().value) { - const hostURL = store.get('hostURL') || KHOJ_URL; - const headers = { - 'Authorization': `Bearer ${store.get("khojToken")}` - }; - axios.post(`${hostURL}/api/v1/index/update?force=${regenerate}&client=desktop`, formData, { headers }) - .then(response => { - console.log(response.data); - let lastSync = []; - for (const file of filesToPush) { - lastSync.push({ - path: file, - datetime: new Date().toISOString() - }); - } - store.set('lastSync', lastSync); - }) - .catch(error => { - console.error(error); - if (error.code == 'ECONNREFUSED') { - const win = BrowserWindow.getAllWindows()[0]; - if (win) win.webContents.send('update-state', state); - } else if (error.response.status == 429) { - const win = BrowserWindow.getAllWindows()[0]; - if (win) win.webContents.send('needsSubscription', true); - if (win) win.webContents.send('update-state', state); - } - state['completed'] = false - }) - .finally(() => { - // Syncing complete - syncing = false; - const win = BrowserWindow.getAllWindows()[0]; - if (win) win.webContents.send('update-state', state); - }); - } else { + const hostURL = store.get('hostURL') || KHOJ_URL; + const headers = { 'Authorization': `Bearer ${store.get("khojToken")}` }; + let requests = []; + + // Request indexing files on server. With upto 1000 files in each request + for (let i = 0; i < filesDataToPush.length; i += 1000) { + const filesDataGroup = filesDataToPush.slice(i, i + 1000); + const formData = new FormData(); + filesDataGroup.forEach(fileData => { formData.append('files', fileData.blob, fileData.path) }); + let request = axios.post(`${hostURL}/api/v1/index/update?force=${regenerate}&client=desktop`, formData, { headers }); + requests.push(request); + } + + // Wait for requests batch to finish + Promise + .all(requests) + .then(responses => { + const lastSync = filesToPush + .filter(file => responses.find(response => response.data.includes(file))) + .map(file => ({ path: file, datetime: new Date().toISOString() })); + store.set('lastSync', lastSync); + }) + .catch(error => { + console.error(error); + state["completed"] = false; + if (error?.response?.status === 429 && (win = BrowserWindow.getAllWindows()[0])) { + state["error"] = `Looks like you're out of space to sync your files. Upgrade your plan to unlock more space.`; + } else if (error?.code === 'ECONNREFUSED') { + state["error"] = `Could not connect to Khoj server. Ensure you can connect to it at ${error.address}:${error.port}.`; + } else { + state["error"] = `Sync was unsuccessful at ${currentTime.toLocaleTimeString()}. Contact team@khoj.dev to report this issue.`; + } + }) + .finally(() => { // Syncing complete syncing = false; - const win = BrowserWindow.getAllWindows()[0]; - if (win) win.webContents.send('update-state', state); - } + if (win = BrowserWindow.getAllWindows()[0]) { + win.webContents.send('update-state', state); + } + }); } pushDataToKhoj(); diff --git a/src/interface/desktop/renderer.js b/src/interface/desktop/renderer.js index 16df6d2f..02f92163 100644 --- a/src/interface/desktop/renderer.js +++ b/src/interface/desktop/renderer.js @@ -158,7 +158,7 @@ window.updateStateAPI.onUpdateState((event, state) => { nextSyncTime = new Date(); nextSyncTime.setMinutes(Math.ceil((nextSyncTime.getMinutes() + 1) / 10) * 10); if (state.completed == false) { - syncStatusElement.innerHTML = `Sync was unsuccessful at ${currentTime.toLocaleTimeString()}. Contact team@khoj.dev to report this issue.`; + if (state.error) syncStatusElement.innerHTML = state.error; return; } const options = { hour: '2-digit', minute: '2-digit' }; diff --git a/src/interface/obsidian/src/utils.ts b/src/interface/obsidian/src/utils.ts index 7eda8a47..1f309ff6 100644 --- a/src/interface/obsidian/src/utils.ts +++ b/src/interface/obsidian/src/utils.ts @@ -31,40 +31,59 @@ function fileExtensionToMimeType (extension: string): string { export async function updateContentIndex(vault: Vault, setting: KhojSetting, lastSyncedFiles: TFile[], regenerate: boolean = false): Promise { // Get all markdown, pdf files in the vault console.log(`Khoj: Updating Khoj content index...`) - const files = vault.getFiles().filter(file => file.extension === 'md' || file.extension === 'pdf'); - const binaryFileTypes = ['pdf', 'png', 'jpg', 'jpeg'] + const files = vault.getFiles().filter(file => file.extension === 'md' || file.extension === 'markdown' || file.extension === 'pdf'); + const binaryFileTypes = ['pdf'] let countOfFilesToIndex = 0; let countOfFilesToDelete = 0; // Add all files to index as multipart form data - const formData = new FormData(); + const fileData = []; for (const file of files) { countOfFilesToIndex++; const encoding = binaryFileTypes.includes(file.extension) ? "binary" : "utf8"; const mimeType = fileExtensionToMimeType(file.extension) + (encoding === "utf8" ? "; charset=UTF-8" : ""); const fileContent = encoding == 'binary' ? await vault.readBinary(file) : await vault.read(file); - formData.append('files', new Blob([fileContent], { type: mimeType }), file.path); + fileData.push({blob: new Blob([fileContent], { type: mimeType }), path: file.path}); } // Add any previously synced files to be deleted to multipart form data for (const lastSyncedFile of lastSyncedFiles) { if (!files.includes(lastSyncedFile)) { countOfFilesToDelete++; - formData.append('files', new Blob([]), lastSyncedFile.path); + fileData.push({blob: new Blob([]), path: lastSyncedFile.path}); } } - // Call Khoj backend to update index with all markdown, pdf files - const response = await fetch(`${setting.khojUrl}/api/v1/index/update?force=${regenerate}&client=obsidian`, { - method: 'POST', - headers: { - 'Authorization': `Bearer ${setting.khojApiKey}`, - }, - body: formData, - }); + // Iterate through all indexable files in vault, 1000 at a time + let error_message = null; + for (let i = 0; i < fileData.length; i += 1000) { + const filesGroup = fileData.slice(i, i + 1000); + const formData = new FormData(); + filesGroup.forEach(fileItem => { formData.append('files', fileItem.blob, fileItem.path) }); + // Call Khoj backend to update index with all markdown, pdf files + const response = await fetch(`${setting.khojUrl}/api/v1/index/update?force=${regenerate}&client=obsidian`, { + method: 'POST', + headers: { + 'Authorization': `Bearer ${setting.khojApiKey}`, + }, + body: formData, + }); - if (!response.ok) { - new Notice(`❗️Failed to update Khoj content index. Ensure Khoj server connected or raise issue on Khoj Discord/Github\nError: ${response.statusText}`); + if (!response.ok) { + if (response.status === 429) { + error_message = `❗️Failed to sync your content with Khoj server. Requests were throttled. Upgrade your subscription or try again later.`; + break; + } else if (response.status === 404) { + error_message = `❗️Could not connect to Khoj server. Ensure you can connect to it.`; + break; + } else { + error_message = `❗️Failed to sync your content with Khoj server. Raise issue on Khoj Discord or Github\nError: ${response.statusText}`; + } + } + } + + if (error_message) { + new Notice(error_message); } else { console.log(`✅ Refreshed Khoj content index. Updated: ${countOfFilesToIndex} files, Deleted: ${countOfFilesToDelete} files.`); } @@ -92,7 +111,7 @@ export async function createNote(name: string, newLeaf = false): Promise { console.error('Khoj: Could not create note.\n' + (e as any).message); throw e } - } +} export async function createNoteAndCloseModal(query: string, modal: Modal, opt?: { newLeaf: boolean }): Promise { try { diff --git a/src/khoj/routers/indexer.py b/src/khoj/routers/indexer.py index c7d4c01d..de80aed4 100644 --- a/src/khoj/routers/indexer.py +++ b/src/khoj/routers/indexer.py @@ -63,37 +63,23 @@ async def update( ), ): user = request.user.object + index_files: Dict[str, Dict[str, str]] = {"org": {}, "markdown": {}, "pdf": {}, "plaintext": {}} try: logger.info(f"📬 Updating content index via API call by {client} client") - org_files: Dict[str, str] = {} - markdown_files: Dict[str, str] = {} - pdf_files: Dict[str, bytes] = {} - plaintext_files: Dict[str, str] = {} - for file in files: file_type, encoding = get_file_type(file.content_type) - dict_to_update = None - if file_type == "org": - dict_to_update = org_files - elif file_type == "markdown": - dict_to_update = markdown_files - elif file_type == "pdf": - dict_to_update = pdf_files # type: ignore - elif file_type == "plaintext": - dict_to_update = plaintext_files - - if dict_to_update is not None: - dict_to_update[file.filename] = ( + if file_type in index_files: + index_files[file_type][file.filename] = ( file.file.read().decode("utf-8") if encoding == "utf-8" else file.file.read() # type: ignore ) else: logger.warning(f"Skipped indexing unsupported file type sent by {client} client: {file.filename}") indexer_input = IndexerInput( - org=org_files, - markdown=markdown_files, - pdf=pdf_files, - plaintext=plaintext_files, + org=index_files["org"], + markdown=index_files["markdown"], + pdf=index_files["pdf"], + plaintext=index_files["plaintext"], ) if state.config == None: @@ -143,10 +129,10 @@ async def update( return Response(content="Failed", status_code=500) indexing_metadata = { - "num_org": len(org_files), - "num_markdown": len(markdown_files), - "num_pdf": len(pdf_files), - "num_plaintext": len(plaintext_files), + "num_org": len(index_files["org"]), + "num_markdown": len(index_files["markdown"]), + "num_pdf": len(index_files["pdf"]), + "num_plaintext": len(index_files["plaintext"]), } update_telemetry_state( @@ -162,7 +148,8 @@ async def update( logger.info(f"📪 Content index updated via API call by {client} client") - return Response(content="OK", status_code=200) + indexed_filenames = ",".join(file for ctype in index_files for file in index_files[ctype]) or "" + return Response(content=indexed_filenames, status_code=200) def configure_search(search_models: SearchModels, search_config: Optional[SearchConfig]) -> Optional[SearchModels]: diff --git a/tests/test_client.py b/tests/test_client.py index 0bc3c02f..0a25bea2 100644 --- a/tests/test_client.py +++ b/tests/test_client.py @@ -169,6 +169,7 @@ def test_index_update_normal_file_unsubscribed(client, api_user4: KhojApiUser): assert response.status_code == 200 +# ---------------------------------------------------------------------------------------------------- @pytest.mark.django_db(transaction=True) def test_index_update_big_files_no_billing(client): # Arrange @@ -197,6 +198,26 @@ def test_index_update(client): assert response.status_code == 200 +# ---------------------------------------------------------------------------------------------------- +@pytest.mark.django_db(transaction=True) +def test_index_update_fails_if_more_than_1000_files(client, api_user4: KhojApiUser): + # Arrange + api_token = api_user4.token + state.billing_enabled = True + files = [("files", (f"path/to/filename{i}.org", f"Symphony No {i}", "text/org")) for i in range(1001)] + + headers = {"Authorization": f"Bearer {api_token}"} + + # Act + ok_response = client.post("/api/v1/index/update", files=files[:1000], headers=headers) + bad_response = client.post("/api/v1/index/update", files=files, headers=headers) + + # Assert + assert ok_response.status_code == 200 + assert bad_response.status_code == 400 + assert bad_response.content.decode("utf-8") == '{"detail":"Too many files. Maximum number of files is 1000."}' + + # ---------------------------------------------------------------------------------------------------- @pytest.mark.django_db(transaction=True) def test_regenerate_with_valid_content_type(client):