mirror of
https://github.com/khoaliber/khoj.git
synced 2026-03-04 21:29:12 +00:00
Add processor to index entries from jsonl files for plugins
- Read, merge entries from input jsonl files and filters - Mark new, modified entries for update
This commit is contained in:
0
src/khoj/processor/jsonl/__init__.py
Normal file
0
src/khoj/processor/jsonl/__init__.py
Normal file
100
src/khoj/processor/jsonl/jsonl_to_jsonl.py
Normal file
100
src/khoj/processor/jsonl/jsonl_to_jsonl.py
Normal file
@@ -0,0 +1,100 @@
|
|||||||
|
# Standard Packages
|
||||||
|
import glob
|
||||||
|
import logging
|
||||||
|
from pathlib import Path
|
||||||
|
from typing import List
|
||||||
|
|
||||||
|
# Internal Packages
|
||||||
|
from khoj.processor.text_to_jsonl import TextToJsonl
|
||||||
|
from khoj.utils.helpers import get_absolute_path, timer
|
||||||
|
from khoj.utils.jsonl import load_jsonl, dump_jsonl, compress_jsonl_data
|
||||||
|
from khoj.utils.rawconfig import Entry
|
||||||
|
|
||||||
|
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
class JsonlToJsonl(TextToJsonl):
|
||||||
|
# Define Functions
|
||||||
|
def process(self, previous_entries=None):
|
||||||
|
# Extract required fields from config
|
||||||
|
input_jsonl_files, input_jsonl_filter, output_file = (
|
||||||
|
self.config.input_files,
|
||||||
|
self.config.input_filter,
|
||||||
|
self.config.compressed_jsonl,
|
||||||
|
)
|
||||||
|
|
||||||
|
# Get Jsonl Input Files to Process
|
||||||
|
all_input_jsonl_files = JsonlToJsonl.get_jsonl_files(input_jsonl_files, input_jsonl_filter)
|
||||||
|
|
||||||
|
# Extract Entries from specified jsonl files
|
||||||
|
with timer("Parse entries from jsonl files", logger):
|
||||||
|
input_jsons = JsonlToJsonl.extract_jsonl_entries(all_input_jsonl_files)
|
||||||
|
current_entries = list(map(Entry.from_dict, input_jsons))
|
||||||
|
|
||||||
|
# Split entries by max tokens supported by model
|
||||||
|
with timer("Split entries by max token size supported by model", logger):
|
||||||
|
current_entries = self.split_entries_by_max_tokens(current_entries, max_tokens=256)
|
||||||
|
|
||||||
|
# Identify, mark and merge any new entries with previous entries
|
||||||
|
with timer("Identify new or updated entries", logger):
|
||||||
|
if not previous_entries:
|
||||||
|
entries_with_ids = list(enumerate(current_entries))
|
||||||
|
else:
|
||||||
|
entries_with_ids = self.mark_entries_for_update(
|
||||||
|
current_entries,
|
||||||
|
previous_entries,
|
||||||
|
key="compiled",
|
||||||
|
logger=logger,
|
||||||
|
)
|
||||||
|
|
||||||
|
with timer("Write entries to JSONL file", logger):
|
||||||
|
# Process Each Entry from All Notes Files
|
||||||
|
entries = list(map(lambda entry: entry[1], entries_with_ids))
|
||||||
|
jsonl_data = JsonlToJsonl.convert_entries_to_jsonl(entries)
|
||||||
|
|
||||||
|
# Compress JSONL formatted Data
|
||||||
|
if output_file.suffix == ".gz":
|
||||||
|
compress_jsonl_data(jsonl_data, output_file)
|
||||||
|
elif output_file.suffix == ".jsonl":
|
||||||
|
dump_jsonl(jsonl_data, output_file)
|
||||||
|
|
||||||
|
return entries_with_ids
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def get_jsonl_files(jsonl_files=None, jsonl_file_filters=None):
|
||||||
|
"Get all jsonl files to process"
|
||||||
|
absolute_jsonl_files, filtered_jsonl_files = set(), set()
|
||||||
|
if jsonl_files:
|
||||||
|
absolute_jsonl_files = {get_absolute_path(jsonl_file) for jsonl_file in jsonl_files}
|
||||||
|
if jsonl_file_filters:
|
||||||
|
filtered_jsonl_files = {
|
||||||
|
filtered_file
|
||||||
|
for jsonl_file_filter in jsonl_file_filters
|
||||||
|
for filtered_file in glob.glob(get_absolute_path(jsonl_file_filter), recursive=True)
|
||||||
|
}
|
||||||
|
|
||||||
|
all_jsonl_files = sorted(absolute_jsonl_files | filtered_jsonl_files)
|
||||||
|
|
||||||
|
files_with_non_jsonl_extensions = {
|
||||||
|
jsonl_file for jsonl_file in all_jsonl_files if not jsonl_file.endswith(".jsonl")
|
||||||
|
}
|
||||||
|
if any(files_with_non_jsonl_extensions):
|
||||||
|
print(f"[Warning] There maybe non jsonl files in the input set: {files_with_non_jsonl_extensions}")
|
||||||
|
|
||||||
|
logger.info(f"Processing files: {all_jsonl_files}")
|
||||||
|
|
||||||
|
return all_jsonl_files
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def extract_jsonl_entries(jsonl_files):
|
||||||
|
"Extract entries from specified jsonl files"
|
||||||
|
entries = []
|
||||||
|
for jsonl_file in jsonl_files:
|
||||||
|
entries.extend(load_jsonl(Path(jsonl_file)))
|
||||||
|
return entries
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def convert_entries_to_jsonl(entries: List[Entry]):
|
||||||
|
"Convert each entry to JSON and collate as JSONL"
|
||||||
|
return "".join([f"{entry.to_json()}\n" for entry in entries])
|
||||||
82
tests/test_jsonl_to_jsonl.py
Normal file
82
tests/test_jsonl_to_jsonl.py
Normal file
@@ -0,0 +1,82 @@
|
|||||||
|
# Standard Packages
|
||||||
|
import json
|
||||||
|
|
||||||
|
# Internal Packages
|
||||||
|
from khoj.processor.jsonl.jsonl_to_jsonl import JsonlToJsonl
|
||||||
|
from khoj.utils.jsonl import load_jsonl
|
||||||
|
from khoj.utils.rawconfig import Entry
|
||||||
|
|
||||||
|
|
||||||
|
def test_process_entries_from_single_input_jsonl(tmp_path):
|
||||||
|
"Convert multiple jsonl entries from single file to entries."
|
||||||
|
# Arrange
|
||||||
|
input_jsonl = """{"raw": "raw input data 1", "compiled": "compiled input data 1", "file": "source/file/path1"}
|
||||||
|
{"raw": "raw input data 2", "compiled": "compiled input data 2", "file": "source/file/path2"}
|
||||||
|
"""
|
||||||
|
input_jsonl_file = create_file(tmp_path, input_jsonl)
|
||||||
|
|
||||||
|
# Act
|
||||||
|
# Process Each Entry from All Notes Files
|
||||||
|
input_jsons = JsonlToJsonl.extract_jsonl_entries([input_jsonl_file])
|
||||||
|
entries = list(map(Entry.from_dict, input_jsons))
|
||||||
|
output_jsonl = JsonlToJsonl.convert_entries_to_jsonl(entries)
|
||||||
|
|
||||||
|
# Assert
|
||||||
|
assert len(entries) == 2
|
||||||
|
assert output_jsonl == input_jsonl
|
||||||
|
|
||||||
|
|
||||||
|
def test_process_entries_from_multiple_input_jsonls(tmp_path):
|
||||||
|
"Convert multiple jsonl entries from single file to entries."
|
||||||
|
# Arrange
|
||||||
|
input_jsonl_1 = """{"raw": "raw input data 1", "compiled": "compiled input data 1", "file": "source/file/path1"}"""
|
||||||
|
input_jsonl_2 = """{"raw": "raw input data 2", "compiled": "compiled input data 2", "file": "source/file/path2"}"""
|
||||||
|
input_jsonl_file_1 = create_file(tmp_path, input_jsonl_1, filename="input1.jsonl")
|
||||||
|
input_jsonl_file_2 = create_file(tmp_path, input_jsonl_2, filename="input2.jsonl")
|
||||||
|
|
||||||
|
# Act
|
||||||
|
# Process Each Entry from All Notes Files
|
||||||
|
input_jsons = JsonlToJsonl.extract_jsonl_entries([input_jsonl_file_1, input_jsonl_file_2])
|
||||||
|
entries = list(map(Entry.from_dict, input_jsons))
|
||||||
|
output_jsonl = JsonlToJsonl.convert_entries_to_jsonl(entries)
|
||||||
|
|
||||||
|
# Assert
|
||||||
|
assert len(entries) == 2
|
||||||
|
assert output_jsonl == f"{input_jsonl_1}\n{input_jsonl_2}\n"
|
||||||
|
|
||||||
|
|
||||||
|
def test_get_jsonl_files(tmp_path):
|
||||||
|
"Ensure JSONL files specified via input-filter, input-files extracted"
|
||||||
|
# Arrange
|
||||||
|
# Include via input-filter globs
|
||||||
|
group1_file1 = create_file(tmp_path, filename="group1-file1.jsonl")
|
||||||
|
group1_file2 = create_file(tmp_path, filename="group1-file2.jsonl")
|
||||||
|
group2_file1 = create_file(tmp_path, filename="group2-file1.jsonl")
|
||||||
|
group2_file2 = create_file(tmp_path, filename="group2-file2.jsonl")
|
||||||
|
# Include via input-file field
|
||||||
|
file1 = create_file(tmp_path, filename="notes.jsonl")
|
||||||
|
# Not included by any filter
|
||||||
|
create_file(tmp_path, filename="not-included-jsonl.jsonl")
|
||||||
|
create_file(tmp_path, filename="not-included-text.txt")
|
||||||
|
|
||||||
|
expected_files = sorted(map(str, [group1_file1, group1_file2, group2_file1, group2_file2, file1]))
|
||||||
|
|
||||||
|
# Setup input-files, input-filters
|
||||||
|
input_files = [tmp_path / "notes.jsonl"]
|
||||||
|
input_filter = [tmp_path / "group1*.jsonl", tmp_path / "group2*.jsonl"]
|
||||||
|
|
||||||
|
# Act
|
||||||
|
extracted_org_files = JsonlToJsonl.get_jsonl_files(input_files, input_filter)
|
||||||
|
|
||||||
|
# Assert
|
||||||
|
assert len(extracted_org_files) == 5
|
||||||
|
assert extracted_org_files == expected_files
|
||||||
|
|
||||||
|
|
||||||
|
# Helper Functions
|
||||||
|
def create_file(tmp_path, entry=None, filename="test.jsonl"):
|
||||||
|
jsonl_file = tmp_path / filename
|
||||||
|
jsonl_file.touch()
|
||||||
|
if entry:
|
||||||
|
jsonl_file.write_text(entry)
|
||||||
|
return jsonl_file
|
||||||
Reference in New Issue
Block a user