Parse markdown parent entries as single entry if fit within max tokens

These changes improve context available to the search model.
Specifically this should improve entry context from short knowledge trees,
that is knowledge bases with sparse, short heading/entry trees

Previously we'd always split markdown files by headings, even if a
parent entry was small enough to fit entirely within the max token
limits of the search model. This used to reduce the context available
to the search model to select appropriate entries for a query,
especially from short entry trees

Revert back to using regex to parse through markdown file instead of
using MarkdownHeaderTextSplitter. It was easier to implement the
logical split using regexes rather than bend MarkdowHeaderTextSplitter
to implement it.
- DFS traverse the markdown knowledge tree, prefix ancestry to each entry
This commit is contained in:
Debanjum Singh Solanky
2024-02-10 23:03:30 +05:30
parent 982ac1859c
commit db2581459f
2 changed files with 168 additions and 97 deletions

View File

@@ -1,10 +1,9 @@
import logging
import re
from pathlib import Path
from typing import List, Tuple
from typing import Dict, List, Tuple
import urllib3
from langchain.text_splitter import MarkdownHeaderTextSplitter
from khoj.database.models import Entry as DbEntry
from khoj.database.models import KhojUser
@@ -80,37 +79,54 @@ class MarkdownToEntries(TextToEntries):
entries: List[str],
entry_to_file_map: List[Tuple[str, Path]],
max_tokens=256,
ancestry: Dict[int, str] = {},
):
if len(TextToEntries.tokenizer(markdown_content)) <= max_tokens:
entry_to_file_map += [(markdown_content, markdown_file)]
entries.extend([markdown_content])
# Prepend the markdown section's heading ancestry
ancestry_string = "\n".join([f"{'#' * key} {ancestry[key]}" for key in sorted(ancestry.keys())])
markdown_content_with_ancestry = f"{ancestry_string}{markdown_content}"
# If content is small or content has no children headings, save it as a single entry
if len(TextToEntries.tokenizer(markdown_content_with_ancestry)) <= max_tokens or not re.search(
rf"^#{{{len(ancestry)+1},}}\s", markdown_content, re.MULTILINE
):
entry_to_file_map += [(markdown_content_with_ancestry, markdown_file)]
entries.extend([markdown_content_with_ancestry])
return entries, entry_to_file_map
headers_to_split_on = [("#", "1"), ("##", "2"), ("###", "3"), ("####", "4"), ("#####", "5"), ("######", "6")]
reversed_headers_to_split_on = list(reversed(headers_to_split_on))
markdown_entries_per_file: List[str] = []
previous_section_metadata, current_section_metadata = None, None
# Split by next heading level present in the entry
next_heading_level = len(ancestry)
sections: List[str] = []
while len(sections) < 2:
next_heading_level += 1
sections = re.split(rf"(\n|^)(?=[#]{{{next_heading_level}}} .+\n?)", markdown_content, re.MULTILINE)
splitter = MarkdownHeaderTextSplitter(headers_to_split_on, strip_headers=False, return_each_line=True)
for section in splitter.split_text(markdown_content):
current_section_metadata = section.metadata.copy()
# Append the section's content to the last entry if the metadata is the same
if previous_section_metadata == current_section_metadata:
markdown_entries_per_file[-1] = f"{markdown_entries_per_file[-1]}\n{section.page_content}"
# Insert new entry with it's heading ancestry, if the section is under a new heading
for section in sections:
# Skip empty sections
if section.strip() == "":
continue
# Extract the section body and (when present) the heading
current_ancestry = ancestry.copy()
first_line = [line for line in section.split("\n") if line.strip() != ""][0]
if re.search(rf"^#{{{next_heading_level}}} ", first_line):
# Extract the section body without the heading
current_section_body = "\n".join(section.split(first_line)[1:])
# Parse the section heading into current section ancestry
current_section_title = first_line[next_heading_level:].strip()
current_ancestry[next_heading_level] = current_section_title
else:
# Drop the current heading from the metadata. It is already in the section content
if section.metadata:
section.metadata.pop(max(section.metadata))
# Prepend the markdown section's heading ancestry
for heading in reversed_headers_to_split_on:
if heading[1] in section.metadata:
section.page_content = f"{heading[0]} {section.metadata[heading[1]]}\n{section.page_content}"
previous_section_metadata = current_section_metadata
markdown_entries_per_file += [section.page_content]
current_section_body = section
# Recurse down children of the current entry
MarkdownToEntries.process_single_markdown_file(
current_section_body,
markdown_file,
entries,
entry_to_file_map,
max_tokens,
current_ancestry,
)
entry_to_file_map += zip(markdown_entries_per_file, [markdown_file] * len(markdown_entries_per_file))
entries.extend(markdown_entries_per_file)
return entries, entry_to_file_map
@staticmethod