Use new Text Entry class to track text entries in Intermediate Format

- Context
  - The app maintains all text content in a standard, intermediate format
  - The intermediate format was loaded, passed around as a dictionary
    for easier, faster updates to the intermediate format schema initially
  - The intermediate format is reasonably stable now, given it's usage
    by all 3 text content types currently implemented

- Changes
  - Concretize text entries into `Entries' class instead of using dictionaries
    - Code is updated to load, pass around entries as `Entries' objects
      instead of as dictionaries
    - `text_search' and `text_to_jsonl' methods are annotated with
       type hints for the new `Entries' type
    - Code and Tests referencing entries are updated to use class style
      access patterns instead of the previous dictionary access patterns

  - Move `mark_entries_for_update' method into `TextToJsonl' base class
    - This is a more natural location for the method as it is only
      (to be) used by `text_to_jsonl' classes
    - Avoid circular reference issues on importing `Entries' class
This commit is contained in:
Debanjum Singh Solanky
2022-09-15 23:34:43 +03:00
parent 99754970ab
commit 7e9298f315
15 changed files with 161 additions and 131 deletions

View File

@@ -1,5 +1,4 @@
# Standard Packages
import json
import glob
import re
import logging
@@ -7,9 +6,10 @@ import time
# Internal Packages
from src.processor.text_to_jsonl import TextToJsonl
from src.utils.helpers import get_absolute_path, is_none_or_empty, mark_entries_for_update
from src.utils.helpers import get_absolute_path, is_none_or_empty
from src.utils.constants import empty_escape_sequences
from src.utils.jsonl import dump_jsonl, compress_jsonl_data
from src.utils.rawconfig import Entry
logger = logging.getLogger(__name__)
@@ -40,7 +40,7 @@ class BeancountToJsonl(TextToJsonl):
if not previous_entries:
entries_with_ids = list(enumerate(current_entries))
else:
entries_with_ids = mark_entries_for_update(current_entries, previous_entries, key='compiled', logger=logger)
entries_with_ids = self.mark_entries_for_update(current_entries, previous_entries, key='compiled', logger=logger)
end = time.time()
logger.debug(f"Identify new or updated transaction: {end - start} seconds")
@@ -111,17 +111,17 @@ class BeancountToJsonl(TextToJsonl):
return entries, dict(transaction_to_file_map)
@staticmethod
def convert_transactions_to_maps(entries: list[str], transaction_to_file_map) -> list[dict]:
"Convert each Beancount transaction into a dictionary"
entry_maps = []
for entry in entries:
entry_maps.append({'compiled': entry, 'raw': entry, 'file': f'{transaction_to_file_map[entry]}'})
def convert_transactions_to_maps(parsed_entries: list[str], transaction_to_file_map) -> list[Entry]:
"Convert each parsed Beancount transaction into a Entry"
entries = []
for parsed_entry in parsed_entries:
entries.append(Entry(compiled=parsed_entry, raw=parsed_entry, file=f'{transaction_to_file_map[parsed_entry]}'))
logger.info(f"Converted {len(entries)} transactions to dictionaries")
logger.info(f"Converted {len(parsed_entries)} transactions to dictionaries")
return entry_maps
return entries
@staticmethod
def convert_transaction_maps_to_jsonl(entries: list[dict]) -> str:
"Convert each Beancount transaction dictionary to JSON and collate as JSONL"
return ''.join([f'{json.dumps(entry_dict, ensure_ascii=False)}\n' for entry_dict in entries])
def convert_transaction_maps_to_jsonl(entries: list[Entry]) -> str:
"Convert each Beancount transaction entry to JSON and collate as JSONL"
return ''.join([f'{entry.to_json()}\n' for entry in entries])

View File

