This commit is contained in:
Leon
2025-07-15 22:54:35 +02:00
commit f7eda17284
89 changed files with 18535 additions and 0 deletions

1
backend/app/__init__.py Normal file
View File

@@ -0,0 +1 @@
"""The main application package."""

View File

@@ -0,0 +1 @@
"""Core application modules."""

View File

@@ -0,0 +1,32 @@
from pydantic import AliasChoices, Field
from pydantic_settings import BaseSettings, SettingsConfigDict
"""Configuration settings for the Letterfeed application."""
class Settings(BaseSettings):
"""Application settings, loaded from environment variables or .env file."""
model_config = SettingsConfigDict(
env_file=".env", extra="ignore", env_prefix="LETTERFEED_", frozen=True
)
database_url: str = Field(
"sqlite:////data/letterfeed.db",
validation_alias=AliasChoices("DATABASE_URL", "LETTERFEED_DATABASE_URL"),
)
app_base_url: str = Field(
"http://localhost:8000",
validation_alias=AliasChoices("APP_BASE_URL", "LETTERFEED_APP_BASE_URL"),
)
imap_server: str = ""
imap_username: str = ""
imap_password: str = ""
search_folder: str = "INBOX"
move_to_folder: str | None = None
mark_as_read: bool = False
email_check_interval: int = 15
auto_add_new_senders: bool = False
settings = Settings()

View File

@@ -0,0 +1,25 @@
from sqlalchemy import create_engine
from sqlalchemy.orm import declarative_base, sessionmaker
from app.core.config import settings
from app.core.logging import get_logger
"""Database connection and session management."""
logger = get_logger(__name__)
engine = create_engine(settings.database_url, connect_args={"check_same_thread": False})
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
Base = declarative_base()
def get_db():
"""Dependency that provides a database session."""
logger.debug("Creating new database session")
db = SessionLocal()
try:
yield db
finally:
logger.debug("Closing database session")
db.close()

42
backend/app/core/imap.py Normal file
View File

@@ -0,0 +1,42 @@
import imaplib
from app.core.logging import get_logger
"""IMAP utility functions for connecting to mail servers and fetching folders."""
logger = get_logger(__name__)
def _test_imap_connection(server, username, password):
"""Test the IMAP connection with the given credentials."""
logger.info(f"Testing IMAP connection to {server} for user {username}")
try:
mail = imaplib.IMAP4_SSL(server)
mail.login(username, password)
mail.logout()
logger.info("IMAP connection successful")
return True, "Connection successful"
except Exception as e:
logger.error(f"IMAP connection failed: {e}")
return False, str(e)
def get_folders(server, username, password):
"""Fetch a list of IMAP folders from the mail server."""
logger.info(f"Fetching IMAP folders from {server} for user {username}")
try:
mail = imaplib.IMAP4_SSL(server)
mail.login(username, password)
status, folders = mail.list()
mail.logout()
if status == "OK":
folder_list = [
folder.decode().split(' "/" ')[1].strip('"') for folder in folders
]
logger.info(f"Found {len(folder_list)} folders")
return folder_list
logger.warning(f"Failed to list IMAP folders, status: {status}")
return []
except Exception as e:
logger.error(f"Error fetching IMAP folders: {e}")
return []

View File

@@ -0,0 +1,51 @@
import logging
from logging.config import dictConfig
"""Logging configuration for the application."""
def setup_logging():
"""Set up the logging configuration for the application."""
log_config = {
"version": 1,
"disable_existing_loggers": False,
"formatters": {
"default": {
"()": "logging.Formatter",
"fmt": "%(asctime)s - %(name)s - %(levelname)s - %(message)s",
},
},
"handlers": {
"default": {
"formatter": "default",
"class": "logging.StreamHandler",
"stream": "ext://sys.stdout",
},
},
"loggers": {
"app": {
"handlers": ["default"],
"level": "INFO",
"propagate": True,
},
"uvicorn": {
"handlers": ["default"],
"level": "INFO",
"propagate": False,
},
"uvicorn.error": {
"level": "INFO",
},
"uvicorn.access": {
"handlers": ["default"],
"level": "INFO",
"propagate": False,
},
},
}
dictConfig(log_config)
def get_logger(name: str):
"""Return a logger instance with the given name."""
return logging.getLogger(name)

View File

@@ -0,0 +1,54 @@
from apscheduler.schedulers.background import BackgroundScheduler
from app.core.database import SessionLocal
from app.core.logging import get_logger
from app.crud.settings import get_settings
from app.services.email_processor import process_emails
"""Scheduler for background tasks like email processing."""
logger = get_logger(__name__)
def job():
"""Process emails as a scheduled job."""
logger.info("Scheduler job starting: process_emails")
db = SessionLocal()
try:
process_emails(db)
logger.info("Scheduler job finished: process_emails")
except Exception as e:
logger.error(f"Error in scheduled job process_emails: {e}", exc_info=True)
finally:
db.close()
scheduler = BackgroundScheduler()
def start_scheduler_with_interval():
"""Start the scheduler with an interval based on application settings."""
logger.info("Attempting to start scheduler...")
db = SessionLocal()
try:
settings = get_settings(db)
interval = settings.email_check_interval if settings else 15
logger.info(f"Setting scheduler interval to {interval} minutes")
scheduler.add_job(
job,
"interval",
minutes=interval,
id="email_check_job",
replace_existing=True,
)
if not scheduler.running:
# Run the job immediately once
job()
scheduler.start()
logger.info("Scheduler started.")
else:
logger.info("Scheduler is already running.")
except Exception as e:
logger.error(f"Failed to start scheduler: {e}", exc_info=True)
finally:
db.close()

View File

@@ -0,0 +1 @@
"""CRUD operations for database models."""

View File

@@ -0,0 +1,36 @@
from sqlalchemy.orm import Session
from app.core.logging import get_logger
from app.models.entries import Entry
from app.schemas.entries import EntryCreate
logger = get_logger(__name__)
def get_entries_by_newsletter(
db: Session, newsletter_id: int, skip: int = 0, limit: int = 100
):
"""Retrieve entries for a specific newsletter."""
logger.debug(
f"Querying entries for newsletter_id={newsletter_id}, skip={skip}, limit={limit}"
)
return (
db.query(Entry)
.filter(Entry.newsletter_id == newsletter_id)
.offset(skip)
.limit(limit)
.all()
)
def create_entry(db: Session, entry: EntryCreate, newsletter_id: int):
"""Create a new entry for a newsletter."""
logger.info(
f"Creating new entry for newsletter_id={newsletter_id} with subject '{entry.subject}'"
)
db_entry = Entry(**entry.model_dump(), newsletter_id=newsletter_id)
db.add(db_entry)
db.commit()
db.refresh(db_entry)
logger.info(f"Successfully created entry with id={db_entry.id}")
return db_entry

