mirror of
https://github.com/khoaliber/khoj.git
synced 2026-03-07 13:23:15 +00:00
Improve Obsidian Sync for Large Vaults (#1078)
- Batch sync files by size to try not exceed API request payload size limits - Fix force sync of large vaults from Obsidian - Add API endpoint to delete all indexed files by file type - Fix to also delete file objects when call DELETE content source API
This commit is contained in:
@@ -87,7 +87,11 @@ export async function updateContentIndex(vault: Vault, setting: KhojSetting, las
|
|||||||
lastSync = lastSync.size > 0 ? lastSync : new Map<TFile, number>();
|
lastSync = lastSync.size > 0 ? lastSync : new Map<TFile, number>();
|
||||||
|
|
||||||
// Add all files to index as multipart form data
|
// Add all files to index as multipart form data
|
||||||
const fileData = [];
|
let fileData = [];
|
||||||
|
let currentBatchSize = 0;
|
||||||
|
const MAX_BATCH_SIZE = 10 * 1024 * 1024; // 10MB max batch size
|
||||||
|
let currentBatch = [];
|
||||||
|
|
||||||
for (const file of files) {
|
for (const file of files) {
|
||||||
// Only push files that have been modified since last sync if not regenerating
|
// Only push files that have been modified since last sync if not regenerating
|
||||||
if (!regenerate && file.stat.mtime < (lastSync.get(file) ?? 0)) {
|
if (!regenerate && file.stat.mtime < (lastSync.get(file) ?? 0)) {
|
||||||
@@ -98,31 +102,68 @@ export async function updateContentIndex(vault: Vault, setting: KhojSetting, las
|
|||||||
const encoding = supportedBinaryFileTypes.includes(file.extension) ? "binary" : "utf8";
|
const encoding = supportedBinaryFileTypes.includes(file.extension) ? "binary" : "utf8";
|
||||||
const mimeType = fileExtensionToMimeType(file.extension) + (encoding === "utf8" ? "; charset=UTF-8" : "");
|
const mimeType = fileExtensionToMimeType(file.extension) + (encoding === "utf8" ? "; charset=UTF-8" : "");
|
||||||
const fileContent = encoding == 'binary' ? await vault.readBinary(file) : await vault.read(file);
|
const fileContent = encoding == 'binary' ? await vault.readBinary(file) : await vault.read(file);
|
||||||
fileData.push({ blob: new Blob([fileContent], { type: mimeType }), path: file.path });
|
const fileItem = { blob: new Blob([fileContent], { type: mimeType }), path: file.path };
|
||||||
|
|
||||||
|
// Check if adding this file would exceed batch size
|
||||||
|
const fileSize = (typeof fileContent === 'string') ? new Blob([fileContent]).size : fileContent.byteLength;
|
||||||
|
if (currentBatchSize + fileSize > MAX_BATCH_SIZE && currentBatch.length > 0) {
|
||||||
|
fileData.push(currentBatch);
|
||||||
|
currentBatch = [];
|
||||||
|
currentBatchSize = 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
currentBatch.push(fileItem);
|
||||||
|
currentBatchSize += fileSize;
|
||||||
}
|
}
|
||||||
|
|
||||||
// Add any previously synced files to be deleted to multipart form data
|
// Add any previously synced files to be deleted to final batch
|
||||||
let filesToDelete: TFile[] = [];
|
let filesToDelete: TFile[] = [];
|
||||||
for (const lastSyncedFile of lastSync.keys()) {
|
for (const lastSyncedFile of lastSync.keys()) {
|
||||||
if (!files.includes(lastSyncedFile)) {
|
if (!files.includes(lastSyncedFile)) {
|
||||||
countOfFilesToDelete++;
|
countOfFilesToDelete++;
|
||||||
let fileObj = new Blob([""], { type: filenameToMimeType(lastSyncedFile) });
|
let fileObj = new Blob([""], { type: filenameToMimeType(lastSyncedFile) });
|
||||||
fileData.push({ blob: fileObj, path: lastSyncedFile.path });
|
currentBatch.push({ blob: fileObj, path: lastSyncedFile.path });
|
||||||
filesToDelete.push(lastSyncedFile);
|
filesToDelete.push(lastSyncedFile);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Iterate through all indexable files in vault, 1000 at a time
|
// Add final batch if not empty
|
||||||
let responses: string[] = [];
|
if (currentBatch.length > 0) {
|
||||||
|
fileData.push(currentBatch);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Delete all files of enabled content types first if regenerating
|
||||||
let error_message = null;
|
let error_message = null;
|
||||||
for (let i = 0; i < fileData.length; i += 1000) {
|
const contentTypesToDelete = [];
|
||||||
const filesGroup = fileData.slice(i, i + 1000);
|
if (regenerate) {
|
||||||
|
// Mark content types to delete based on user sync file type settings
|
||||||
|
if (setting.syncFileType.markdown) contentTypesToDelete.push('markdown');
|
||||||
|
if (setting.syncFileType.pdf) contentTypesToDelete.push('pdf');
|
||||||
|
if (setting.syncFileType.images) contentTypesToDelete.push('image');
|
||||||
|
}
|
||||||
|
for (const contentType of contentTypesToDelete) {
|
||||||
|
const response = await fetch(`${setting.khojUrl}/api/content/type/${contentType}?client=obsidian`, {
|
||||||
|
method: "DELETE",
|
||||||
|
headers: {
|
||||||
|
'Authorization': `Bearer ${setting.khojApiKey}`,
|
||||||
|
}
|
||||||
|
});
|
||||||
|
if (!response.ok) {
|
||||||
|
error_message = "❗️Failed to clear existing content index";
|
||||||
|
fileData = [];
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Iterate through all indexable files in vault, 10Mb batch at a time
|
||||||
|
let responses: string[] = [];
|
||||||
|
for (const batch of fileData) {
|
||||||
|
// Create multipart form data with all files in batch
|
||||||
const formData = new FormData();
|
const formData = new FormData();
|
||||||
const method = regenerate ? "PUT" : "PATCH";
|
batch.forEach(fileItem => { formData.append('files', fileItem.blob, fileItem.path) });
|
||||||
filesGroup.forEach(fileItem => { formData.append('files', fileItem.blob, fileItem.path) });
|
|
||||||
// Call Khoj backend to update index with all markdown, pdf files
|
// Call Khoj backend to sync index with updated files in vault
|
||||||
const response = await fetch(`${setting.khojUrl}/api/content?client=obsidian`, {
|
const response = await fetch(`${setting.khojUrl}/api/content?client=obsidian`, {
|
||||||
method: method,
|
method: "PATCH",
|
||||||
headers: {
|
headers: {
|
||||||
'Authorization': `Bearer ${setting.khojApiKey}`,
|
'Authorization': `Bearer ${setting.khojApiKey}`,
|
||||||
},
|
},
|
||||||
@@ -167,7 +208,7 @@ export async function updateContentIndex(vault: Vault, setting: KhojSetting, las
|
|||||||
error_message = `❗️Could not connect to Khoj server. Ensure you can connect to it.`;
|
error_message = `❗️Could not connect to Khoj server. Ensure you can connect to it.`;
|
||||||
break;
|
break;
|
||||||
} else {
|
} else {
|
||||||
error_message = `❗️Failed to sync your content with Khoj server. Raise issue on Khoj Discord or Github\nError: ${response.statusText}`;
|
error_message = `❗️Failed to sync all your content with Khoj server. Raise issue on Khoj Discord or Github\nError: ${response.statusText}`;
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
responses.push(await response.text());
|
responses.push(await response.text());
|
||||||
|
|||||||
@@ -595,48 +595,48 @@ export default function SettingsView() {
|
|||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
const disconnectContent = async (type: string) => {
|
const disconnectContent = async (source: string) => {
|
||||||
try {
|
try {
|
||||||
const response = await fetch(`/api/content/${type}`, {
|
const response = await fetch(`/api/content/source/${source}`, {
|
||||||
method: "DELETE",
|
method: "DELETE",
|
||||||
headers: {
|
headers: {
|
||||||
"Content-Type": "application/json",
|
"Content-Type": "application/json",
|
||||||
},
|
},
|
||||||
});
|
});
|
||||||
if (!response.ok) throw new Error(`Failed to disconnect ${type}`);
|
if (!response.ok) throw new Error(`Failed to disconnect ${source}`);
|
||||||
|
|
||||||
// Set updated user settings
|
// Set updated user settings
|
||||||
if (userConfig) {
|
if (userConfig) {
|
||||||
let newUserConfig = userConfig;
|
let newUserConfig = userConfig;
|
||||||
if (type === "computer") {
|
if (source === "computer") {
|
||||||
newUserConfig.enabled_content_source.computer = false;
|
newUserConfig.enabled_content_source.computer = false;
|
||||||
} else if (type === "notion") {
|
} else if (source === "notion") {
|
||||||
newUserConfig.enabled_content_source.notion = false;
|
newUserConfig.enabled_content_source.notion = false;
|
||||||
newUserConfig.notion_token = null;
|
newUserConfig.notion_token = null;
|
||||||
setNotionToken(newUserConfig.notion_token);
|
setNotionToken(newUserConfig.notion_token);
|
||||||
} else if (type === "github") {
|
} else if (source === "github") {
|
||||||
newUserConfig.enabled_content_source.github = false;
|
newUserConfig.enabled_content_source.github = false;
|
||||||
}
|
}
|
||||||
setUserConfig(newUserConfig);
|
setUserConfig(newUserConfig);
|
||||||
}
|
}
|
||||||
|
|
||||||
// Notify user about disconnecting content source
|
// Notify user about disconnecting content source
|
||||||
if (type === "computer") {
|
if (source === "computer") {
|
||||||
toast({
|
toast({
|
||||||
title: `✅ Deleted Synced Files`,
|
title: `✅ Deleted Synced Files`,
|
||||||
description: "Your synced documents have been deleted.",
|
description: "Your synced documents have been deleted.",
|
||||||
});
|
});
|
||||||
} else {
|
} else {
|
||||||
toast({
|
toast({
|
||||||
title: `✅ Disconnected ${type}`,
|
title: `✅ Disconnected ${source}`,
|
||||||
description: `Your ${type} integration to Khoj has been disconnected.`,
|
description: `Your ${source} integration to Khoj has been disconnected.`,
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
} catch (error) {
|
} catch (error) {
|
||||||
console.error(`Error disconnecting ${type}:`, error);
|
console.error(`Error disconnecting ${source}:`, error);
|
||||||
toast({
|
toast({
|
||||||
title: `⚠️ Failed to Disconnect ${type}`,
|
title: `⚠️ Failed to Disconnect ${source}`,
|
||||||
description: `Failed to disconnect from ${type}. Try again or contact team@khoj.dev`,
|
description: `Failed to disconnect from ${source}. Try again or contact team@khoj.dev`,
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|||||||
@@ -1549,6 +1549,11 @@ class FileObjectAdapters:
|
|||||||
async def adelete_file_object_by_name(user: KhojUser, file_name: str):
|
async def adelete_file_object_by_name(user: KhojUser, file_name: str):
|
||||||
return await FileObject.objects.filter(user=user, file_name=file_name).adelete()
|
return await FileObject.objects.filter(user=user, file_name=file_name).adelete()
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
@arequire_valid_user
|
||||||
|
async def adelete_file_objects_by_names(user: KhojUser, file_names: List[str]):
|
||||||
|
return await FileObject.objects.filter(user=user, file_name__in=file_names).adelete()
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
@arequire_valid_user
|
@arequire_valid_user
|
||||||
async def adelete_all_file_objects(user: KhojUser):
|
async def adelete_all_file_objects(user: KhojUser):
|
||||||
@@ -1678,6 +1683,15 @@ class EntryAdapters:
|
|||||||
.values_list("file_path", flat=True)
|
.values_list("file_path", flat=True)
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
@require_valid_user
|
||||||
|
def get_all_filenames_by_type(user: KhojUser, file_type: str):
|
||||||
|
return (
|
||||||
|
Entry.objects.filter(user=user, file_type=file_type)
|
||||||
|
.distinct("file_path")
|
||||||
|
.values_list("file_path", flat=True)
|
||||||
|
)
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
@require_valid_user
|
@require_valid_user
|
||||||
def get_size_of_indexed_data_in_mb(user: KhojUser):
|
def get_size_of_indexed_data_in_mb(user: KhojUser):
|
||||||
|
|||||||
@@ -401,6 +401,36 @@ async def get_file_object(
|
|||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
|
@api_content.delete("/type/{content_type}", status_code=200)
|
||||||
|
@requires(["authenticated"])
|
||||||
|
async def delete_content_type(
|
||||||
|
request: Request,
|
||||||
|
content_type: str,
|
||||||
|
client: Optional[str] = None,
|
||||||
|
):
|
||||||
|
user = request.user.object
|
||||||
|
if content_type not in {s.value for s in SearchType}:
|
||||||
|
raise ValueError(f"Unsupported content type: {content_type}")
|
||||||
|
if content_type == "all":
|
||||||
|
await EntryAdapters.adelete_all_entries(user)
|
||||||
|
else:
|
||||||
|
# Delete file objects of the given type
|
||||||
|
file_list = await sync_to_async(list)(EntryAdapters.get_all_filenames_by_type(user, content_type)) # type: ignore[call-arg]
|
||||||
|
await FileObjectAdapters.adelete_file_objects_by_names(user, file_list)
|
||||||
|
# Delete entries of the given type
|
||||||
|
await EntryAdapters.adelete_all_entries(user, file_type=content_type)
|
||||||
|
|
||||||
|
update_telemetry_state(
|
||||||
|
request=request,
|
||||||
|
telemetry_type="api",
|
||||||
|
api="delete_content_config",
|
||||||
|
client=client,
|
||||||
|
metadata={"content_type": content_type},
|
||||||
|
)
|
||||||
|
|
||||||
|
return {"status": "ok"}
|
||||||
|
|
||||||
|
|
||||||
@api_content.get("/{content_source}", response_model=List[str])
|
@api_content.get("/{content_source}", response_model=List[str])
|
||||||
@requires(["authenticated"])
|
@requires(["authenticated"])
|
||||||
async def get_content_source(
|
async def get_content_source(
|
||||||
@@ -420,7 +450,7 @@ async def get_content_source(
|
|||||||
return await sync_to_async(list)(EntryAdapters.get_all_filenames_by_source(user, content_source)) # type: ignore[call-arg]
|
return await sync_to_async(list)(EntryAdapters.get_all_filenames_by_source(user, content_source)) # type: ignore[call-arg]
|
||||||
|
|
||||||
|
|
||||||
@api_content.delete("/{content_source}", status_code=200)
|
@api_content.delete("/source/{content_source}", status_code=200)
|
||||||
@requires(["authenticated"])
|
@requires(["authenticated"])
|
||||||
async def delete_content_source(
|
async def delete_content_source(
|
||||||
request: Request,
|
request: Request,
|
||||||
@@ -434,7 +464,12 @@ async def delete_content_source(
|
|||||||
raise ValueError(f"Invalid content source: {content_source}")
|
raise ValueError(f"Invalid content source: {content_source}")
|
||||||
elif content_object != "Computer":
|
elif content_object != "Computer":
|
||||||
await content_object.objects.filter(user=user).adelete()
|
await content_object.objects.filter(user=user).adelete()
|
||||||
await sync_to_async(EntryAdapters.delete_all_entries)(user, file_source=content_source)
|
else:
|
||||||
|
# Delete file objects from the given source
|
||||||
|
file_list = await sync_to_async(list)(EntryAdapters.get_all_filenames_by_source(user, content_source)) # type: ignore[call-arg]
|
||||||
|
await FileObjectAdapters.adelete_file_objects_by_names(user, file_list)
|
||||||
|
# Delete entries from the given source
|
||||||
|
await EntryAdapters.adelete_all_entries(user, file_source=content_source)
|
||||||
|
|
||||||
if content_source == DbEntry.EntrySource.NOTION:
|
if content_source == DbEntry.EntrySource.NOTION:
|
||||||
await NotionConfig.objects.filter(user=user).adelete()
|
await NotionConfig.objects.filter(user=user).adelete()
|
||||||
@@ -449,7 +484,6 @@ async def delete_content_source(
|
|||||||
metadata={"content_source": content_source},
|
metadata={"content_source": content_source},
|
||||||
)
|
)
|
||||||
|
|
||||||
enabled_content = await sync_to_async(EntryAdapters.get_unique_file_types)(user)
|
|
||||||
return {"status": "ok"}
|
return {"status": "ok"}
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user