@@ -1,5 +1,4 @@
# Standard Packages
import json
import glob
import re
import logging
@@ -7,9 +6,10 @@ import time
# Internal Packages
from src.processor.text_to_jsonl import TextToJsonl
from src.utils.helpers import get_absolute_path, is_none_or_empty, mark_entries_for_update
from src.utils.helpers import get_absolute_path, is_none_or_empty
from src.utils.constants import empty_escape_sequences
from src.utils.jsonl import dump_jsonl, compress_jsonl_data
from src.utils.rawconfig import Entry
logger = logging.getLogger(__name__)
@@ -40,7 +40,7 @@ class MarkdownToJsonl(TextToJsonl):
if not previous_entries:
entries_with_ids = list(enumerate(current_entries))
else:
entries_with_ids = mark_entries_for_update(current_entries, previous_entries, key='compiled', logger=logger)
entries_with_ids = self.mark_entries_for_update(current_entries, previous_entries, key='compiled', logger=logger)
end = time.time()
logger.debug(f"Identify new or updated entries: {end - start} seconds")
@@ -110,17 +110,17 @@ class MarkdownToJsonl(TextToJsonl):
return entries, dict(entry_to_file_map)
@staticmethod
def convert_markdown_entries_to_maps(entries: list[str], entry_to_file_map) -> list[dict]:
def convert_markdown_entries_to_maps(parsed_entries: list[str], entry_to_file_map) -> list[Entry]:
"Convert each Markdown entries into a dictionary"
entry_maps = []
for entry in entries:
entry_maps.append({'compiled': entry, 'raw': entry, 'file': f'{entry_to_file_map[entry]}'})
entries = []
for parsed_entry in parsed_entries:
entries.append(Entry(compiled=parsed_entry, raw=parsed_entry, file=f'{entry_to_file_map[parsed_entry]}'))
logger.info(f"Converted {len(entries)} markdown entries to dictionaries")
logger.info(f"Converted {len(parsed_entries)} markdown entries to dictionaries")
return entry_maps
return entries
@staticmethod
def convert_markdown_maps_to_jsonl(entries):
"Convert each Markdown entries to JSON and collate as JSONL"
return ''.join([f'{json.dumps(entry_dict, ensure_ascii=False)}\n' for entry_dict in entries])
def convert_markdown_maps_to_jsonl(entries: list[Entry]):
"Convert each Markdown entry to JSON and collate as JSONL"
return ''.join([f'{entry.to_json()}\n' for entry in entries])

View File

@@ -1,5 +1,4 @@
# Standard Packages
import json
import glob
import logging
import time
@@ -8,8 +7,9 @@ from typing import Iterable
# Internal Packages
from src.processor.org_mode import orgnode
from src.processor.text_to_jsonl import TextToJsonl
from src.utils.helpers import get_absolute_path, is_none_or_empty, mark_entries_for_update
from src.utils.helpers import get_absolute_path, is_none_or_empty
from src.utils.jsonl import dump_jsonl, compress_jsonl_data
from src.utils.rawconfig import Entry
from src.utils import state
@@ -18,7 +18,7 @@ logger = logging.getLogger(__name__)
class OrgToJsonl(TextToJsonl):
# Define Functions
def process(self, previous_entries=None):
def process(self, previous_entries: list[Entry]=None):
# Extract required fields from config
org_files, org_file_filter, output_file = self.config.input_files, self.config.input_filter, self.config.compressed_jsonl
index_heading_entries = self.config.index_heading_entries
@@ -47,7 +47,7 @@ class OrgToJsonl(TextToJsonl):
if not previous_entries:
entries_with_ids = list(enumerate(current_entries))
else:
entries_with_ids = mark_entries_for_update(current_entries, previous_entries, key='compiled', logger=logger)
entries_with_ids = self.mark_entries_for_update(current_entries, previous_entries, key='compiled', logger=logger)
# Process Each Entry from All Notes Files
start = time.time()
@@ -104,51 +104,48 @@ class OrgToJsonl(TextToJsonl):
return entries, dict(entry_to_file_map)
@staticmethod
def convert_org_nodes_to_entries(entries: list[orgnode.Orgnode], entry_to_file_map, index_heading_entries=False) -> list[dict]:
"Convert Org-Mode entries into list of dictionary"
entry_maps = []
for entry in entries:
entry_dict = dict()
if not entry.hasBody and not index_heading_entries:
def convert_org_nodes_to_entries(parsed_entries: list[orgnode.Orgnode], entry_to_file_map, index_heading_entries=False) -> list[Entry]:
"Convert Org-Mode nodes into list of Entry objects"
entries: list[Entry] = []
for parsed_entry in parsed_entries:
if not parsed_entry.hasBody and not index_heading_entries:
# Ignore title notes i.e notes with just headings and empty body
continue
entry_dict["compiled"] = f'{entry.heading}.'
compiled = f'{parsed_entry.heading}.'
if state.verbose > 2:
logger.debug(f"Title: {entry.heading}")
logger.debug(f"Title: {parsed_entry.heading}")
if entry.tags:
tags_str = " ".join(entry.tags)
entry_dict["compiled"] += f'\t {tags_str}.'
if parsed_entry.tags:
tags_str = " ".join(parsed_entry.tags)
compiled += f'\t {tags_str}.'
if state.verbose > 2:
logger.debug(f"Tags: {tags_str}")
if entry.closed:
entry_dict["compiled"] += f'\n Closed on {entry.closed.strftime("%Y-%m-%d")}.'
if parsed_entry.closed:
compiled += f'\n Closed on {parsed_entry.closed.strftime("%Y-%m-%d")}.'
if state.verbose > 2:
logger.debug(f'Closed: {entry.closed.strftime("%Y-%m-%d")}')
logger.debug(f'Closed: {parsed_entry.closed.strftime("%Y-%m-%d")}')
if entry.scheduled:
entry_dict["compiled"] += f'\n Scheduled for {entry.scheduled.strftime("%Y-%m-%d")}.'
if parsed_entry.scheduled:
compiled += f'\n Scheduled for {parsed_entry.scheduled.strftime("%Y-%m-%d")}.'
if state.verbose > 2:
logger.debug(f'Scheduled: {entry.scheduled.strftime("%Y-%m-%d")}')
logger.debug(f'Scheduled: {parsed_entry.scheduled.strftime("%Y-%m-%d")}')
if entry.hasBody:
entry_dict["compiled"] += f'\n {entry.body}'
if parsed_entry.hasBody:
compiled += f'\n {parsed_entry.body}'
if state.verbose > 2:
logger.debug(f"Body: {entry.body}")
logger.debug(f"Body: {parsed_entry.body}")
if entry_dict:
entry_dict["raw"] = f'{entry}'
entry_dict["file"] = f'{entry_to_file_map[entry]}'
if compiled:
entries += [Entry(
compiled=compiled,
raw=f'{parsed_entry}',
file=f'{entry_to_file_map[parsed_entry]}')]
# Convert Dictionary to JSON and Append to JSONL string
entry_maps.append(entry_dict)
return entry_maps
return entries
@staticmethod
def convert_org_entries_to_jsonl(entries: Iterable[dict]) -> str:
def convert_org_entries_to_jsonl(entries: Iterable[Entry]) -> str:
"Convert each Org-Mode entry to JSON and collate as JSONL"
return ''.join([f'{json.dumps(entry_dict, ensure_ascii=False)}\n' for entry_dict in entries])
return ''.join([f'{entry_dict.to_json()}\n' for entry_dict in entries])

