Standardize structure of text to entries to match other entry processors

Add process_single_plaintext_file func etc with similar signatures as
org_to_entries and markdown_to_entries processors

The standardization makes modifications, abstractions easier to create
This commit is contained in:
Debanjum Singh Solanky
2024-04-08 23:49:34 +05:30
parent 079f409238
commit 8291b898ca
3 changed files with 84 additions and 28 deletions

View File

@@ -1,7 +1,9 @@
import logging
import re
from pathlib import Path
from typing import List, Tuple
from typing import Dict, List, Tuple
import urllib3
from bs4 import BeautifulSoup
from khoj.database.models import Entry as DbEntry
@@ -28,21 +30,8 @@ class PlaintextToEntries(TextToEntries):
else:
deletion_file_names = None
with timer("Scrub plaintext files and extract text", logger):
for file in files:
try:
plaintext_content = files[file]
if file.endswith(("html", "htm", "xml")):
plaintext_content = PlaintextToEntries.extract_html_content(
plaintext_content, file.split(".")[-1]
)
files[file] = plaintext_content
except Exception as e:
logger.warning(f"Unable to read file: {file} as plaintext. Skipping file.")
logger.warning(e, exc_info=True)
# Extract Entries from specified plaintext files
with timer("Parse entries from specified Plaintext files", logger):
with timer("Extract entries from specified Plaintext files", logger):
current_entries = PlaintextToEntries.extract_plaintext_entries(files)
# Split entries by max tokens supported by model
@@ -74,16 +63,57 @@ class PlaintextToEntries(TextToEntries):
return soup.get_text(strip=True, separator="\n")
@staticmethod
def extract_plaintext_entries(entry_to_file_map: dict[str, str]) -> List[Entry]:
"Convert each plaintext entries into a dictionary"
entries = []
for file, entry in entry_to_file_map.items():
def extract_plaintext_entries(text_files: Dict[str, str]) -> List[Entry]:
entries: List[str] = []
entry_to_file_map: List[Tuple[str, str]] = []
for text_file in text_files:
try:
text_content = text_files[text_file]
entries, entry_to_file_map = PlaintextToEntries.process_single_plaintext_file(
text_content, text_file, entries, entry_to_file_map
)
except Exception as e:
logger.warning(f"Unable to read file: {text_file} as plaintext. Skipping file.")
logger.warning(e, exc_info=True)
# Extract Entries from specified plaintext files
return PlaintextToEntries.convert_text_files_to_entries(entries, dict(entry_to_file_map))
@staticmethod
def process_single_plaintext_file(
text_content: str,
text_file: str,
entries: List[str],
entry_to_file_map: List[Tuple[str, str]],
) -> Tuple[List[str], List[Tuple[str, str]]]:
if text_file.endswith(("html", "htm", "xml")):
text_content = PlaintextToEntries.extract_html_content(text_content, text_file.split(".")[-1])
entry_to_file_map += [(text_content, text_file)]
entries.extend([text_content])
return entries, entry_to_file_map
@staticmethod
def convert_text_files_to_entries(parsed_entries: List[str], entry_to_file_map: dict[str, str]) -> List[Entry]:
"Convert each plaintext file into an entry"
entries: List[Entry] = []
for parsed_entry in parsed_entries:
raw_filename = entry_to_file_map[parsed_entry]
# Check if raw_filename is a URL. If so, save it as is. If not, convert it to a Path.
if type(raw_filename) == str and re.search(r"^https?://", raw_filename):
# Escape the URL to avoid issues with special characters
entry_filename = urllib3.util.parse_url(raw_filename).url
else:
entry_filename = raw_filename
# Append base filename to compiled entry for context to model
entries.append(
Entry(
raw=entry,
file=file,
compiled=f"{Path(file).stem}\n{entry}",
heading=Path(file).stem,
raw=parsed_entry,
file=f"{entry_filename}",
compiled=f"{entry_filename}\n{parsed_entry}",
heading=entry_filename,
)
)
logger.debug(f"Converted {len(parsed_entries)} plaintext files to entries")
return entries