diff --git a/src/khoj/processor/jsonl/__init__.py b/src/khoj/processor/jsonl/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/src/khoj/processor/jsonl/jsonl_to_jsonl.py b/src/khoj/processor/jsonl/jsonl_to_jsonl.py new file mode 100644 index 00000000..0268a659 --- /dev/null +++ b/src/khoj/processor/jsonl/jsonl_to_jsonl.py @@ -0,0 +1,100 @@ +# Standard Packages +import glob +import logging +from pathlib import Path +from typing import List + +# Internal Packages +from khoj.processor.text_to_jsonl import TextToJsonl +from khoj.utils.helpers import get_absolute_path, timer +from khoj.utils.jsonl import load_jsonl, dump_jsonl, compress_jsonl_data +from khoj.utils.rawconfig import Entry + + +logger = logging.getLogger(__name__) + + +class JsonlToJsonl(TextToJsonl): + # Define Functions + def process(self, previous_entries=None): + # Extract required fields from config + input_jsonl_files, input_jsonl_filter, output_file = ( + self.config.input_files, + self.config.input_filter, + self.config.compressed_jsonl, + ) + + # Get Jsonl Input Files to Process + all_input_jsonl_files = JsonlToJsonl.get_jsonl_files(input_jsonl_files, input_jsonl_filter) + + # Extract Entries from specified jsonl files + with timer("Parse entries from jsonl files", logger): + input_jsons = JsonlToJsonl.extract_jsonl_entries(all_input_jsonl_files) + current_entries = list(map(Entry.from_dict, input_jsons)) + + # Split entries by max tokens supported by model + with timer("Split entries by max token size supported by model", logger): + current_entries = self.split_entries_by_max_tokens(current_entries, max_tokens=256) + + # Identify, mark and merge any new entries with previous entries + with timer("Identify new or updated entries", logger): + if not previous_entries: + entries_with_ids = list(enumerate(current_entries)) + else: + entries_with_ids = self.mark_entries_for_update( + current_entries, + previous_entries, + key="compiled", + logger=logger, + ) + + with timer("Write entries to JSONL file", logger): + # Process Each Entry from All Notes Files + entries = list(map(lambda entry: entry[1], entries_with_ids)) + jsonl_data = JsonlToJsonl.convert_entries_to_jsonl(entries) + + # Compress JSONL formatted Data + if output_file.suffix == ".gz": + compress_jsonl_data(jsonl_data, output_file) + elif output_file.suffix == ".jsonl": + dump_jsonl(jsonl_data, output_file) + + return entries_with_ids + + @staticmethod + def get_jsonl_files(jsonl_files=None, jsonl_file_filters=None): + "Get all jsonl files to process" + absolute_jsonl_files, filtered_jsonl_files = set(), set() + if jsonl_files: + absolute_jsonl_files = {get_absolute_path(jsonl_file) for jsonl_file in jsonl_files} + if jsonl_file_filters: + filtered_jsonl_files = { + filtered_file + for jsonl_file_filter in jsonl_file_filters + for filtered_file in glob.glob(get_absolute_path(jsonl_file_filter), recursive=True) + } + + all_jsonl_files = sorted(absolute_jsonl_files | filtered_jsonl_files) + + files_with_non_jsonl_extensions = { + jsonl_file for jsonl_file in all_jsonl_files if not jsonl_file.endswith(".jsonl") + } + if any(files_with_non_jsonl_extensions): + print(f"[Warning] There maybe non jsonl files in the input set: {files_with_non_jsonl_extensions}") + + logger.info(f"Processing files: {all_jsonl_files}") + + return all_jsonl_files + + @staticmethod + def extract_jsonl_entries(jsonl_files): + "Extract entries from specified jsonl files" + entries = [] + for jsonl_file in jsonl_files: + entries.extend(load_jsonl(Path(jsonl_file))) + return entries + + @staticmethod + def convert_entries_to_jsonl(entries: List[Entry]): + "Convert each entry to JSON and collate as JSONL" + return "".join([f"{entry.to_json()}\n" for entry in entries]) diff --git a/tests/test_jsonl_to_jsonl.py b/tests/test_jsonl_to_jsonl.py new file mode 100644 index 00000000..eb25d579 --- /dev/null +++ b/tests/test_jsonl_to_jsonl.py @@ -0,0 +1,82 @@ +# Standard Packages +import json + +# Internal Packages +from khoj.processor.jsonl.jsonl_to_jsonl import JsonlToJsonl +from khoj.utils.jsonl import load_jsonl +from khoj.utils.rawconfig import Entry + + +def test_process_entries_from_single_input_jsonl(tmp_path): + "Convert multiple jsonl entries from single file to entries." + # Arrange + input_jsonl = """{"raw": "raw input data 1", "compiled": "compiled input data 1", "file": "source/file/path1"} +{"raw": "raw input data 2", "compiled": "compiled input data 2", "file": "source/file/path2"} +""" + input_jsonl_file = create_file(tmp_path, input_jsonl) + + # Act + # Process Each Entry from All Notes Files + input_jsons = JsonlToJsonl.extract_jsonl_entries([input_jsonl_file]) + entries = list(map(Entry.from_dict, input_jsons)) + output_jsonl = JsonlToJsonl.convert_entries_to_jsonl(entries) + + # Assert + assert len(entries) == 2 + assert output_jsonl == input_jsonl + + +def test_process_entries_from_multiple_input_jsonls(tmp_path): + "Convert multiple jsonl entries from single file to entries." + # Arrange + input_jsonl_1 = """{"raw": "raw input data 1", "compiled": "compiled input data 1", "file": "source/file/path1"}""" + input_jsonl_2 = """{"raw": "raw input data 2", "compiled": "compiled input data 2", "file": "source/file/path2"}""" + input_jsonl_file_1 = create_file(tmp_path, input_jsonl_1, filename="input1.jsonl") + input_jsonl_file_2 = create_file(tmp_path, input_jsonl_2, filename="input2.jsonl") + + # Act + # Process Each Entry from All Notes Files + input_jsons = JsonlToJsonl.extract_jsonl_entries([input_jsonl_file_1, input_jsonl_file_2]) + entries = list(map(Entry.from_dict, input_jsons)) + output_jsonl = JsonlToJsonl.convert_entries_to_jsonl(entries) + + # Assert + assert len(entries) == 2 + assert output_jsonl == f"{input_jsonl_1}\n{input_jsonl_2}\n" + + +def test_get_jsonl_files(tmp_path): + "Ensure JSONL files specified via input-filter, input-files extracted" + # Arrange + # Include via input-filter globs + group1_file1 = create_file(tmp_path, filename="group1-file1.jsonl") + group1_file2 = create_file(tmp_path, filename="group1-file2.jsonl") + group2_file1 = create_file(tmp_path, filename="group2-file1.jsonl") + group2_file2 = create_file(tmp_path, filename="group2-file2.jsonl") + # Include via input-file field + file1 = create_file(tmp_path, filename="notes.jsonl") + # Not included by any filter + create_file(tmp_path, filename="not-included-jsonl.jsonl") + create_file(tmp_path, filename="not-included-text.txt") + + expected_files = sorted(map(str, [group1_file1, group1_file2, group2_file1, group2_file2, file1])) + + # Setup input-files, input-filters + input_files = [tmp_path / "notes.jsonl"] + input_filter = [tmp_path / "group1*.jsonl", tmp_path / "group2*.jsonl"] + + # Act + extracted_org_files = JsonlToJsonl.get_jsonl_files(input_files, input_filter) + + # Assert + assert len(extracted_org_files) == 5 + assert extracted_org_files == expected_files + + +# Helper Functions +def create_file(tmp_path, entry=None, filename="test.jsonl"): + jsonl_file = tmp_path / filename + jsonl_file.touch() + if entry: + jsonl_file.write_text(entry) + return jsonl_file