View File

@@ -0,0 +1,106 @@
from sqlalchemy import func
from sqlalchemy.orm import Session
from app.core.logging import get_logger
from app.models.entries import Entry
from app.models.newsletters import Newsletter, Sender
from app.schemas.newsletters import NewsletterCreate, NewsletterUpdate
logger = get_logger(__name__)
def get_newsletter(db: Session, newsletter_id: int):
"""Retrieve a single newsletter by its ID."""
logger.debug(f"Querying for newsletter with id={newsletter_id}")
result = (
db.query(Newsletter, func.count(Entry.id))
.outerjoin(Entry, Newsletter.id == Entry.newsletter_id)
.filter(Newsletter.id == newsletter_id)
.group_by(Newsletter.id)
.first()
)
if result:
newsletter, count = result
newsletter.entries_count = count
return newsletter
return None
def get_newsletters(db: Session, skip: int = 0, limit: int = 100):
"""Retrieve a list of newsletters."""
logger.debug(f"Querying for newsletters with skip={skip}, limit={limit}")
results = (
db.query(Newsletter, func.count(Entry.id))
.outerjoin(Entry, Newsletter.id == Entry.newsletter_id)
.group_by(Newsletter.id)
.order_by(Newsletter.id)
.offset(skip)
.limit(limit)
.all()
)
newsletters_with_count = []
for newsletter, count in results:
newsletter.entries_count = count
newsletters_with_count.append(newsletter)
return newsletters_with_count
def create_newsletter(db: Session, newsletter: NewsletterCreate):
"""Create a new newsletter."""
logger.info(f"Creating new newsletter with name '{newsletter.name}'")
db_newsletter = Newsletter(name=newsletter.name)
db.add(db_newsletter)
db.commit()
db.refresh(db_newsletter)
for email in newsletter.sender_emails:
db_sender = Sender(email=email, newsletter_id=db_newsletter.id)
db.add(db_sender)
db.commit()
db.refresh(db_newsletter)
logger.info(f"Successfully created newsletter with id={db_newsletter.id}")
db_newsletter.entries_count = 0
return db_newsletter
def update_newsletter(
db: Session, newsletter_id: int, newsletter_update: NewsletterUpdate
):
"""Update an existing newsletter."""
logger.info(f"Updating newsletter with id={newsletter_id}")
db_newsletter = db.query(Newsletter).filter(Newsletter.id == newsletter_id).first()
if not db_newsletter:
return None
db_newsletter.name = newsletter_update.name
# Simple approach: delete existing senders and add new ones
for sender in db_newsletter.senders:
db.delete(sender)
db.commit()
for email in newsletter_update.sender_emails:
db_sender = Sender(email=email, newsletter_id=db_newsletter.id)
db.add(db_sender)
db.commit()
logger.info(f"Successfully updated newsletter with id={db_newsletter.id}")
return get_newsletter(db, newsletter_id)
def delete_newsletter(db: Session, newsletter_id: int):
"""Delete a newsletter by its ID."""
logger.info(f"Deleting newsletter with id={newsletter_id}")
db_newsletter = get_newsletter(db, newsletter_id)
if not db_newsletter:
return None
db.delete(db_newsletter)
db.commit()
logger.info(f"Successfully deleted newsletter with id={newsletter_id}")
return db_newsletter

View File

@@ -0,0 +1,98 @@
from sqlalchemy.orm import Session
from app.core.config import settings as env_settings
from app.core.logging import get_logger
from app.models.settings import Settings as SettingsModel
from app.schemas.settings import Settings as SettingsSchema
from app.schemas.settings import SettingsCreate
logger = get_logger(__name__)
def get_settings(db: Session, with_password: bool = False) -> SettingsSchema:
"""Retrieve application settings, prioritizing environment variables over database."""
logger.debug("Querying for settings")
db_settings = db.query(SettingsModel).first()
if not db_settings:
logger.info(
"No settings found in the database, creating new default settings from environment variables."
)
# Get env settings, but only for fields that exist in the DB model
model_fields = {c.name for c in SettingsModel.__table__.columns}
env_data_for_db = {
k: v for k, v in env_settings.model_dump().items() if k in model_fields
}
db_settings = SettingsModel(**env_data_for_db)
db.add(db_settings)
db.commit()
db.refresh(db_settings)
logger.info("Default settings created from environment variables.")
# Build dictionary from DB model attributes, handling possible None values
db_data = {
"id": db_settings.id,
"imap_server": db_settings.imap_server or "",
"imap_username": db_settings.imap_username or "",
"search_folder": db_settings.search_folder,
"move_to_folder": db_settings.move_to_folder,
"mark_as_read": db_settings.mark_as_read,
"email_check_interval": db_settings.email_check_interval,
"auto_add_new_senders": db_settings.auto_add_new_senders,
}
# Get all environment settings that were explicitly set.
env_data = env_settings.model_dump(exclude_unset=True)
# Merge them. env_data takes precedence.
merged_data = {**db_data, **env_data}
# The fields that came from env are the "locked" ones.
locked_fields = list(env_data.keys())
logger.debug(f"Locked fields from environment variables: {locked_fields}")
# Handle password separately for security
if with_password:
logger.debug("Including IMAP password in settings data")
if "imap_password" in locked_fields:
merged_data["imap_password"] = env_settings.imap_password
else:
merged_data["imap_password"] = db_settings.imap_password
elif "imap_password" in merged_data:
# Ensure password is not in the data if not requested
del merged_data["imap_password"]
# Create the final schema object
settings_schema = SettingsSchema.model_validate(merged_data)
settings_schema.locked_fields = locked_fields
return settings_schema
def create_or_update_settings(db: Session, settings: SettingsCreate):
"""Create or update application settings."""
logger.info("Creating or updating settings")
db_settings = db.query(SettingsModel).first()
if not db_settings:
logger.info("No existing settings found, creating new ones.")
db_settings = SettingsModel()
db.add(db_settings)
update_data = settings.model_dump()
# Do not update fields that are set by environment variables
locked_fields = list(env_settings.model_dump(exclude_unset=True).keys())
logger.debug(f"Fields locked by environment variables: {locked_fields}")
for key, value in update_data.items():
if key not in locked_fields:
setattr(db_settings, key, value)
db.commit()
db.refresh(db_settings)
logger.info("Successfully updated settings.")
# Return the updated settings including locked fields for a complete view
return get_settings(db)

