Use Base TextToJsonl class to standardize <text>_to_jsonl processors

- Start standardizing implementation of the `text_to_jsonl' processors
  - `text_to_jsonl; scripts already had a shared structure
  - This change starts to codify that implicit structure

- Benefits
  - Ease adding more `text_to_jsonl; processors
  - Allow merging shared functionality
  - Help with type hinting

- Drawbacks
  - Lower agility to change. But this was already an implicit issue as
    the text_to_jsonl processors got more deeply wired into the app
This commit is contained in:
Debanjum Singh Solanky
2022-09-14 10:53:43 +03:00
parent c16ae9e344
commit 02d944030f
12 changed files with 364 additions and 345 deletions

View File

@@ -1,5 +1,3 @@
#!/usr/bin/env python3
# Standard Packages
import json
import glob
@@ -8,121 +6,122 @@ import logging
import time
# Internal Packages
from src.processor.text_to_jsonl import TextToJsonl
from src.utils.helpers import get_absolute_path, is_none_or_empty, mark_entries_for_update
from src.utils.constants import empty_escape_sequences
from src.utils.jsonl import dump_jsonl, compress_jsonl_data
from src.utils.rawconfig import TextContentConfig
logger = logging.getLogger(__name__)
# Define Functions
def beancount_to_jsonl(config: TextContentConfig, previous_entries=None):
# Extract required fields from config
beancount_files, beancount_file_filter, output_file = config.input_files, config.input_filter, config.compressed_jsonl
class BeancountToJsonl(TextToJsonl):
# Define Functions
def process(self, previous_entries=None):
# Extract required fields from config
beancount_files, beancount_file_filter, output_file = self.config.input_files, self.config.input_filter,self.config.compressed_jsonl
# Input Validation
if is_none_or_empty(beancount_files) and is_none_or_empty(beancount_file_filter):
print("At least one of beancount-files or beancount-file-filter is required to be specified")
exit(1)
# Input Validation
if is_none_or_empty(beancount_files) and is_none_or_empty(beancount_file_filter):
print("At least one of beancount-files or beancount-file-filter is required to be specified")
exit(1)
# Get Beancount Files to Process
beancount_files = get_beancount_files(beancount_files, beancount_file_filter)
# Get Beancount Files to Process
beancount_files = BeancountToJsonl.get_beancount_files(beancount_files, beancount_file_filter)
# Extract Entries from specified Beancount files
start = time.time()
current_entries = convert_transactions_to_maps(*extract_beancount_transactions(beancount_files))
end = time.time()
logger.debug(f"Parse transactions from Beancount files into dictionaries: {end - start} seconds")
# Extract Entries from specified Beancount files
start = time.time()
current_entries = BeancountToJsonl.convert_transactions_to_maps(*BeancountToJsonl.extract_beancount_transactions(beancount_files))
end = time.time()
logger.debug(f"Parse transactions from Beancount files into dictionaries: {end - start} seconds")
# Identify, mark and merge any new entries with previous entries
start = time.time()
if not previous_entries:
entries_with_ids = list(enumerate(current_entries))
else:
entries_with_ids = mark_entries_for_update(current_entries, previous_entries, key='compiled', logger=logger)
end = time.time()
logger.debug(f"Identify new or updated transaction: {end - start} seconds")
# Identify, mark and merge any new entries with previous entries
start = time.time()
if not previous_entries:
entries_with_ids = list(enumerate(current_entries))
else:
entries_with_ids = mark_entries_for_update(current_entries, previous_entries, key='compiled', logger=logger)
end = time.time()
logger.debug(f"Identify new or updated transaction: {end - start} seconds")
# Process Each Entry from All Notes Files
start = time.time()
entries = list(map(lambda entry: entry[1], entries_with_ids))
jsonl_data = convert_transaction_maps_to_jsonl(entries)
# Process Each Entry from All Notes Files
start = time.time()
entries = list(map(lambda entry: entry[1], entries_with_ids))
jsonl_data = BeancountToJsonl.convert_transaction_maps_to_jsonl(entries)
# Compress JSONL formatted Data
if output_file.suffix == ".gz":
compress_jsonl_data(jsonl_data, output_file)
elif output_file.suffix == ".jsonl":
dump_jsonl(jsonl_data, output_file)
end = time.time()
logger.debug(f"Write transactions to JSONL file: {end - start} seconds")
# Compress JSONL formatted Data
if output_file.suffix == ".gz":
compress_jsonl_data(jsonl_data, output_file)
elif output_file.suffix == ".jsonl":
dump_jsonl(jsonl_data, output_file)
end = time.time()
logger.debug(f"Write transactions to JSONL file: {end - start} seconds")
return entries_with_ids
return entries_with_ids
@staticmethod
def get_beancount_files(beancount_files=None, beancount_file_filters=None):
"Get Beancount files to process"
absolute_beancount_files, filtered_beancount_files = set(), set()
if beancount_files:
absolute_beancount_files = {get_absolute_path(beancount_file)
for beancount_file
in beancount_files}
if beancount_file_filters:
filtered_beancount_files = {
filtered_file
for beancount_file_filter in beancount_file_filters
for filtered_file in glob.glob(get_absolute_path(beancount_file_filter))
}
def get_beancount_files(beancount_files=None, beancount_file_filters=None):
"Get Beancount files to process"
absolute_beancount_files, filtered_beancount_files = set(), set()
if beancount_files:
absolute_beancount_files = {get_absolute_path(beancount_file)
for beancount_file
in beancount_files}
if beancount_file_filters:
filtered_beancount_files = {
filtered_file
for beancount_file_filter in beancount_file_filters
for filtered_file in glob.glob(get_absolute_path(beancount_file_filter))
all_beancount_files = sorted(absolute_beancount_files | filtered_beancount_files)
files_with_non_beancount_extensions = {
beancount_file
for beancount_file
in all_beancount_files
if not beancount_file.endswith(".bean") and not beancount_file.endswith(".beancount")
}
if any(files_with_non_beancount_extensions):
print(f"[Warning] There maybe non beancount files in the input set: {files_with_non_beancount_extensions}")
all_beancount_files = sorted(absolute_beancount_files | filtered_beancount_files)
logger.info(f'Processing files: {all_beancount_files}')
files_with_non_beancount_extensions = {
beancount_file
for beancount_file
in all_beancount_files
if not beancount_file.endswith(".bean") and not beancount_file.endswith(".beancount")
}
if any(files_with_non_beancount_extensions):
print(f"[Warning] There maybe non beancount files in the input set: {files_with_non_beancount_extensions}")
return all_beancount_files
logger.info(f'Processing files: {all_beancount_files}')
@staticmethod
def extract_beancount_transactions(beancount_files):
"Extract entries from specified Beancount files"
return all_beancount_files
# Initialize Regex for extracting Beancount Entries
transaction_regex = r'^\n?\d{4}-\d{2}-\d{2} [\*|\!] '
empty_newline = f'^[\n\r\t\ ]*$'
entries = []
transaction_to_file_map = []
for beancount_file in beancount_files:
with open(beancount_file) as f:
ledger_content = f.read()
transactions_per_file = [entry.strip(empty_escape_sequences)
for entry
in re.split(empty_newline, ledger_content, flags=re.MULTILINE)
if re.match(transaction_regex, entry)]
transaction_to_file_map += zip(transactions_per_file, [beancount_file]*len(transactions_per_file))
entries.extend(transactions_per_file)
return entries, dict(transaction_to_file_map)
def extract_beancount_transactions(beancount_files):
"Extract entries from specified Beancount files"
@staticmethod
def convert_transactions_to_maps(entries: list[str], transaction_to_file_map) -> list[dict]:
"Convert each Beancount transaction into a dictionary"
entry_maps = []
for entry in entries:
entry_maps.append({'compiled': entry, 'raw': entry, 'file': f'{transaction_to_file_map[entry]}'})
# Initialize Regex for extracting Beancount Entries
transaction_regex = r'^\n?\d{4}-\d{2}-\d{2} [\*|\!] '
empty_newline = f'^[\n\r\t\ ]*$'
logger.info(f"Converted {len(entries)} transactions to dictionaries")
entries = []
transaction_to_file_map = []
for beancount_file in beancount_files:
with open(beancount_file) as f:
ledger_content = f.read()
transactions_per_file = [entry.strip(empty_escape_sequences)
for entry
in re.split(empty_newline, ledger_content, flags=re.MULTILINE)
if re.match(transaction_regex, entry)]
transaction_to_file_map += zip(transactions_per_file, [beancount_file]*len(transactions_per_file))
entries.extend(transactions_per_file)
return entries, dict(transaction_to_file_map)
return entry_maps
def convert_transactions_to_maps(entries: list[str], transaction_to_file_map) -> list[dict]:
"Convert each Beancount transaction into a dictionary"
entry_maps = []
for entry in entries:
entry_maps.append({'compiled': entry, 'raw': entry, 'file': f'{transaction_to_file_map[entry]}'})
logger.info(f"Converted {len(entries)} transactions to dictionaries")
return entry_maps
def convert_transaction_maps_to_jsonl(entries: list[dict]) -> str:
"Convert each Beancount transaction dictionary to JSON and collate as JSONL"
return ''.join([f'{json.dumps(entry_dict, ensure_ascii=False)}\n' for entry_dict in entries])
@staticmethod
def convert_transaction_maps_to_jsonl(entries: list[dict]) -> str:
"Convert each Beancount transaction dictionary to JSON and collate as JSONL"
return ''.join([f'{json.dumps(entry_dict, ensure_ascii=False)}\n' for entry_dict in entries])

View File

@@ -1,5 +1,3 @@
#!/usr/bin/env python3
# Standard Packages
import json
import glob
@@ -8,120 +6,121 @@ import logging
import time
# Internal Packages
from src.processor.text_to_jsonl import TextToJsonl
from src.utils.helpers import get_absolute_path, is_none_or_empty, mark_entries_for_update
from src.utils.constants import empty_escape_sequences
from src.utils.jsonl import dump_jsonl, compress_jsonl_data
from src.utils.rawconfig import TextContentConfig
logger = logging.getLogger(__name__)
# Define Functions
def markdown_to_jsonl(config: TextContentConfig, previous_entries=None):
# Extract required fields from config
markdown_files, markdown_file_filter, output_file = config.input_files, config.input_filter, config.compressed_jsonl
class MarkdownToJsonl(TextToJsonl):
# Define Functions
def process(self, previous_entries=None):
# Extract required fields from config
markdown_files, markdown_file_filter, output_file = self.config.input_files, self.config.input_filter, self.config.compressed_jsonl
# Input Validation
if is_none_or_empty(markdown_files) and is_none_or_empty(markdown_file_filter):
print("At least one of markdown-files or markdown-file-filter is required to be specified")
exit(1)
# Input Validation
if is_none_or_empty(markdown_files) and is_none_or_empty(markdown_file_filter):
print("At least one of markdown-files or markdown-file-filter is required to be specified")
exit(1)
# Get Markdown Files to Process
markdown_files = get_markdown_files(markdown_files, markdown_file_filter)
# Get Markdown Files to Process
markdown_files = MarkdownToJsonl.get_markdown_files(markdown_files, markdown_file_filter)
# Extract Entries from specified Markdown files
start = time.time()
current_entries = convert_markdown_entries_to_maps(*extract_markdown_entries(markdown_files))
end = time.time()
logger.debug(f"Parse entries from Markdown files into dictionaries: {end - start} seconds")
# Extract Entries from specified Markdown files
start = time.time()
current_entries = MarkdownToJsonl.convert_markdown_entries_to_maps(*MarkdownToJsonl.extract_markdown_entries(markdown_files))
end = time.time()
logger.debug(f"Parse entries from Markdown files into dictionaries: {end - start} seconds")
# Identify, mark and merge any new entries with previous entries
start = time.time()
if not previous_entries:
entries_with_ids = list(enumerate(current_entries))
else:
entries_with_ids = mark_entries_for_update(current_entries, previous_entries, key='compiled', logger=logger)
end = time.time()
logger.debug(f"Identify new or updated entries: {end - start} seconds")
# Identify, mark and merge any new entries with previous entries
start = time.time()
if not previous_entries:
entries_with_ids = list(enumerate(current_entries))
else:
entries_with_ids = mark_entries_for_update(current_entries, previous_entries, key='compiled', logger=logger)
end = time.time()
logger.debug(f"Identify new or updated entries: {end - start} seconds")
# Process Each Entry from All Notes Files
start = time.time()
entries = list(map(lambda entry: entry[1], entries_with_ids))
jsonl_data = convert_markdown_maps_to_jsonl(entries)
# Process Each Entry from All Notes Files
start = time.time()
entries = list(map(lambda entry: entry[1], entries_with_ids))
jsonl_data = MarkdownToJsonl.convert_markdown_maps_to_jsonl(entries)
# Compress JSONL formatted Data
if output_file.suffix == ".gz":
compress_jsonl_data(jsonl_data, output_file)
elif output_file.suffix == ".jsonl":
dump_jsonl(jsonl_data, output_file)
end = time.time()
logger.debug(f"Write markdown entries to JSONL file: {end - start} seconds")
# Compress JSONL formatted Data
if output_file.suffix == ".gz":
compress_jsonl_data(jsonl_data, output_file)
elif output_file.suffix == ".jsonl":
dump_jsonl(jsonl_data, output_file)
end = time.time()
logger.debug(f"Write markdown entries to JSONL file: {end - start} seconds")
return entries_with_ids
return entries_with_ids
@staticmethod
def get_markdown_files(markdown_files=None, markdown_file_filters=None):
"Get Markdown files to process"
absolute_markdown_files, filtered_markdown_files = set(), set()
if markdown_files:
absolute_markdown_files = {get_absolute_path(markdown_file) for markdown_file in markdown_files}
if markdown_file_filters:
filtered_markdown_files = {
filtered_file
for markdown_file_filter in markdown_file_filters
for filtered_file in glob.glob(get_absolute_path(markdown_file_filter))
}
def get_markdown_files(markdown_files=None, markdown_file_filters=None):
"Get Markdown files to process"
absolute_markdown_files, filtered_markdown_files = set(), set()
if markdown_files:
absolute_markdown_files = {get_absolute_path(markdown_file) for markdown_file in markdown_files}
if markdown_file_filters:
filtered_markdown_files = {
filtered_file
for markdown_file_filter in markdown_file_filters
for filtered_file in glob.glob(get_absolute_path(markdown_file_filter))
all_markdown_files = sorted(absolute_markdown_files | filtered_markdown_files)
files_with_non_markdown_extensions = {
md_file
for md_file
in all_markdown_files
if not md_file.endswith(".md") and not md_file.endswith('.markdown')
}
all_markdown_files = sorted(absolute_markdown_files | filtered_markdown_files)
if any(files_with_non_markdown_extensions):
logger.warn(f"[Warning] There maybe non markdown-mode files in the input set: {files_with_non_markdown_extensions}")
files_with_non_markdown_extensions = {
md_file
for md_file
in all_markdown_files
if not md_file.endswith(".md") and not md_file.endswith('.markdown')
}
logger.info(f'Processing files: {all_markdown_files}')
if any(files_with_non_markdown_extensions):
logger.warn(f"[Warning] There maybe non markdown-mode files in the input set: {files_with_non_markdown_extensions}")
return all_markdown_files
logger.info(f'Processing files: {all_markdown_files}')
@staticmethod
def extract_markdown_entries(markdown_files):
"Extract entries by heading from specified Markdown files"
return all_markdown_files
# Regex to extract Markdown Entries by Heading
markdown_heading_regex = r'^#'
entries = []
entry_to_file_map = []
for markdown_file in markdown_files:
with open(markdown_file) as f:
markdown_content = f.read()
markdown_entries_per_file = [f'#{entry.strip(empty_escape_sequences)}'
for entry
in re.split(markdown_heading_regex, markdown_content, flags=re.MULTILINE)
if entry.strip(empty_escape_sequences) != '']
entry_to_file_map += zip(markdown_entries_per_file, [markdown_file]*len(markdown_entries_per_file))
entries.extend(markdown_entries_per_file)
def extract_markdown_entries(markdown_files):
"Extract entries by heading from specified Markdown files"
return entries, dict(entry_to_file_map)
# Regex to extract Markdown Entries by Heading
markdown_heading_regex = r'^#'
@staticmethod
def convert_markdown_entries_to_maps(entries: list[str], entry_to_file_map) -> list[dict]:
"Convert each Markdown entries into a dictionary"
entry_maps = []
for entry in entries:
entry_maps.append({'compiled': entry, 'raw': entry, 'file': f'{entry_to_file_map[entry]}'})
entries = []
entry_to_file_map = []
for markdown_file in markdown_files:
with open(markdown_file) as f:
markdown_content = f.read()
markdown_entries_per_file = [f'#{entry.strip(empty_escape_sequences)}'
for entry
in re.split(markdown_heading_regex, markdown_content, flags=re.MULTILINE)
if entry.strip(empty_escape_sequences) != '']
entry_to_file_map += zip(markdown_entries_per_file, [markdown_file]*len(markdown_entries_per_file))
entries.extend(markdown_entries_per_file)
logger.info(f"Converted {len(entries)} markdown entries to dictionaries")
return entries, dict(entry_to_file_map)
return entry_maps
def convert_markdown_entries_to_maps(entries: list[str], entry_to_file_map) -> list[dict]:
"Convert each Markdown entries into a dictionary"
entry_maps = []
for entry in entries:
entry_maps.append({'compiled': entry, 'raw': entry, 'file': f'{entry_to_file_map[entry]}'})
logger.info(f"Converted {len(entries)} markdown entries to dictionaries")
return entry_maps
def convert_markdown_maps_to_jsonl(entries):
"Convert each Markdown entries to JSON and collate as JSONL"
return ''.join([f'{json.dumps(entry_dict, ensure_ascii=False)}\n' for entry_dict in entries])
@staticmethod
def convert_markdown_maps_to_jsonl(entries):
"Convert each Markdown entries to JSON and collate as JSONL"
return ''.join([f'{json.dumps(entry_dict, ensure_ascii=False)}\n' for entry_dict in entries])

View File

@@ -1,5 +1,3 @@
#!/usr/bin/env python3
# Standard Packages
import json
import glob
@@ -9,147 +7,148 @@ from typing import Iterable
# Internal Packages
from src.processor.org_mode import orgnode
from src.processor.text_to_jsonl import TextToJsonl
from src.utils.helpers import get_absolute_path, is_none_or_empty, mark_entries_for_update
from src.utils.jsonl import dump_jsonl, compress_jsonl_data
from src.utils import state
from src.utils.rawconfig import TextContentConfig
logger = logging.getLogger(__name__)
# Define Functions
def org_to_jsonl(config: TextContentConfig, previous_entries=None):
# Extract required fields from config
org_files, org_file_filter, output_file = config.input_files, config.input_filter, config.compressed_jsonl
index_heading_entries = config.index_heading_entries
class OrgToJsonl(TextToJsonl):
# Define Functions
def process(self, previous_entries=None):
# Extract required fields from config
org_files, org_file_filter, output_file = self.config.input_files, self.config.input_filter, self.config.compressed_jsonl
index_heading_entries = self.config.index_heading_entries
# Input Validation
if is_none_or_empty(org_files) and is_none_or_empty(org_file_filter):
print("At least one of org-files or org-file-filter is required to be specified")
exit(1)
# Input Validation
if is_none_or_empty(org_files) and is_none_or_empty(org_file_filter):
print("At least one of org-files or org-file-filter is required to be specified")
exit(1)
# Get Org Files to Process
start = time.time()
org_files = get_org_files(org_files, org_file_filter)
# Get Org Files to Process
start = time.time()
org_files = OrgToJsonl.get_org_files(org_files, org_file_filter)
# Extract Entries from specified Org files
start = time.time()
entry_nodes, file_to_entries = extract_org_entries(org_files)
end = time.time()
logger.debug(f"Parse entries from org files into OrgNode objects: {end - start} seconds")
# Extract Entries from specified Org files
start = time.time()
entry_nodes, file_to_entries = self.extract_org_entries(org_files)
end = time.time()
logger.debug(f"Parse entries from org files into OrgNode objects: {end - start} seconds")
start = time.time()
current_entries = convert_org_nodes_to_entries(entry_nodes, file_to_entries, index_heading_entries)
end = time.time()
logger.debug(f"Convert OrgNodes into entry dictionaries: {end - start} seconds")
start = time.time()
current_entries = self.convert_org_nodes_to_entries(entry_nodes, file_to_entries, index_heading_entries)
end = time.time()
logger.debug(f"Convert OrgNodes into entry dictionaries: {end - start} seconds")
# Identify, mark and merge any new entries with previous entries
if not previous_entries:
entries_with_ids = list(enumerate(current_entries))
else:
entries_with_ids = mark_entries_for_update(current_entries, previous_entries, key='compiled', logger=logger)
# Identify, mark and merge any new entries with previous entries
if not previous_entries:
entries_with_ids = list(enumerate(current_entries))
else:
entries_with_ids = mark_entries_for_update(current_entries, previous_entries, key='compiled', logger=logger)
# Process Each Entry from All Notes Files
start = time.time()
entries = map(lambda entry: entry[1], entries_with_ids)
jsonl_data = convert_org_entries_to_jsonl(entries)
# Process Each Entry from All Notes Files
start = time.time()
entries = map(lambda entry: entry[1], entries_with_ids)
jsonl_data = self.convert_org_entries_to_jsonl(entries)
# Compress JSONL formatted Data
if output_file.suffix == ".gz":
compress_jsonl_data(jsonl_data, output_file)
elif output_file.suffix == ".jsonl":
dump_jsonl(jsonl_data, output_file)
end = time.time()
logger.debug(f"Write org entries to JSONL file: {end - start} seconds")
# Compress JSONL formatted Data
if output_file.suffix == ".gz":
compress_jsonl_data(jsonl_data, output_file)
elif output_file.suffix == ".jsonl":
dump_jsonl(jsonl_data, output_file)
end = time.time()
logger.debug(f"Write org entries to JSONL file: {end - start} seconds")
return entries_with_ids
return entries_with_ids
@staticmethod
def get_org_files(org_files=None, org_file_filters=None):
"Get Org files to process"
absolute_org_files, filtered_org_files = set(), set()
if org_files:
absolute_org_files = {
get_absolute_path(org_file)
for org_file
in org_files
}
if org_file_filters:
filtered_org_files = {
filtered_file
for org_file_filter in org_file_filters
for filtered_file in glob.glob(get_absolute_path(org_file_filter))
}
def get_org_files(org_files=None, org_file_filters=None):
"Get Org files to process"
absolute_org_files, filtered_org_files = set(), set()
if org_files:
absolute_org_files = {
get_absolute_path(org_file)
for org_file
in org_files
}
if org_file_filters:
filtered_org_files = {
filtered_file
for org_file_filter in org_file_filters
for filtered_file in glob.glob(get_absolute_path(org_file_filter))
}
all_org_files = sorted(absolute_org_files | filtered_org_files)
all_org_files = sorted(absolute_org_files | filtered_org_files)
files_with_non_org_extensions = {org_file for org_file in all_org_files if not org_file.endswith(".org")}
if any(files_with_non_org_extensions):
logger.warn(f"There maybe non org-mode files in the input set: {files_with_non_org_extensions}")
files_with_non_org_extensions = {org_file for org_file in all_org_files if not org_file.endswith(".org")}
if any(files_with_non_org_extensions):
logger.warn(f"There maybe non org-mode files in the input set: {files_with_non_org_extensions}")
logger.info(f'Processing files: {all_org_files}')
logger.info(f'Processing files: {all_org_files}')
return all_org_files
return all_org_files
@staticmethod
def extract_org_entries(org_files):
"Extract entries from specified Org files"
entries = []
entry_to_file_map = []
for org_file in org_files:
org_file_entries = orgnode.makelist(str(org_file))
entry_to_file_map += zip(org_file_entries, [org_file]*len(org_file_entries))
entries.extend(org_file_entries)
return entries, dict(entry_to_file_map)
def extract_org_entries(org_files):
"Extract entries from specified Org files"
entries = []
entry_to_file_map = []
for org_file in org_files:
org_file_entries = orgnode.makelist(str(org_file))
entry_to_file_map += zip(org_file_entries, [org_file]*len(org_file_entries))
entries.extend(org_file_entries)
@staticmethod
def convert_org_nodes_to_entries(entries: list[orgnode.Orgnode], entry_to_file_map, index_heading_entries=False) -> list[dict]:
"Convert Org-Mode entries into list of dictionary"
entry_maps = []
for entry in entries:
entry_dict = dict()
return entries, dict(entry_to_file_map)
if not entry.hasBody and not index_heading_entries:
# Ignore title notes i.e notes with just headings and empty body
continue
def convert_org_nodes_to_entries(entries: list[orgnode.Orgnode], entry_to_file_map, index_heading_entries=False) -> list[dict]:
"Convert Org-Mode entries into list of dictionary"
entry_maps = []
for entry in entries:
entry_dict = dict()
if not entry.hasBody and not index_heading_entries:
# Ignore title notes i.e notes with just headings and empty body
continue
entry_dict["compiled"] = f'{entry.heading}.'
if state.verbose > 2:
logger.debug(f"Title: {entry.heading}")
if entry.tags:
tags_str = " ".join(entry.tags)
entry_dict["compiled"] += f'\t {tags_str}.'
entry_dict["compiled"] = f'{entry.heading}.'
if state.verbose > 2:
logger.debug(f"Tags: {tags_str}")
logger.debug(f"Title: {entry.heading}")
if entry.closed:
entry_dict["compiled"] += f'\n Closed on {entry.closed.strftime("%Y-%m-%d")}.'
if state.verbose > 2:
logger.debug(f'Closed: {entry.closed.strftime("%Y-%m-%d")}')
if entry.tags:
tags_str = " ".join(entry.tags)
entry_dict["compiled"] += f'\t {tags_str}.'
if state.verbose > 2:
logger.debug(f"Tags: {tags_str}")
if entry.scheduled:
entry_dict["compiled"] += f'\n Scheduled for {entry.scheduled.strftime("%Y-%m-%d")}.'
if state.verbose > 2:
logger.debug(f'Scheduled: {entry.scheduled.strftime("%Y-%m-%d")}')
if entry.closed:
entry_dict["compiled"] += f'\n Closed on {entry.closed.strftime("%Y-%m-%d")}.'
if state.verbose > 2:
logger.debug(f'Closed: {entry.closed.strftime("%Y-%m-%d")}')
if entry.hasBody:
entry_dict["compiled"] += f'\n {entry.body}'
if state.verbose > 2:
logger.debug(f"Body: {entry.body}")
if entry.scheduled:
entry_dict["compiled"] += f'\n Scheduled for {entry.scheduled.strftime("%Y-%m-%d")}.'
if state.verbose > 2:
logger.debug(f'Scheduled: {entry.scheduled.strftime("%Y-%m-%d")}')
if entry_dict:
entry_dict["raw"] = f'{entry}'
entry_dict["file"] = f'{entry_to_file_map[entry]}'
if entry.hasBody:
entry_dict["compiled"] += f'\n {entry.body}'
if state.verbose > 2:
logger.debug(f"Body: {entry.body}")
# Convert Dictionary to JSON and Append to JSONL string
entry_maps.append(entry_dict)
if entry_dict:
entry_dict["raw"] = f'{entry}'
entry_dict["file"] = f'{entry_to_file_map[entry]}'
return entry_maps
# Convert Dictionary to JSON and Append to JSONL string
entry_maps.append(entry_dict)
return entry_maps
def convert_org_entries_to_jsonl(entries: Iterable[dict]) -> str:
"Convert each Org-Mode entry to JSON and collate as JSONL"
return ''.join([f'{json.dumps(entry_dict, ensure_ascii=False)}\n' for entry_dict in entries])
@staticmethod
def convert_org_entries_to_jsonl(entries: Iterable[dict]) -> str:
"Convert each Org-Mode entry to JSON and collate as JSONL"
return ''.join([f'{json.dumps(entry_dict, ensure_ascii=False)}\n' for entry_dict in entries])

View File

@@ -0,0 +1,14 @@
# Standard Packages
from abc import ABC, abstractmethod
from typing import Iterable
# Internal Packages
from src.utils.rawconfig import TextContentConfig
class TextToJsonl(ABC):
def __init__(self, config: TextContentConfig):
self.config = config
@abstractmethod
def process(self, previous_entries: Iterable[tuple[int, dict]]=None) -> list[tuple[int, dict]]: ...