Temporarily move logic to associate entry and fileobject objects into the management command, out of automatic migrations

This commit is contained in:
sabaimran
2025-01-22 19:50:22 -08:00
parent fd90842d38
commit 5a3a897080
2 changed files with 82 additions and 34 deletions

View File

@@ -1,4 +1,5 @@
from django.core.management.base import BaseCommand
from django.db import transaction
from django.db.models import Exists, OuterRef
from khoj.database.models import Entry, FileObject
@@ -15,6 +16,52 @@ class Command(BaseCommand):
)
def handle(self, *args, **options):
apply = options["apply"]
mode = "UPDATE" if apply else "DRY RUN"
self.stdout.write(f"[{mode}] Processing entries with null file_objects...")
# Create lookup dictionary of all file objects
file_objects_map = {(fo.user_id, fo.file_name): fo for fo in FileObject.objects.all()}
chunk_size = 1000
processed = 0
processed_entry_ids = set()
while True:
entries = list(
Entry.objects.select_related("user")
.filter(file_object__isnull=True)
.exclude(id__in=processed_entry_ids)
.only("id", "user", "file_path")[:chunk_size]
)
if not entries:
break
processed_entry_ids.update([entry.id for entry in entries])
entries_to_update = []
for entry in entries:
try:
file_object = file_objects_map.get((entry.user_id, entry.file_path))
if file_object:
entry.file_object = file_object
entries_to_update.append(entry)
except Exception as e:
self.stdout.write(self.style.WARNING(f"Error processing entry {entry.id}: {str(e)}"))
continue
if entries_to_update and apply:
with transaction.atomic():
Entry.objects.bulk_update(entries_to_update, ["file_object"], batch_size=chunk_size)
processed += len(entries)
self.stdout.write(f"Processed {processed} entries")
action = "Updated" if apply else "Would update"
self.stdout.write(self.style.SUCCESS(f"{action} {len(processed_entry_ids)} entries"))
# Find FileObjects with no related entries using subquery
orphaned_files = FileObject.objects.annotate(
has_entries=Exists(Entry.objects.filter(file_object=OuterRef("pk")))

View File

@@ -5,49 +5,50 @@ from django.db import migrations, models
def migrate_entry_objects(apps, schema_editor):
Entry = apps.get_model("database", "Entry")
FileObject = apps.get_model("database", "FileObject")
db_alias = schema_editor.connection.alias
pass
# Entry = apps.get_model("database", "Entry")
# FileObject = apps.get_model("database", "FileObject")
# db_alias = schema_editor.connection.alias
# Create lookup dictionary of all file objects
file_objects_map = {(fo.user_id, fo.file_name): fo for fo in FileObject.objects.using(db_alias).all()}
# # Create lookup dictionary of all file objects
# file_objects_map = {(fo.user_id, fo.file_name): fo for fo in FileObject.objects.using(db_alias).all()}
# Process entries in chunks of 1000
chunk_size = 1000
processed = 0
# # Process entries in chunks of 1000
# chunk_size = 1000
# processed = 0
processed_entry_ids = set()
# processed_entry_ids = set()
while True:
entries = list(
Entry.objects.using(db_alias)
.select_related("user")
.filter(file_object__isnull=True)
.exclude(id__in=processed_entry_ids)
.only("id", "user", "file_path")[:chunk_size]
)
# while True:
# entries = list(
# Entry.objects.using(db_alias)
# .select_related("user")
# .filter(file_object__isnull=True)
# .exclude(id__in=processed_entry_ids)
# .only("id", "user", "file_path")[:chunk_size]
# )
if not entries:
break
# if not entries:
# break
processed_entry_ids.update([entry.id for entry in entries])
# processed_entry_ids.update([entry.id for entry in entries])
entries_to_update = []
for entry in entries:
try:
file_object = file_objects_map.get((entry.user_id, entry.file_path))
if file_object:
entry.file_object = file_object
entries_to_update.append(entry)
except Exception as e:
print(f"Error processing entry {entry.id}: {str(e)}")
continue
# entries_to_update = []
# for entry in entries:
# try:
# file_object = file_objects_map.get((entry.user_id, entry.file_path))
# if file_object:
# entry.file_object = file_object
# entries_to_update.append(entry)
# except Exception as e:
# print(f"Error processing entry {entry.id}: {str(e)}")
# continue
if entries_to_update:
Entry.objects.using(db_alias).bulk_update(entries_to_update, ["file_object"], batch_size=chunk_size)
# if entries_to_update:
# Entry.objects.using(db_alias).bulk_update(entries_to_update, ["file_object"], batch_size=chunk_size)
processed += len(entries)
print(f"Processed {processed} entries")
# processed += len(entries)
# print(f"Processed {processed} entries")
def reverse_migration(apps, schema_editor):