48
backend/app/main.py Normal file
View File

@@ -0,0 +1,48 @@
from contextlib import asynccontextmanager
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from app.core.database import Base, engine
from app.core.logging import get_logger, setup_logging
from app.core.scheduler import scheduler, start_scheduler_with_interval
from app.routers import feeds, health, imap, newsletters
@asynccontextmanager
async def lifespan(app: FastAPI):
"""Handle application startup and shutdown events."""
setup_logging()
logger = get_logger(__name__)
from app.core.config import settings
logger.info(f"DATABASE_URL used: {settings.database_url}")
logger.info("Starting up Letterfeed backend...")
Base.metadata.create_all(bind=engine)
start_scheduler_with_interval()
yield
if scheduler.running:
logger.info("Shutting down scheduler...")
scheduler.shutdown()
logger.info("...Letterfeed backend shut down.")
app = FastAPI(lifespan=lifespan)
# CORS Middleware
app.add_middleware(
CORSMiddleware,
allow_origins=[
"http://localhost:5173",
"http://localhost:3000",
"http://127.0.0.1:5173",
],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
app.include_router(health.router)
app.include_router(imap.router)
app.include_router(newsletters.router)
app.include_router(feeds.router)

View File

@@ -0,0 +1 @@
"""SQLAlchemy models for the database."""

View File

@@ -0,0 +1,21 @@
import datetime
from sqlalchemy import Column, DateTime, ForeignKey, Integer, String, Text
from sqlalchemy.orm import relationship
from app.core.database import Base
class Entry(Base):
"""Represents an entry (e.g., an email) associated with a newsletter."""
__tablename__ = "entries"
id = Column(Integer, primary_key=True, index=True)
newsletter_id = Column(Integer, ForeignKey("newsletters.id"))
subject = Column(String)
body = Column(Text)
received_at = Column(
DateTime(timezone=True), default=datetime.datetime.now(datetime.UTC)
)
newsletter = relationship("Newsletter", back_populates="entries")

View File

@@ -0,0 +1,31 @@
from sqlalchemy import Boolean, Column, ForeignKey, Integer, String
from sqlalchemy.orm import relationship
from app.core.database import Base
class Newsletter(Base):
"""Represents a newsletter, which can have multiple senders and entries."""
__tablename__ = "newsletters"
id = Column(Integer, primary_key=True, index=True)
name = Column(String)
is_active = Column(Boolean, default=True)
senders = relationship(
"Sender", back_populates="newsletter", cascade="all, delete-orphan"
)
entries = relationship(
"Entry", back_populates="newsletter", cascade="all, delete-orphan"
)
class Sender(Base):
"""Represents an email sender associated with a newsletter."""
__tablename__ = "senders"
id = Column(Integer, primary_key=True, index=True)
email = Column(String, unique=True, index=True, nullable=False)
newsletter_id = Column(Integer, ForeignKey("newsletters.id"), nullable=False)
newsletter = relationship("Newsletter", back_populates="senders")

View File

@@ -0,0 +1,18 @@
from sqlalchemy import Boolean, Column, Integer, String
from app.core.database import Base
class Settings(Base):
"""Represents application settings, including IMAP configuration."""
__tablename__ = "settings"
id = Column(Integer, primary_key=True, index=True)
imap_server = Column(String, index=True, nullable=True, default="")
imap_username = Column(String, nullable=True, default="")
imap_password = Column(String, nullable=True)
search_folder = Column(String, default="INBOX")
move_to_folder = Column(String, nullable=True)
mark_as_read = Column(Boolean, default=False)
email_check_interval = Column(Integer, default=15) # Interval in minutes
auto_add_new_senders = Column(Boolean, default=False)

View File

@@ -0,0 +1 @@
"""API routers for the application."""

View File

@@ -0,0 +1,25 @@
from fastapi import APIRouter, Depends, HTTPException
from fastapi.responses import Response
from sqlalchemy.orm import Session
from app.core.database import get_db
from app.core.logging import get_logger
from app.services.feed_generator import generate_feed
logger = get_logger(__name__)
router = APIRouter()
@router.get("/feeds/{newsletter_id}")
def get_newsletter_feed(newsletter_id: int, db: Session = Depends(get_db)):
"""Generate an Atom feed for a specific newsletter."""
logger.info(f"Generating feed for newsletter_id={newsletter_id}")
feed = generate_feed(db, newsletter_id)
if not feed:
logger.warning(
f"Newsletter with id={newsletter_id} not found, cannot generate feed."
)
raise HTTPException(status_code=404, detail="Newsletter not found")
logger.info(f"Successfully generated feed for newsletter_id={newsletter_id}")
return Response(content=feed, media_type="application/atom+xml")

View File

@@ -0,0 +1,13 @@
from fastapi import APIRouter
from app.core.logging import get_logger
logger = get_logger(__name__)
router = APIRouter()
@router.get("/health")
def health_check():
"""Perform a health check of the API."""
logger.info("Health check endpoint called")
return {"status": "ok"}

View File

@@ -0,0 +1,73 @@
from typing import List
from fastapi import APIRouter, Depends, HTTPException
from sqlalchemy.orm import Session
from app.core.database import get_db
from app.core.imap import _test_imap_connection, get_folders
from app.core.logging import get_logger
from app.crud.settings import create_or_update_settings, get_settings
from app.schemas.settings import Settings, SettingsCreate
logger = get_logger(__name__)
router = APIRouter()
@router.get("/imap/settings", response_model=Settings)
def read_settings(db: Session = Depends(get_db)):
"""Retrieve IMAP settings."""
logger.info("Request to read IMAP settings")
settings = get_settings(db)
if not settings:
logger.warning("IMAP settings not found")
raise HTTPException(status_code=404, detail="IMAP settings not found")
return settings
@router.post("/imap/settings", response_model=Settings)
def update_settings(settings: SettingsCreate, db: Session = Depends(get_db)):
"""Update IMAP settings."""
logger.info("Request to update IMAP settings")
return create_or_update_settings(db=db, settings=settings)
@router.post("/imap/test")
def test_connection(db: Session = Depends(get_db)):
"""Test the IMAP connection with current settings."""
logger.info("Request to test IMAP connection")
settings = get_settings(db, with_password=True)
if not settings:
logger.error("IMAP settings not found, cannot test connection")
raise HTTPException(status_code=404, detail="IMAP settings not found")
is_successful, message = _test_imap_connection(
server=settings.imap_server,
username=settings.imap_username,
password=settings.imap_password,
)
if not is_successful:
logger.warning(f"IMAP connection test failed: {message}")
raise HTTPException(status_code=400, detail=message)
logger.info("IMAP connection test successful")
return {"message": message}
@router.get("/imap/folders", response_model=List[str])
def read_folders(db: Session = Depends(get_db)):
"""Retrieve a list of IMAP folders from the configured server."""
logger.info("Request to fetch IMAP folders")
settings = get_settings(db, with_password=True)
if not settings:
logger.error("IMAP settings not found, cannot fetch folders")
raise HTTPException(status_code=404, detail="IMAP settings not found")
folders = get_folders(
server=settings.imap_server,
username=settings.imap_username,
password=settings.imap_password,
)
logger.info(f"Found {len(folders)} IMAP folders")
return folders

View File

@@ -0,0 +1,72 @@
from typing import List
from fastapi import APIRouter, Depends, HTTPException
from sqlalchemy.orm import Session
from app.core.database import get_db
from app.core.logging import get_logger
from app.crud.newsletters import (
create_newsletter,
delete_newsletter,
get_newsletter,
get_newsletters,
update_newsletter,
)
from app.schemas.newsletters import Newsletter, NewsletterCreate, NewsletterUpdate
logger = get_logger(__name__)
router = APIRouter()
@router.post("/newsletters", response_model=Newsletter)
def create_new_newsletter(newsletter: NewsletterCreate, db: Session = Depends(get_db)):
"""Create a new newsletter."""
logger.info(
f"Request to create new newsletter for senders {newsletter.sender_emails}"
)
return create_newsletter(db=db, newsletter=newsletter)
@router.get("/newsletters", response_model=List[Newsletter])
def read_newsletters(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)):
"""Retrieve a list of newsletters."""
logger.info(f"Request to read newsletters with skip={skip}, limit={limit}")
newsletters = get_newsletters(db, skip=skip, limit=limit)
return newsletters
@router.get("/newsletters/{newsletter_id}", response_model=Newsletter)
def read_newsletter(newsletter_id: int, db: Session = Depends(get_db)):
"""Retrieve a single newsletter by its ID."""
logger.info(f"Request to read newsletter with id={newsletter_id}")
db_newsletter = get_newsletter(db, newsletter_id=newsletter_id)
if db_newsletter is None:
logger.warning(f"Newsletter with id={newsletter_id} not found")
raise HTTPException(status_code=404, detail="Newsletter not found")
return db_newsletter
@router.put("/newsletters/{newsletter_id}", response_model=Newsletter)
def update_existing_newsletter(
newsletter_id: int, newsletter: NewsletterUpdate, db: Session = Depends(get_db)
):
"""Update an existing newsletter."""
logger.info(f"Request to update newsletter with id={newsletter_id}")
db_newsletter = update_newsletter(
db, newsletter_id=newsletter_id, newsletter_update=newsletter
)
if db_newsletter is None:
logger.warning(f"Newsletter with id={newsletter_id} not found, cannot update")
raise HTTPException(status_code=404, detail="Newsletter not found")
return db_newsletter
@router.delete("/newsletters/{newsletter_id}", response_model=Newsletter)
def delete_existing_newsletter(newsletter_id: int, db: Session = Depends(get_db)):
"""Delete a newsletter by its ID."""
logger.info(f"Request to delete newsletter with id={newsletter_id}")
db_newsletter = delete_newsletter(db, newsletter_id=newsletter_id)
if db_newsletter is None:
logger.warning(f"Newsletter with id={newsletter_id} not found, cannot delete")
raise HTTPException(status_code=404, detail="Newsletter not found")
return db_newsletter

