Add Deeplinks to Improve Context for Document Retrieval (#1206)

## Overview
Show deep link URI and raw document context to provide deeper, richer
context to Khoj. This should allow it better combine semantic search
with other new document retrieval tools like line range based file
viewer and regex tools added in #1205

## Details
- Attach line number based deeplinks to each indexed document entry
Document URI follows URL fragment based schema of form
`file:///path/to/file.txt#line=123`
- Show raw indexed document entries with deep links to LLM when it uses
the semantic search tool
- Reduce structural changes to raw org-mode entries for easier deep
linking.
This commit is contained in:
Debanjum
2025-07-03 20:05:04 -07:00
committed by GitHub
14 changed files with 291 additions and 76 deletions

View File

@@ -23,6 +23,7 @@ logger = logging.getLogger(__name__)
class Context(PydanticBaseModel):
compiled: str
file: str
uri: str
query: Optional[str] = None

View File

@@ -54,13 +54,13 @@ class MarkdownToEntries(TextToEntries):
def extract_markdown_entries(markdown_files: Dict[str, str], max_tokens=256) -> Tuple[Dict[str, str], List[Entry]]:
"Extract entries by heading from specified Markdown files"
entries: List[str] = []
entry_to_file_map: List[Tuple[str, str]] = []
entry_to_file_map: List[Tuple[str, str, int]] = []
file_to_text_map: Dict[str, str] = dict()
for markdown_file in markdown_files:
try:
markdown_content = markdown_files[markdown_file]
entries, entry_to_file_map = MarkdownToEntries.process_single_markdown_file(
markdown_content, markdown_file, entries, entry_to_file_map, max_tokens
markdown_content, markdown_file, entries, entry_to_file_map, max_tokens, start_line=1
)
file_to_text_map[markdown_file] = markdown_content
except Exception as e:
@@ -68,17 +68,18 @@ class MarkdownToEntries(TextToEntries):
f"Unable to process file: {markdown_file}. This file will not be indexed.\n{e}", exc_info=True
)
return file_to_text_map, MarkdownToEntries.convert_markdown_entries_to_maps(entries, dict(entry_to_file_map))
return file_to_text_map, MarkdownToEntries.convert_markdown_entries_to_maps(entries, entry_to_file_map)
@staticmethod
def process_single_markdown_file(
markdown_content: str,
markdown_file: str,
entries: List[str],
entry_to_file_map: List[Tuple[str, str]],
entry_to_file_map: List[Tuple[str, str, int]],
max_tokens=256,
ancestry: Dict[int, str] = {},
) -> Tuple[List[str], List[Tuple[str, str]]]:
start_line: int = 1,
) -> Tuple[List[str], List[Tuple[str, str, int]]]:
# Prepend the markdown section's heading ancestry
ancestry_string = "\n".join([f"{'#' * key} {ancestry[key]}" for key in sorted(ancestry.keys())])
markdown_content_with_ancestry = f"{ancestry_string}{markdown_content}"
@@ -87,7 +88,9 @@ class MarkdownToEntries(TextToEntries):
if len(TextToEntries.tokenizer(markdown_content_with_ancestry)) <= max_tokens or not re.search(
rf"^#{{{len(ancestry)+1},}}\s", markdown_content, flags=re.MULTILINE
):
entry_to_file_map += [(markdown_content_with_ancestry, markdown_file)]
# Create entry with line number information
entry_with_line_info = (markdown_content_with_ancestry, markdown_file, start_line)
entry_to_file_map += [entry_with_line_info]
entries.extend([markdown_content_with_ancestry])
return entries, entry_to_file_map
@@ -98,22 +101,32 @@ class MarkdownToEntries(TextToEntries):
next_heading_level += 1
sections = re.split(rf"(\n|^)(?=[#]{{{next_heading_level}}} .+\n?)", markdown_content, flags=re.MULTILINE)
# Recurse down each non-empty section after parsing its body, heading and ancestry
current_line_offset = 0
for section in sections:
num_lines_in_section = section.count("\n")
# Skip empty sections
if section.strip() == "":
current_line_offset += num_lines_in_section
continue
section_start_line_in_file = start_line + current_line_offset
# Extract the section body and (when present) the heading
current_ancestry = ancestry.copy()
first_line = [line for line in section.split("\n") if line.strip() != ""][0]
if re.search(rf"^#{{{next_heading_level}}} ", first_line):
# Extract the section body without the heading
current_section_body = "\n".join(section.split(first_line)[1:])
current_section_heading, current_section_body = section.split(first_line, 1)
current_section_body_offset = current_section_heading.count("\n")
# Parse the section heading into current section ancestry
current_section_title = first_line[next_heading_level:].strip()
current_ancestry[next_heading_level] = current_section_title
# Line number should point to the heading itself
recursive_start_line = section_start_line_in_file + current_section_body_offset
else:
current_section_body = section
recursive_start_line = section_start_line_in_file
# Recurse down children of the current entry
MarkdownToEntries.process_single_markdown_file(
@@ -123,23 +136,38 @@ class MarkdownToEntries(TextToEntries):
entry_to_file_map,
max_tokens,
current_ancestry,
start_line=recursive_start_line,
)
current_line_offset += num_lines_in_section
return entries, entry_to_file_map
@staticmethod
def convert_markdown_entries_to_maps(parsed_entries: List[str], entry_to_file_map: Dict[str, str]) -> List[Entry]:
def convert_markdown_entries_to_maps(
parsed_entries: List[str], entry_to_file_map: List[Tuple[str, str, int]]
) -> List[Entry]:
"Convert each Markdown entries into a dictionary"
entries: List[Entry] = []
# Create a mapping from parsed entry to file info
entry_map: Dict[str, Tuple[str, int]] = {}
for entry_info in entry_to_file_map:
entry_content, raw_filename, start_line = entry_info
entry_map[entry_content] = (raw_filename, start_line)
for parsed_entry in parsed_entries:
raw_filename = entry_to_file_map[parsed_entry]
raw_filename, start_line = entry_map[parsed_entry]
calculated_line = start_line if start_line > 0 else 1
# Check if raw_filename is a URL. If so, save it as is. If not, convert it to a Path.
if type(raw_filename) == str and re.search(r"^https?://", raw_filename):
# Escape the URL to avoid issues with special characters
entry_filename = urllib3.util.parse_url(raw_filename).url
uri = entry_filename
else:
entry_filename = raw_filename
# Create URI with line number
uri = f"file://{entry_filename}#line={calculated_line}"
heading = parsed_entry.splitlines()[0] if re.search(r"^#+\s", parsed_entry) else ""
# Append base filename to compiled entry for context to model
@@ -152,6 +180,7 @@ class MarkdownToEntries(TextToEntries):
raw=parsed_entry,
heading=f"{prefix}{heading}",
file=entry_filename,
uri=uri,
)
)

