From 9f1eea4488b7f326fd3a2ee3fb76e09f8d4f4dad Mon Sep 17 00:00:00 2001 From: Leon Date: Wed, 16 Jul 2025 21:33:59 +0200 Subject: [PATCH] refactor: restructure email processing --- backend/app/services/email_processor.py | 248 ++++++++++++++---------- 1 file changed, 141 insertions(+), 107 deletions(-) diff --git a/backend/app/services/email_processor.py b/backend/app/services/email_processor.py index 5ca066a..e5f555d 100644 --- a/backend/app/services/email_processor.py +++ b/backend/app/services/email_processor.py @@ -1,6 +1,7 @@ import email import imaplib from email.header import decode_header, make_header +from email.message import Message import trafilatura from sqlalchemy.orm import Session @@ -9,16 +10,16 @@ from app.core.logging import get_logger from app.crud.entries import create_entry, get_entry_by_message_id from app.crud.newsletters import create_newsletter, get_newsletters from app.crud.settings import get_settings +from app.models.newsletters import Newsletter from app.schemas.entries import EntryCreate from app.schemas.newsletters import NewsletterCreate +from app.schemas.settings import Settings logger = get_logger(__name__) -def process_emails(db: Session): - """Process unread emails, add them as entries, and manage newsletters.""" - logger.info("Starting email processing...") - settings = get_settings(db, with_password=True) +def _is_configured(settings: Settings | None) -> bool: + """Check if IMAP settings are configured.""" if ( not settings or not settings.imap_server @@ -26,125 +27,158 @@ def process_emails(db: Session): or not settings.imap_password ): logger.warning("IMAP settings are not configured. Skipping email processing.") - return + return False + return True - newsletters = get_newsletters(db) - sender_map = {} - for nl in newsletters: - for sender in nl.senders: - sender_map[sender.email] = nl - logger.info(f"Processing emails for {len(newsletters)} newsletters.") +def _connect_to_imap(settings: Settings) -> imaplib.IMAP4_SSL | None: + """Connect to the IMAP server and select the mailbox.""" try: logger.info(f"Connecting to IMAP server: {settings.imap_server}") mail = imaplib.IMAP4_SSL(settings.imap_server) mail.login(settings.imap_username, settings.imap_password) mail.select(settings.search_folder) logger.info(f"Selected mailbox: {settings.search_folder}") + return mail + except Exception as e: + logger.error(f"Failed to connect to IMAP server: {e}", exc_info=True) + return None - status, messages = mail.search(None, "(UNSEEN)") - if status != "OK": - logger.error(f"Failed to search for unseen emails, status: {status}") - return - email_ids = messages[0].split() +def _fetch_unread_email_ids(mail: imaplib.IMAP4_SSL) -> list[str]: + """Fetch IDs of unread emails.""" + status, messages = mail.search(None, "(UNSEEN)") + if status != "OK": + logger.error(f"Failed to search for unseen emails, status: {status}") + return [] + return messages[0].split() + + +def _get_email_body(msg: Message) -> str: + """Extract body from an email message.""" + body, html = "", "" + for part in msg.walk(): + ctype = part.get_content_type() + cdispo = str(part.get("Content-Disposition")) + if "attachment" in cdispo: + continue + if ctype == "text/plain": + try: + payload = part.get_payload(decode=True) + charset = part.get_content_charset() or "utf-8" + body = payload.decode(charset, "ignore") + except Exception: + pass + elif ctype == "text/html": + try: + payload = part.get_payload(decode=True) + charset = part.get_content_charset() or "utf-8" + html = payload.decode(charset, "ignore") + except Exception: + pass + return html or body + + +def _auto_add_newsletter( + db: Session, sender: str, msg: Message, settings: Settings +) -> Newsletter: + """Automatically add a new newsletter.""" + logger.info(f"Auto-adding new newsletter for sender: {sender}") + newsletter_name = email.utils.parseaddr(msg["From"])[0] or sender + new_newsletter_schema = NewsletterCreate( + name=newsletter_name, sender_emails=[sender] + ) + return create_newsletter(db, new_newsletter_schema) + + +def _process_single_email( + num: str, + mail: imaplib.IMAP4_SSL, + db: Session, + sender_map: dict[str, Newsletter], + settings: Settings, +) -> None: + """Process a single email message.""" + status, data = mail.fetch(num, "(RFC822)") + if status != "OK": + logger.warning(f"Failed to fetch email with id={num}") + return + + msg = email.message_from_bytes(data[0][1]) + sender = email.utils.parseaddr(msg["From"])[1] + message_id = msg.get("Message-ID") + + if not message_id: + logger.warning( + f"Email from {sender} with subject '{msg['Subject']}' has no Message-ID, skipping." + ) + return + + if get_entry_by_message_id(db, message_id): + logger.info(f"Email with Message-ID {message_id} already processed, skipping.") + return + + logger.debug(f"Processing email from {sender} with subject '{msg['Subject']}'") + + newsletter = sender_map.get(sender) + if not newsletter and settings.auto_add_new_senders: + newsletter = _auto_add_newsletter(db, sender, msg, settings) + sender_map[sender] = newsletter + + if not newsletter: + return + + subject = str(make_header(decode_header(msg["Subject"]))) + final_body = _get_email_body(msg) + + if newsletter.extract_content: + extracted_body = trafilatura.extract(final_body) + if extracted_body: + final_body = extracted_body + + entry = EntryCreate(subject=subject, body=final_body, message_id=message_id) + create_entry(db, entry, newsletter.id) + logger.info( + f"Created new entry for newsletter '{newsletter.name}' from sender {sender}" + ) + + if settings.mark_as_read: + logger.debug(f"Marking email with id={num} as read") + mail.store(num, "+FLAGS", "\\Seen") + + if settings.move_to_folder: + logger.debug(f"Moving email with id={num} to {settings.move_to_folder}") + mail.copy(num, settings.move_to_folder) + mail.store(num, "+FLAGS", "\\Deleted") + + +def process_emails(db: Session) -> None: + """Process unread emails, add them as entries, and manage newsletters.""" + logger.info("Starting email processing...") + settings = get_settings(db, with_password=True) + if not _is_configured(settings): + return + + newsletters = get_newsletters(db) + sender_map = {sender.email: nl for nl in newsletters for sender in nl.senders} + logger.info(f"Processing emails for {len(newsletters)} newsletters.") + + mail = _connect_to_imap(settings) + if not mail: + return + + try: + email_ids = _fetch_unread_email_ids(mail) logger.info(f"Found {len(email_ids)} unseen emails.") - for num in email_ids: - status, data = mail.fetch(num, "(RFC822)") - if status != "OK": - logger.warning(f"Failed to fetch email with id={num}") - continue - - msg = email.message_from_bytes(data[0][1]) - sender = email.utils.parseaddr(msg["From"])[1] - message_id = msg.get("Message-ID") - - if not message_id: - logger.warning( - f"Email from {sender} with subject '{msg['Subject']}' has no Message-ID, skipping." - ) - continue - - if get_entry_by_message_id(db, message_id): - logger.info( - f"Email with Message-ID {message_id} already processed, skipping." - ) - continue - - logger.debug( - f"Processing email from {sender} with subject '{msg['Subject']}'" - ) - - newsletter = sender_map.get(sender) - if not newsletter and settings.auto_add_new_senders: - logger.info(f"Auto-adding new newsletter for sender: {sender}") - newsletter_name = email.utils.parseaddr(msg["From"])[0] or sender - new_newsletter_schema = NewsletterCreate( - name=newsletter_name, sender_emails=[sender] - ) - newsletter = create_newsletter(db, new_newsletter_schema) - sender_map[sender] = newsletter - - if newsletter: - subject = str(make_header(decode_header(msg["Subject"]))) - - body = "" - html = "" - - for part in msg.walk(): - ctype = part.get_content_type() - cdispo = str(part.get("Content-Disposition")) - - if "attachment" in cdispo: - continue - - if ctype == "text/plain": - try: - payload = part.get_payload(decode=True) - charset = part.get_content_charset() or "utf-8" - body = payload.decode(charset, "ignore") - except Exception: - pass - elif ctype == "text/html": - try: - payload = part.get_payload(decode=True) - charset = part.get_content_charset() or "utf-8" - html = payload.decode(charset, "ignore") - except Exception: - pass - - final_body = html or body - if newsletter.extract_content: - extracted_body = trafilatura.extract(final_body) - if extracted_body: - final_body = extracted_body - - entry = EntryCreate( - subject=subject, body=final_body, message_id=message_id - ) - create_entry(db, entry, newsletter.id) - logger.info( - f"Created new entry for newsletter '{newsletter.name}' from sender {sender}" - ) - - if settings.mark_as_read: - logger.debug(f"Marking email with id={num} as read") - mail.store(num, "+FLAGS", "\\Seen") - - if settings.move_to_folder: - logger.debug( - f"Moving email with id={num} to {settings.move_to_folder}" - ) - mail.copy(num, settings.move_to_folder) - mail.store(num, "+FLAGS", "\\Deleted") + _process_single_email(num, mail, db, sender_map, settings) if settings.move_to_folder: logger.info("Expunging deleted emails") mail.expunge() - mail.logout() - logger.info("Email processing finished successfully.") - except Exception as e: logger.error(f"Error processing emails: {e}", exc_info=True) + finally: + mail.logout() + logger.info("Email processing finished successfully.")