Back to blog

Working with Databases in FastAPI: Complete Guide

pythonfastapidatabasesqlalchemypostgresql
Working with Databases in FastAPI: Complete Guide

Databases are the backbone of most APIs. FastAPI provides excellent support for both SQL and NoSQL databases, with SQLAlchemy being the most popular choice for SQL databases. Let's explore how to integrate databases into your FastAPI applications efficiently and effectively.

Database Options for FastAPI

SQL Databases

Popular Choices:

  • PostgreSQL - Most popular, feature-rich, great for production
  • MySQL/MariaDB - Widely used, excellent performance
  • SQLite - Great for development, testing, and small applications
  • Microsoft SQL Server - Enterprise solutions

When to Use SQL:

  • Structured data with relationships
  • ACID compliance required
  • Complex queries and joins
  • Data integrity is critical

NoSQL Databases

Popular Choices:

  • MongoDB - Document database, flexible schema
  • Redis - In-memory key-value store, great for caching
  • Elasticsearch - Search and analytics
  • DynamoDB - AWS managed NoSQL

When to Use NoSQL:

  • Flexible schema requirements
  • Horizontal scalability needed
  • Real-time analytics
  • Document-oriented data

This guide focuses on PostgreSQL with SQLAlchemy, but the patterns apply to other SQL databases too.

Setting Up SQLAlchemy

Installation

# For synchronous operations
pip install sqlalchemy psycopg2-binary
 
# For async operations (recommended)
pip install sqlalchemy asyncpg
 
# For Alembic (migrations)
pip install alembic

Database Connection

# database.py
from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
 
# Database URL format:
# postgresql://username:password@host:port/database
DATABASE_URL = "postgresql://user:password@localhost:5432/mydb"
 
# Create engine
engine = create_engine(
    DATABASE_URL,
    pool_pre_ping=True,  # Verify connections before using
    pool_size=5,  # Connection pool size
    max_overflow=10,  # Max connections above pool_size
)
 
# Create session factory
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
 
# Base class for models
Base = declarative_base()

Async Database Connection

# database.py (async version)
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
 
# Async database URL (note: postgresql+asyncpg)
DATABASE_URL = "postgresql+asyncpg://user:password@localhost:5432/mydb"
 
# Create async engine
engine = create_async_engine(
    DATABASE_URL,
    echo=True,  # Log SQL queries (disable in production)
    future=True,
    pool_pre_ping=True,
)
 
# Create async session factory
AsyncSessionLocal = sessionmaker(
    engine,
    class_=AsyncSession,
    expire_on_commit=False,
)
 
Base = declarative_base()

Database Models

Defining Models

# models.py
from sqlalchemy import Column, Integer, String, Float, Boolean, DateTime, ForeignKey
from sqlalchemy.orm import relationship
from sqlalchemy.sql import func
from database import Base
 
class User(Base):
    __tablename__ = "users"
 
    id = Column(Integer, primary_key=True, index=True)
    email = Column(String, unique=True, index=True, nullable=False)
    username = Column(String, unique=True, index=True, nullable=False)
    hashed_password = Column(String, nullable=False)
    is_active = Column(Boolean, default=True)
    created_at = Column(DateTime(timezone=True), server_default=func.now())
    updated_at = Column(DateTime(timezone=True), onupdate=func.now())
 
    # Relationship
    posts = relationship("Post", back_populates="author")
 
 
class Post(Base):
    __tablename__ = "posts"
 
    id = Column(Integer, primary_key=True, index=True)
    title = Column(String, nullable=False)
    content = Column(String, nullable=False)
    published = Column(Boolean, default=False)
    views = Column(Integer, default=0)
    author_id = Column(Integer, ForeignKey("users.id"))
    created_at = Column(DateTime(timezone=True), server_default=func.now())
    updated_at = Column(DateTime(timezone=True), onupdate=func.now())
 
    # Relationship
    author = relationship("User", back_populates="posts")

Database vs Pydantic Models

Important: Keep database models and Pydantic models separate!

# models.py - SQLAlchemy (Database)
from sqlalchemy import Column, Integer, String
from database import Base
 
class User(Base):
    __tablename__ = "users"
    
    id = Column(Integer, primary_key=True)
    email = Column(String)
    hashed_password = Column(String)  # Sensitive!
 
 
# schemas.py - Pydantic (API)
from pydantic import BaseModel, EmailStr
 
class UserBase(BaseModel):
    email: EmailStr
 
class UserCreate(UserBase):
    password: str  # Plain password for input
 
