diff --git a/src/khoj/database/management/commands/delete_orphaned_fileobjects.py b/src/khoj/database/management/commands/delete_orphaned_fileobjects.py index 99d45c6f..09a8ac29 100644 --- a/src/khoj/database/management/commands/delete_orphaned_fileobjects.py +++ b/src/khoj/database/management/commands/delete_orphaned_fileobjects.py @@ -1,4 +1,5 @@ from django.core.management.base import BaseCommand +from django.db import transaction from django.db.models import Exists, OuterRef from khoj.database.models import Entry, FileObject @@ -15,6 +16,52 @@ class Command(BaseCommand): ) def handle(self, *args, **options): + apply = options["apply"] + + mode = "UPDATE" if apply else "DRY RUN" + self.stdout.write(f"[{mode}] Processing entries with null file_objects...") + + # Create lookup dictionary of all file objects + file_objects_map = {(fo.user_id, fo.file_name): fo for fo in FileObject.objects.all()} + + chunk_size = 1000 + processed = 0 + processed_entry_ids = set() + + while True: + entries = list( + Entry.objects.select_related("user") + .filter(file_object__isnull=True) + .exclude(id__in=processed_entry_ids) + .only("id", "user", "file_path")[:chunk_size] + ) + + if not entries: + break + + processed_entry_ids.update([entry.id for entry in entries]) + entries_to_update = [] + + for entry in entries: + try: + file_object = file_objects_map.get((entry.user_id, entry.file_path)) + if file_object: + entry.file_object = file_object + entries_to_update.append(entry) + except Exception as e: + self.stdout.write(self.style.WARNING(f"Error processing entry {entry.id}: {str(e)}")) + continue + + if entries_to_update and apply: + with transaction.atomic(): + Entry.objects.bulk_update(entries_to_update, ["file_object"], batch_size=chunk_size) + + processed += len(entries) + self.stdout.write(f"Processed {processed} entries") + + action = "Updated" if apply else "Would update" + self.stdout.write(self.style.SUCCESS(f"{action} {len(processed_entry_ids)} entries")) + # Find FileObjects with no related entries using subquery orphaned_files = FileObject.objects.annotate( has_entries=Exists(Entry.objects.filter(file_object=OuterRef("pk"))) diff --git a/src/khoj/database/migrations/0079_entry_file_object.py b/src/khoj/database/migrations/0079_entry_file_object.py index 3846dd9d..d9a6166f 100644 --- a/src/khoj/database/migrations/0079_entry_file_object.py +++ b/src/khoj/database/migrations/0079_entry_file_object.py @@ -5,49 +5,50 @@ from django.db import migrations, models def migrate_entry_objects(apps, schema_editor): - Entry = apps.get_model("database", "Entry") - FileObject = apps.get_model("database", "FileObject") - db_alias = schema_editor.connection.alias + pass + # Entry = apps.get_model("database", "Entry") + # FileObject = apps.get_model("database", "FileObject") + # db_alias = schema_editor.connection.alias - # Create lookup dictionary of all file objects - file_objects_map = {(fo.user_id, fo.file_name): fo for fo in FileObject.objects.using(db_alias).all()} + # # Create lookup dictionary of all file objects + # file_objects_map = {(fo.user_id, fo.file_name): fo for fo in FileObject.objects.using(db_alias).all()} - # Process entries in chunks of 1000 - chunk_size = 1000 - processed = 0 + # # Process entries in chunks of 1000 + # chunk_size = 1000 + # processed = 0 - processed_entry_ids = set() + # processed_entry_ids = set() - while True: - entries = list( - Entry.objects.using(db_alias) - .select_related("user") - .filter(file_object__isnull=True) - .exclude(id__in=processed_entry_ids) - .only("id", "user", "file_path")[:chunk_size] - ) + # while True: + # entries = list( + # Entry.objects.using(db_alias) + # .select_related("user") + # .filter(file_object__isnull=True) + # .exclude(id__in=processed_entry_ids) + # .only("id", "user", "file_path")[:chunk_size] + # ) - if not entries: - break + # if not entries: + # break - processed_entry_ids.update([entry.id for entry in entries]) + # processed_entry_ids.update([entry.id for entry in entries]) - entries_to_update = [] - for entry in entries: - try: - file_object = file_objects_map.get((entry.user_id, entry.file_path)) - if file_object: - entry.file_object = file_object - entries_to_update.append(entry) - except Exception as e: - print(f"Error processing entry {entry.id}: {str(e)}") - continue + # entries_to_update = [] + # for entry in entries: + # try: + # file_object = file_objects_map.get((entry.user_id, entry.file_path)) + # if file_object: + # entry.file_object = file_object + # entries_to_update.append(entry) + # except Exception as e: + # print(f"Error processing entry {entry.id}: {str(e)}") + # continue - if entries_to_update: - Entry.objects.using(db_alias).bulk_update(entries_to_update, ["file_object"], batch_size=chunk_size) + # if entries_to_update: + # Entry.objects.using(db_alias).bulk_update(entries_to_update, ["file_object"], batch_size=chunk_size) - processed += len(entries) - print(f"Processed {processed} entries") + # processed += len(entries) + # print(f"Processed {processed} entries") def reverse_migration(apps, schema_editor):