View File

@@ -0,0 +1 @@
"""Pydantic schemas for data validation and serialization."""

View File

@@ -0,0 +1,23 @@
import datetime
from pydantic import BaseModel, ConfigDict
class EntryBase(BaseModel):
"""Base schema for an entry."""
subject: str
body: str
class EntryCreate(EntryBase):
"""Schema for creating a new entry."""
pass
class Entry(EntryBase):
"""Schema for retrieving an entry with its ID and newsletter ID."""
id: int
newsletter_id: int
received_at: datetime.datetime
model_config = ConfigDict(from_attributes=True)

View File

@@ -0,0 +1,46 @@
from typing import List
from pydantic import BaseModel, ConfigDict
class SenderBase(BaseModel):
"""Base schema for a sender."""
email: str
class SenderCreate(SenderBase):
"""Schema for creating a new sender."""
pass
class Sender(SenderBase):
"""Schema for retrieving a sender with its ID and newsletter ID."""
id: int
newsletter_id: int
model_config = ConfigDict(from_attributes=True)
class NewsletterBase(BaseModel):
"""Base schema for a newsletter."""
name: str
class NewsletterCreate(NewsletterBase):
"""Schema for creating a new newsletter."""
sender_emails: List[str]
class NewsletterUpdate(NewsletterBase):
"""Schema for updating an existing newsletter."""
sender_emails: List[str]
class Newsletter(NewsletterBase):
"""Schema for retrieving a newsletter with its ID, active status, senders, and entries count."""
id: int
is_active: bool
senders: List[Sender] = []
entries_count: int
model_config = ConfigDict(from_attributes=True)

View File

@@ -0,0 +1,28 @@
from typing import List
from pydantic import BaseModel, ConfigDict, Field
class SettingsBase(BaseModel):
"""Base schema for application settings."""
imap_server: str
imap_username: str
search_folder: str = "INBOX"
move_to_folder: str | None = None
mark_as_read: bool = False
email_check_interval: int = 15
auto_add_new_senders: bool = False
class SettingsCreate(SettingsBase):
"""Schema for creating or updating settings, including the IMAP password."""
imap_password: str
class Settings(SettingsBase):
"""Schema for retrieving settings, with password excluded by default."""
id: int
imap_password: str | None = Field(None, exclude=True)
locked_fields: List[str] = []
model_config = ConfigDict(from_attributes=True)

View File

@@ -0,0 +1 @@
"""Application services for business logic."""

View File