class UserOut(UserBase):
    id: int
    # No password in output!
    
    model_config = {"from_attributes": True}
 
class UserInDB(UserBase):
    id: int
    hashed_password: str  # For internal use
    
    model_config = {"from_attributes": True}

Database Dependency

Creating Database Sessions

# database.py
from sqlalchemy.orm import Session
 
def get_db():
    db = SessionLocal()
    try:
        yield db
    finally:
        db.close()

Using in FastAPI

# main.py
from fastapi import FastAPI, Depends
from sqlalchemy.orm import Session
from database import get_db
import models, schemas
 
app = FastAPI()
 
@app.get("/users/", response_model=list[schemas.UserOut])
def get_users(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)):
    users = db.query(models.User).offset(skip).limit(limit).all()
    return users

Async Database Dependency

# database.py (async)
from sqlalchemy.ext.asyncio import AsyncSession
 
async def get_db():
    async with AsyncSessionLocal() as session:
        try:
            yield session
            await session.commit()
        except Exception:
            await session.rollback()
            raise
        finally:
            await session.close()

CRUD Operations

Create

from fastapi import FastAPI, Depends, HTTPException, status
from sqlalchemy.orm import Session
import models, schemas
from database import get_db
 
app = FastAPI()
 
@app.post("/users/", response_model=schemas.UserOut, status_code=status.HTTP_201_CREATED)
def create_user(user: schemas.UserCreate, db: Session = Depends(get_db)):
    # Check if user exists
    db_user = db.query(models.User).filter(models.User.email == user.email).first()
    if db_user:
        raise HTTPException(
            status_code=status.HTTP_400_BAD_REQUEST,
            detail="Email already registered"
        )
    
    # Hash password (use proper hashing in production!)
    hashed_password = hash_password(user.password)
    
    # Create user
    db_user = models.User(
        email=user.email,
        username=user.username,
        hashed_password=hashed_password
    )
    db.add(db_user)
    db.commit()
    db.refresh(db_user)
    
    return db_user

Read

@app.get("/users/{user_id}", response_model=schemas.UserOut)
def get_user(user_id: int, db: Session = Depends(get_db)):
    user = db.query(models.User).filter(models.User.id == user_id).first()
    if not user:
        raise HTTPException(
            status_code=status.HTTP_404_NOT_FOUND,
            detail="User not found"
        )
    return user
 
@app.get("/users/", response_model=list[schemas.UserOut])
def list_users(
    skip: int = 0,
    limit: int = 100,
    db: Session = Depends(get_db)
):
    users = db.query(models.User).offset(skip).limit(limit).all()
    return users

Update

@app.put("/users/{user_id}", response_model=schemas.UserOut)
def update_user(
    user_id: int,
    user_update: schemas.UserUpdate,
    db: Session = Depends(get_db)
):
    # Get user
    db_user = db.query(models.User).filter(models.User.id == user_id).first()
    if not db_user:
        raise HTTPException(
            status_code=status.HTTP_404_NOT_FOUND,
            detail="User not found"
        )
    
    # Update fields
    update_data = user_update.model_dump(exclude_unset=True)
    for field, value in update_data.items():
        setattr(db_user, field, value)
    
    db.commit()
    db.refresh(db_user)
    
    return db_user

Delete

@app.delete("/users/{user_id}", status_code=status.HTTP_204_NO_CONTENT)
def delete_user(user_id: int, db: Session = Depends(get_db)):
    user = db.query(models.User).filter(models.User.id == user_id).first()
    if not user:
        raise HTTPException(
            status_code=status.HTTP_404_NOT_FOUND,
            detail="User not found"
        )
    
    db.delete(user)
    db.commit()
    
    return None

Async CRUD Operations

Async Create

from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select
 
@app.post("/users/", response_model=schemas.UserOut)
async def create_user(user: schemas.UserCreate, db: AsyncSession = Depends(get_db)):
    # Check if user exists
    result = await db.execute(
        select(models.User).filter(models.User.email == user.email)
    )
    db_user = result.scalar_one_or_none()
    
    if db_user:
        raise HTTPException(status_code=400, detail="Email already registered")
    
    # Create user
    db_user = models.User(
        email=user.email,
        username=user.username,
        hashed_password=hash_password(user.password)
    )
    db.add(db_user)
    await db.commit()
    await db.refresh(db_user)
    
    return db_user

Async Read

