mirror of
https://github.com/khoaliber/khoj.git
synced 2026-03-02 21:19:12 +00:00
Add processor to index entries from jsonl files for plugins
- Read, merge entries from input jsonl files and filters - Mark new, modified entries for update
This commit is contained in:
0
src/khoj/processor/jsonl/__init__.py
Normal file
0
src/khoj/processor/jsonl/__init__.py
Normal file
100
src/khoj/processor/jsonl/jsonl_to_jsonl.py
Normal file
100
src/khoj/processor/jsonl/jsonl_to_jsonl.py
Normal file
@@ -0,0 +1,100 @@
|
||||
# Standard Packages
|
||||
import glob
|
||||
import logging
|
||||
from pathlib import Path
|
||||
from typing import List
|
||||
|
||||
# Internal Packages
|
||||
from khoj.processor.text_to_jsonl import TextToJsonl
|
||||
from khoj.utils.helpers import get_absolute_path, timer
|
||||
from khoj.utils.jsonl import load_jsonl, dump_jsonl, compress_jsonl_data
|
||||
from khoj.utils.rawconfig import Entry
|
||||
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class JsonlToJsonl(TextToJsonl):
|
||||
# Define Functions
|
||||
def process(self, previous_entries=None):
|
||||
# Extract required fields from config
|
||||
input_jsonl_files, input_jsonl_filter, output_file = (
|
||||
self.config.input_files,
|
||||
self.config.input_filter,
|
||||
self.config.compressed_jsonl,
|
||||
)
|
||||
|
||||
# Get Jsonl Input Files to Process
|
||||
all_input_jsonl_files = JsonlToJsonl.get_jsonl_files(input_jsonl_files, input_jsonl_filter)
|
||||
|
||||
# Extract Entries from specified jsonl files
|
||||
with timer("Parse entries from jsonl files", logger):
|
||||
input_jsons = JsonlToJsonl.extract_jsonl_entries(all_input_jsonl_files)
|
||||
current_entries = list(map(Entry.from_dict, input_jsons))
|
||||
|
||||
# Split entries by max tokens supported by model
|
||||
with timer("Split entries by max token size supported by model", logger):
|
||||
current_entries = self.split_entries_by_max_tokens(current_entries, max_tokens=256)
|
||||
|
||||
# Identify, mark and merge any new entries with previous entries
|
||||
with timer("Identify new or updated entries", logger):
|
||||
if not previous_entries:
|
||||
entries_with_ids = list(enumerate(current_entries))
|
||||
else:
|
||||
entries_with_ids = self.mark_entries_for_update(
|
||||
current_entries,
|
||||
previous_entries,
|
||||
key="compiled",
|
||||
logger=logger,
|
||||
)
|
||||
|
||||
with timer("Write entries to JSONL file", logger):
|
||||
# Process Each Entry from All Notes Files
|
||||
entries = list(map(lambda entry: entry[1], entries_with_ids))
|
||||
jsonl_data = JsonlToJsonl.convert_entries_to_jsonl(entries)
|
||||
|
||||
# Compress JSONL formatted Data
|
||||
if output_file.suffix == ".gz":
|
||||
compress_jsonl_data(jsonl_data, output_file)
|
||||
elif output_file.suffix == ".jsonl":
|
||||
dump_jsonl(jsonl_data, output_file)
|
||||
|
||||
return entries_with_ids
|
||||
|
||||
@staticmethod
|
||||
def get_jsonl_files(jsonl_files=None, jsonl_file_filters=None):
|
||||
"Get all jsonl files to process"
|
||||
absolute_jsonl_files, filtered_jsonl_files = set(), set()
|
||||
if jsonl_files:
|
||||
absolute_jsonl_files = {get_absolute_path(jsonl_file) for jsonl_file in jsonl_files}
|
||||
if jsonl_file_filters:
|
||||
filtered_jsonl_files = {
|
||||
filtered_file
|
||||
for jsonl_file_filter in jsonl_file_filters
|
||||
for filtered_file in glob.glob(get_absolute_path(jsonl_file_filter), recursive=True)
|
||||
}
|
||||
|
||||
all_jsonl_files = sorted(absolute_jsonl_files | filtered_jsonl_files)
|
||||
|
||||
files_with_non_jsonl_extensions = {
|
||||
jsonl_file for jsonl_file in all_jsonl_files if not jsonl_file.endswith(".jsonl")
|
||||
}
|
||||
if any(files_with_non_jsonl_extensions):
|
||||
print(f"[Warning] There maybe non jsonl files in the input set: {files_with_non_jsonl_extensions}")
|
||||
|
||||
logger.info(f"Processing files: {all_jsonl_files}")
|
||||
|
||||
return all_jsonl_files
|
||||
|
||||
@staticmethod
|
||||
def extract_jsonl_entries(jsonl_files):
|
||||
"Extract entries from specified jsonl files"
|
||||
entries = []
|
||||
for jsonl_file in jsonl_files:
|
||||
entries.extend(load_jsonl(Path(jsonl_file)))
|
||||
return entries
|
||||
|
||||
@staticmethod
|
||||
def convert_entries_to_jsonl(entries: List[Entry]):
|
||||
"Convert each entry to JSON and collate as JSONL"
|
||||
return "".join([f"{entry.to_json()}\n" for entry in entries])
|
||||
82
tests/test_jsonl_to_jsonl.py
Normal file
82
tests/test_jsonl_to_jsonl.py
Normal file
@@ -0,0 +1,82 @@
|
||||
# Standard Packages
|
||||
import json
|
||||
|
||||
# Internal Packages
|
||||
from khoj.processor.jsonl.jsonl_to_jsonl import JsonlToJsonl
|
||||
from khoj.utils.jsonl import load_jsonl
|
||||
from khoj.utils.rawconfig import Entry
|
||||
|
||||
|
||||
def test_process_entries_from_single_input_jsonl(tmp_path):
|
||||
"Convert multiple jsonl entries from single file to entries."
|
||||
# Arrange
|
||||
input_jsonl = """{"raw": "raw input data 1", "compiled": "compiled input data 1", "file": "source/file/path1"}
|
||||
{"raw": "raw input data 2", "compiled": "compiled input data 2", "file": "source/file/path2"}
|
||||
"""
|
||||
input_jsonl_file = create_file(tmp_path, input_jsonl)
|
||||
|
||||
# Act
|
||||
# Process Each Entry from All Notes Files
|
||||
input_jsons = JsonlToJsonl.extract_jsonl_entries([input_jsonl_file])
|
||||
entries = list(map(Entry.from_dict, input_jsons))
|
||||
output_jsonl = JsonlToJsonl.convert_entries_to_jsonl(entries)
|
||||
|
||||
# Assert
|
||||
assert len(entries) == 2
|
||||
assert output_jsonl == input_jsonl
|
||||
|
||||
|
||||
def test_process_entries_from_multiple_input_jsonls(tmp_path):
|
||||
"Convert multiple jsonl entries from single file to entries."
|
||||
# Arrange
|
||||
input_jsonl_1 = """{"raw": "raw input data 1", "compiled": "compiled input data 1", "file": "source/file/path1"}"""
|
||||
input_jsonl_2 = """{"raw": "raw input data 2", "compiled": "compiled input data 2", "file": "source/file/path2"}"""
|
||||
input_jsonl_file_1 = create_file(tmp_path, input_jsonl_1, filename="input1.jsonl")
|
||||
input_jsonl_file_2 = create_file(tmp_path, input_jsonl_2, filename="input2.jsonl")
|
||||
|
||||
# Act
|
||||
# Process Each Entry from All Notes Files
|
||||
input_jsons = JsonlToJsonl.extract_jsonl_entries([input_jsonl_file_1, input_jsonl_file_2])
|
||||
entries = list(map(Entry.from_dict, input_jsons))
|
||||
output_jsonl = JsonlToJsonl.convert_entries_to_jsonl(entries)
|
||||
|
||||
# Assert
|
||||
assert len(entries) == 2
|
||||
assert output_jsonl == f"{input_jsonl_1}\n{input_jsonl_2}\n"
|
||||
|
||||
|
||||
def test_get_jsonl_files(tmp_path):
|
||||
"Ensure JSONL files specified via input-filter, input-files extracted"
|
||||
# Arrange
|
||||
# Include via input-filter globs
|
||||
group1_file1 = create_file(tmp_path, filename="group1-file1.jsonl")
|
||||
group1_file2 = create_file(tmp_path, filename="group1-file2.jsonl")
|
||||
group2_file1 = create_file(tmp_path, filename="group2-file1.jsonl")
|
||||
group2_file2 = create_file(tmp_path, filename="group2-file2.jsonl")
|
||||
# Include via input-file field
|
||||
file1 = create_file(tmp_path, filename="notes.jsonl")
|
||||
# Not included by any filter
|
||||
create_file(tmp_path, filename="not-included-jsonl.jsonl")
|
||||
create_file(tmp_path, filename="not-included-text.txt")
|
||||
|
||||
expected_files = sorted(map(str, [group1_file1, group1_file2, group2_file1, group2_file2, file1]))
|
||||
|
||||
# Setup input-files, input-filters
|
||||
input_files = [tmp_path / "notes.jsonl"]
|
||||
input_filter = [tmp_path / "group1*.jsonl", tmp_path / "group2*.jsonl"]
|
||||
|
||||
# Act
|
||||
extracted_org_files = JsonlToJsonl.get_jsonl_files(input_files, input_filter)
|
||||
|
||||
# Assert
|
||||
assert len(extracted_org_files) == 5
|
||||
assert extracted_org_files == expected_files
|
||||
|
||||
|
||||
# Helper Functions
|
||||
def create_file(tmp_path, entry=None, filename="test.jsonl"):
|
||||
jsonl_file = tmp_path / filename
|
||||
jsonl_file.touch()
|
||||
if entry:
|
||||
jsonl_file.write_text(entry)
|
||||
return jsonl_file
|
||||
Reference in New Issue
Block a user