@@ -0,0 +1,128 @@
import email
import imaplib
from sqlalchemy.orm import Session
from app.core.logging import get_logger
from app.crud.entries import create_entry
from app.crud.newsletters import create_newsletter, get_newsletters
from app.crud.settings import get_settings
from app.schemas.entries import EntryCreate
from app.schemas.newsletters import NewsletterCreate
logger = get_logger(__name__)
def process_emails(db: Session):
"""Process unread emails, add them as entries, and manage newsletters."""
logger.info("Starting email processing...")
settings = get_settings(db, with_password=True)
if (
not settings
or not settings.imap_server
or not settings.imap_username
or not settings.imap_password
):
logger.warning("IMAP settings are not configured. Skipping email processing.")
return
newsletters = get_newsletters(db)
sender_map = {}
for nl in newsletters:
for sender in nl.senders:
sender_map[sender.email] = nl
logger.info(f"Processing emails for {len(newsletters)} newsletters.")
try:
logger.info(f"Connecting to IMAP server: {settings.imap_server}")
mail = imaplib.IMAP4_SSL(settings.imap_server)
mail.login(settings.imap_username, settings.imap_password)
mail.select(settings.search_folder)
logger.info(f"Selected mailbox: {settings.search_folder}")
status, messages = mail.search(None, "(UNSEEN)")
if status != "OK":
logger.error(f"Failed to search for unseen emails, status: {status}")
return
email_ids = messages[0].split()
logger.info(f"Found {len(email_ids)} unseen emails.")
for num in email_ids:
status, data = mail.fetch(num, "(RFC822)")
if status != "OK":
logger.warning(f"Failed to fetch email with id={num}")
continue
msg = email.message_from_bytes(data[0][1])
sender = email.utils.parseaddr(msg["From"])[1]
logger.debug(
f"Processing email from {sender} with subject '{msg['Subject']}'"
)
newsletter = sender_map.get(sender)
if not newsletter and settings.auto_add_new_senders:
logger.info(f"Auto-adding new newsletter for sender: {sender}")
newsletter_name = email.utils.parseaddr(msg["From"])[0] or sender
new_newsletter_schema = NewsletterCreate(
name=newsletter_name, sender_emails=[sender]
)
newsletter = create_newsletter(db, new_newsletter_schema)
sender_map[sender] = newsletter
if newsletter:
subject = msg["Subject"]
body = ""
html = ""
for part in msg.walk():
ctype = part.get_content_type()
cdispo = str(part.get("Content-Disposition"))
if "attachment" in cdispo:
continue
if ctype == "text/plain":
try:
payload = part.get_payload(decode=True)
charset = part.get_content_charset() or "utf-8"
body = payload.decode(charset, "ignore")
except Exception:
pass
elif ctype == "text/html":
try:
payload = part.get_payload(decode=True)
charset = part.get_content_charset() or "utf-8"
html = payload.decode(charset, "ignore")
except Exception:
pass
final_body = html or body
entry = EntryCreate(subject=subject, body=final_body)
create_entry(db, entry, newsletter.id)
logger.info(
f"Created new entry for newsletter '{newsletter.name}' from sender {sender}"
)
if settings.mark_as_read:
logger.debug(f"Marking email with id={num} as read")
mail.store(num, "+FLAGS", "\\Seen")
if settings.move_to_folder:
logger.debug(
f"Moving email with id={num} to {settings.move_to_folder}"
)
mail.copy(num, settings.move_to_folder)
mail.store(num, "+FLAGS", "\\Deleted")
if settings.move_to_folder:
logger.info("Expunging deleted emails")
mail.expunge()
mail.logout()
logger.info("Email processing finished successfully.")
except Exception as e:
logger.error(f"Error processing emails: {e}", exc_info=True)

View File

@@ -0,0 +1,38 @@
from dateutil import tz
from feedgen.feed import FeedGenerator
from sqlalchemy.orm import Session
from app.core.config import settings
from app.crud.entries import get_entries_by_newsletter
from app.crud.newsletters import get_newsletter
def generate_feed(db: Session, newsletter_id: int):
"""Generate an Atom feed for a given newsletter."""
newsletter = get_newsletter(db, newsletter_id)
if not newsletter:
return None
entries = get_entries_by_newsletter(db, newsletter_id)
feed_url = f"{settings.app_base_url}/feeds/{newsletter_id}"
fg = FeedGenerator()
fg.id(f"urn:letterfeed:newsletter:{newsletter.id}")
fg.title(newsletter.name)
fg.link(href=feed_url, rel="self")
sender_emails = ", ".join([s.email for s in newsletter.senders])
fg.description(f"A feed of newsletters from {sender_emails}")
for entry in entries:
fe = fg.add_entry()
fe.id(f"urn:letterfeed:entry:{entry.id}")
fe.title(entry.subject)
fe.content(entry.body, type="html")
if entry.received_at.tzinfo is None:
timezone_aware_received_at = entry.received_at.replace(tzinfo=tz.tzutc())
fe.published(timezone_aware_received_at)
else:
fe.published(entry.received_at)
return fg.atom_str(pretty=True)

View File

@@ -0,0 +1 @@
"""Unit tests for the application."""

View File

@@ -0,0 +1,65 @@
import os
os.environ["LETTERFEED_DATABASE_URL"] = "sqlite:///./test.db"
import pytest
from fastapi.testclient import TestClient
from sqlalchemy.orm import sessionmaker
from app.core.database import Base, engine, get_db
from app.main import app
TestingSessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
@pytest.fixture(scope="function", autouse=True)
def setup_and_teardown_db():
"""Set up and tear down the database for each test.
This fixture is automatically used for all tests. It creates all tables
before a test and drops them afterwards. This ensures a clean database for
every test.
"""
Base.metadata.create_all(bind=engine)
yield
Base.metadata.drop_all(bind=engine)
@pytest.fixture(name="db_session")
def db_session_fixture():
"""Yield a database session for a test."""
db = TestingSessionLocal()
try:
yield db
finally:
db.close()
@pytest.fixture(name="client")
def client_fixture(db_session):
"""Yield a TestClient for a test."""
def override_get_db():
yield db_session
app.dependency_overrides[get_db] = override_get_db
with TestClient(app) as client:
yield client
app.dependency_overrides.clear()
@pytest.fixture(scope="session", autouse=True)
def cleanup_test_db(request):
"""Clean up the test database after the test session.
This fixture is automatically used once per test session. It registers a
finalizer to remove the test database file after all tests have run.
"""
def remove_test_db():
# The path is relative to the backend directory where pytest is run
db_file = "test.db"
if os.path.exists(db_file):
os.remove(db_file)
request.addfinalizer(remove_test_db)

View File

