feat: replace Trafilatura with readability and nh3

This commit is contained in:
Leon
2025-08-07 04:44:56 +02:00
parent f6d6743b4d
commit 427a32e951
5 changed files with 176 additions and 166 deletions

View File

@@ -1,10 +1,12 @@
import email
import html
import imaplib
import quopri
from email.header import decode_header, make_header
from email.message import Message
import trafilatura
import nh3
from bs4 import BeautifulSoup
from readability import Document
from sqlalchemy.orm import Session
from app.core.logging import get_logger
@@ -62,31 +64,94 @@ def _fetch_unread_email_ids(mail: imaplib.IMAP4_SSL) -> list[str]:
def _get_email_body(msg: Message) -> str:
"""Extract body from an email message."""
body = ""
"""Extract the HTML body from an email message, falling back to plain text."""
html_body = ""
text_body = ""
for part in msg.walk():
ctype = part.get_content_type()
cdispo = str(part.get("Content-Disposition"))
if "attachment" in cdispo:
continue
if ctype in ["text/plain", "text/html"]:
if ctype == "text/html":
try:
payload = part.get_payload(decode=True)
charset = part.get_content_charset() or "utf-8"
body = payload.decode(charset, "ignore")
html_body = payload.decode(charset, "ignore")
except Exception:
pass
return html.unescape(body)
elif ctype == "text/plain":
try:
payload = part.get_payload(decode=True)
charset = part.get_content_charset() or "utf-8"
text_body = payload.decode(charset, "ignore")
except Exception:
pass
# Prefer HTML body, but fall back to plain text if HTML is empty
return html_body or text_body
def _extract_and_clean_html(raw_html_content: str) -> dict[str, str]:
"""Decode, extract, and sanitize newsletter HTML."""
try:
decoded_bytes = quopri.decodestring(raw_html_content.encode("utf-8"))
clean_html_str = decoded_bytes.decode("utf-8", "ignore")
except Exception:
# If quopri fails, assume it's already decoded.
clean_html_str = raw_html_content
doc = Document(clean_html_str)
extracted_body = doc.summary(html_partial=True)
ALLOWED_TAGS = {
"p",
"strong",
"em",
"u",
"h3",
"h4",
"ul",
"ol",
"li",
"a",
"img",
"br",
"div",
"span",
"figure",
"figcaption",
}
ALLOWED_ATTRIBUTES = {
"a": {"href", "title"},
"img": {"src", "alt", "width", "height"},
"*": {"style"},
}
cleaned_body = nh3.clean(
extracted_body, tags=ALLOWED_TAGS, attributes=ALLOWED_ATTRIBUTES
)
title = doc.title()
if not title or title == "no-title":
soup = BeautifulSoup(cleaned_body, "html.parser")
first_headline = soup.find(["h1", "h2", "h3"])
title = first_headline.get_text(strip=True) if first_headline else "Newsletter"
return {"title": title, "body": cleaned_body}
def _auto_add_newsletter(
db: Session, sender: str, msg: Message, settings: Settings
db: Session,
sender: str,
msg: Message,
settings: Settings,
) -> Newsletter:
"""Automatically add a new newsletter."""
logger.info(f"Auto-adding new newsletter for sender: {sender}")
newsletter_name = email.utils.parseaddr(msg["From"])[0] or sender
new_newsletter_schema = NewsletterCreate(
name=newsletter_name, sender_emails=[sender]
name=newsletter_name,
sender_emails=[sender],
)
return create_newsletter(db, new_newsletter_schema)
@@ -129,14 +194,15 @@ def _process_single_email(
return
subject = str(make_header(decode_header(msg["Subject"])))
final_body = _get_email_body(msg)
body = _get_email_body(msg)
if newsletter.extract_content:
extracted_body = trafilatura.extract(final_body)
if extracted_body:
final_body = extracted_body
cleaned_data = _extract_and_clean_html(body)
# The subject from the email itself is often better than what readability extracts
# so we only override the body.
body = cleaned_data["body"]
entry_schema = EntryCreate(subject=subject, body=final_body, message_id=message_id)
entry_schema = EntryCreate(subject=subject, body=body, message_id=message_id)
new_entry = create_entry(db, entry_schema, newsletter.id)
if not new_entry:

View File

@@ -70,7 +70,7 @@ def test_process_emails(mock_imap, db_session: Session):
mock_mail.search.return_value = ("OK", [b"1"])
# Mock email content
mock_msg_bytes = b"From: newsletter@example.com\nSubject: Test Subject\nMessage-ID: <test@test.com>\n\nTest Body"
mock_msg_bytes = b"From: newsletter@example.com\nSubject: Test Subject\nMessage-ID: <test@test.com>\n\n<p>Test Body</p>"
mock_mail.fetch.return_value = ("OK", [(None, mock_msg_bytes)])
process_emails(db_session)
@@ -95,7 +95,7 @@ def test_process_emails(mock_imap, db_session: Session):
entries = get_entries_by_newsletter(db_session, newsletters[0].id)
assert len(entries) == 1
assert entries[0].subject == "Test Subject"
assert entries[0].body == "Test Body"
assert entries[0].body == "<p>Test Body</p>"
@patch("app.core.scheduler.job")

View File

@@ -26,6 +26,7 @@ def _setup_test_email_processing(
msg["From"] = newsletter_create_data.sender_emails[0]
msg["Subject"] = "Test Email"
msg["Message-ID"] = "<test-message-id>"
msg.set_payload("<html><body><p>Original Body</p></body></html>", "utf-8")
mock_mail.fetch.return_value = ("OK", [(b"1 (RFC822)", msg.as_bytes())])
return mock_mail, newsletter, settings
@@ -83,13 +84,17 @@ def test_process_single_email_with_global_move_folder(db_session: Session):
mock_mail.store.assert_any_call("1", "+FLAGS", "\\Deleted")
@patch("app.services.email_processor.trafilatura.extract")
@patch("app.services.email_processor._extract_and_clean_html")
def test_process_single_email_with_content_extraction(
mock_trafilatura, db_session: Session
mock_extract_clean,
db_session: Session,
):
"""Test that trafilatura is called when extract_content is True."""
"""Test that the cleaning function is called when extract_content is True."""
# 1. ARRANGE
mock_trafilatura.return_value = "Extracted Body"
mock_extract_clean.return_value = {
"title": "Extracted Title",
"body": "Extracted Body",
}
settings_data = SettingsCreate(
imap_server="test.com", imap_username="test", imap_password="password"
)
@@ -108,8 +113,10 @@ def test_process_single_email_with_content_extraction(
_process_single_email("1", mock_mail, db_session, sender_map, settings)
# 3. ASSERT
mock_trafilatura.assert_called_once()
mock_extract_clean.assert_called_once()
# Check that create_entry was called with the extracted body
mock_create_entry.assert_called_once()
entry_create_arg = mock_create_entry.call_args[0][1]
assert entry_create_arg.body == "Extracted Body"
# Subject should still come from the email, not the extracted title
assert entry_create_arg.subject == "Test Email"