mirror of
https://github.com/khoaliber/LetterFeed.git
synced 2026-03-09 13:25:18 +00:00
feat: use message-id header to only process message once
This commit is contained in:
@@ -23,6 +23,12 @@ def get_entries_by_newsletter(
|
|||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def get_entry_by_message_id(db: Session, message_id: str):
|
||||||
|
"""Retrieve an entry by its message_id."""
|
||||||
|
logger.debug(f"Querying for entry with message_id={message_id}")
|
||||||
|
return db.query(Entry).filter(Entry.message_id == message_id).first()
|
||||||
|
|
||||||
|
|
||||||
def create_entry(db: Session, entry: EntryCreate, newsletter_id: int):
|
def create_entry(db: Session, entry: EntryCreate, newsletter_id: int):
|
||||||
"""Create a new entry for a newsletter."""
|
"""Create a new entry for a newsletter."""
|
||||||
logger.info(
|
logger.info(
|
||||||
|
|||||||
@@ -8,6 +8,7 @@ from app.core.database import Base
|
|||||||
|
|
||||||
class Entry(Base):
|
class Entry(Base):
|
||||||
"""Represents an entry (e.g., an email) associated with a newsletter."""
|
"""Represents an entry (e.g., an email) associated with a newsletter."""
|
||||||
|
|
||||||
__tablename__ = "entries"
|
__tablename__ = "entries"
|
||||||
|
|
||||||
id = Column(Integer, primary_key=True, index=True)
|
id = Column(Integer, primary_key=True, index=True)
|
||||||
@@ -17,5 +18,6 @@ class Entry(Base):
|
|||||||
received_at = Column(
|
received_at = Column(
|
||||||
DateTime(timezone=True), default=datetime.datetime.now(datetime.UTC)
|
DateTime(timezone=True), default=datetime.datetime.now(datetime.UTC)
|
||||||
)
|
)
|
||||||
|
message_id = Column(String, unique=True, index=True, nullable=False)
|
||||||
|
|
||||||
newsletter = relationship("Newsletter", back_populates="entries")
|
newsletter = relationship("Newsletter", back_populates="entries")
|
||||||
|
|||||||
@@ -5,17 +5,21 @@ from pydantic import BaseModel, ConfigDict
|
|||||||
|
|
||||||
class EntryBase(BaseModel):
|
class EntryBase(BaseModel):
|
||||||
"""Base schema for an entry."""
|
"""Base schema for an entry."""
|
||||||
|
|
||||||
subject: str
|
subject: str
|
||||||
body: str
|
body: str
|
||||||
|
message_id: str
|
||||||
|
|
||||||
|
|
||||||
class EntryCreate(EntryBase):
|
class EntryCreate(EntryBase):
|
||||||
"""Schema for creating a new entry."""
|
"""Schema for creating a new entry."""
|
||||||
|
|
||||||
pass
|
pass
|
||||||
|
|
||||||
|
|
||||||
class Entry(EntryBase):
|
class Entry(EntryBase):
|
||||||
"""Schema for retrieving an entry with its ID and newsletter ID."""
|
"""Schema for retrieving an entry with its ID and newsletter ID."""
|
||||||
|
|
||||||
id: int
|
id: int
|
||||||
newsletter_id: int
|
newsletter_id: int
|
||||||
received_at: datetime.datetime
|
received_at: datetime.datetime
|
||||||
|
|||||||
@@ -4,7 +4,7 @@ import imaplib
|
|||||||
from sqlalchemy.orm import Session
|
from sqlalchemy.orm import Session
|
||||||
|
|
||||||
from app.core.logging import get_logger
|
from app.core.logging import get_logger
|
||||||
from app.crud.entries import create_entry
|
from app.crud.entries import create_entry, get_entry_by_message_id
|
||||||
from app.crud.newsletters import create_newsletter, get_newsletters
|
from app.crud.newsletters import create_newsletter, get_newsletters
|
||||||
from app.crud.settings import get_settings
|
from app.crud.settings import get_settings
|
||||||
from app.schemas.entries import EntryCreate
|
from app.schemas.entries import EntryCreate
|
||||||
@@ -56,6 +56,20 @@ def process_emails(db: Session):
|
|||||||
|
|
||||||
msg = email.message_from_bytes(data[0][1])
|
msg = email.message_from_bytes(data[0][1])
|
||||||
sender = email.utils.parseaddr(msg["From"])[1]
|
sender = email.utils.parseaddr(msg["From"])[1]
|
||||||
|
message_id = msg.get("Message-ID")
|
||||||
|
|
||||||
|
if not message_id:
|
||||||
|
logger.warning(
|
||||||
|
f"Email from {sender} with subject '{msg['Subject']}' has no Message-ID, skipping."
|
||||||
|
)
|
||||||
|
continue
|
||||||
|
|
||||||
|
if get_entry_by_message_id(db, message_id):
|
||||||
|
logger.info(
|
||||||
|
f"Email with Message-ID {message_id} already processed, skipping."
|
||||||
|
)
|
||||||
|
continue
|
||||||
|
|
||||||
logger.debug(
|
logger.debug(
|
||||||
f"Processing email from {sender} with subject '{msg['Subject']}'"
|
f"Processing email from {sender} with subject '{msg['Subject']}'"
|
||||||
)
|
)
|
||||||
@@ -100,7 +114,9 @@ def process_emails(db: Session):
|
|||||||
|
|
||||||
final_body = html or body
|
final_body = html or body
|
||||||
|
|
||||||
entry = EntryCreate(subject=subject, body=final_body)
|
entry = EntryCreate(
|
||||||
|
subject=subject, body=final_body, message_id=message_id
|
||||||
|
)
|
||||||
create_entry(db, entry, newsletter.id)
|
create_entry(db, entry, newsletter.id)
|
||||||
logger.info(
|
logger.info(
|
||||||
f"Created new entry for newsletter '{newsletter.name}' from sender {sender}"
|
f"Created new entry for newsletter '{newsletter.name}' from sender {sender}"
|
||||||
|
|||||||
@@ -70,7 +70,7 @@ def test_process_emails(mock_imap, db_session: Session):
|
|||||||
mock_mail.search.return_value = ("OK", [b"1"])
|
mock_mail.search.return_value = ("OK", [b"1"])
|
||||||
|
|
||||||
# Mock email content
|
# Mock email content
|
||||||
mock_msg_bytes = b"From: newsletter@example.com\nSubject: Test Subject\n\nTest Body"
|
mock_msg_bytes = b"From: newsletter@example.com\nSubject: Test Subject\nMessage-ID: <test@test.com>\n\nTest Body"
|
||||||
mock_mail.fetch.return_value = ("OK", [(None, mock_msg_bytes)])
|
mock_mail.fetch.return_value = ("OK", [(None, mock_msg_bytes)])
|
||||||
|
|
||||||
process_emails(db_session)
|
process_emails(db_session)
|
||||||
@@ -151,7 +151,7 @@ def test_process_emails_auto_add_sender(mock_imap, db_session: Session):
|
|||||||
mock_mail = MagicMock()
|
mock_mail = MagicMock()
|
||||||
mock_imap.return_value = mock_mail
|
mock_imap.return_value = mock_mail
|
||||||
mock_mail.search.return_value = ("OK", [b"1"])
|
mock_mail.search.return_value = ("OK", [b"1"])
|
||||||
mock_msg_bytes = b"From: New Sender <new@example.com>\nSubject: New Email\n\nHello"
|
mock_msg_bytes = b"From: New Sender <new@example.com>\nSubject: New Email\nMessage-ID: <new@new.com>\n\nHello"
|
||||||
mock_mail.fetch.return_value = ("OK", [(None, mock_msg_bytes)])
|
mock_mail.fetch.return_value = ("OK", [(None, mock_msg_bytes)])
|
||||||
|
|
||||||
process_emails(db_session)
|
process_emails(db_session)
|
||||||
@@ -192,10 +192,55 @@ def test_process_emails_no_move_or_read(mock_imap, db_session: Session):
|
|||||||
mock_mail = MagicMock()
|
mock_mail = MagicMock()
|
||||||
mock_imap.return_value = mock_mail
|
mock_imap.return_value = mock_mail
|
||||||
mock_mail.search.return_value = ("OK", [b"1"])
|
mock_mail.search.return_value = ("OK", [b"1"])
|
||||||
mock_msg_bytes = b"From: newsletter@example.com\nSubject: Test Subject\n\nTest Body"
|
mock_msg_bytes = b"From: newsletter@example.com\nSubject: Test Subject\nMessage-ID: <test@test.com>\n\nTest Body"
|
||||||
mock_mail.fetch.return_value = ("OK", [(None, mock_msg_bytes)])
|
mock_mail.fetch.return_value = ("OK", [(None, mock_msg_bytes)])
|
||||||
|
|
||||||
process_emails(db_session)
|
process_emails(db_session)
|
||||||
|
|
||||||
mock_mail.store.assert_not_called()
|
mock_mail.store.assert_not_called()
|
||||||
mock_mail.copy.assert_not_called()
|
mock_mail.copy.assert_not_called()
|
||||||
|
|
||||||
|
|
||||||
|
@patch("app.services.email_processor.imaplib.IMAP4_SSL")
|
||||||
|
def test_process_emails_avoids_duplicates(mock_imap, db_session: Session):
|
||||||
|
"""Test that process_emails avoids processing duplicate emails."""
|
||||||
|
settings_data = SettingsCreate(
|
||||||
|
imap_server="imap.test.com",
|
||||||
|
imap_username="test@test.com",
|
||||||
|
imap_password="password",
|
||||||
|
)
|
||||||
|
create_or_update_settings(db_session, settings_data)
|
||||||
|
newsletter_data = NewsletterCreate(
|
||||||
|
name="Test Newsletter", sender_emails=["newsletter@example.com"]
|
||||||
|
)
|
||||||
|
newsletter = create_newsletter(db_session, newsletter_data)
|
||||||
|
|
||||||
|
# Create an entry that already exists
|
||||||
|
from app.crud.entries import create_entry
|
||||||
|
from app.schemas.entries import EntryCreate
|
||||||
|
|
||||||
|
create_entry(
|
||||||
|
db_session,
|
||||||
|
EntryCreate(
|
||||||
|
subject="Existing Subject",
|
||||||
|
body="Existing Body",
|
||||||
|
message_id="<existing@message.com>",
|
||||||
|
),
|
||||||
|
newsletter.id,
|
||||||
|
)
|
||||||
|
|
||||||
|
mock_mail = MagicMock()
|
||||||
|
mock_imap.return_value = mock_mail
|
||||||
|
mock_mail.search.return_value = ("OK", [b"1"])
|
||||||
|
# This email has the same Message-ID as the one we just created
|
||||||
|
mock_msg_bytes = b"From: newsletter@example.com\nSubject: Test Subject\nMessage-ID: <existing@message.com>\n\nTest Body"
|
||||||
|
mock_mail.fetch.return_value = ("OK", [(None, mock_msg_bytes)])
|
||||||
|
|
||||||
|
process_emails(db_session)
|
||||||
|
|
||||||
|
# Verify that no new entry was created
|
||||||
|
from app.crud.entries import get_entries_by_newsletter
|
||||||
|
|
||||||
|
entries = get_entries_by_newsletter(db_session, newsletter.id)
|
||||||
|
assert len(entries) == 1
|
||||||
|
assert entries[0].subject == "Existing Subject"
|
||||||
|
|||||||
@@ -148,7 +148,11 @@ def test_create_entry(db_session: Session):
|
|||||||
name="Test Newsletter 5", sender_emails=[unique_email]
|
name="Test Newsletter 5", sender_emails=[unique_email]
|
||||||
)
|
)
|
||||||
newsletter = create_newsletter(db_session, newsletter_data)
|
newsletter = create_newsletter(db_session, newsletter_data)
|
||||||
entry_data = EntryCreate(subject="Test Subject", body="Test Body")
|
entry_data = EntryCreate(
|
||||||
|
subject="Test Subject",
|
||||||
|
body="Test Body",
|
||||||
|
message_id=f"<{uuid.uuid4()}@test.com>",
|
||||||
|
)
|
||||||
entry = create_entry(db_session, entry_data, newsletter.id)
|
entry = create_entry(db_session, entry_data, newsletter.id)
|
||||||
assert entry.subject == "Test Subject"
|
assert entry.subject == "Test Subject"
|
||||||
assert entry.newsletter_id == newsletter.id
|
assert entry.newsletter_id == newsletter.id
|
||||||
@@ -162,10 +166,18 @@ def test_get_entries_by_newsletter(db_session: Session):
|
|||||||
)
|
)
|
||||||
newsletter = create_newsletter(db_session, newsletter_data)
|
newsletter = create_newsletter(db_session, newsletter_data)
|
||||||
create_entry(
|
create_entry(
|
||||||
db_session, EntryCreate(subject="Entry 1", body="Body 1"), newsletter.id
|
db_session,
|
||||||
|
EntryCreate(
|
||||||
|
subject="Entry 1", body="Body 1", message_id=f"<{uuid.uuid4()}@test.com>"
|
||||||
|
),
|
||||||
|
newsletter.id,
|
||||||
)
|
)
|
||||||
create_entry(
|
create_entry(
|
||||||
db_session, EntryCreate(subject="Entry 2", body="Body 2"), newsletter.id
|
db_session,
|
||||||
|
EntryCreate(
|
||||||
|
subject="Entry 2", body="Body 2", message_id=f"<{uuid.uuid4()}@test.com>"
|
||||||
|
),
|
||||||
|
newsletter.id,
|
||||||
)
|
)
|
||||||
entries = get_entries_by_newsletter(db_session, newsletter.id)
|
entries = get_entries_by_newsletter(db_session, newsletter.id)
|
||||||
assert len(entries) == 2
|
assert len(entries) == 2
|
||||||
@@ -183,9 +195,14 @@ def test_update_newsletter(db_session: Session):
|
|||||||
from app.schemas.newsletters import NewsletterUpdate
|
from app.schemas.newsletters import NewsletterUpdate
|
||||||
|
|
||||||
updated_email = f"updated_sender_{uuid.uuid4()}@test.com"
|
updated_email = f"updated_sender_{uuid.uuid4()}@test.com"
|
||||||
updated_newsletter_data = NewsletterUpdate(name="Updated Newsletter", sender_emails=[updated_email])
|
updated_newsletter_data = NewsletterUpdate(
|
||||||
|
name="Updated Newsletter", sender_emails=[updated_email]
|
||||||
|
)
|
||||||
from app.crud.newsletters import update_newsletter
|
from app.crud.newsletters import update_newsletter
|
||||||
updated_newsletter = update_newsletter(db_session, newsletter.id, updated_newsletter_data)
|
|
||||||
|
updated_newsletter = update_newsletter(
|
||||||
|
db_session, newsletter.id, updated_newsletter_data
|
||||||
|
)
|
||||||
|
|
||||||
assert updated_newsletter.name == "Updated Newsletter"
|
assert updated_newsletter.name == "Updated Newsletter"
|
||||||
assert len(updated_newsletter.senders) == 1
|
assert len(updated_newsletter.senders) == 1
|
||||||
@@ -201,6 +218,7 @@ def test_delete_newsletter(db_session: Session):
|
|||||||
newsletter = create_newsletter(db_session, newsletter_data)
|
newsletter = create_newsletter(db_session, newsletter_data)
|
||||||
|
|
||||||
from app.crud.newsletters import delete_newsletter
|
from app.crud.newsletters import delete_newsletter
|
||||||
|
|
||||||
deleted_newsletter = delete_newsletter(db_session, newsletter.id)
|
deleted_newsletter = delete_newsletter(db_session, newsletter.id)
|
||||||
|
|
||||||
assert deleted_newsletter.id == newsletter.id
|
assert deleted_newsletter.id == newsletter.id
|
||||||
@@ -208,4 +226,5 @@ def test_delete_newsletter(db_session: Session):
|
|||||||
|
|
||||||
# Verify it's actually deleted
|
# Verify it's actually deleted
|
||||||
from app.crud.newsletters import get_newsletter
|
from app.crud.newsletters import get_newsletter
|
||||||
|
|
||||||
assert get_newsletter(db_session, newsletter.id) is None
|
assert get_newsletter(db_session, newsletter.id) is None
|
||||||
|
|||||||
@@ -164,9 +164,17 @@ def test_get_newsletter_feed(client: TestClient):
|
|||||||
newsletter_id = create_response.json()["id"]
|
newsletter_id = create_response.json()["id"]
|
||||||
|
|
||||||
# Add some entries to the newsletter
|
# Add some entries to the newsletter
|
||||||
entry_data_1 = {"subject": "Test Entry 1", "body": "<p>Content 1</p>"}
|
entry_data_1 = {
|
||||||
|
"subject": "Test Entry 1",
|
||||||
|
"body": "<p>Content 1</p>",
|
||||||
|
"message_id": f"<entry1_{uuid.uuid4()}@test.com>",
|
||||||
|
}
|
||||||
client.post(f"/newsletters/{newsletter_id}/entries", json=entry_data_1)
|
client.post(f"/newsletters/{newsletter_id}/entries", json=entry_data_1)
|
||||||
entry_data_2 = {"subject": "Test Entry 2", "body": "<p>Content 2</p>"}
|
entry_data_2 = {
|
||||||
|
"subject": "Test Entry 2",
|
||||||
|
"body": "<p>Content 2</p>",
|
||||||
|
"message_id": f"<entry2_{uuid.uuid4()}@test.com>",
|
||||||
|
}
|
||||||
client.post(f"/newsletters/{newsletter_id}/entries", json=entry_data_2)
|
client.post(f"/newsletters/{newsletter_id}/entries", json=entry_data_2)
|
||||||
|
|
||||||
response = client.get(f"/feeds/{newsletter_id}")
|
response = client.get(f"/feeds/{newsletter_id}")
|
||||||
|
|||||||
@@ -1,3 +1,5 @@
|
|||||||
|
import uuid
|
||||||
|
|
||||||
from sqlalchemy.orm import Session
|
from sqlalchemy.orm import Session
|
||||||
|
|
||||||
from app.crud.entries import create_entry
|
from app.crud.entries import create_entry
|
||||||
@@ -17,12 +19,16 @@ def test_generate_feed(db_session: Session):
|
|||||||
|
|
||||||
# Create entries for the newsletter
|
# Create entries for the newsletter
|
||||||
entry1_data = EntryCreate(
|
entry1_data = EntryCreate(
|
||||||
subject="First Entry", body="<p>This is the first entry.</p>"
|
subject="First Entry",
|
||||||
|
body="<p>This is the first entry.</p>",
|
||||||
|
message_id=f"<{uuid.uuid4()}@test.com>",
|
||||||
)
|
)
|
||||||
create_entry(db_session, entry1_data, newsletter.id)
|
create_entry(db_session, entry1_data, newsletter.id)
|
||||||
|
|
||||||
entry2_data = EntryCreate(
|
entry2_data = EntryCreate(
|
||||||
subject="Second Entry", body="<p>This is the second entry.</p>"
|
subject="Second Entry",
|
||||||
|
body="<p>This is the second entry.</p>",
|
||||||
|
message_id=f"<{uuid.uuid4()}@test.com>",
|
||||||
)
|
)
|
||||||
create_entry(db_session, entry2_data, newsletter.id)
|
create_entry(db_session, entry2_data, newsletter.id)
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user