@@ -0,0 +1,201 @@
from unittest.mock import ANY, MagicMock, patch
from sqlalchemy.orm import Session
from app.core.imap import _test_imap_connection, get_folders
from app.crud.newsletters import create_newsletter
from app.crud.settings import create_or_update_settings
from app.schemas.newsletters import NewsletterCreate
from app.schemas.settings import SettingsCreate
from app.services.email_processor import process_emails
@patch("app.core.imap.imaplib.IMAP4_SSL")
def test_test_imap_connection_success(mock_imap):
"""Test IMAP connection success."""
mock_imap.return_value.login.return_value = (None, None)
mock_imap.return_value.logout.return_value = (None, None)
success, message = _test_imap_connection("imap.test.com", "user", "pass")
assert success
assert message == "Connection successful"
@patch("app.core.imap.imaplib.IMAP4_SSL")
def test_test_imap_connection_failure(mock_imap):
"""Test IMAP connection failure."""
mock_imap.return_value.login.side_effect = Exception("Auth failed")
success, message = _test_imap_connection("imap.test.com", "user", "pass")
assert not success
assert message == "Auth failed"
@patch("app.core.imap.imaplib.IMAP4_SSL")
def test_get_folders(mock_imap):
"""Test fetching IMAP folders."""
mock_imap.return_value.login.return_value = (None, None)
mock_imap.return_value.logout.return_value = (None, None)
mock_imap.return_value.list.return_value = (
"OK",
[b'(NOCONNECT NOSELECT) "/" "INBOX"', b'(NOCONNECT NOSELECT) "/" "Processed"'],
)
folders = get_folders("imap.test.com", "user", "pass")
assert folders == ["INBOX", "Processed"]
@patch("app.services.email_processor.imaplib.IMAP4_SSL")
def test_process_emails(mock_imap, db_session: Session):
"""Test processing emails."""
# Setup settings
settings_data = SettingsCreate(
imap_server="imap.test.com",
imap_username="test@test.com",
imap_password="password",
search_folder="INBOX",
move_to_folder="Processed",
mark_as_read=True,
)
create_or_update_settings(db_session, settings_data)
# Setup newsletter
newsletter_data = NewsletterCreate(
name="Test Newsletter", sender_emails=["newsletter@example.com"]
)
create_newsletter(db_session, newsletter_data)
# Mock IMAP connection and email fetching
mock_mail = MagicMock()
mock_imap.return_value = mock_mail
mock_mail.login.return_value = (None, None)
mock_mail.select.return_value = (None, None)
mock_mail.search.return_value = ("OK", [b"1"])
# Mock email content
mock_msg_bytes = b"From: newsletter@example.com\nSubject: Test Subject\n\nTest Body"
mock_mail.fetch.return_value = ("OK", [(None, mock_msg_bytes)])
process_emails(db_session)
# Assertions
mock_mail.login.assert_called_once_with("test@test.com", "password")
mock_mail.select.assert_called_once_with("INBOX")
mock_mail.search.assert_called_once_with(None, "(UNSEEN)")
mock_mail.fetch.assert_called_once_with(b"1", "(RFC822)")
mock_mail.store.assert_any_call(b"1", "+FLAGS", "\\Seen")
mock_mail.copy.assert_called_once_with(b"1", "Processed")
mock_mail.store.assert_any_call(b"1", "+FLAGS", "\\Deleted")
mock_mail.expunge.assert_called_once()
mock_mail.logout.assert_called_once()
# Verify entry in DB
from app.crud.entries import get_entries_by_newsletter
from app.crud.newsletters import get_newsletters
newsletters = get_newsletters(db_session)
assert len(newsletters) == 1
entries = get_entries_by_newsletter(db_session, newsletters[0].id)
assert len(entries) == 1
assert entries[0].subject == "Test Subject"
assert entries[0].body == "Test Body"
@patch("app.core.scheduler.job")
@patch("app.core.scheduler.SessionLocal")
@patch("app.core.scheduler.scheduler")
def test_start_scheduler_with_interval(
mock_scheduler, mock_session_local, mock_job, db_session: Session
):
"""Test starting the scheduler with an interval."""
mock_session_local.return_value = db_session
mock_scheduler.running = False
settings_data = SettingsCreate(
imap_server="imap.test.com",
imap_username="test@test.com",
imap_password="password",
email_check_interval=30,
)
create_or_update_settings(db_session, settings_data)
from app.core.scheduler import start_scheduler_with_interval
start_scheduler_with_interval()
mock_scheduler.add_job.assert_called_with(
ANY, "interval", minutes=30, id="email_check_job", replace_existing=True
)
mock_scheduler.start.assert_called_once()
mock_job.assert_called_once()
@patch("app.core.scheduler.SessionLocal")
@patch("app.core.scheduler.process_emails")
def test_scheduler_job(mock_process_emails, mock_session_local, db_session: Session):
"""Test the scheduler job."""
mock_session_local.return_value = db_session
from app.core.scheduler import job
job()
mock_process_emails.assert_called_once_with(db_session)
@patch("app.services.email_processor.imaplib.IMAP4_SSL")
def test_process_emails_auto_add_sender(mock_imap, db_session: Session):
"""Test processing emails with auto add sender enabled."""
settings_data = SettingsCreate(
imap_server="imap.test.com",
imap_username="test@test.com",
imap_password="password",
auto_add_new_senders=True,
)
create_or_update_settings(db_session, settings_data)
mock_mail = MagicMock()
mock_imap.return_value = mock_mail
mock_mail.search.return_value = ("OK", [b"1"])
mock_msg_bytes = b"From: New Sender <new@example.com>\nSubject: New Email\n\nHello"
mock_mail.fetch.return_value = ("OK", [(None, mock_msg_bytes)])
process_emails(db_session)
from app.crud.newsletters import get_newsletters
newsletters = get_newsletters(db_session)
assert len(newsletters) == 1
assert len(newsletters[0].senders) == 1
assert newsletters[0].senders[0].email == "new@example.com"
assert newsletters[0].name == "New Sender"
@patch("app.services.email_processor.imaplib.IMAP4_SSL")
def test_process_emails_no_settings(mock_imap, db_session: Session):
"""Test processing emails with no settings in the database."""
# No settings in the DB
process_emails(db_session)
mock_imap.assert_not_called()
@patch("app.services.email_processor.imaplib.IMAP4_SSL")
def test_process_emails_no_move_or_read(mock_imap, db_session: Session):
"""Test processing emails with no move or read."""
settings_data = SettingsCreate(
imap_server="imap.test.com",
imap_username="test@test.com",
imap_password="password",
mark_as_read=False,
move_to_folder=None,
)
create_or_update_settings(db_session, settings_data)
create_newsletter(
db_session,
NewsletterCreate(name="Test", sender_emails=["newsletter@example.com"]),
)
mock_mail = MagicMock()
mock_imap.return_value = mock_mail
mock_mail.search.return_value = ("OK", [b"1"])
mock_msg_bytes = b"From: newsletter@example.com\nSubject: Test Subject\n\nTest Body"
mock_mail.fetch.return_value = ("OK", [(None, mock_msg_bytes)])
process_emails(db_session)
mock_mail.store.assert_not_called()
mock_mail.copy.assert_not_called()

