mirror of
https://github.com/khoaliber/khoj.git
synced 2026-03-10 05:39:11 +00:00
Update DB migration for better memory and speed efficiency
This commit is contained in:
@@ -16,52 +16,6 @@ class Command(BaseCommand):
|
|||||||
)
|
)
|
||||||
|
|
||||||
def handle(self, *args, **options):
|
def handle(self, *args, **options):
|
||||||
apply = options["apply"]
|
|
||||||
|
|
||||||
mode = "UPDATE" if apply else "DRY RUN"
|
|
||||||
self.stdout.write(f"[{mode}] Processing entries with null file_objects...")
|
|
||||||
|
|
||||||
# Create lookup dictionary of all file objects
|
|
||||||
file_objects_map = {(fo.user_id, fo.file_name): fo for fo in FileObject.objects.all()}
|
|
||||||
|
|
||||||
chunk_size = 1000
|
|
||||||
processed = 0
|
|
||||||
processed_entry_ids = set()
|
|
||||||
|
|
||||||
while True:
|
|
||||||
entries = list(
|
|
||||||
Entry.objects.select_related("user")
|
|
||||||
.filter(file_object__isnull=True)
|
|
||||||
.exclude(id__in=processed_entry_ids)
|
|
||||||
.only("id", "user", "file_path")[:chunk_size]
|
|
||||||
)
|
|
||||||
|
|
||||||
if not entries:
|
|
||||||
break
|
|
||||||
|
|
||||||
processed_entry_ids.update([entry.id for entry in entries])
|
|
||||||
entries_to_update = []
|
|
||||||
|
|
||||||
for entry in entries:
|
|
||||||
try:
|
|
||||||
file_object = file_objects_map.get((entry.user_id, entry.file_path))
|
|
||||||
if file_object:
|
|
||||||
entry.file_object = file_object
|
|
||||||
entries_to_update.append(entry)
|
|
||||||
except Exception as e:
|
|
||||||
self.stdout.write(self.style.WARNING(f"Error processing entry {entry.id}: {str(e)}"))
|
|
||||||
continue
|
|
||||||
|
|
||||||
if entries_to_update and apply:
|
|
||||||
with transaction.atomic():
|
|
||||||
Entry.objects.bulk_update(entries_to_update, ["file_object"], batch_size=chunk_size)
|
|
||||||
|
|
||||||
processed += len(entries)
|
|
||||||
self.stdout.write(f"Processed {processed} entries")
|
|
||||||
|
|
||||||
action = "Updated" if apply else "Would update"
|
|
||||||
self.stdout.write(self.style.SUCCESS(f"{action} {len(processed_entry_ids)} entries"))
|
|
||||||
|
|
||||||
# Find FileObjects with no related entries using subquery
|
# Find FileObjects with no related entries using subquery
|
||||||
orphaned_files = FileObject.objects.annotate(
|
orphaned_files = FileObject.objects.annotate(
|
||||||
has_entries=Exists(Entry.objects.filter(file_object=OuterRef("pk")))
|
has_entries=Exists(Entry.objects.filter(file_object=OuterRef("pk")))
|
||||||
|
|||||||
@@ -5,50 +5,44 @@ from django.db import migrations, models
|
|||||||
|
|
||||||
|
|
||||||
def migrate_entry_objects(apps, schema_editor):
|
def migrate_entry_objects(apps, schema_editor):
|
||||||
pass
|
Entry = apps.get_model("database", "Entry")
|
||||||
# Entry = apps.get_model("database", "Entry")
|
FileObject = apps.get_model("database", "FileObject")
|
||||||
# FileObject = apps.get_model("database", "FileObject")
|
db_alias = schema_editor.connection.alias
|
||||||
# db_alias = schema_editor.connection.alias
|
|
||||||
|
|
||||||
# # Create lookup dictionary of all file objects
|
# Process file objects in chunks
|
||||||
# file_objects_map = {(fo.user_id, fo.file_name): fo for fo in FileObject.objects.using(db_alias).all()}
|
chunk_size = 1000
|
||||||
|
processed = 0
|
||||||
|
processed_file_ids = set()
|
||||||
|
|
||||||
# # Process entries in chunks of 1000
|
while True:
|
||||||
# chunk_size = 1000
|
file_objects = list(
|
||||||
# processed = 0
|
FileObject.objects.using(db_alias)
|
||||||
|
.exclude(id__in=processed_file_ids)
|
||||||
|
.select_related("user")
|
||||||
|
.only("id", "user", "file_name")[:chunk_size]
|
||||||
|
)
|
||||||
|
|
||||||
# processed_entry_ids = set()
|
if not file_objects:
|
||||||
|
break
|
||||||
|
|
||||||
# while True:
|
processed_file_ids.update([fo.id for fo in file_objects])
|
||||||
# entries = list(
|
|
||||||
# Entry.objects.using(db_alias)
|
|
||||||
# .select_related("user")
|
|
||||||
# .filter(file_object__isnull=True)
|
|
||||||
# .exclude(id__in=processed_entry_ids)
|
|
||||||
# .only("id", "user", "file_path")[:chunk_size]
|
|
||||||
# )
|
|
||||||
|
|
||||||
# if not entries:
|
for file_object in file_objects:
|
||||||
# break
|
try:
|
||||||
|
# Find all entries matching this file object
|
||||||
|
matching_entries = Entry.objects.using(db_alias).filter(
|
||||||
|
user_id=file_object.user_id, file_path=file_object.file_name, file_object__isnull=True
|
||||||
|
)
|
||||||
|
|
||||||
# processed_entry_ids.update([entry.id for entry in entries])
|
if matching_entries.exists():
|
||||||
|
# Update all matching entries in bulk
|
||||||
|
matching_entries.update(file_object=file_object)
|
||||||
|
except Exception as e:
|
||||||
|
print(f"Error processing file object {file_object.id}: {str(e)}")
|
||||||
|
continue
|
||||||
|
|
||||||
# entries_to_update = []
|
processed += len(file_objects)
|
||||||
# for entry in entries:
|
print(f"Processed {processed} file objects")
|
||||||
# try:
|
|
||||||
# file_object = file_objects_map.get((entry.user_id, entry.file_path))
|
|
||||||
# if file_object:
|
|
||||||
# entry.file_object = file_object
|
|
||||||
# entries_to_update.append(entry)
|
|
||||||
# except Exception as e:
|
|
||||||
# print(f"Error processing entry {entry.id}: {str(e)}")
|
|
||||||
# continue
|
|
||||||
|
|
||||||
# if entries_to_update:
|
|
||||||
# Entry.objects.using(db_alias).bulk_update(entries_to_update, ["file_object"], batch_size=chunk_size)
|
|
||||||
|
|
||||||
# processed += len(entries)
|
|
||||||
# print(f"Processed {processed} entries")
|
|
||||||
|
|
||||||
|
|
||||||
def reverse_migration(apps, schema_editor):
|
def reverse_migration(apps, schema_editor):
|
||||||
|
|||||||
Reference in New Issue
Block a user