Add processor to index entries from jsonl files for plugins

- Read, merge entries from input jsonl files and filters
- Mark new, modified entries for update
This commit is contained in:
Debanjum Singh Solanky
2023-02-23 22:29:07 -06:00
parent fcbbe8c759
commit 55a032e8c4
3 changed files with 182 additions and 0 deletions

View File

View File

@@ -0,0 +1,100 @@
# Standard Packages
import glob
import logging
from pathlib import Path
from typing import List
# Internal Packages
from khoj.processor.text_to_jsonl import TextToJsonl
from khoj.utils.helpers import get_absolute_path, timer
from khoj.utils.jsonl import load_jsonl, dump_jsonl, compress_jsonl_data
from khoj.utils.rawconfig import Entry
logger = logging.getLogger(__name__)
class JsonlToJsonl(TextToJsonl):
# Define Functions
def process(self, previous_entries=None):
# Extract required fields from config
input_jsonl_files, input_jsonl_filter, output_file = (
self.config.input_files,
self.config.input_filter,
self.config.compressed_jsonl,
)
# Get Jsonl Input Files to Process
all_input_jsonl_files = JsonlToJsonl.get_jsonl_files(input_jsonl_files, input_jsonl_filter)
# Extract Entries from specified jsonl files
with timer("Parse entries from jsonl files", logger):
input_jsons = JsonlToJsonl.extract_jsonl_entries(all_input_jsonl_files)
current_entries = list(map(Entry.from_dict, input_jsons))
# Split entries by max tokens supported by model
with timer("Split entries by max token size supported by model", logger):
current_entries = self.split_entries_by_max_tokens(current_entries, max_tokens=256)
# Identify, mark and merge any new entries with previous entries
with timer("Identify new or updated entries", logger):
if not previous_entries:
entries_with_ids = list(enumerate(current_entries))
else:
entries_with_ids = self.mark_entries_for_update(
current_entries,
previous_entries,
key="compiled",
logger=logger,
)
with timer("Write entries to JSONL file", logger):
# Process Each Entry from All Notes Files
entries = list(map(lambda entry: entry[1], entries_with_ids))
jsonl_data = JsonlToJsonl.convert_entries_to_jsonl(entries)
# Compress JSONL formatted Data
if output_file.suffix == ".gz":
compress_jsonl_data(jsonl_data, output_file)
elif output_file.suffix == ".jsonl":
dump_jsonl(jsonl_data, output_file)
return entries_with_ids
@staticmethod
def get_jsonl_files(jsonl_files=None, jsonl_file_filters=None):
"Get all jsonl files to process"
absolute_jsonl_files, filtered_jsonl_files = set(), set()
if jsonl_files:
absolute_jsonl_files = {get_absolute_path(jsonl_file) for jsonl_file in jsonl_files}
if jsonl_file_filters:
filtered_jsonl_files = {
filtered_file
for jsonl_file_filter in jsonl_file_filters
for filtered_file in glob.glob(get_absolute_path(jsonl_file_filter), recursive=True)
}
all_jsonl_files = sorted(absolute_jsonl_files | filtered_jsonl_files)
files_with_non_jsonl_extensions = {
jsonl_file for jsonl_file in all_jsonl_files if not jsonl_file.endswith(".jsonl")
}
if any(files_with_non_jsonl_extensions):
print(f"[Warning] There maybe non jsonl files in the input set: {files_with_non_jsonl_extensions}")
logger.info(f"Processing files: {all_jsonl_files}")
return all_jsonl_files
@staticmethod
def extract_jsonl_entries(jsonl_files):
"Extract entries from specified jsonl files"
entries = []
for jsonl_file in jsonl_files:
entries.extend(load_jsonl(Path(jsonl_file)))
return entries
@staticmethod
def convert_entries_to_jsonl(entries: List[Entry]):
"Convert each entry to JSON and collate as JSONL"
return "".join([f"{entry.to_json()}\n" for entry in entries])

View File

@@ -0,0 +1,82 @@
# Standard Packages
import json
# Internal Packages
from khoj.processor.jsonl.jsonl_to_jsonl import JsonlToJsonl
from khoj.utils.jsonl import load_jsonl
from khoj.utils.rawconfig import Entry
def test_process_entries_from_single_input_jsonl(tmp_path):
"Convert multiple jsonl entries from single file to entries."
# Arrange
input_jsonl = """{"raw": "raw input data 1", "compiled": "compiled input data 1", "file": "source/file/path1"}
{"raw": "raw input data 2", "compiled": "compiled input data 2", "file": "source/file/path2"}
"""
input_jsonl_file = create_file(tmp_path, input_jsonl)
# Act
# Process Each Entry from All Notes Files
input_jsons = JsonlToJsonl.extract_jsonl_entries([input_jsonl_file])
entries = list(map(Entry.from_dict, input_jsons))
output_jsonl = JsonlToJsonl.convert_entries_to_jsonl(entries)
# Assert
assert len(entries) == 2
assert output_jsonl == input_jsonl
def test_process_entries_from_multiple_input_jsonls(tmp_path):
"Convert multiple jsonl entries from single file to entries."
# Arrange
input_jsonl_1 = """{"raw": "raw input data 1", "compiled": "compiled input data 1", "file": "source/file/path1"}"""
input_jsonl_2 = """{"raw": "raw input data 2", "compiled": "compiled input data 2", "file": "source/file/path2"}"""
input_jsonl_file_1 = create_file(tmp_path, input_jsonl_1, filename="input1.jsonl")
input_jsonl_file_2 = create_file(tmp_path, input_jsonl_2, filename="input2.jsonl")
# Act
# Process Each Entry from All Notes Files
input_jsons = JsonlToJsonl.extract_jsonl_entries([input_jsonl_file_1, input_jsonl_file_2])
entries = list(map(Entry.from_dict, input_jsons))
output_jsonl = JsonlToJsonl.convert_entries_to_jsonl(entries)
# Assert
assert len(entries) == 2
assert output_jsonl == f"{input_jsonl_1}\n{input_jsonl_2}\n"
def test_get_jsonl_files(tmp_path):
"Ensure JSONL files specified via input-filter, input-files extracted"
# Arrange
# Include via input-filter globs
group1_file1 = create_file(tmp_path, filename="group1-file1.jsonl")
group1_file2 = create_file(tmp_path, filename="group1-file2.jsonl")
group2_file1 = create_file(tmp_path, filename="group2-file1.jsonl")
group2_file2 = create_file(tmp_path, filename="group2-file2.jsonl")
# Include via input-file field
file1 = create_file(tmp_path, filename="notes.jsonl")
# Not included by any filter
create_file(tmp_path, filename="not-included-jsonl.jsonl")
create_file(tmp_path, filename="not-included-text.txt")
expected_files = sorted(map(str, [group1_file1, group1_file2, group2_file1, group2_file2, file1]))
# Setup input-files, input-filters
input_files = [tmp_path / "notes.jsonl"]
input_filter = [tmp_path / "group1*.jsonl", tmp_path / "group2*.jsonl"]
# Act
extracted_org_files = JsonlToJsonl.get_jsonl_files(input_files, input_filter)
# Assert
assert len(extracted_org_files) == 5
assert extracted_org_files == expected_files
# Helper Functions
def create_file(tmp_path, entry=None, filename="test.jsonl"):
jsonl_file = tmp_path / filename
jsonl_file.touch()
if entry:
jsonl_file.write_text(entry)
return jsonl_file