View File

@@ -0,0 +1,172 @@
import uuid
from unittest.mock import patch
from sqlalchemy.orm import Session
from app.crud.entries import create_entry, get_entries_by_newsletter
from app.crud.newsletters import create_newsletter, get_newsletter, get_newsletters
from app.crud.settings import create_or_update_settings, get_settings
from app.schemas.entries import EntryCreate
from app.schemas.newsletters import NewsletterCreate
from app.schemas.settings import SettingsCreate
def test_create_or_update_settings(db_session: Session):
"""Test creating or updating settings."""
settings_data = SettingsCreate(
imap_server="imap.test.com",
imap_username="test@test.com",
imap_password="password",
search_folder="INBOX",
move_to_folder="Archive",
mark_as_read=True,
)
settings = create_or_update_settings(db_session, settings_data)
assert settings.imap_server == "imap.test.com"
assert settings.mark_as_read
updated_settings_data = SettingsCreate(
imap_server="imap.updated.com",
imap_username="updated@test.com",
imap_password="new_password",
search_folder="Inbox",
move_to_folder=None,
mark_as_read=False,
)
updated_settings = create_or_update_settings(db_session, updated_settings_data)
assert updated_settings.imap_server == "imap.updated.com"
assert not updated_settings.mark_as_read
assert updated_settings.move_to_folder is None
def test_get_settings(db_session: Session):
"""Test getting settings."""
settings_data = SettingsCreate(
imap_server="imap.get.com",
imap_username="get@test.com",
imap_password="get_password",
search_folder="INBOX",
move_to_folder=None,
mark_as_read=False,
)
create_or_update_settings(db_session, settings_data)
settings = get_settings(db_session)
assert settings.imap_server == "imap.get.com"
def test_get_settings_with_env_override(db_session: Session):
"""Test getting settings with environment variable override."""
# 1. Create initial settings in the database
db_settings_data = SettingsCreate(
imap_server="db.imap.com",
imap_username="db_user",
imap_password="db_pass",
email_check_interval=15,
)
create_or_update_settings(db_session, db_settings_data)
# 2. Patch the env_settings to simulate environment variables
with patch("app.crud.settings.env_settings") as mock_env_settings:
mock_env_settings.model_dump.return_value = {
"imap_server": "env.imap.com",
"imap_username": "env_user",
"imap_password": "env_pass",
"email_check_interval": 30,
}
mock_env_settings.imap_password = "env_pass"
# 3. Call get_settings and assert the override
settings = get_settings(db_session, with_password=True)
assert settings.imap_server == "env.imap.com"
assert settings.imap_username == "env_user"
assert settings.imap_password == "env_pass"
assert settings.email_check_interval == 30
assert "imap_server" in settings.locked_fields
assert "imap_username" in settings.locked_fields
# 4. Call create_or_update_settings and assert that locked fields are not updated
update_data = SettingsCreate(
imap_server="new.imap.com",
imap_username="new_user",
imap_password="new_pass",
email_check_interval=45,
)
updated_settings = create_or_update_settings(db_session, update_data)
assert updated_settings.imap_server == "env.imap.com" # Should not change
assert updated_settings.imap_username == "env_user" # Should not change
assert updated_settings.email_check_interval == 30 # Should not change
def test_create_newsletter(db_session: Session):
"""Test creating a newsletter."""
unique_email = f"sender_{uuid.uuid4()}@test.com"
newsletter_data = NewsletterCreate(
name="Test Newsletter 1", sender_emails=[unique_email]
)
newsletter = create_newsletter(db_session, newsletter_data)
assert newsletter.name == "Test Newsletter 1"
assert newsletter.is_active
assert len(newsletter.senders) == 1
assert newsletter.senders[0].email == unique_email
def test_get_newsletter(db_session: Session):
"""Test getting a single newsletter."""
unique_email = f"sender_{uuid.uuid4()}@test.com"
newsletter_data = NewsletterCreate(
name="Test Newsletter 2", sender_emails=[unique_email]
)
created_newsletter = create_newsletter(db_session, newsletter_data)
newsletter = get_newsletter(db_session, created_newsletter.id)
assert newsletter.name == "Test Newsletter 2"
assert len(newsletter.senders) == 1
assert newsletter.senders[0].email == unique_email
def test_get_newsletters(db_session: Session):
"""Test getting multiple newsletters."""
create_newsletter(
db_session,
NewsletterCreate(
name="Test Newsletter 3", sender_emails=[f"sender_{uuid.uuid4()}@test.com"]
),
)
create_newsletter(
db_session,
NewsletterCreate(
name="Test Newsletter 4", sender_emails=[f"sender_{uuid.uuid4()}@test.com"]
),
)
newsletters = get_newsletters(db_session)
assert len(newsletters) >= 2
def test_create_entry(db_session: Session):
"""Test creating a newsletter entry."""
unique_email = f"sender_{uuid.uuid4()}@test.com"
newsletter_data = NewsletterCreate(
name="Test Newsletter 5", sender_emails=[unique_email]
)
newsletter = create_newsletter(db_session, newsletter_data)
entry_data = EntryCreate(subject="Test Subject", body="Test Body")
entry = create_entry(db_session, entry_data, newsletter.id)
assert entry.subject == "Test Subject"
assert entry.newsletter_id == newsletter.id
def test_get_entries_by_newsletter(db_session: Session):
"""Test getting entries for a newsletter."""
unique_email = f"sender_{uuid.uuid4()}@test.com"
newsletter_data = NewsletterCreate(
name="Test Newsletter 6", sender_emails=[unique_email]
)
newsletter = create_newsletter(db_session, newsletter_data)
create_entry(
db_session, EntryCreate(subject="Entry 1", body="Body 1"), newsletter.id
)
create_entry(
db_session, EntryCreate(subject="Entry 2", body="Body 2"), newsletter.id
)
entries = get_entries_by_newsletter(db_session, newsletter.id)
assert len(entries) == 2
assert entries[0].subject == "Entry 1"

View File