@app.get("/users/{user_id}", response_model=schemas.UserOut)
async def get_user(user_id: int, db: AsyncSession = Depends(get_db)):
    result = await db.execute(
        select(models.User).filter(models.User.id == user_id)
    )
    user = result.scalar_one_or_none()
    
    if not user:
        raise HTTPException(status_code=404, detail="User not found")
    
    return user
 
@app.get("/users/", response_model=list[schemas.UserOut])
async def list_users(skip: int = 0, limit: int = 100, db: AsyncSession = Depends(get_db)):
    result = await db.execute(
        select(models.User).offset(skip).limit(limit)
    )
    users = result.scalars().all()
    return users

Repository Pattern

Creating a Repository

# repositories/user_repository.py
from sqlalchemy.orm import Session
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select
import models, schemas
 
class UserRepository:
    def __init__(self, db: Session):
        self.db = db
    
    def get(self, user_id: int):
        return self.db.query(models.User).filter(models.User.id == user_id).first()
    
    def get_by_email(self, email: str):
        return self.db.query(models.User).filter(models.User.email == email).first()
    
    def list(self, skip: int = 0, limit: int = 100):
        return self.db.query(models.User).offset(skip).limit(limit).all()
    
    def create(self, user: schemas.UserCreate):
        db_user = models.User(**user.model_dump())
        self.db.add(db_user)
        self.db.commit()
        self.db.refresh(db_user)
        return db_user
    
    def update(self, user_id: int, user_update: schemas.UserUpdate):
        db_user = self.get(user_id)
        if not db_user:
            return None
        
        update_data = user_update.model_dump(exclude_unset=True)
        for field, value in update_data.items():
            setattr(db_user, field, value)
        
        self.db.commit()
        self.db.refresh(db_user)
        return db_user
    
    def delete(self, user_id: int):
        db_user = self.get(user_id)
        if db_user:
            self.db.delete(db_user)
            self.db.commit()
            return True
        return False

Using Repository in Endpoints

from repositories.user_repository import UserRepository
 
def get_user_repository(db: Session = Depends(get_db)):
    return UserRepository(db)
 
@app.get("/users/{user_id}", response_model=schemas.UserOut)
def get_user(
    user_id: int,
    repo: UserRepository = Depends(get_user_repository)
):
    user = repo.get(user_id)
    if not user:
        raise HTTPException(status_code=404, detail="User not found")
    return user
 
@app.post("/users/", response_model=schemas.UserOut)
def create_user(
    user: schemas.UserCreate,
    repo: UserRepository = Depends(get_user_repository)
):
    # Check if exists
    if repo.get_by_email(user.email):
        raise HTTPException(status_code=400, detail="Email already exists")
    
    return repo.create(user)

Relationships and Joins

One-to-Many Relationship

# Get user with their posts
@app.get("/users/{user_id}/posts", response_model=list[schemas.PostOut])
def get_user_posts(user_id: int, db: Session = Depends(get_db)):
    user = db.query(models.User).filter(models.User.id == user_id).first()
    if not user:
        raise HTTPException(status_code=404, detail="User not found")
    
    return user.posts
 
# Alternative: Direct query with join
@app.get("/posts/", response_model=list[schemas.PostWithAuthor])
def list_posts(db: Session = Depends(get_db)):
    posts = db.query(models.Post).join(models.User).all()
    return posts

Eager Loading

from sqlalchemy.orm import joinedload
 
@app.get("/users/{user_id}", response_model=schemas.UserWithPosts)
def get_user_with_posts(user_id: int, db: Session = Depends(get_db)):
    # Load user and posts in one query
    user = (
        db.query(models.User)
        .options(joinedload(models.User.posts))
        .filter(models.User.id == user_id)
        .first()
    )
    
    if not user:
        raise HTTPException(status_code=404, detail="User not found")
    
    return user

Database Migrations with Alembic

Initialize Alembic

# Initialize Alembic
alembic init alembic
 
# This creates:
# alembic/
#   env.py
#   script.py.mako
#   versions/
# alembic.ini

Configure Alembic

# alembic/env.py
from logging.config import fileConfig
from sqlalchemy import engine_from_config, pool
from alembic import context
import models  # Import your models
from database import Base
 
# this is the Alembic Config object
config = context.config
 
# Set sqlalchemy.url from your database.py
config.set_main_option('sqlalchemy.url', 'postgresql://user:password@localhost/mydb')
 
# Set target metadata
target_metadata = Base.metadata
 
# ... rest of the file

Create Migration

# Auto-generate migration from model changes
alembic revision --autogenerate -m "create users table"
 