View File

@@ -1,9 +1,14 @@
# Standard Packages
from abc import ABC, abstractmethod
from typing import Iterable
import hashlib
import time
import logging
# Internal Packages
from src.utils.rawconfig import TextContentConfig
from src.utils.rawconfig import Entry, TextContentConfig
logger = logging.getLogger(__name__)
class TextToJsonl(ABC):
@@ -11,4 +16,39 @@ class TextToJsonl(ABC):
self.config = config
@abstractmethod
def process(self, previous_entries: Iterable[tuple[int, dict]]=None) -> list[tuple[int, dict]]: ...
def process(self, previous_entries: list[Entry]=None) -> list[tuple[int, Entry]]: ...
def mark_entries_for_update(self, current_entries: list[Entry], previous_entries: list[Entry], key='compiled', logger=None) -> list[tuple[int, Entry]]:
# Hash all current and previous entries to identify new entries
start = time.time()
current_entry_hashes = list(map(lambda e: hashlib.md5(bytes(getattr(e, key), encoding='utf-8')).hexdigest(), current_entries))
previous_entry_hashes = list(map(lambda e: hashlib.md5(bytes(getattr(e, key), encoding='utf-8')).hexdigest(), previous_entries))
end = time.time()
logger.debug(f"Hash previous, current entries: {end - start} seconds")
start = time.time()
hash_to_current_entries = dict(zip(current_entry_hashes, current_entries))
hash_to_previous_entries = dict(zip(previous_entry_hashes, previous_entries))
# All entries that did not exist in the previous set are to be added
new_entry_hashes = set(current_entry_hashes) - set(previous_entry_hashes)
# All entries that exist in both current and previous sets are kept
existing_entry_hashes = set(current_entry_hashes) & set(previous_entry_hashes)
# Mark new entries with -1 id to flag for later embeddings generation
new_entries = [
(-1, hash_to_current_entries[entry_hash])
for entry_hash in new_entry_hashes
]
# Set id of existing entries to their previous ids to reuse their existing encoded embeddings
existing_entries = [
(previous_entry_hashes.index(entry_hash), hash_to_previous_entries[entry_hash])
for entry_hash in existing_entry_hashes
]
existing_entries_sorted = sorted(existing_entries, key=lambda e: e[0])
entries_with_ids = existing_entries_sorted + new_entries
end = time.time()
logger.debug(f"Identify, Mark, Combine new, existing entries: {end - start} seconds")
return entries_with_ids