mirror of
https://github.com/khoaliber/khoj.git
synced 2026-03-07 13:23:15 +00:00
Extract logic to mark entries for embeddings update into helper function
- This could be re-used by other text_to_jsonl converters like markdown, beancount
This commit is contained in:
@@ -7,12 +7,11 @@ import argparse
|
|||||||
import pathlib
|
import pathlib
|
||||||
import glob
|
import glob
|
||||||
import logging
|
import logging
|
||||||
import hashlib
|
|
||||||
import time
|
import time
|
||||||
|
|
||||||
# Internal Packages
|
# Internal Packages
|
||||||
from src.processor.org_mode import orgnode
|
from src.processor.org_mode import orgnode
|
||||||
from src.utils.helpers import get_absolute_path, is_none_or_empty
|
from src.utils.helpers import get_absolute_path, is_none_or_empty, mark_entries_for_update
|
||||||
from src.utils.jsonl import dump_jsonl, compress_jsonl_data
|
from src.utils.jsonl import dump_jsonl, compress_jsonl_data
|
||||||
from src.utils import state
|
from src.utils import state
|
||||||
|
|
||||||
@@ -46,37 +45,7 @@ def org_to_jsonl(org_files, org_file_filter, output_file, previous_entries=None)
|
|||||||
if not previous_entries:
|
if not previous_entries:
|
||||||
entries_with_ids = list(enumerate(current_entries))
|
entries_with_ids = list(enumerate(current_entries))
|
||||||
else:
|
else:
|
||||||
# Hash all current and previous entries to identify new entries
|
entries_with_ids = mark_entries_for_update(current_entries, previous_entries, key='compiled', logger=logger)
|
||||||
start = time.time()
|
|
||||||
current_entry_hashes = list(map(lambda e: hashlib.md5(bytes(e['compiled'], encoding='utf-8')).hexdigest(), current_entries))
|
|
||||||
previous_entry_hashes = list(map(lambda e: hashlib.md5(bytes(e['compiled'], encoding='utf-8')).hexdigest(), previous_entries))
|
|
||||||
end = time.time()
|
|
||||||
logger.debug(f"Hash previous, current entries: {end - start} seconds")
|
|
||||||
|
|
||||||
start = time.time()
|
|
||||||
hash_to_current_entries = dict(zip(current_entry_hashes, current_entries))
|
|
||||||
hash_to_previous_entries = dict(zip(previous_entry_hashes, previous_entries))
|
|
||||||
|
|
||||||
# All entries that did not exist in the previous set are to be added
|
|
||||||
new_entry_hashes = set(current_entry_hashes) - set(previous_entry_hashes)
|
|
||||||
# All entries that exist in both current and previous sets are kept
|
|
||||||
existing_entry_hashes = set(current_entry_hashes) & set(previous_entry_hashes)
|
|
||||||
|
|
||||||
# Mark new entries with no ids for later embeddings generation
|
|
||||||
new_entries = [
|
|
||||||
(None, hash_to_current_entries[entry_hash])
|
|
||||||
for entry_hash in new_entry_hashes
|
|
||||||
]
|
|
||||||
# Set id of existing entries to their previous ids to reuse their existing encoded embeddings
|
|
||||||
existing_entries = [
|
|
||||||
(previous_entry_hashes.index(entry_hash), hash_to_previous_entries[entry_hash])
|
|
||||||
for entry_hash in existing_entry_hashes
|
|
||||||
]
|
|
||||||
|
|
||||||
existing_entries_sorted = sorted(existing_entries, key=lambda e: e[0])
|
|
||||||
entries_with_ids = existing_entries_sorted + new_entries
|
|
||||||
end = time.time()
|
|
||||||
logger.debug(f"Identify, Mark, Combine new, existing entries: {end - start} seconds")
|
|
||||||
|
|
||||||
# Process Each Entry from All Notes Files
|
# Process Each Entry from All Notes Files
|
||||||
start = time.time()
|
start = time.time()
|
||||||
|
|||||||
@@ -1,6 +1,8 @@
|
|||||||
# Standard Packages
|
# Standard Packages
|
||||||
import pathlib
|
import pathlib
|
||||||
import sys
|
import sys
|
||||||
|
import time
|
||||||
|
import hashlib
|
||||||
from os.path import join
|
from os.path import join
|
||||||
from collections import OrderedDict
|
from collections import OrderedDict
|
||||||
|
|
||||||
@@ -79,3 +81,39 @@ class LRU(OrderedDict):
|
|||||||
if len(self) > self.capacity:
|
if len(self) > self.capacity:
|
||||||
oldest = next(iter(self))
|
oldest = next(iter(self))
|
||||||
del self[oldest]
|
del self[oldest]
|
||||||
|
|
||||||
|
|
||||||
|
def mark_entries_for_update(current_entries, previous_entries, key='compiled', logger=None):
|
||||||
|
# Hash all current and previous entries to identify new entries
|
||||||
|
start = time.time()
|
||||||
|
current_entry_hashes = list(map(lambda e: hashlib.md5(bytes(e[key], encoding='utf-8')).hexdigest(), current_entries))
|
||||||
|
previous_entry_hashes = list(map(lambda e: hashlib.md5(bytes(e[key], encoding='utf-8')).hexdigest(), previous_entries))
|
||||||
|
end = time.time()
|
||||||
|
logger.debug(f"Hash previous, current entries: {end - start} seconds")
|
||||||
|
|
||||||
|
start = time.time()
|
||||||
|
hash_to_current_entries = dict(zip(current_entry_hashes, current_entries))
|
||||||
|
hash_to_previous_entries = dict(zip(previous_entry_hashes, previous_entries))
|
||||||
|
|
||||||
|
# All entries that did not exist in the previous set are to be added
|
||||||
|
new_entry_hashes = set(current_entry_hashes) - set(previous_entry_hashes)
|
||||||
|
# All entries that exist in both current and previous sets are kept
|
||||||
|
existing_entry_hashes = set(current_entry_hashes) & set(previous_entry_hashes)
|
||||||
|
|
||||||
|
# Mark new entries with no ids for later embeddings generation
|
||||||
|
new_entries = [
|
||||||
|
(None, hash_to_current_entries[entry_hash])
|
||||||
|
for entry_hash in new_entry_hashes
|
||||||
|
]
|
||||||
|
# Set id of existing entries to their previous ids to reuse their existing encoded embeddings
|
||||||
|
existing_entries = [
|
||||||
|
(previous_entry_hashes.index(entry_hash), hash_to_previous_entries[entry_hash])
|
||||||
|
for entry_hash in existing_entry_hashes
|
||||||
|
]
|
||||||
|
|
||||||
|
existing_entries_sorted = sorted(existing_entries, key=lambda e: e[0])
|
||||||
|
entries_with_ids = existing_entries_sorted + new_entries
|
||||||
|
end = time.time()
|
||||||
|
logger.debug(f"Identify, Mark, Combine new, existing entries: {end - start} seconds")
|
||||||
|
|
||||||
|
return entries_with_ids
|
||||||
Reference in New Issue
Block a user