# Manually create empty migration
alembic revision -m "add custom index"

Apply Migration

# Upgrade to latest
alembic upgrade head
 
# Upgrade to specific revision
alembic upgrade <revision_id>
 
# Downgrade one revision
alembic downgrade -1
 
# View current revision
alembic current
 
# View migration history
alembic history

Example Migration

# alembic/versions/xxx_create_users_table.py
from alembic import op
import sqlalchemy as sa
 
def upgrade():
    op.create_table(
        'users',
        sa.Column('id', sa.Integer(), primary_key=True),
        sa.Column('email', sa.String(), nullable=False),
        sa.Column('username', sa.String(), nullable=False),
        sa.Column('hashed_password', sa.String(), nullable=False),
        sa.Column('is_active', sa.Boolean(), default=True),
        sa.Column('created_at', sa.DateTime(timezone=True), server_default=sa.func.now()),
    )
    
    # Create indexes
    op.create_index('ix_users_email', 'users', ['email'], unique=True)
    op.create_index('ix_users_username', 'users', ['username'], unique=True)
 
def downgrade():
    op.drop_index('ix_users_username')
    op.drop_index('ix_users_email')
    op.drop_table('users')

Transactions

Manual Transaction Control

from sqlalchemy.exc import IntegrityError
 
@app.post("/transfer/")
def transfer_money(transfer: schemas.Transfer, db: Session = Depends(get_db)):
    try:
        # Start transaction (implicitly)
        sender = db.query(models.Account).filter(models.Account.id == transfer.from_id).first()
        receiver = db.query(models.Account).filter(models.Account.id == transfer.to_id).first()
        
        if sender.balance < transfer.amount:
            raise HTTPException(status_code=400, detail="Insufficient funds")
        
        # Update balances
        sender.balance -= transfer.amount
        receiver.balance += transfer.amount
        
        # Create transaction record
        transaction = models.Transaction(
            from_id=transfer.from_id,
            to_id=transfer.to_id,
            amount=transfer.amount
        )
        db.add(transaction)
        
        # Commit all changes
        db.commit()
        
        return {"message": "Transfer successful"}
        
    except IntegrityError:
        db.rollback()
        raise HTTPException(status_code=400, detail="Transaction failed")
    except Exception as e:
        db.rollback()
        raise

Context Manager

from contextlib import contextmanager
 
@contextmanager
def get_db_transaction():
    db = SessionLocal()
    try:
        yield db
        db.commit()
    except Exception:
        db.rollback()
        raise
    finally:
        db.close()
 
# Usage
def transfer_money(transfer: schemas.Transfer):
    with get_db_transaction() as db:
        # All database operations
        # Automatically commits on success, rolls back on error
        pass

Query Optimization

Filtering and Ordering

from sqlalchemy import and_, or_, desc
 
@app.get("/posts/", response_model=list[schemas.PostOut])
def list_posts(
    published: bool = None,
    author_id: int = None,
    sort_by: str = "created_at",
    db: Session = Depends(get_db)
):
    query = db.query(models.Post)
    
    # Add filters
    if published is not None:
        query = query.filter(models.Post.published == published)
    
    if author_id is not None:
        query = query.filter(models.Post.author_id == author_id)
    
    # Add ordering
    if sort_by == "views":
        query = query.order_by(desc(models.Post.views))
    else:
        query = query.order_by(desc(models.Post.created_at))
    
    return query.all()

Pagination

from fastapi import Query
 
@app.get("/posts/", response_model=schemas.PaginatedPosts)
def list_posts(
    page: int = Query(1, ge=1),
    page_size: int = Query(10, ge=1, le=100),
    db: Session = Depends(get_db)
):
    # Calculate offset
    skip = (page - 1) * page_size
    
    # Get total count
    total = db.query(models.Post).count()
    
    # Get paginated results
    posts = (
        db.query(models.Post)
        .offset(skip)
        .limit(page_size)
        .all()
    )
    
    return {
        "posts": posts,
        "total": total,
        "page": page,
        "page_size": page_size,
        "total_pages": (total + page_size - 1) // page_size
    }

Selective Loading

# Only load specific columns
@app.get("/users/emails")
def list_user_emails(db: Session = Depends(get_db)):
    # Returns list of tuples: [(email,), (email,), ...]
    emails = db.query(models.User.email).all()
    return [email[0] for email in emails]
 
# Load as dictionary
from sqlalchemy.orm import load_only
 