View File

@@ -87,6 +87,7 @@ class OrgToEntries(TextToEntries):
entry_to_file_map: List[Tuple[Orgnode, str]],
max_tokens=256,
ancestry: Dict[int, str] = {},
start_line: int = 1,
) -> Tuple[List[List[Orgnode]], List[Tuple[Orgnode, str]]]:
"""Parse org_content from org_file into OrgNode entries
@@ -104,7 +105,9 @@ class OrgToEntries(TextToEntries):
if len(TextToEntries.tokenizer(org_content_with_ancestry)) <= max_tokens or not re.search(
rf"^\*{{{len(ancestry)+1},}}\s", org_content, re.MULTILINE
):
orgnode_content_with_ancestry = orgnode.makelist(org_content_with_ancestry, org_file)
orgnode_content_with_ancestry = orgnode.makelist(
org_content_with_ancestry, org_file, start_line=start_line, ancestry_lines=len(ancestry)
)
entry_to_file_map += zip(orgnode_content_with_ancestry, [org_file] * len(orgnode_content_with_ancestry))
entries.extend([orgnode_content_with_ancestry])
return entries, entry_to_file_map
@@ -125,24 +128,32 @@ class OrgToEntries(TextToEntries):
return entries, entry_to_file_map
# Recurse down each non-empty section after parsing its body, heading and ancestry
current_line_offset = 0
for section in sections:
num_lines_in_section = section.count("\n")
# Skip empty sections
if section.strip() == "":
current_line_offset += num_lines_in_section
continue
section_start_line_in_file = start_line + current_line_offset
# Extract the section body and (when present) the heading
current_ancestry = ancestry.copy()
first_non_empty_line = [line for line in section.split("\n") if line.strip() != ""][0]
# If first non-empty line is a heading with expected heading level
if re.search(rf"^\*{{{next_heading_level}}}\s", first_non_empty_line):
# Extract the section body without the heading
current_section_body = "\n".join(section.split(first_non_empty_line, 1)[1:])
current_section_heading, current_section_body = section.split(first_non_empty_line, 1)
current_section_body_offset = current_section_heading.count("\n")
# Parse the section heading into current section ancestry
current_section_title = first_non_empty_line[next_heading_level:].strip()
current_ancestry[next_heading_level] = current_section_title
recursive_start_line = section_start_line_in_file + current_section_body_offset
# Else process the section as just body text
else:
current_section_body = section
recursive_start_line = section_start_line_in_file
# Recurse down children of the current entry
OrgToEntries.process_single_org_file(
@@ -152,7 +163,9 @@ class OrgToEntries(TextToEntries):
entry_to_file_map,
max_tokens,
current_ancestry,
start_line=recursive_start_line,
)
current_line_offset += num_lines_in_section
return entries, entry_to_file_map
@@ -207,6 +220,8 @@ class OrgToEntries(TextToEntries):
if parsed_entry.hasBody:
compiled += f"\n {parsed_entry.body}"
uri = parsed_entry.properties.pop("LINE", None)
# Add the sub-entry contents to the entry
entry_compiled += compiled
entry_raw += f"{parsed_entry}"
@@ -220,6 +235,7 @@ class OrgToEntries(TextToEntries):
raw=entry_raw,
heading=entry_heading,
file=entry_to_file_map[parsed_entry],
uri=uri,
)
)

View File

@@ -58,7 +58,7 @@ def makelist_with_filepath(filename):
return makelist(f, filename)
def makelist(file, filename) -> List["Orgnode"]:
def makelist(file, filename, start_line: int = 1, ancestry_lines: int = 0) -> List["Orgnode"]:
"""
Read an org-mode file and return a list of Orgnode objects
created from this file.
@@ -66,7 +66,7 @@ def makelist(file, filename) -> List["Orgnode"]:
ctr = 0
if type(file) == str:
f = file.split("\n")
f = file.splitlines()
else:
f = file
@@ -114,14 +114,23 @@ def makelist(file, filename) -> List["Orgnode"]:
logbook = list()
thisNode.properties = property_map
nodelist.append(thisNode)
property_map = {"LINE": f"file:{normalize_filename(filename)}::{ctr}"}
# Account for ancestry lines that were prepended when calculating line numbers
if ancestry_lines > 0:
calculated_line = start_line + ctr - 1 - ancestry_lines
if calculated_line <= 0:
calculated_line = 1 # Fallback to line 1 if calculation results in invalid line number
else:
calculated_line = start_line + ctr - 1
if calculated_line <= 0:
calculated_line = ctr # Use the original behavior if start_line calculation fails
property_map = {"LINE": f"file://{normalize_filename(filename)}#line={calculated_line}"}
previous_level = level
previous_heading: str = heading
level = heading_search.group(1)
heading = heading_search.group(2)
bodytext = ""
tags = list() # set of all tags in headline
tag_search = re.search(r"(.*?)\s*:([a-zA-Z0-9].*?):$", heading)
tag_search = re.search(r"(.*?)\s+:([a-zA-Z0-9@_].*?):\s*$", heading)
if tag_search:
heading = tag_search.group(1)
parsedtags = tag_search.group(2)
@@ -260,14 +269,6 @@ def makelist(file, filename) -> List["Orgnode"]:
# Prefix filepath/title to ancestors
n.ancestors = [file_title] + n.ancestors
# Set SOURCE property to a file+heading based org-mode link to the entry
if n.level == 0:
n.properties["LINE"] = f"file:{normalize_filename(filename)}::0"
n.properties["SOURCE"] = f"[[file:{normalize_filename(filename)}]]"
else:
escaped_heading = n.heading.replace("[", "\\[").replace("]", "\\]")
n.properties["SOURCE"] = f"[[file:{normalize_filename(filename)}::*{escaped_heading}]]"
return nodelist
@@ -520,10 +521,11 @@ class Orgnode(object):
n = n + "\n"
# Output Property Drawer
n = n + indent + ":PROPERTIES:\n"
for key, value in self._properties.items():
n = n + indent + f":{key}: {value}\n"
n = n + indent + ":END:\n"
if self._properties:
n = n + indent + ":PROPERTIES:\n"
for key, value in self._properties.items():
n = n + indent + f":{key}: {value}\n"
n = n + indent + ":END:\n"
# Output Body
if self.hasBody:

