From 29403551b299e98ec9a97dfbbae8201367f4c054 Mon Sep 17 00:00:00 2001 From: Debanjum Date: Sun, 12 Jan 2025 10:49:25 +0700 Subject: [PATCH 1/4] Batch sync files by size to not exceed API request payload size limits This may help mitigate the issue #970 --- src/interface/obsidian/src/utils.ts | 39 ++++++++++++++++++++++------- 1 file changed, 30 insertions(+), 9 deletions(-) diff --git a/src/interface/obsidian/src/utils.ts b/src/interface/obsidian/src/utils.ts index 27903af4..39f33ab9 100644 --- a/src/interface/obsidian/src/utils.ts +++ b/src/interface/obsidian/src/utils.ts @@ -88,6 +88,10 @@ export async function updateContentIndex(vault: Vault, setting: KhojSetting, las // Add all files to index as multipart form data const fileData = []; + let currentBatchSize = 0; + const MAX_BATCH_SIZE = 10 * 1024 * 1024; // 10MB max batch size + let currentBatch = []; + for (const file of files) { // Only push files that have been modified since last sync if not regenerating if (!regenerate && file.stat.mtime < (lastSync.get(file) ?? 0)) { @@ -98,29 +102,46 @@ export async function updateContentIndex(vault: Vault, setting: KhojSetting, las const encoding = supportedBinaryFileTypes.includes(file.extension) ? "binary" : "utf8"; const mimeType = fileExtensionToMimeType(file.extension) + (encoding === "utf8" ? "; charset=UTF-8" : ""); const fileContent = encoding == 'binary' ? await vault.readBinary(file) : await vault.read(file); - fileData.push({ blob: new Blob([fileContent], { type: mimeType }), path: file.path }); + const fileItem = { blob: new Blob([fileContent], { type: mimeType }), path: file.path }; + + // Check if adding this file would exceed batch size + const fileSize = (typeof fileContent === 'string') ? new Blob([fileContent]).size : fileContent.byteLength; + if (currentBatchSize + fileSize > MAX_BATCH_SIZE && currentBatch.length > 0) { + fileData.push(currentBatch); + currentBatch = []; + currentBatchSize = 0; + } + + currentBatch.push(fileItem); + currentBatchSize += fileSize; } - // Add any previously synced files to be deleted to multipart form data + // Add any previously synced files to be deleted to final batch let filesToDelete: TFile[] = []; for (const lastSyncedFile of lastSync.keys()) { if (!files.includes(lastSyncedFile)) { countOfFilesToDelete++; let fileObj = new Blob([""], { type: filenameToMimeType(lastSyncedFile) }); - fileData.push({ blob: fileObj, path: lastSyncedFile.path }); + currentBatch.push({ blob: fileObj, path: lastSyncedFile.path }); filesToDelete.push(lastSyncedFile); } } - // Iterate through all indexable files in vault, 1000 at a time + // Add final batch if not empty + if (currentBatch.length > 0) { + fileData.push(currentBatch); + } + + // Iterate through all indexable files in vault, 10Mb batch at a time let responses: string[] = []; let error_message = null; - for (let i = 0; i < fileData.length; i += 1000) { - const filesGroup = fileData.slice(i, i + 1000); + for (const batch of fileData) { + // Create multipart form data with all files in batch const formData = new FormData(); + batch.forEach(fileItem => { formData.append('files', fileItem.blob, fileItem.path) }); + + // Call Khoj backend to sync index with updated files in vault const method = regenerate ? "PUT" : "PATCH"; - filesGroup.forEach(fileItem => { formData.append('files', fileItem.blob, fileItem.path) }); - // Call Khoj backend to update index with all markdown, pdf files const response = await fetch(`${setting.khojUrl}/api/content?client=obsidian`, { method: method, headers: { @@ -167,7 +188,7 @@ export async function updateContentIndex(vault: Vault, setting: KhojSetting, las error_message = `❗️Could not connect to Khoj server. Ensure you can connect to it.`; break; } else { - error_message = `❗️Failed to sync your content with Khoj server. Raise issue on Khoj Discord or Github\nError: ${response.statusText}`; + error_message = `❗️Failed to sync all your content with Khoj server. Raise issue on Khoj Discord or Github\nError: ${response.statusText}`; } } else { responses.push(await response.text()); From b692e690b45d1bf8fb4810a96e499a974c363d37 Mon Sep 17 00:00:00 2001 From: Debanjum Date: Fri, 7 Mar 2025 13:25:17 +0530 Subject: [PATCH 2/4] Rename and fix the delete content source API endpoint - Delete file objects on deleting content by source via API Previously only entries were deleted, not the associated file objects - Add new db adapter to delete multiple file objects (by name) --- src/interface/web/app/settings/page.tsx | 24 ++++++++++++------------ src/khoj/database/adapters/__init__.py | 5 +++++ src/khoj/routers/api_content.py | 10 +++++++--- 3 files changed, 24 insertions(+), 15 deletions(-) diff --git a/src/interface/web/app/settings/page.tsx b/src/interface/web/app/settings/page.tsx index 9b845987..14929fa0 100644 --- a/src/interface/web/app/settings/page.tsx +++ b/src/interface/web/app/settings/page.tsx @@ -595,48 +595,48 @@ export default function SettingsView() { } }; - const disconnectContent = async (type: string) => { + const disconnectContent = async (source: string) => { try { - const response = await fetch(`/api/content/${type}`, { + const response = await fetch(`/api/content/source/${source}`, { method: "DELETE", headers: { "Content-Type": "application/json", }, }); - if (!response.ok) throw new Error(`Failed to disconnect ${type}`); + if (!response.ok) throw new Error(`Failed to disconnect ${source}`); // Set updated user settings if (userConfig) { let newUserConfig = userConfig; - if (type === "computer") { + if (source === "computer") { newUserConfig.enabled_content_source.computer = false; - } else if (type === "notion") { + } else if (source === "notion") { newUserConfig.enabled_content_source.notion = false; newUserConfig.notion_token = null; setNotionToken(newUserConfig.notion_token); - } else if (type === "github") { + } else if (source === "github") { newUserConfig.enabled_content_source.github = false; } setUserConfig(newUserConfig); } // Notify user about disconnecting content source - if (type === "computer") { + if (source === "computer") { toast({ title: `✅ Deleted Synced Files`, description: "Your synced documents have been deleted.", }); } else { toast({ - title: `✅ Disconnected ${type}`, - description: `Your ${type} integration to Khoj has been disconnected.`, + title: `✅ Disconnected ${source}`, + description: `Your ${source} integration to Khoj has been disconnected.`, }); } } catch (error) { - console.error(`Error disconnecting ${type}:`, error); + console.error(`Error disconnecting ${source}:`, error); toast({ - title: `⚠️ Failed to Disconnect ${type}`, - description: `Failed to disconnect from ${type}. Try again or contact team@khoj.dev`, + title: `⚠️ Failed to Disconnect ${source}`, + description: `Failed to disconnect from ${source}. Try again or contact team@khoj.dev`, }); } }; diff --git a/src/khoj/database/adapters/__init__.py b/src/khoj/database/adapters/__init__.py index ccbbcd99..ae979a8a 100644 --- a/src/khoj/database/adapters/__init__.py +++ b/src/khoj/database/adapters/__init__.py @@ -1549,6 +1549,11 @@ class FileObjectAdapters: async def adelete_file_object_by_name(user: KhojUser, file_name: str): return await FileObject.objects.filter(user=user, file_name=file_name).adelete() + @staticmethod + @arequire_valid_user + async def adelete_file_objects_by_names(user: KhojUser, file_names: List[str]): + return await FileObject.objects.filter(user=user, file_name__in=file_names).adelete() + @staticmethod @arequire_valid_user async def adelete_all_file_objects(user: KhojUser): diff --git a/src/khoj/routers/api_content.py b/src/khoj/routers/api_content.py index d9412629..8762681f 100644 --- a/src/khoj/routers/api_content.py +++ b/src/khoj/routers/api_content.py @@ -420,7 +420,7 @@ async def get_content_source( return await sync_to_async(list)(EntryAdapters.get_all_filenames_by_source(user, content_source)) # type: ignore[call-arg] -@api_content.delete("/{content_source}", status_code=200) +@api_content.delete("/source/{content_source}", status_code=200) @requires(["authenticated"]) async def delete_content_source( request: Request, @@ -434,7 +434,12 @@ async def delete_content_source( raise ValueError(f"Invalid content source: {content_source}") elif content_object != "Computer": await content_object.objects.filter(user=user).adelete() - await sync_to_async(EntryAdapters.delete_all_entries)(user, file_source=content_source) + else: + # Delete file objects from the given source + file_list = await sync_to_async(list)(EntryAdapters.get_all_filenames_by_source(user, content_source)) # type: ignore[call-arg] + await FileObjectAdapters.adelete_file_objects_by_names(user, file_list) + # Delete entries from the given source + await EntryAdapters.adelete_all_entries(user, file_source=content_source) if content_source == DbEntry.EntrySource.NOTION: await NotionConfig.objects.filter(user=user).adelete() @@ -449,7 +454,6 @@ async def delete_content_source( metadata={"content_source": content_source}, ) - enabled_content = await sync_to_async(EntryAdapters.get_unique_file_types)(user) return {"status": "ok"} From 86fa528a73b7d0b5e667bf87ef9e47c2ae2c4b81 Mon Sep 17 00:00:00 2001 From: Debanjum Date: Sun, 12 Jan 2025 12:11:42 +0700 Subject: [PATCH 3/4] Add API endpoint to delete all indexed files by file type --- src/khoj/database/adapters/__init__.py | 9 ++++++++ src/khoj/routers/api_content.py | 30 ++++++++++++++++++++++++++ 2 files changed, 39 insertions(+) diff --git a/src/khoj/database/adapters/__init__.py b/src/khoj/database/adapters/__init__.py index ae979a8a..33e879aa 100644 --- a/src/khoj/database/adapters/__init__.py +++ b/src/khoj/database/adapters/__init__.py @@ -1683,6 +1683,15 @@ class EntryAdapters: .values_list("file_path", flat=True) ) + @staticmethod + @require_valid_user + def get_all_filenames_by_type(user: KhojUser, file_type: str): + return ( + Entry.objects.filter(user=user, file_type=file_type) + .distinct("file_path") + .values_list("file_path", flat=True) + ) + @staticmethod @require_valid_user def get_size_of_indexed_data_in_mb(user: KhojUser): diff --git a/src/khoj/routers/api_content.py b/src/khoj/routers/api_content.py index 8762681f..90e4b498 100644 --- a/src/khoj/routers/api_content.py +++ b/src/khoj/routers/api_content.py @@ -401,6 +401,36 @@ async def get_file_object( ) +@api_content.delete("/type/{content_type}", status_code=200) +@requires(["authenticated"]) +async def delete_content_type( + request: Request, + content_type: str, + client: Optional[str] = None, +): + user = request.user.object + if content_type not in {s.value for s in SearchType}: + raise ValueError(f"Unsupported content type: {content_type}") + if content_type == "all": + await EntryAdapters.adelete_all_entries(user) + else: + # Delete file objects of the given type + file_list = await sync_to_async(list)(EntryAdapters.get_all_filenames_by_type(user, content_type)) # type: ignore[call-arg] + await FileObjectAdapters.adelete_file_objects_by_names(user, file_list) + # Delete entries of the given type + await EntryAdapters.adelete_all_entries(user, file_type=content_type) + + update_telemetry_state( + request=request, + telemetry_type="api", + api="delete_content_config", + client=client, + metadata={"content_type": content_type}, + ) + + return {"status": "ok"} + + @api_content.get("/{content_source}", response_model=List[str]) @requires(["authenticated"]) async def get_content_source( From 043de068ff5e218ea05d582740b997d408c8dd77 Mon Sep 17 00:00:00 2001 From: Debanjum Date: Sun, 12 Jan 2025 11:12:43 +0700 Subject: [PATCH 4/4] Fix force sync of large vaults from Obsidian Previously if you tried to force sync a vault with more than 1000 files it would only end up keeping the last batch because the PUT API call would delete all previous entries. This change calls DELETE for all previously indexed data first, followed by a PATCH to index current vault on a force sync (regenerate) request. This ensures that files from previous batches are not deleted. --- src/interface/obsidian/src/utils.ts | 28 ++++++++++++++++++++++++---- 1 file changed, 24 insertions(+), 4 deletions(-) diff --git a/src/interface/obsidian/src/utils.ts b/src/interface/obsidian/src/utils.ts index 39f33ab9..468e8c88 100644 --- a/src/interface/obsidian/src/utils.ts +++ b/src/interface/obsidian/src/utils.ts @@ -87,7 +87,7 @@ export async function updateContentIndex(vault: Vault, setting: KhojSetting, las lastSync = lastSync.size > 0 ? lastSync : new Map(); // Add all files to index as multipart form data - const fileData = []; + let fileData = []; let currentBatchSize = 0; const MAX_BATCH_SIZE = 10 * 1024 * 1024; // 10MB max batch size let currentBatch = []; @@ -132,18 +132,38 @@ export async function updateContentIndex(vault: Vault, setting: KhojSetting, las fileData.push(currentBatch); } + // Delete all files of enabled content types first if regenerating + let error_message = null; + const contentTypesToDelete = []; + if (regenerate) { + // Mark content types to delete based on user sync file type settings + if (setting.syncFileType.markdown) contentTypesToDelete.push('markdown'); + if (setting.syncFileType.pdf) contentTypesToDelete.push('pdf'); + if (setting.syncFileType.images) contentTypesToDelete.push('image'); + } + for (const contentType of contentTypesToDelete) { + const response = await fetch(`${setting.khojUrl}/api/content/type/${contentType}?client=obsidian`, { + method: "DELETE", + headers: { + 'Authorization': `Bearer ${setting.khojApiKey}`, + } + }); + if (!response.ok) { + error_message = "❗️Failed to clear existing content index"; + fileData = []; + } + } + // Iterate through all indexable files in vault, 10Mb batch at a time let responses: string[] = []; - let error_message = null; for (const batch of fileData) { // Create multipart form data with all files in batch const formData = new FormData(); batch.forEach(fileItem => { formData.append('files', fileItem.blob, fileItem.path) }); // Call Khoj backend to sync index with updated files in vault - const method = regenerate ? "PUT" : "PATCH"; const response = await fetch(`${setting.khojUrl}/api/content?client=obsidian`, { - method: method, + method: "PATCH", headers: { 'Authorization': `Bearer ${setting.khojApiKey}`, },