@app.get("/users/")
def list_users_minimal(db: Session = Depends(get_db)):
    users = (
        db.query(models.User)
        .options(load_only(models.User.id, models.User.username))
        .all()
    )
    return users

Connection Pooling

Pool Configuration

from sqlalchemy import create_engine
from sqlalchemy.pool import QueuePool
 
engine = create_engine(
    DATABASE_URL,
    poolclass=QueuePool,
    pool_size=5,  # Number of persistent connections
    max_overflow=10,  # Extra connections when pool is exhausted
    pool_timeout=30,  # Wait time for connection (seconds)
    pool_recycle=3600,  # Recycle connections after 1 hour
    pool_pre_ping=True,  # Verify connection before using
)

Connection Pool Events

from sqlalchemy import event
 
@event.listens_for(engine, "connect")
def receive_connect(dbapi_conn, connection_record):
    print("New connection created")
 
@event.listens_for(engine, "checkout")
def receive_checkout(dbapi_conn, connection_record, connection_proxy):
    print("Connection retrieved from pool")
 
@event.listens_for(engine, "checkin")
def receive_checkin(dbapi_conn, connection_record):
    print("Connection returned to pool")

Real-World Example: Blog API

# Complete example with all concepts
from fastapi import FastAPI, Depends, HTTPException, status, Query
from sqlalchemy.orm import Session, joinedload
from sqlalchemy import desc
from typing import List
import models, schemas
from database import get_db, engine
 
# Create tables
models.Base.metadata.create_all(bind=engine)
 
app = FastAPI(title="Blog API")
 
# User Endpoints
@app.post("/users/", response_model=schemas.UserOut, status_code=status.HTTP_201_CREATED)
def create_user(user: schemas.UserCreate, db: Session = Depends(get_db)):
    # Check if user exists
    if db.query(models.User).filter(models.User.email == user.email).first():
        raise HTTPException(status_code=400, detail="Email already registered")
    
    # Create user
    db_user = models.User(
        email=user.email,
        username=user.username,
        hashed_password=hash_password(user.password)
    )
    db.add(db_user)
    db.commit()
    db.refresh(db_user)
    return db_user
 
@app.get("/users/{user_id}", response_model=schemas.UserWithPosts)
def get_user(user_id: int, db: Session = Depends(get_db)):
    user = (
        db.query(models.User)
        .options(joinedload(models.User.posts))
        .filter(models.User.id == user_id)
        .first()
    )
    if not user:
        raise HTTPException(status_code=404, detail="User not found")
    return user
 
# Post Endpoints
@app.post("/posts/", response_model=schemas.PostOut)
def create_post(post: schemas.PostCreate, db: Session = Depends(get_db)):
    # Verify author exists
    author = db.query(models.User).filter(models.User.id == post.author_id).first()
    if not author:
        raise HTTPException(status_code=404, detail="Author not found")
    
    db_post = models.Post(**post.model_dump())
    db.add(db_post)
    db.commit()
    db.refresh(db_post)
    return db_post
 
@app.get("/posts/", response_model=schemas.PaginatedPosts)
def list_posts(
    page: int = Query(1, ge=1),
    page_size: int = Query(10, ge=1, le=100),
    published: bool = None,
    author_id: int = None,
    db: Session = Depends(get_db)
):
    query = db.query(models.Post).options(joinedload(models.Post.author))
    
    # Filters
    if published is not None:
        query = query.filter(models.Post.published == published)
    if author_id is not None:
        query = query.filter(models.Post.author_id == author_id)
    
    # Pagination
    total = query.count()
    posts = (
        query
        .order_by(desc(models.Post.created_at))
        .offset((page - 1) * page_size)
        .limit(page_size)
        .all()
    )
    
    return {
        "posts": posts,
        "total": total,
        "page": page,
        "page_size": page_size,
    }
 
@app.get("/posts/{post_id}", response_model=schemas.PostWithAuthor)
def get_post(post_id: int, db: Session = Depends(get_db)):
    post = (
        db.query(models.Post)
        .options(joinedload(models.Post.author))
        .filter(models.Post.id == post_id)
        .first()
    )
    if not post:
        raise HTTPException(status_code=404, detail="Post not found")
    
    # Increment views
    post.views += 1
    db.commit()
    
    return post
 
@app.put("/posts/{post_id}", response_model=schemas.PostOut)
def update_post(
    post_id: int,
    post_update: schemas.PostUpdate,
    db: Session = Depends(get_db)
):
    db_post = db.query(models.Post).filter(models.Post.id == post_id).first()
    if not db_post:
        raise HTTPException(status_code=404, detail="Post not found")
    
    update_data = post_update.model_dump(exclude_unset=True)
    for field, value in update_data.items():
        setattr(db_post, field, value)
    
    db.commit()
    db.refresh(db_post)
    return db_post
 