View File

@@ -81,8 +81,35 @@ class TextToEntries(ABC):
chunked_entry_chunks = text_splitter.split_text(entry.compiled)
corpus_id = uuid.uuid4()
line_start = None
last_offset = 0
if entry.uri and entry.uri.startswith("file://"):
if "#line=" in entry.uri:
line_start = int(entry.uri.split("#line=", 1)[-1].split("&", 1)[0])
else:
line_start = 0
# Create heading prefixed entry from each chunk
for chunk_index, compiled_entry_chunk in enumerate(chunked_entry_chunks):
# set line start in uri of chunked entries
entry_uri = entry.uri
if line_start is not None:
# Find the chunk in the raw text to get an accurate line number.
# Search for the unmodified chunk from the last offset.
searchable_chunk = compiled_entry_chunk.strip()
if searchable_chunk:
chunk_start_pos_in_raw = entry.raw.find(searchable_chunk, last_offset)
if chunk_start_pos_in_raw != -1:
# Found the chunk. Calculate its line offset from the start of the raw text.
line_offset_in_raw = entry.raw[:chunk_start_pos_in_raw].count("\n")
new_line_num = line_start + line_offset_in_raw
entry_uri = re.sub(r"#line=\d+", f"#line={new_line_num}", entry.uri)
# Update search position for the next chunk to start after the current one.
last_offset = chunk_start_pos_in_raw + len(searchable_chunk)
else:
# Chunk not found in raw text, likely from a heading. Use original line_start.
entry_uri = re.sub(r"#line=\d+", f"#line={line_start}", entry.uri)
# Prepend heading to all other chunks, the first chunk already has heading from original entry
if chunk_index > 0 and entry.heading:
# Snip heading to avoid crossing max_tokens limit
@@ -99,6 +126,7 @@ class TextToEntries(ABC):
entry.raw = compiled_entry_chunk if raw_is_compiled else TextToEntries.clean_field(entry.raw)
entry.heading = TextToEntries.clean_field(entry.heading)
entry.file = TextToEntries.clean_field(entry.file)
entry_uri = TextToEntries.clean_field(entry_uri)
chunked_entries.append(
Entry(
@@ -107,6 +135,7 @@ class TextToEntries(ABC):
heading=entry.heading,
file=entry.file,
corpus_id=corpus_id,
uri=entry_uri,
)
)
@@ -192,6 +221,7 @@ class TextToEntries(ABC):
file_type=file_type,
hashed_value=entry_hash,
corpus_id=entry.corpus_id,
url=entry.uri,
search_model=model,
file_object=file_object,
)