@@ -0,0 +1,147 @@
import uuid
from unittest.mock import patch
from fastapi.testclient import TestClient
def test_health_check(client: TestClient):
"""Test the health check endpoint."""
response = client.get("/health")
assert response.status_code == 200
assert response.json() == {"status": "ok"}
@patch("app.core.imap.imaplib.IMAP4_SSL")
def test_update_imap_settings(mock_imap, client: TestClient):
"""Test updating IMAP settings."""
mock_imap.return_value.login.return_value = (None, None)
mock_imap.return_value.logout.return_value = (None, None)
settings_data = {
"imap_server": "imap.example.com",
"imap_username": "test@example.com",
"imap_password": "password",
"search_folder": "INBOX",
"move_to_folder": "Processed",
"mark_as_read": True,
}
response = client.post("/imap/settings", json=settings_data)
assert response.status_code == 200
assert response.json()["imap_server"] == "imap.example.com"
assert response.json()["imap_username"] == "test@example.com"
assert response.json()["search_folder"] == "INBOX"
assert response.json()["move_to_folder"] == "Processed"
assert response.json()["mark_as_read"]
@patch("app.core.imap.imaplib.IMAP4_SSL")
def test_get_imap_settings(mock_imap, client: TestClient):
"""Test getting IMAP settings."""
mock_imap.return_value.login.return_value = (None, None)
mock_imap.return_value.logout.return_value = (None, None)
settings_data = {
"imap_server": "imap.example.com",
"imap_username": "test@example.com",
"imap_password": "password",
"search_folder": "INBOX",
"move_to_folder": "Processed",
"mark_as_read": True,
}
client.post("/imap/settings", json=settings_data)
response = client.get("/imap/settings")
assert response.status_code == 200
assert response.json()["imap_server"] == "imap.example.com"
assert response.json()["imap_username"] == "test@example.com"
@patch("app.core.imap.imaplib.IMAP4_SSL")
def test_test_imap_connection(mock_imap, client: TestClient):
"""Test the IMAP connection."""
mock_imap.return_value.login.return_value = (None, None)
mock_imap.return_value.logout.return_value = (None, None)
settings_data = {
"imap_server": "imap.example.com",
"imap_username": "test@example.com",
"imap_password": "password",
"search_folder": "INBOX",
"move_to_folder": "Processed",
"mark_as_read": True,
}
client.post("/imap/settings", json=settings_data)
response = client.post("/imap/test")
assert response.status_code == 200
assert response.json() == {"message": "Connection successful"}
@patch("app.core.imap.imaplib.IMAP4_SSL")
def test_get_imap_folders(mock_imap, client: TestClient):
"""Test getting IMAP folders."""
mock_imap.return_value.login.return_value = (None, None)
mock_imap.return_value.logout.return_value = (None, None)
mock_imap.return_value.list.return_value = (
"OK",
[b'(NOCONNECT NOSELECT) "/" "INBOX"', b'(NOCONNECT NOSELECT) "/" "Processed"'],
)
settings_data = {
"imap_server": "imap.example.com",
"imap_username": "test@example.com",
"imap_password": "password",
"search_folder": "INBOX",
"move_to_folder": "Processed",
"mark_as_read": True,
}
client.post("/imap/settings", json=settings_data)
response = client.get("/imap/folders")
assert response.status_code == 200
assert response.json() == ["INBOX", "Processed"]
def test_create_newsletter(client: TestClient):
"""Test creating a newsletter."""
unique_email = f"newsletter_{uuid.uuid4()}@example.com"
newsletter_data = {"name": "Example Newsletter", "sender_emails": [unique_email]}
response = client.post("/newsletters", json=newsletter_data)
assert response.status_code == 200
assert response.json()["name"] == "Example Newsletter"
assert response.json()["is_active"]
assert len(response.json()["senders"]) == 1
assert response.json()["senders"][0]["email"] == unique_email
def test_get_newsletters(client: TestClient):
"""Test getting all newsletters."""
unique_email = f"newsletter_{uuid.uuid4()}@example.com"
newsletter_data = {"name": "Another Newsletter", "sender_emails": [unique_email]}
client.post("/newsletters", json=newsletter_data)
response = client.get("/newsletters")
assert response.status_code == 200
assert len(response.json()) >= 1
assert any(
unique_email in [s["email"] for s in nl["senders"]] for nl in response.json()
)
def test_get_single_newsletter(client: TestClient):
"""Test getting a single newsletter."""
unique_email = f"newsletter_{uuid.uuid4()}@example.com"
newsletter_data = {"name": "Third Newsletter", "sender_emails": [unique_email]}
create_response = client.post("/newsletters/", json=newsletter_data)
newsletter_id = create_response.json()["id"]
response = client.get(f"/newsletters/{newsletter_id}")
assert response.status_code == 200
assert response.json()["senders"][0]["email"] == unique_email
def test_get_nonexistent_newsletter(client: TestClient):
"""Test getting a nonexistent newsletter."""
response = client.get("/newsletters/999")
assert response.status_code == 404
assert response.json() == {"detail": "Newsletter not found"}

View File

@@ -0,0 +1,48 @@
from sqlalchemy.orm import Session
from app.crud.entries import create_entry
from app.crud.newsletters import create_newsletter
from app.schemas.entries import EntryCreate
from app.schemas.newsletters import NewsletterCreate
from app.services.feed_generator import generate_feed
def test_generate_feed(db_session: Session):
"""Test the feed generation for a newsletter with entries."""
# Create a newsletter
newsletter_data = NewsletterCreate(
name="Feed Test Newsletter", sender_emails=["feed@example.com"]
)
newsletter = create_newsletter(db_session, newsletter_data)
# Create entries for the newsletter
entry1_data = EntryCreate(
subject="First Entry", body="<p>This is the first entry.</p>"
)
create_entry(db_session, entry1_data, newsletter.id)
entry2_data = EntryCreate(
subject="Second Entry", body="<p>This is the second entry.</p>"
)
create_entry(db_session, entry2_data, newsletter.id)
# Generate the feed
feed_xml = generate_feed(db_session, newsletter.id)
assert feed_xml is not None
# Parse the feed XML to verify content (simplified check)
# In a real scenario, you'd use an XML parser to validate structure and content more thoroughly
assert f"<title>{newsletter.name}</title>" in feed_xml.decode()
assert f"<id>urn:letterfeed:newsletter:{newsletter.id}</id>" in feed_xml.decode()
assert "<title>First Entry</title>" in feed_xml.decode()
assert "<title>Second Entry</title>" in feed_xml.decode()
assert (
'<content type="html">&lt;p&gt;This is the first entry.&lt;/p&gt;</content>'
in feed_xml.decode()
)
def test_generate_feed_nonexistent_newsletter(db_session: Session):
"""Test feed generation for a non-existent newsletter."""
feed_xml = generate_feed(db_session, 999) # Non-existent newsletter ID
assert feed_xml is None