@app.delete("/posts/{post_id}", status_code=status.HTTP_204_NO_CONTENT)
def delete_post(post_id: int, db: Session = Depends(get_db)):
    post = db.query(models.Post).filter(models.Post.id == post_id).first()
    if not post:
        raise HTTPException(status_code=404, detail="Post not found")
    
    db.delete(post)
    db.commit()

Best Practices

1. Separate Models and Schemas

# ✅ Good - Clear separation
# models.py - Database
class User(Base):
    __tablename__ = "users"
    id = Column(Integer, primary_key=True)
    hashed_password = Column(String)
 
# schemas.py - API
class UserOut(BaseModel):
    id: int
    # No password!

2. Use Environment Variables

# ✅ Good - Configuration
import os
from pydantic_settings import BaseSettings
 
class Settings(BaseSettings):
    database_url: str
    database_pool_size: int = 5
    
    class Config:
        env_file = ".env"
 
settings = Settings()
engine = create_engine(settings.database_url)

3. Always Use Indexes

# ✅ Good - Indexed columns
class User(Base):
    __tablename__ = "users"
    
    id = Column(Integer, primary_key=True, index=True)
    email = Column(String, unique=True, index=True)  # Frequently queried
    username = Column(String, unique=True, index=True)

4. Use Transactions for Multiple Operations

# ✅ Good - Atomic operations
@app.post("/complex-operation/")
def complex_operation(db: Session = Depends(get_db)):
    try:
        # Multiple database operations
        user = models.User(...)
        db.add(user)
        
        profile = models.Profile(user_id=user.id)
        db.add(profile)
        
        db.commit()  # Commit all or nothing
    except Exception:
        db.rollback()
        raise

5. Eager Load Relationships When Needed

# ✅ Good - Avoid N+1 queries
users = (
    db.query(models.User)
    .options(joinedload(models.User.posts))
    .all()
)

Common Pitfalls

1. N+1 Query Problem

# ❌ Bad - N+1 queries
users = db.query(models.User).all()
for user in users:
    print(user.posts)  # Separate query for each user!
 
# ✅ Good - Single query with join
users = db.query(models.User).options(joinedload(models.User.posts)).all()

2. Forgetting to Close Sessions

# ❌ Bad - Session leak
def get_users():
    db = SessionLocal()
    users = db.query(models.User).all()
    return users  # Session not closed!
 
# ✅ Good - Always close
def get_db():
    db = SessionLocal()
    try:
        yield db
    finally:
        db.close()

3. Not Using Connection Pooling

# ❌ Bad - New connection every time
engine = create_engine(DATABASE_URL, poolclass=NullPool)
 
# ✅ Good - Use connection pool
engine = create_engine(
    DATABASE_URL,
    pool_size=5,
    max_overflow=10
)

Performance Tips

1. Use Async When Possible

Async operations don't block the event loop, allowing better concurrency.

2. Batch Operations

# ✅ Batch insert
users = [models.User(email=f"user{i}@example.com") for i in range(100)]
db.bulk_save_objects(users)
db.commit()

3. Use select_from for Complex Queries

from sqlalchemy import select
 
# Complex query with multiple joins
stmt = (
    select(models.User, models.Post)
    .select_from(models.User)
    .join(models.Post)
    .where(models.Post.published == True)
)

4. Add Database Indexes

# Add composite index for common queries
Index('ix_posts_author_published', 
      models.Post.author_id, 
      models.Post.published)

Conclusion

Database integration is crucial for building production-ready FastAPI applications. Key takeaways:

  • Use SQLAlchemy for robust SQL database integration
  • Separate concerns - Database models vs API schemas
  • Leverage async for better performance
  • Use migrations with Alembic for schema changes
  • Implement repository pattern for clean code
  • Optimize queries to avoid N+1 problems
  • Configure connection pooling properly
  • Use transactions for data integrity

By following these patterns and best practices, you'll build scalable, maintainable APIs that handle data efficiently.

Start building data-driven APIs with FastAPI today! 🚀

Resources

📬 Subscribe to Newsletter

Get the latest blog posts delivered to your inbox every week. No spam, unsubscribe anytime.

We respect your privacy. Unsubscribe at any time.

💬 Comments

Sign in to leave a comment

We'll never post without your permission.