Authentication in FastAPI: Complete Guide to JWT & OAuth2

Authentication is crucial for most APIs. FastAPI provides excellent built-in support for OAuth2 and JWT (JSON Web Tokens), making it easy to implement secure authentication. Let's build a complete authentication system from scratch.
What We'll Build
By the end of this guide, you'll have:
- User registration with password hashing
- Login with JWT token generation
- Protected routes that require authentication
- Token refresh mechanism
- Password reset functionality
- Best practices for production security
Setting Up Dependencies
First, install the required packages:
# Using pip
pip install fastapi uvicorn python-jose[cryptography] passlib[bcrypt] python-multipart
# Using Poetry
poetry add fastapi uvicorn python-jose[cryptography] passlib[bcrypt] python-multipartDependencies explained:
python-jose: JWT token creation and validationpasslib: Password hashing with bcryptpython-multipart: Form data parsing (for OAuth2 password flow)
Project Structure
app/
├── main.py # FastAPI app
├── auth/
│ ├── __init__.py
│ ├── models.py # User models
│ ├── schemas.py # Pydantic schemas
│ ├── security.py # Password & token handling
│ ├── dependencies.py # Auth dependencies
│ └── routes.py # Auth endpoints
└── config.py # ConfigurationConfiguration
# config.py
from pydantic_settings import BaseSettings
class Settings(BaseSettings):
# Security
SECRET_KEY: str = "your-secret-key-change-this-in-production"
ALGORITHM: str = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES: int = 30
REFRESH_TOKEN_EXPIRE_DAYS: int = 7
# Database
DATABASE_URL: str = "sqlite:///./app.db"
class Config:
env_file = ".env"
settings = Settings()Create a .env file:
SECRET_KEY=your-very-secret-key-min-32-characters-long
ALGORITHM=HS256
ACCESS_TOKEN_EXPIRE_MINUTES=30
REFRESH_TOKEN_EXPIRE_DAYS=7
DATABASE_URL=sqlite:///./app.dbImportant: Generate a secure secret key:
# Generate a secure random secret key
openssl rand -hex 32Database Models
# auth/models.py
from sqlalchemy import Boolean, Column, Integer, String, DateTime
from sqlalchemy.ext.declarative import declarative_base
from datetime import datetime
Base = declarative_base()
class User(Base):
__tablename__ = "users"
id = Column(Integer, primary_key=True, index=True)
email = Column(String, unique=True, index=True, nullable=False)
username = Column(String, unique=True, index=True, nullable=False)
hashed_password = Column(String, nullable=False)
full_name = Column(String)
is_active = Column(Boolean, default=True)
is_superuser = Column(Boolean, default=False)
created_at = Column(DateTime, default=datetime.utcnow)
updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
def __repr__(self):
return f"<User {self.username}>"Pydantic Schemas
# auth/schemas.py
from pydantic import BaseModel, EmailStr, Field
from datetime import datetime
from typing import Optional
# User schemas
class UserBase(BaseModel):
email: EmailStr
username: str = Field(..., min_length=3, max_length=50)
full_name: Optional[str] = None
class UserCreate(UserBase):
password: str = Field(..., min_length=8, max_length=100)
class UserUpdate(BaseModel):
email: Optional[EmailStr] = None
full_name: Optional[str] = None
password: Optional[str] = None
class UserInDB(UserBase):
id: int
is_active: bool
is_superuser: bool
created_at: datetime
updated_at: datetime
class Config:
from_attributes = True
class User(UserInDB):
pass
# Token schemas
class Token(BaseModel):
access_token: str
refresh_token: str
token_type: str = "bearer"
class TokenData(BaseModel):
username: Optional[str] = None
scopes: list[str] = []
class RefreshTokenRequest(BaseModel):
refresh_token: strPassword Hashing & Token Creation
# auth/security.py
from datetime import datetime, timedelta
from typing import Optional
from jose import JWTError, jwt
from passlib.context import CryptContext
from config import settings
# Password hashing context
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
def verify_password(plain_password: str, hashed_password: str) -> bool:
"""Verify a password against its hash."""
return pwd_context.verify(plain_password, hashed_password)
def get_password_hash(password: str) -> str:
"""Hash a password."""
return pwd_context.hash(password)
def create_access_token(
data: dict,
expires_delta: Optional[timedelta] = None
) -> str:
"""Create a JWT access token."""
to_encode = data.copy()
if expires_delta:
expire = datetime.utcnow() + expires_delta
else:
expire = datetime.utcnow() + timedelta(
minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES
)
to_encode.update({"exp": expire, "type": "access"})
encoded_jwt = jwt.encode(
to_encode,
settings.SECRET_KEY,
algorithm=settings.ALGORITHM
)
return encoded_jwt
def create_refresh_token(data: dict) -> str:
"""Create a JWT refresh token."""
to_encode = data.copy()
expire = datetime.utcnow() + timedelta(days=settings.REFRESH_TOKEN_EXPIRE_DAYS)
to_encode.update({"exp": expire, "type": "refresh"})
encoded_jwt = jwt.encode(
to_encode,
settings.SECRET_KEY,
algorithm=settings.ALGORITHM
)
return encoded_jwt
def verify_token(token: str, token_type: str = "access") -> Optional[dict]:
"""Verify and decode a JWT token."""
try:
payload = jwt.decode(
token,
settings.SECRET_KEY,
algorithms=[settings.ALGORITHM]
)
# Check token type
if payload.get("type") != token_type:
return None
return payload
except JWTError:
return NoneAuthentication Dependencies
# auth/dependencies.py
from fastapi import Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer
from sqlalchemy.orm import Session
from jose import JWTError
from typing import Optional
from .security import verify_token
from .models import User
from .schemas import TokenData
from database import get_db
# OAuth2 scheme for token extraction
oauth2_scheme = OAuth2PasswordBearer(
tokenUrl="auth/login",
scopes={
"user": "Regular user access",
"admin": "Admin access"
}
)
async def get_current_user(
token: str = Depends(oauth2_scheme),
db: Session = Depends(get_db)
) -> User:
"""Get the current authenticated user from JWT token."""
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
# Verify token
payload = verify_token(token, token_type="access")
if payload is None:
raise credentials_exception
username: str = payload.get("sub")
if username is None:
raise credentials_exception
# Get user from database
user = db.query(User).filter(User.username == username).first()
if user is None:
raise credentials_exception
return user
async def get_current_active_user(
current_user: User = Depends(get_current_user)
) -> User:
"""Ensure the current user is active."""
if not current_user.is_active:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Inactive user"
)
return current_user
async def get_current_superuser(
current_user: User = Depends(get_current_active_user)
) -> User:
"""Ensure the current user is a superuser."""
if not current_user.is_superuser:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Not enough permissions"
)
return current_userAuthentication Routes
# auth/routes.py
from fastapi import APIRouter, Depends, HTTPException, status
from fastapi.security import OAuth2PasswordRequestForm
from sqlalchemy.orm import Session
from datetime import timedelta
from .schemas import User, UserCreate, Token, RefreshTokenRequest
from .models import User as UserModel
from .security import (
verify_password,
get_password_hash,
create_access_token,
create_refresh_token,
verify_token
)
from .dependencies import get_current_active_user
from database import get_db
from config import settings
router = APIRouter(prefix="/auth", tags=["authentication"])
@router.post("/register", response_model=User, status_code=status.HTTP_201_CREATED)
async def register(user_data: UserCreate, db: Session = Depends(get_db)):
"""Register a new user."""
# Check if user already exists
if db.query(UserModel).filter(UserModel.email == user_data.email).first():
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Email already registered"
)
if db.query(UserModel).filter(UserModel.username == user_data.username).first():
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Username already taken"
)
# Create new user
user = UserModel(
email=user_data.email,
username=user_data.username,
full_name=user_data.full_name,
hashed_password=get_password_hash(user_data.password)
)
db.add(user)
db.commit()
db.refresh(user)
return user
@router.post("/login", response_model=Token)
async def login(
form_data: OAuth2PasswordRequestForm = Depends(),
db: Session = Depends(get_db)
):
"""Login with username and password, returns JWT tokens."""
# Find user
user = db.query(UserModel).filter(
UserModel.username == form_data.username
).first()
# Verify credentials
if not user or not verify_password(form_data.password, user.hashed_password):
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect username or password",
headers={"WWW-Authenticate": "Bearer"},
)
# Check if user is active
if not user.is_active:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Inactive user"
)
# Create tokens
access_token = create_access_token(data={"sub": user.username})
refresh_token = create_refresh_token(data={"sub": user.username})
return {
"access_token": access_token,
"refresh_token": refresh_token,
"token_type": "bearer"
}
@router.post("/refresh", response_model=Token)
async def refresh_token(
refresh_data: RefreshTokenRequest,
db: Session = Depends(get_db)
):
"""Refresh access token using refresh token."""
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
# Verify refresh token
payload = verify_token(refresh_data.refresh_token, token_type="refresh")
if payload is None:
raise credentials_exception
username: str = payload.get("sub")
if username is None:
raise credentials_exception
# Verify user still exists and is active
user = db.query(UserModel).filter(UserModel.username == username).first()
if not user or not user.is_active:
raise credentials_exception
# Create new tokens
access_token = create_access_token(data={"sub": user.username})
new_refresh_token = create_refresh_token(data={"sub": user.username})
return {
"access_token": access_token,
"refresh_token": new_refresh_token,
"token_type": "bearer"
}
@router.get("/me", response_model=User)
async def get_me(current_user: UserModel = Depends(get_current_active_user)):
"""Get current user information."""
return current_user
@router.put("/me", response_model=User)
async def update_me(
user_update: UserUpdate,
current_user: UserModel = Depends(get_current_active_user),
db: Session = Depends(get_db)
):
"""Update current user information."""
if user_update.email:
# Check if email is already taken by another user
existing = db.query(UserModel).filter(
UserModel.email == user_update.email,
UserModel.id != current_user.id
).first()
if existing:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Email already registered"
)
current_user.email = user_update.email
if user_update.full_name:
current_user.full_name = user_update.full_name
if user_update.password:
current_user.hashed_password = get_password_hash(user_update.password)
db.commit()
db.refresh(current_user)
return current_userDatabase Setup
# database.py
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from config import settings
engine = create_engine(
settings.DATABASE_URL,
connect_args={"check_same_thread": False} if "sqlite" in settings.DATABASE_URL else {}
)
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
def get_db():
"""Database session dependency."""
db = SessionLocal()
try:
yield db
finally:
db.close()
def init_db():
"""Initialize database tables."""
from auth.models import Base
Base.metadata.create_all(bind=engine)Main Application
# main.py
from fastapi import FastAPI, Depends
from fastapi.middleware.cors import CORSMiddleware
from auth.routes import router as auth_router
from auth.dependencies import get_current_active_user, get_current_superuser
from auth.schemas import User
from database import init_db
app = FastAPI(
title="FastAPI Authentication",
description="Complete authentication system with JWT",
version="1.0.0"
)
# CORS middleware
app.add_middleware(
CORSMiddleware,
allow_origins=["http://localhost:3000"], # Frontend URL
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Initialize database
@app.on_event("startup")
async def startup_event():
init_db()
# Include routers
app.include_router(auth_router)
# Public endpoint
@app.get("/")
async def root():
return {"message": "Welcome to FastAPI Authentication API"}
# Protected endpoint - requires authentication
@app.get("/protected")
async def protected_route(current_user: User = Depends(get_current_active_user)):
return {
"message": "This is a protected route",
"user": current_user.username
}
# Admin-only endpoint
@app.get("/admin")
async def admin_route(current_user: User = Depends(get_current_superuser)):
return {
"message": "This is an admin-only route",
"user": current_user.username
}Testing the API
1. Register a New User
curl -X POST "http://localhost:8000/auth/register" \
-H "Content-Type: application/json" \
-d '{
"email": "user@example.com",
"username": "johndoe",
"full_name": "John Doe",
"password": "secretpassword123"
}'2. Login
curl -X POST "http://localhost:8000/auth/login" \
-H "Content-Type: application/x-www-form-urlencoded" \
-d "username=johndoe&password=secretpassword123"Response:
{
"access_token": "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9...",
"refresh_token": "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9...",
"token_type": "bearer"
}3. Access Protected Route
curl -X GET "http://localhost:8000/protected" \
-H "Authorization: Bearer YOUR_ACCESS_TOKEN"4. Get Current User
curl -X GET "http://localhost:8000/auth/me" \
-H "Authorization: Bearer YOUR_ACCESS_TOKEN"5. Refresh Token
curl -X POST "http://localhost:8000/auth/refresh" \
-H "Content-Type: application/json" \
-d '{
"refresh_token": "YOUR_REFRESH_TOKEN"
}'Frontend Integration Example
React with Axios
// api/auth.js
import axios from 'axios';
const API_URL = 'http://localhost:8000';
// Create axios instance
const api = axios.create({
baseURL: API_URL,
});
// Add token to requests
api.interceptors.request.use((config) => {
const token = localStorage.getItem('access_token');
if (token) {
config.headers.Authorization = `Bearer ${token}`;
}
return config;
});
// Handle token refresh on 401
api.interceptors.response.use(
(response) => response,
async (error) => {
const originalRequest = error.config;
if (error.response.status === 401 && !originalRequest._retry) {
originalRequest._retry = true;
const refreshToken = localStorage.getItem('refresh_token');
if (refreshToken) {
try {
const response = await axios.post(`${API_URL}/auth/refresh`, {
refresh_token: refreshToken,
});
const { access_token, refresh_token } = response.data;
localStorage.setItem('access_token', access_token);
localStorage.setItem('refresh_token', refresh_token);
originalRequest.headers.Authorization = `Bearer ${access_token}`;
return api(originalRequest);
} catch (err) {
// Refresh failed, redirect to login
localStorage.clear();
window.location.href = '/login';
}
}
}
return Promise.reject(error);
}
);
export const authAPI = {
register: (data) => api.post('/auth/register', data),
login: (username, password) =>
api.post('/auth/login',
new URLSearchParams({ username, password }),
{ headers: { 'Content-Type': 'application/x-www-form-urlencoded' }}
),
getMe: () => api.get('/auth/me'),
updateMe: (data) => api.put('/auth/me', data),
};
export default api;Login Component
// components/Login.jsx
import { useState } from 'react';
import { authAPI } from '../api/auth';
function Login() {
const [username, setUsername] = useState('');
const [password, setPassword] = useState('');
const [error, setError] = useState('');
const handleLogin = async (e) => {
e.preventDefault();
setError('');
try {
const response = await authAPI.login(username, password);
const { access_token, refresh_token } = response.data;
localStorage.setItem('access_token', access_token);
localStorage.setItem('refresh_token', refresh_token);
// Redirect to dashboard
window.location.href = '/dashboard';
} catch (err) {
setError(err.response?.data?.detail || 'Login failed');
}
};
return (
<form onSubmit={handleLogin}>
<input
type="text"
placeholder="Username"
value={username}
onChange={(e) => setUsername(e.target.value)}
required
/>
<input
type="password"
placeholder="Password"
value={password}
onChange={(e) => setPassword(e.target.value)}
required
/>
{error && <p className="error">{error}</p>}
<button type="submit">Login</button>
</form>
);
}
export default Login;Security Best Practices
1. Strong Secret Keys
# Generate a secure secret key
import secrets
secret_key = secrets.token_urlsafe(32)
print(secret_key) # Use this in your .env file2. Environment Variables
Never hardcode secrets:
# ❌ Bad
SECRET_KEY = "my-secret-key"
# ✅ Good
SECRET_KEY = os.getenv("SECRET_KEY")3. Password Requirements
# schemas.py
from pydantic import validator
import re
class UserCreate(BaseModel):
password: str = Field(..., min_length=8)
@validator('password')
def password_strength(cls, v):
if not re.search(r'[A-Z]', v):
raise ValueError('Password must contain uppercase letter')
if not re.search(r'[a-z]', v):
raise ValueError('Password must contain lowercase letter')
if not re.search(r'\d', v):
raise ValueError('Password must contain digit')
if not re.search(r'[!@#$%^&*(),.?":{}|<>]', v):
raise ValueError('Password must contain special character')
return v4. Rate Limiting
from slowapi import Limiter, _rate_limit_exceeded_handler
from slowapi.util import get_remote_address
from slowapi.errors import RateLimitExceeded
limiter = Limiter(key_func=get_remote_address)
app.state.limiter = limiter
app.add_exception_handler(RateLimitExceeded, _rate_limit_exceeded_handler)
@router.post("/login")
@limiter.limit("5/minute") # 5 attempts per minute
async def login(request: Request, ...):
...5. Token Expiration
# Short-lived access tokens
ACCESS_TOKEN_EXPIRE_MINUTES = 15 # 15 minutes
# Longer-lived refresh tokens
REFRESH_TOKEN_EXPIRE_DAYS = 7 # 7 days6. HTTPS Only in Production
# In production
from fastapi.middleware.httpsredirect import HTTPSRedirectMiddleware
if not settings.DEBUG:
app.add_middleware(HTTPSRedirectMiddleware)7. Store Tokens Securely
// ❌ Bad - localStorage vulnerable to XSS
localStorage.setItem('token', token);
// ✅ Better - httpOnly cookies (set from backend)
// Backend sets cookie:
response.set_cookie(
key="access_token",
value=access_token,
httponly=True,
secure=True,
samesite="lax"
)Advanced Features
Email Verification
import secrets
from datetime import datetime, timedelta
class User(Base):
# ... existing fields
email_verified = Column(Boolean, default=False)
verification_token = Column(String, nullable=True)
verification_token_expires = Column(DateTime, nullable=True)
async def send_verification_email(user: User, db: Session):
"""Generate and send verification email."""
token = secrets.token_urlsafe(32)
expires = datetime.utcnow() + timedelta(hours=24)
user.verification_token = token
user.verification_token_expires = expires
db.commit()
# Send email with verification link
verification_url = f"http://yourapp.com/verify?token={token}"
# ... send email logic
@router.get("/verify")
async def verify_email(token: str, db: Session = Depends(get_db)):
"""Verify user email."""
user = db.query(User).filter(
User.verification_token == token,
User.verification_token_expires > datetime.utcnow()
).first()
if not user:
raise HTTPException(status_code=400, detail="Invalid or expired token")
user.email_verified = True
user.verification_token = None
user.verification_token_expires = None
db.commit()
return {"message": "Email verified successfully"}Password Reset
@router.post("/forgot-password")
async def forgot_password(email: EmailStr, db: Session = Depends(get_db)):
"""Send password reset email."""
user = db.query(User).filter(User.email == email).first()
if user:
# Generate reset token
reset_token = secrets.token_urlsafe(32)
# Store token and expiry in database
# Send email with reset link
pass
# Always return success to prevent email enumeration
return {"message": "If email exists, reset link has been sent"}
@router.post("/reset-password")
async def reset_password(
token: str,
new_password: str,
db: Session = Depends(get_db)
):
"""Reset password with token."""
# Verify token and update password
passTesting Authentication
# tests/test_auth.py
import pytest
from fastapi.testclient import TestClient
from main import app
client = TestClient(app)
def test_register():
response = client.post("/auth/register", json={
"email": "test@example.com",
"username": "testuser",
"password": "Test123!@#",
"full_name": "Test User"
})
assert response.status_code == 201
data = response.json()
assert data["email"] == "test@example.com"
assert "hashed_password" not in data
def test_login():
response = client.post("/auth/login", data={
"username": "testuser",
"password": "Test123!@#"
})
assert response.status_code == 200
data = response.json()
assert "access_token" in data
assert "refresh_token" in data
assert data["token_type"] == "bearer"
def test_protected_route_without_token():
response = client.get("/protected")
assert response.status_code == 401
def test_protected_route_with_token():
# Login first
login_response = client.post("/auth/login", data={
"username": "testuser",
"password": "Test123!@#"
})
token = login_response.json()["access_token"]
# Access protected route
response = client.get(
"/protected",
headers={"Authorization": f"Bearer {token}"}
)
assert response.status_code == 200
assert "message" in response.json()
def test_refresh_token():
# Login and get refresh token
login_response = client.post("/auth/login", data={
"username": "testuser",
"password": "Test123!@#"
})
refresh_token = login_response.json()["refresh_token"]
# Refresh
response = client.post("/auth/refresh", json={
"refresh_token": refresh_token
})
assert response.status_code == 200
data = response.json()
assert "access_token" in dataConclusion
You now have a complete, production-ready authentication system in FastAPI:
✅ User registration with password hashing
✅ JWT-based authentication with access and refresh tokens
✅ OAuth2 password flow for standard compliance
✅ Protected routes with dependency injection
✅ Token refresh mechanism
✅ Security best practices implemented
✅ Frontend integration examples
✅ Testing setup
Key Takeaways
- Never store plain passwords - always hash with bcrypt
- Use short-lived access tokens - 15-30 minutes
- Implement refresh tokens - for seamless user experience
- Validate tokens properly - check expiry and type
- Secure your secret keys - use environment variables
- Rate limit login attempts - prevent brute force attacks
- Use HTTPS in production - encrypt data in transit
Next Steps
- Add social OAuth (Google, GitHub)
- Implement 2FA (Two-Factor Authentication)
- Add role-based access control (RBAC)
- Set up email verification
- Add password reset functionality
- Implement session management
- Add login attempt tracking
Your FastAPI app is now secure and ready for production! 🔒
📬 Subscribe to Newsletter
Get the latest blog posts delivered to your inbox every week. No spam, unsubscribe anytime.
We respect your privacy. Unsubscribe at any time.
💬 Comments
Sign in to leave a comment
We'll never post without your permission.