View File

@@ -646,7 +646,7 @@ def generate_chatml_messages_with_context(
if not is_none_or_empty(chat.context):
references = "\n\n".join(
{f"# File: {item.file}\n## {item.compiled}\n" for item in chat.context or [] if isinstance(item, dict)}
{f"# URI: {item.uri}\n## {item.compiled}\n" for item in chat.context or [] if isinstance(item, dict)}
)
message_context += [{"type": "text", "text": f"{prompts.notes_conversation.format(references=references)}"}]

View File

@@ -1263,8 +1263,9 @@ async def search_documents(
compiled_references = [
{
"query": item.additional["query"],
"compiled": item.additional["compiled"],
"compiled": item["entry"],
"file": item.additional["file"],
"uri": item.additional["uri"],
}
for item in search_results
]
@@ -2867,6 +2868,7 @@ async def view_file_content(
{
"query": query,
"file": path,
"uri": path,
"compiled": filtered_text,
}
]
@@ -2878,7 +2880,7 @@ async def view_file_content(
logger.error(error_msg, exc_info=True)
# Return an error result in the expected format
yield [{"query": query, "file": path, "compiled": error_msg}]
yield [{"query": query, "file": path, "uri": path, "compiled": error_msg}]
async def grep_files(
@@ -2982,7 +2984,7 @@ async def grep_files(
max_results,
)
if not line_matches:
yield {"query": query, "file": path_prefix, "compiled": "No matches found."}
yield {"query": query, "file": path_prefix, "uri": path_prefix, "compiled": "No matches found."}
return
# Truncate matched lines list if too long
@@ -2991,7 +2993,7 @@ async def grep_files(
f"... {len(line_matches) - max_results} more results found. Use stricter regex or path to narrow down results."
]
yield {"query": query, "file": path_prefix or "", "compiled": "\n".join(line_matches)}
yield {"query": query, "file": path_prefix, "uri": path_prefix, "compiled": "\n".join(line_matches)}
except Exception as e:
error_msg = f"Error using grep files tool: {str(e)}"
@@ -3000,6 +3002,7 @@ async def grep_files(
{
"query": _generate_query(0, 0, path_prefix or "", regex_pattern, lines_before, lines_after),
"file": path_prefix,
"uri": path_prefix,
"compiled": error_msg,
}
]
@@ -3032,7 +3035,7 @@ async def list_files(
file_objects = await FileObjectAdapters.aget_file_objects_by_path_prefix(user, path)
if not file_objects:
yield {"query": _generate_query(0, path, pattern), "file": path, "compiled": "No files found."}
yield {"query": _generate_query(0, path, pattern), "file": path, "uri": path, "compiled": "No files found."}
return
# Extract file names from file objects
@@ -3047,7 +3050,7 @@ async def list_files(
query = _generate_query(len(files), path, pattern)
if not files:
yield {"query": query, "file": path, "compiled": "No files found."}
yield {"query": query, "file": path, "uri": path, "compiled": "No files found."}
return
# Truncate the list if it's too long
@@ -3057,9 +3060,9 @@ async def list_files(
f"... {len(files) - max_files} more files found. Use glob pattern to narrow down results."
]
yield {"query": query, "file": path, "compiled": "\n- ".join(files)}
yield {"query": query, "file": path, "uri": path, "compiled": "\n- ".join(files)}
except Exception as e:
error_msg = f"Error listing files in {path}: {str(e)}"
logger.error(error_msg, exc_info=True)
yield {"query": query, "file": path, "compiled": error_msg}
yield {"query": query, "file": path, "uri": path, "compiled": error_msg}

View File

@@ -157,6 +157,7 @@ def collate_results(hits, dedupe=True):
"additional": {
"source": hit.file_source,
"file": hit.file_path,
"uri": hit.url,
"compiled": hit.compiled,
"heading": hit.heading,
},
@@ -180,6 +181,7 @@ def deduplicated_search_responses(hits: List[SearchResponse]):
"additional": {
"source": hit.additional["source"],
"file": hit.additional["file"],
"uri": hit.additional["uri"],
"query": hit.additional["query"],
"compiled": hit.additional["compiled"],
"heading": hit.additional["heading"],

View File

@@ -176,6 +176,7 @@ class Entry:
compiled: str
heading: Optional[str]
file: Optional[str]
uri: Optional[str] = None
corpus_id: str
def __init__(
@@ -184,6 +185,7 @@ class Entry:
compiled: str = None,
heading: Optional[str] = None,
file: Optional[str] = None,
uri: Optional[str] = None,
corpus_id: uuid.UUID = None,
):
self.raw = raw
@@ -191,6 +193,14 @@ class Entry:
self.heading = heading
self.file = file
self.corpus_id = str(corpus_id)
if uri:
self.uri = uri
elif file and (file.startswith("http") or file.startswith("file://")):
self.uri = file
elif file:
self.uri = f"file://{file}"
else:
self.uri = None
def to_json(self) -> str:
return json.dumps(self.__dict__, ensure_ascii=False)
@@ -206,4 +216,5 @@ class Entry:
file=dictionary.get("file", None),
heading=dictionary.get("heading", None),
corpus_id=dictionary.get("corpus_id", None),
uri=dictionary.get("uri", None),
)

39
tests/data/markdown/main_readme.md vendored Normal file
View File

@@ -0,0 +1,39 @@
# Main Readme
> Allow natural language search, chat with your documents using transformer based models
This is a test markdown file with multiple, nested child entries.
## Dependencies
- Python3
- [Miniconda](https://docs.conda.io/en/latest/miniconda.html#latest-miniconda-installer-links)
## Installation
```bash
pip install khoj
```
## Run
Load ML model, generate embeddings and expose API to query specified org-mode files
```shell
python3 main.py --input-files ~/Notes/Schedule.org ~/Notes/Incoming.org --verbose
```
## Use
### **Khoj via API**
- Query: `GET` [http://localhost:42110/api/search?q="What is the meaning of life"](http://localhost:42110/api/search?q=%22what%20is%20the%20meaning%20of%20life%22)
- Update Index: `GET` [http://localhost:42110/api/update](http://localhost:42110/api/update)
- [Khoj API Docs](http://localhost:42110/docs)
### *Khoj via Web*
- Open browser to http://localhost:42110
- Enter query in search box
## Acknowledgments
- [MiniLM Model](https://huggingface.co/sentence-transformers/multi-qa-MiniLM-L6-cos-v1) for Asymmetric Text Search. See (SBert Documentation)[https://www.sbert.net/examples/applications/retrieve_rerank/README.html]
- [OpenAI CLIP Model](https://github.com/openai/CLIP) for Image Search. See [SBert Documentation](https://www.sbert.net/examples/applications/image-search/README.html)

View File

@@ -3,7 +3,7 @@
All data is processed locally. User can interface with khoj app via [[./interface/emacs/khoj.el][Emacs]], API or Commandline
** Dependencies
** Dependencies [[id:123-421-121-12]] :TAG1:@TAG1_1:
- Python3
- [[https://docs.conda.io/en/latest/miniconda.html#latest-miniconda-installer-links][Miniconda]]
@@ -22,7 +22,7 @@
#+end_src
** Use
*** *Khoj via Emacs*
*** *Khoj via Emacs* [[https://khoj.dev][link to khoj website]] :@EMACS:CLIENT_1:KHOJ:
- [[https://github.com/khoj-ai/khoj/tree/master/interface/emacs#installation][Install]] [[./interface/emacs/khoj.el][khoj.el]]
- Run ~M-x khoj <user-query>~ or Call ~C-c C-s~

View File

@@ -1,4 +1,5 @@
import os
import re
from pathlib import Path
from khoj.processor.content.markdown.markdown_to_entries import MarkdownToEntries
@@ -248,6 +249,58 @@ def test_get_markdown_files(tmp_path):
assert set(extracted_org_files.keys()) == expected_files
def test_line_number_tracking_in_recursive_split():
"Ensure line numbers in URIs are correct after recursive splitting by checking against the actual file."
# Arrange
markdown_file_path = os.path.abspath("tests/data/markdown/main_readme.md")
with open(markdown_file_path, "r") as f:
markdown_content = f.read()
lines = markdown_content.splitlines()
data = {markdown_file_path: markdown_content}
# Act
# Using a small max_tokens to force recursive splitting
_, entries = MarkdownToEntries.extract_markdown_entries(markdown_files=data, max_tokens=10)
# Assert
assert len(entries) > 0, "No entries were extracted."
for entry in entries:
# Extract file path and line number from the entry URI
# for files uri is expected in format: file:///path/to/file.md#line=5
match = re.search(r"file://(.*?)#line=(\d+)", entry.uri)
filepath_from_uri = match.group(1)
line_number_from_uri = int(match.group(2))
# line_number is 1-based, list index is 0-based
line_in_file = clean(lines[line_number_from_uri - 1])
next_line_in_file = clean(lines[line_number_from_uri]) if line_number_from_uri < len(lines) else ""
# Remove ancestor heading lines inserted during post-processing
first_entry_line = ""
for line in entry.raw.splitlines():
if line.startswith("#"):
first_entry_line = line
else:
break # Stop at the first non-heading line
# Remove heading prefix from entry.compiled as level changed during post-processing
cleaned_first_entry_line = first_entry_line.strip()
# Remove multiple consecutive spaces
cleaned_first_entry_line = clean(cleaned_first_entry_line)
assert entry.uri is not None, f"Entry '{entry}' has a None URI."
assert match is not None, f"URI format is incorrect: {entry.uri}"
assert (
filepath_from_uri == markdown_file_path
), f"File path in URI '{filepath_from_uri}' does not match expected '{markdown_file_path}'"
# Ensure the first non-heading line in the compiled entry matches the line in the file
assert (
cleaned_first_entry_line in line_in_file.strip() or cleaned_first_entry_line in next_line_in_file.strip()
), f"First non-heading line '{cleaned_first_entry_line}' in {entry.raw} does not match line {line_number_from_uri} in file: '{line_in_file}' or next line '{next_line_in_file}'"
# Helper Functions
def create_file(tmp_path: Path, entry=None, filename="test.md"):
markdown_file = tmp_path / filename
@@ -255,3 +308,8 @@ def create_file(tmp_path: Path, entry=None, filename="test.md"):
if entry:
markdown_file.write_text(entry)
return markdown_file
def clean(text):
"Normalize spaces in text for easier comparison."
return re.sub(r"\s+", " ", text)

View File

@@ -147,12 +147,10 @@ body line 1.1
# Extract Entries from specified Org files
extracted_entries = OrgToEntries.extract_org_entries(org_files=data, max_tokens=12)
assert len(extracted_entries) == 2
for entry in extracted_entries[1]:
entry.raw = clean(entry.raw)
# Assert
assert len(extracted_entries[1]) == 1
assert entry.raw == expected_entry
assert extracted_entries[1][-1].raw == expected_entry
def test_parse_org_entry_with_children_as_single_entry_if_small(tmp_path):
@@ -388,8 +386,6 @@ def test_extract_entries_with_different_level_headings(tmp_path):
# Extract Entries from specified Org files
entries = OrgToEntries.extract_org_entries(org_files=data, index_heading_entries=True, max_tokens=3)
assert len(entries) == 2
for entry in entries[1]:
entry.raw = clean(f"{entry.raw}")
# Assert
assert len(entries[1]) == 2
@@ -397,6 +393,60 @@ def test_extract_entries_with_different_level_headings(tmp_path):
assert entries[1][1].raw == "* Heading 2\n"
def test_line_number_tracking_in_recursive_split():
"Ensure line numbers in URIs are correct after recursive splitting by checking against the actual file."
# Arrange
org_file_path = os.path.abspath("tests/data/org/main_readme.org")
with open(org_file_path, "r") as f:
org_content = f.read()
lines = org_content.splitlines()
data = {org_file_path: org_content}
# Act
# Using a small max_tokens to force recursive splitting
_, entries = OrgToEntries.extract_org_entries(org_files=data, max_tokens=10, index_heading_entries=True)
# Assert
assert len(entries) > 0, "No entries were extracted."
for entry in entries:
# Extract file path and line number from the entry URI
# for files uri is expected in format: file:///path/to/file.org#line=5
match = re.search(r"file://(.*?)#line=(\d+)", entry.uri)
if not match:
continue
filepath_from_uri = match.group(1)
line_number_from_uri = int(match.group(2))
# line_number is 1-based, list index is 0-based
line_in_file = clean(lines[line_number_from_uri - 1])
next_line_in_file = clean(lines[line_number_from_uri]) if line_number_from_uri < len(lines) else ""
# Remove ancestor heading lines inserted during post-processing
first_entry_line = ""
for line in entry.raw.splitlines():
if line.startswith("*"):
first_entry_line = line
else:
break # Stop at the first non-heading line
# Remove heading prefix from entry.compiled as level changed during post-processing
cleaned_first_entry_line = first_entry_line.strip()
# Remove multiple consecutive spaces
cleaned_first_entry_line = clean(cleaned_first_entry_line)
assert entry.uri is not None, f"Entry '{entry}' has a None URI."
assert match is not None, f"URI format is incorrect: {entry.uri}"
assert (
filepath_from_uri == org_file_path
), f"File path in URI '{filepath_from_uri}' does not match expected '{org_file_path}'"
# Ensure the first non-heading line in the compiled entry matches the line in the file
assert (
cleaned_first_entry_line in line_in_file.strip() or cleaned_first_entry_line in next_line_in_file.strip()
), f"First non-heading line '{cleaned_first_entry_line}' in {entry.raw} does not match line {line_number_from_uri} in file: '{line_in_file}' or next line '{next_line_in_file}'"
# Helper Functions
def create_file(tmp_path, entry=None, filename="test.org"):
org_file = tmp_path / filename
@@ -406,6 +456,6 @@ def create_file(tmp_path, entry=None, filename="test.org"):
return org_file
def clean(entry):
"Remove properties from entry for easier comparison."
return re.sub(r"\n:PROPERTIES:(.*?):END:", "", entry, flags=re.DOTALL)
def clean(text):
"Normalize spaces in text for easier comparison."
return re.sub(r"\s+", " ", text)

View File

@@ -100,9 +100,8 @@ def test_render_entry_with_property_drawer_and_empty_body(tmp_path):
expected_entry = f"""*** [#A] Heading1 :tag1:
:PROPERTIES:
:LINE: file:{orgfile}::2
:LINE: file://{orgfile}#line=2
:ID: id:111-111-111-1111-1111
:SOURCE: [[file:{orgfile}::*Heading1]]
:END:
"""
@@ -133,37 +132,12 @@ Body Line 2
# Assert
# SOURCE link rendered with Heading
assert f":SOURCE: [[file:{orgfile}::*{entries[0].heading}]]" in f"{entries[0]}"
# ID link rendered with ID
assert f":ID: id:123-456-789-4234-1231" in f"{entries[0]}"
# LINE link rendered with line number
assert f":LINE: file:{orgfile}::2" in f"{entries[0]}"
# ----------------------------------------------------------------------------------------------------
def test_source_link_to_entry_escaped_for_rendering(tmp_path):
"Test SOURCE link renders with square brackets in filename, heading escaped for org-mode rendering"
# Arrange
entry = f"""
*** [#A] Heading[1] :tag1:
:PROPERTIES:
:ID: 123-456-789-4234-1231
:END:
Body Line 1"""
orgfile = create_file(tmp_path, entry, filename="test[1].org")
# Act
entries = orgnode.makelist_with_filepath(orgfile)
# Assert
assert len(entries) == 1
# parsed heading from entry
assert entries[0].heading == "Heading[1]"
# track ancestors of entry
assert entries[0].ancestors == [f"{orgfile}"]
# ensure SOURCE link has square brackets in filename, heading escaped in rendered entries
escaped_orgfile = f"{orgfile}".replace("[1]", "\\[1\\]")
assert f":SOURCE: [[file:{escaped_orgfile}::*Heading\\[1\\]" in f"{entries[0]}"
assert f":LINE: file://{orgfile}#line=2" in f"{entries[0]}"
# LINE link rendered with line number
assert f":LINE: file://{orgfile}#line=7" in f"{entries[1]}"
# ----------------------------------------------------------------------------------------------------