diff --git a/src/khoj/database/management/commands/delete_orphaned_fileobjects.py b/src/khoj/database/management/commands/delete_orphaned_fileobjects.py index 09a8ac29..f7efa1dd 100644 --- a/src/khoj/database/management/commands/delete_orphaned_fileobjects.py +++ b/src/khoj/database/management/commands/delete_orphaned_fileobjects.py @@ -16,52 +16,6 @@ class Command(BaseCommand): ) def handle(self, *args, **options): - apply = options["apply"] - - mode = "UPDATE" if apply else "DRY RUN" - self.stdout.write(f"[{mode}] Processing entries with null file_objects...") - - # Create lookup dictionary of all file objects - file_objects_map = {(fo.user_id, fo.file_name): fo for fo in FileObject.objects.all()} - - chunk_size = 1000 - processed = 0 - processed_entry_ids = set() - - while True: - entries = list( - Entry.objects.select_related("user") - .filter(file_object__isnull=True) - .exclude(id__in=processed_entry_ids) - .only("id", "user", "file_path")[:chunk_size] - ) - - if not entries: - break - - processed_entry_ids.update([entry.id for entry in entries]) - entries_to_update = [] - - for entry in entries: - try: - file_object = file_objects_map.get((entry.user_id, entry.file_path)) - if file_object: - entry.file_object = file_object - entries_to_update.append(entry) - except Exception as e: - self.stdout.write(self.style.WARNING(f"Error processing entry {entry.id}: {str(e)}")) - continue - - if entries_to_update and apply: - with transaction.atomic(): - Entry.objects.bulk_update(entries_to_update, ["file_object"], batch_size=chunk_size) - - processed += len(entries) - self.stdout.write(f"Processed {processed} entries") - - action = "Updated" if apply else "Would update" - self.stdout.write(self.style.SUCCESS(f"{action} {len(processed_entry_ids)} entries")) - # Find FileObjects with no related entries using subquery orphaned_files = FileObject.objects.annotate( has_entries=Exists(Entry.objects.filter(file_object=OuterRef("pk"))) diff --git a/src/khoj/database/migrations/0079_entry_file_object.py b/src/khoj/database/migrations/0079_entry_file_object.py index d9a6166f..1b15b0a6 100644 --- a/src/khoj/database/migrations/0079_entry_file_object.py +++ b/src/khoj/database/migrations/0079_entry_file_object.py @@ -5,50 +5,44 @@ from django.db import migrations, models def migrate_entry_objects(apps, schema_editor): - pass - # Entry = apps.get_model("database", "Entry") - # FileObject = apps.get_model("database", "FileObject") - # db_alias = schema_editor.connection.alias + Entry = apps.get_model("database", "Entry") + FileObject = apps.get_model("database", "FileObject") + db_alias = schema_editor.connection.alias - # # Create lookup dictionary of all file objects - # file_objects_map = {(fo.user_id, fo.file_name): fo for fo in FileObject.objects.using(db_alias).all()} + # Process file objects in chunks + chunk_size = 1000 + processed = 0 + processed_file_ids = set() - # # Process entries in chunks of 1000 - # chunk_size = 1000 - # processed = 0 + while True: + file_objects = list( + FileObject.objects.using(db_alias) + .exclude(id__in=processed_file_ids) + .select_related("user") + .only("id", "user", "file_name")[:chunk_size] + ) - # processed_entry_ids = set() + if not file_objects: + break - # while True: - # entries = list( - # Entry.objects.using(db_alias) - # .select_related("user") - # .filter(file_object__isnull=True) - # .exclude(id__in=processed_entry_ids) - # .only("id", "user", "file_path")[:chunk_size] - # ) + processed_file_ids.update([fo.id for fo in file_objects]) - # if not entries: - # break + for file_object in file_objects: + try: + # Find all entries matching this file object + matching_entries = Entry.objects.using(db_alias).filter( + user_id=file_object.user_id, file_path=file_object.file_name, file_object__isnull=True + ) - # processed_entry_ids.update([entry.id for entry in entries]) + if matching_entries.exists(): + # Update all matching entries in bulk + matching_entries.update(file_object=file_object) + except Exception as e: + print(f"Error processing file object {file_object.id}: {str(e)}") + continue - # entries_to_update = [] - # for entry in entries: - # try: - # file_object = file_objects_map.get((entry.user_id, entry.file_path)) - # if file_object: - # entry.file_object = file_object - # entries_to_update.append(entry) - # except Exception as e: - # print(f"Error processing entry {entry.id}: {str(e)}") - # continue - - # if entries_to_update: - # Entry.objects.using(db_alias).bulk_update(entries_to_update, ["file_object"], batch_size=chunk_size) - - # processed += len(entries) - # print(f"Processed {processed} entries") + processed += len(file_objects) + print(f"Processed {processed} file objects") def reverse_migration(apps, schema_editor):