Back to blog

Authorization in FastAPI: RBAC, Permissions & Access Control

pythonfastapiauthorizationrbacsecuritypermissions
Authorization in FastAPI: RBAC, Permissions & Access Control

After implementing authentication (verifying who users are), the next critical step is authorization (determining what they can do). FastAPI provides powerful tools for implementing sophisticated authorization systems. Let's build a complete authorization system from scratch.

Authentication vs Authorization

Authentication: Who are you?

  • Login with username/password
  • JWT tokens
  • OAuth2 flows

Authorization: What can you do?

  • Role-Based Access Control (RBAC)
  • Permission checks
  • Resource ownership
  • Policy-based decisions

What We'll Build

By the end of this guide, you'll have:

  • Role-Based Access Control (RBAC) system
  • Permission-based authorization
  • OAuth2 scopes integration
  • Resource-level authorization
  • Dynamic permission checking
  • Policy-based access control
  • Best practices for production

Project Setup

Building on our authentication system, let's add authorization:

# Install dependencies (if not already installed)
poetry add fastapi uvicorn sqlalchemy python-jose[cryptography] passlib[bcrypt]

Database Models with Roles

# models.py
from sqlalchemy import Boolean, Column, Integer, String, DateTime, Table, ForeignKey
from sqlalchemy.orm import relationship
from sqlalchemy.ext.declarative import declarative_base
from datetime import datetime
 
Base = declarative_base()
 
# Many-to-many relationship tables
user_roles = Table(
    'user_roles',
    Base.metadata,
    Column('user_id', Integer, ForeignKey('users.id')),
    Column('role_id', Integer, ForeignKey('roles.id'))
)
 
role_permissions = Table(
    'role_permissions',
    Base.metadata,
    Column('role_id', Integer, ForeignKey('roles.id')),
    Column('permission_id', Integer, ForeignKey('permissions.id'))
)
 
class User(Base):
    __tablename__ = "users"
    
    id = Column(Integer, primary_key=True, index=True)
    email = Column(String, unique=True, index=True, nullable=False)
    username = Column(String, unique=True, index=True, nullable=False)
    hashed_password = Column(String, nullable=False)
    full_name = Column(String)
    is_active = Column(Boolean, default=True)
    created_at = Column(DateTime, default=datetime.utcnow)
    
    # Relationships
    roles = relationship("Role", secondary=user_roles, back_populates="users")
    posts = relationship("Post", back_populates="author")
    
    def has_role(self, role_name: str) -> bool:
        """Check if user has a specific role."""
        return any(role.name == role_name for role in self.roles)
    
    def has_permission(self, permission_name: str) -> bool:
        """Check if user has a specific permission."""
        for role in self.roles:
            if any(perm.name == permission_name for perm in role.permissions):
                return True
        return False
    
    def get_permissions(self) -> set[str]:
        """Get all permissions for user."""
        permissions = set()
        for role in self.roles:
            for permission in role.permissions:
                permissions.add(permission.name)
        return permissions
 
class Role(Base):
    __tablename__ = "roles"
    
    id = Column(Integer, primary_key=True, index=True)
    name = Column(String, unique=True, nullable=False, index=True)
    description = Column(String)
    created_at = Column(DateTime, default=datetime.utcnow)
    
    # Relationships
    users = relationship("User", secondary=user_roles, back_populates="roles")
    permissions = relationship("Permission", secondary=role_permissions, back_populates="roles")
 
class Permission(Base):
    __tablename__ = "permissions"
    
    id = Column(Integer, primary_key=True, index=True)
    name = Column(String, unique=True, nullable=False, index=True)
    description = Column(String)
    resource = Column(String)  # e.g., "posts", "users", "comments"
    action = Column(String)     # e.g., "read", "create", "update", "delete"
    created_at = Column(DateTime, default=datetime.utcnow)
    
    # Relationships
    roles = relationship("Role", secondary=role_permissions, back_populates="permissions")
 
class Post(Base):
    __tablename__ = "posts"
    
    id = Column(Integer, primary_key=True, index=True)
    title = Column(String, nullable=False)
    content = Column(String, nullable=False)
    author_id = Column(Integer, ForeignKey("users.id"))
    is_published = Column(Boolean, default=False)
    created_at = Column(DateTime, default=datetime.utcnow)
    updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
    
    # Relationships
    author = relationship("User", back_populates="posts")

Pydantic Schemas

# schemas.py
from pydantic import BaseModel
from datetime import datetime
from typing import Optional
 
class PermissionBase(BaseModel):
    name: str
    description: Optional[str] = None
    resource: str
    action: str
 
class Permission(PermissionBase):
    id: int
    created_at: datetime
    
    class Config:
        from_attributes = True
 
class RoleBase(BaseModel):
    name: str
    description: Optional[str] = None
 
class RoleCreate(RoleBase):
    permission_ids: list[int] = []
 
class Role(RoleBase):
    id: int
    created_at: datetime
    permissions: list[Permission] = []
    
    class Config:
        from_attributes = True
 
class UserWithRoles(BaseModel):
    id: int
    username: str
    email: str
    full_name: Optional[str]
    roles: list[Role] = []
    
    class Config:
        from_attributes = True
 
class PostBase(BaseModel):
    title: str
    content: str
    is_published: bool = False
 
class PostCreate(PostBase):
    pass
 
class PostUpdate(BaseModel):
    title: Optional[str] = None
    content: Optional[str] = None
    is_published: Optional[bool] = None
 
class Post(PostBase):
    id: int
    author_id: int
    created_at: datetime
    updated_at: datetime
    
    class Config:
        from_attributes = True

Authorization Dependencies

# dependencies.py
from fastapi import Depends, HTTPException, status
from sqlalchemy.orm import Session
from typing import Callable
 
from .models import User
from .auth import get_current_active_user
from database import get_db
 
def require_role(required_role: str):
    """Dependency to require a specific role."""
    async def role_checker(
        current_user: User = Depends(get_current_active_user)
    ) -> User:
        if not current_user.has_role(required_role):
            raise HTTPException(
                status_code=status.HTTP_403_FORBIDDEN,
                detail=f"Role '{required_role}' required"
            )
        return current_user
    return role_checker
 
def require_permission(required_permission: str):
    """Dependency to require a specific permission."""
    async def permission_checker(
        current_user: User = Depends(get_current_active_user)
    ) -> User:
        if not current_user.has_permission(required_permission):
            raise HTTPException(
                status_code=status.HTTP_403_FORBIDDEN,
                detail=f"Permission '{required_permission}' required"
            )
        return current_user
    return permission_checker
 
def require_any_role(*roles: str):
    """Dependency to require any of the specified roles."""
    async def role_checker(
        current_user: User = Depends(get_current_active_user)
    ) -> User:
        if not any(current_user.has_role(role) for role in roles):
            raise HTTPException(
                status_code=status.HTTP_403_FORBIDDEN,
                detail=f"One of these roles required: {', '.join(roles)}"
            )
        return current_user
    return role_checker
 
def require_all_roles(*roles: str):
    """Dependency to require all specified roles."""
    async def role_checker(
        current_user: User = Depends(get_current_active_user)
    ) -> User:
        if not all(current_user.has_role(role) for role in roles):
            raise HTTPException(
                status_code=status.HTTP_403_FORBIDDEN,
                detail=f"All these roles required: {', '.join(roles)}"
            )
        return current_user
    return role_checker
 
def require_any_permission(*permissions: str):
    """Dependency to require any of the specified permissions."""
    async def permission_checker(
        current_user: User = Depends(get_current_active_user)
    ) -> User:
        if not any(current_user.has_permission(perm) for perm in permissions):
            raise HTTPException(
                status_code=status.HTTP_403_FORBIDDEN,
                detail=f"One of these permissions required: {', '.join(permissions)}"
            )
        return current_user
    return permission_checker

Using Authorization in Routes

1. Role-Based Authorization

# routes/admin.py
from fastapi import APIRouter, Depends
from dependencies import require_role, require_any_role
 
router = APIRouter(prefix="/admin", tags=["admin"])
 
@router.get("/dashboard")
async def admin_dashboard(
    current_user: User = Depends(require_role("admin"))
):
    """Only accessible by users with 'admin' role."""
    return {
        "message": "Welcome to admin dashboard",
        "user": current_user.username
    }
 
@router.get("/users")
async def list_users(
    current_user: User = Depends(require_any_role("admin", "moderator")),
    db: Session = Depends(get_db)
):
    """Accessible by admin or moderator."""
    users = db.query(User).all()
    return users
 
@router.delete("/users/{user_id}")
async def delete_user(
    user_id: int,
    current_user: User = Depends(require_role("admin")),
    db: Session = Depends(get_db)
):
    """Only admins can delete users."""
    user = db.query(User).filter(User.id == user_id).first()
    if not user:
        raise HTTPException(status_code=404, detail="User not found")
    
    db.delete(user)
    db.commit()
    return {"message": "User deleted"}

2. Permission-Based Authorization

# routes/posts.py
from fastapi import APIRouter, Depends, HTTPException, status
from sqlalchemy.orm import Session
 
from dependencies import require_permission, get_current_active_user
from schemas import Post, PostCreate, PostUpdate
from models import Post as PostModel, User
 
router = APIRouter(prefix="/posts", tags=["posts"])
 
@router.get("/")
async def list_posts(
    db: Session = Depends(get_db),
    current_user: User = Depends(require_permission("posts:read"))
):
    """List all posts - requires 'posts:read' permission."""
    posts = db.query(PostModel).all()
    return posts
 
@router.post("/", response_model=Post)
async def create_post(
    post_data: PostCreate,
    db: Session = Depends(get_db),
    current_user: User = Depends(require_permission("posts:create"))
):
    """Create a new post - requires 'posts:create' permission."""
    post = PostModel(
        title=post_data.title,
        content=post_data.content,
        is_published=post_data.is_published,
        author_id=current_user.id
    )
    db.add(post)
    db.commit()
    db.refresh(post)
    return post
 
@router.put("/{post_id}", response_model=Post)
async def update_post(
    post_id: int,
    post_data: PostUpdate,
    db: Session = Depends(get_db),
    current_user: User = Depends(get_current_active_user)
):
    """Update a post - requires ownership or 'posts:update' permission."""
    post = db.query(PostModel).filter(PostModel.id == post_id).first()
    if not post:
        raise HTTPException(status_code=404, detail="Post not found")
    
    # Check authorization: owner or has permission
    if post.author_id != current_user.id and not current_user.has_permission("posts:update"):
        raise HTTPException(
            status_code=status.HTTP_403_FORBIDDEN,
            detail="Not authorized to update this post"
        )
    
    # Update fields
    if post_data.title is not None:
        post.title = post_data.title
    if post_data.content is not None:
        post.content = post_data.content
    if post_data.is_published is not None:
        post.is_published = post_data.is_published
    
    db.commit()
    db.refresh(post)
    return post
 
@router.delete("/{post_id}")
async def delete_post(
    post_id: int,
    db: Session = Depends(get_db),
    current_user: User = Depends(get_current_active_user)
):
    """Delete a post - requires ownership or 'posts:delete' permission."""
    post = db.query(PostModel).filter(PostModel.id == post_id).first()
    if not post:
        raise HTTPException(status_code=404, detail="Post not found")
    
    # Check authorization: owner or has permission
    if post.author_id != current_user.id and not current_user.has_permission("posts:delete"):
        raise HTTPException(
            status_code=status.HTTP_403_FORBIDDEN,
            detail="Not authorized to delete this post"
        )
    
    db.delete(post)
    db.commit()
    return {"message": "Post deleted"}

Resource Ownership Authorization

# dependencies.py (continued)
from typing import Type, Callable
from sqlalchemy.orm import Session
 
def require_resource_owner(
    model: Type[Base],
    owner_field: str = "author_id"
):
    """
    Dependency to ensure user owns the resource.
    
    Args:
        model: SQLAlchemy model class
        owner_field: Name of the field containing owner's user_id
    """
    async def ownership_checker(
        resource_id: int,
        current_user: User = Depends(get_current_active_user),
        db: Session = Depends(get_db)
    ):
        resource = db.query(model).filter(model.id == resource_id).first()
        if not resource:
            raise HTTPException(status_code=404, detail="Resource not found")
        
        owner_id = getattr(resource, owner_field)
        if owner_id != current_user.id:
            raise HTTPException(
                status_code=status.HTTP_403_FORBIDDEN,
                detail="You don't own this resource"
            )
        
        return resource
    
    return ownership_checker
 
# Usage in routes
@router.put("/posts/{post_id}")
async def update_my_post(
    post_id: int,
    post_data: PostUpdate,
    post: Post = Depends(require_resource_owner(PostModel)),
    db: Session = Depends(get_db)
):
    """Update own post only."""
    # Update post...
    pass

OAuth2 Scopes Integration

# security.py
from fastapi.security import OAuth2PasswordBearer, SecurityScopes
from fastapi import Security, HTTPException, status
from pydantic import ValidationError
 
# Define OAuth2 scheme with scopes
oauth2_scheme = OAuth2PasswordBearer(
    tokenUrl="auth/login",
    scopes={
        "users:read": "Read user information",
        "users:write": "Modify user information",
        "posts:read": "Read posts",
        "posts:write": "Create and update posts",
        "posts:delete": "Delete posts",
        "admin": "Admin access to all resources",
    }
)
 
async def get_current_user_with_scopes(
    security_scopes: SecurityScopes,
    token: str = Depends(oauth2_scheme),
    db: Session = Depends(get_db)
) -> User:
    """Get current user and validate OAuth2 scopes."""
    if security_scopes.scopes:
        authenticate_value = f'Bearer scope="{security_scopes.scope_str}"'
    else:
        authenticate_value = "Bearer"
    
    credentials_exception = HTTPException(
        status_code=status.HTTP_401_UNAUTHORIZED,
        detail="Could not validate credentials",
        headers={"WWW-Authenticate": authenticate_value},
    )
    
    # Verify token
    payload = verify_token(token, token_type="access")
    if payload is None:
        raise credentials_exception
    
    username: str = payload.get("sub")
    token_scopes = payload.get("scopes", [])
    
    # Get user
    user = db.query(User).filter(User.username == username).first()
    if user is None:
        raise credentials_exception
    
    # Check scopes
    for scope in security_scopes.scopes:
        if scope not in token_scopes:
            raise HTTPException(
                status_code=status.HTTP_403_FORBIDDEN,
                detail="Not enough permissions",
                headers={"WWW-Authenticate": authenticate_value},
            )
    
    return user
 
# Usage in routes
@router.get("/users/me")
async def read_users_me(
    current_user: User = Security(get_current_user_with_scopes, scopes=["users:read"])
):
    """Read own user info - requires 'users:read' scope."""
    return current_user
 
@router.put("/users/me")
async def update_users_me(
    user_update: UserUpdate,
    current_user: User = Security(get_current_user_with_scopes, scopes=["users:write"]),
    db: Session = Depends(get_db)
):
    """Update own user info - requires 'users:write' scope."""
    # Update user...
    pass

Dynamic Authorization Policies

# policies.py
from typing import Protocol, runtime_checkable
from models import User, Post
 
@runtime_checkable
class Policy(Protocol):
    """Base protocol for authorization policies."""
    def authorize(self, user: User, resource=None) -> bool:
        """Check if user is authorized."""
        ...
 
class PostPolicy:
    """Authorization policy for posts."""
    
    @staticmethod
    def can_read(user: User, post: Post) -> bool:
        """Check if user can read a post."""
        # Published posts are public
        if post.is_published:
            return True
        # Owners can read their own drafts
        if post.author_id == user.id:
            return True
        # Admins can read everything
        if user.has_role("admin"):
            return True
        return False
    
    @staticmethod
    def can_update(user: User, post: Post) -> bool:
        """Check if user can update a post."""
        # Owners can update their posts
        if post.author_id == user.id:
            return True
        # Editors can update any post
        if user.has_permission("posts:update"):
            return True
        return False
    
    @staticmethod
    def can_delete(user: User, post: Post) -> bool:
        """Check if user can delete a post."""
        # Owners can delete their posts
        if post.author_id == user.id:
            return True
        # Admins can delete any post
        if user.has_role("admin"):
            return True
        return False
    
    @staticmethod
    def can_publish(user: User, post: Post) -> bool:
        """Check if user can publish a post."""
        # Owners can publish their posts
        if post.author_id == user.id:
            return True
        # Publishers can publish any post
        if user.has_permission("posts:publish"):
            return True
        return False
 
# Dependency for policy checking
def authorize_post_action(action: str):
    """Dependency to check post authorization using policy."""
    async def policy_checker(
        post_id: int,
        current_user: User = Depends(get_current_active_user),
        db: Session = Depends(get_db)
    ):
        post = db.query(Post).filter(Post.id == post_id).first()
        if not post:
            raise HTTPException(status_code=404, detail="Post not found")
        
        policy_method = getattr(PostPolicy, f"can_{action}", None)
        if not policy_method or not policy_method(current_user, post):
            raise HTTPException(
                status_code=status.HTTP_403_FORBIDDEN,
                detail=f"Not authorized to {action} this post"
            )
        
        return post
    
    return policy_checker
 
# Usage in routes
@router.put("/posts/{post_id}")
async def update_post(
    post_id: int,
    post_data: PostUpdate,
    post: Post = Depends(authorize_post_action("update")),
    db: Session = Depends(get_db)
):
    """Update post with policy-based authorization."""
    # Update logic...
    pass
 
@router.post("/posts/{post_id}/publish")
async def publish_post(
    post_id: int,
    post: Post = Depends(authorize_post_action("publish")),
    db: Session = Depends(get_db)
):
    """Publish post - checked by policy."""
    post.is_published = True
    db.commit()
    return {"message": "Post published"}

Seeding Roles and Permissions

# seed.py
from sqlalchemy.orm import Session
from models import Role, Permission, User, user_roles
from database import SessionLocal, engine, Base
 
def seed_permissions(db: Session):
    """Create initial permissions."""
    permissions = [
        # User permissions
        Permission(name="users:read", description="Read users", resource="users", action="read"),
        Permission(name="users:create", description="Create users", resource="users", action="create"),
        Permission(name="users:update", description="Update users", resource="users", action="update"),
        Permission(name="users:delete", description="Delete users", resource="users", action="delete"),
        
        # Post permissions
        Permission(name="posts:read", description="Read posts", resource="posts", action="read"),
        Permission(name="posts:create", description="Create posts", resource="posts", action="create"),
        Permission(name="posts:update", description="Update any post", resource="posts", action="update"),
        Permission(name="posts:delete", description="Delete any post", resource="posts", action="delete"),
        Permission(name="posts:publish", description="Publish posts", resource="posts", action="publish"),
    ]
    
    for perm in permissions:
        existing = db.query(Permission).filter(Permission.name == perm.name).first()
        if not existing:
            db.add(perm)
    
    db.commit()
 
def seed_roles(db: Session):
    """Create initial roles with permissions."""
    # Get permissions
    all_permissions = {p.name: p for p in db.query(Permission).all()}
    
    # Define roles
    roles_config = {
        "user": {
            "description": "Regular user",
            "permissions": ["posts:read", "posts:create"]
        },
        "author": {
            "description": "Content author",
            "permissions": ["posts:read", "posts:create"]
        },
        "editor": {
            "description": "Content editor",
            "permissions": ["posts:read", "posts:create", "posts:update", "posts:publish"]
        },
        "moderator": {
            "description": "Content moderator",
            "permissions": ["users:read", "posts:read", "posts:update", "posts:delete"]
        },
        "admin": {
            "description": "Administrator with full access",
            "permissions": list(all_permissions.keys())
        }
    }
    
    for role_name, config in roles_config.items():
        role = db.query(Role).filter(Role.name == role_name).first()
        if not role:
            role = Role(name=role_name, description=config["description"])
            db.add(role)
            db.flush()
        
        # Add permissions to role
        role.permissions = [all_permissions[p] for p in config["permissions"] if p in all_permissions]
    
    db.commit()
 
def seed_database():
    """Seed the database with initial data."""
    Base.metadata.create_all(bind=engine)
    
    db = SessionLocal()
    try:
        print("Seeding permissions...")
        seed_permissions(db)
        
        print("Seeding roles...")
        seed_roles(db)
        
        print("Database seeded successfully!")
    finally:
        db.close()
 
if __name__ == "__main__":
    seed_database()

Run the seed script:

python seed.py

Admin Routes for Role Management

# routes/admin.py
from fastapi import APIRouter, Depends, HTTPException
from sqlalchemy.orm import Session
 
from dependencies import require_role
from schemas import Role, RoleCreate, UserWithRoles
from models import Role as RoleModel, User, Permission
 
router = APIRouter(prefix="/admin", tags=["admin"])
 
@router.post("/roles", response_model=Role)
async def create_role(
    role_data: RoleCreate,
    db: Session = Depends(get_db),
    current_user: User = Depends(require_role("admin"))
):
    """Create a new role with permissions."""
    # Check if role exists
    existing = db.query(RoleModel).filter(RoleModel.name == role_data.name).first()
    if existing:
        raise HTTPException(status_code=400, detail="Role already exists")
    
    # Create role
    role = RoleModel(name=role_data.name, description=role_data.description)
    
    # Add permissions
    if role_data.permission_ids:
        permissions = db.query(Permission).filter(
            Permission.id.in_(role_data.permission_ids)
        ).all()
        role.permissions = permissions
    
    db.add(role)
    db.commit()
    db.refresh(role)
    return role
 
@router.post("/users/{user_id}/roles/{role_id}")
async def assign_role_to_user(
    user_id: int,
    role_id: int,
    db: Session = Depends(get_db),
    current_user: User = Depends(require_role("admin"))
):
    """Assign a role to a user."""
    user = db.query(User).filter(User.id == user_id).first()
    if not user:
        raise HTTPException(status_code=404, detail="User not found")
    
    role = db.query(RoleModel).filter(RoleModel.id == role_id).first()
    if not role:
        raise HTTPException(status_code=404, detail="Role not found")
    
    if role not in user.roles:
        user.roles.append(role)
        db.commit()
    
    return {"message": f"Role '{role.name}' assigned to user '{user.username}'"}
 
@router.delete("/users/{user_id}/roles/{role_id}")
async def remove_role_from_user(
    user_id: int,
    role_id: int,
    db: Session = Depends(get_db),
    current_user: User = Depends(require_role("admin"))
):
    """Remove a role from a user."""
    user = db.query(User).filter(User.id == user_id).first()
    if not user:
        raise HTTPException(status_code=404, detail="User not found")
    
    role = db.query(RoleModel).filter(RoleModel.id == role_id).first()
    if not role:
        raise HTTPException(status_code=404, detail="Role not found")
    
    if role in user.roles:
        user.roles.remove(role)
        db.commit()
    
    return {"message": f"Role '{role.name}' removed from user '{user.username}'"}
 
@router.get("/users/{user_id}", response_model=UserWithRoles)
async def get_user_with_roles(
    user_id: int,
    db: Session = Depends(get_db),
    current_user: User = Depends(require_role("admin"))
):
    """Get user with their roles and permissions."""
    user = db.query(User).filter(User.id == user_id).first()
    if not user:
        raise HTTPException(status_code=404, detail="User not found")
    return user

Testing Authorization

# tests/test_authorization.py
import pytest
from fastapi.testclient import TestClient
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
 
from main import app
from database import Base, get_db
from models import User, Role, Permission
 
# Test database
SQLALCHEMY_DATABASE_URL = "sqlite:///./test.db"
engine = create_engine(SQLALCHEMY_DATABASE_URL, connect_args={"check_same_thread": False})
TestingSessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
 
def override_get_db():
    db = TestingSessionLocal()
    try:
        yield db
    finally:
        db.close()
 
app.dependency_overrides[get_db] = override_get_db
client = TestClient(app)
 
@pytest.fixture
def setup_database():
    Base.metadata.create_all(bind=engine)
    yield
    Base.metadata.drop_all(bind=engine)
 
@pytest.fixture
def setup_roles_and_users(setup_database):
    db = TestingSessionLocal()
    
    # Create permissions
    read_perm = Permission(name="posts:read", resource="posts", action="read")
    create_perm = Permission(name="posts:create", resource="posts", action="create")
    update_perm = Permission(name="posts:update", resource="posts", action="update")
    delete_perm = Permission(name="posts:delete", resource="posts", action="delete")
    
    db.add_all([read_perm, create_perm, update_perm, delete_perm])
    db.commit()
    
    # Create roles
    user_role = Role(name="user", permissions=[read_perm, create_perm])
    admin_role = Role(name="admin", permissions=[read_perm, create_perm, update_perm, delete_perm])
    
    db.add_all([user_role, admin_role])
    db.commit()
    
    # Create users
    regular_user = User(username="user", email="user@test.com", hashed_password="hashed", roles=[user_role])
    admin_user = User(username="admin", email="admin@test.com", hashed_password="hashed", roles=[admin_role])
    
    db.add_all([regular_user, admin_user])
    db.commit()
    
    db.close()
 
def test_role_based_access(setup_roles_and_users):
    # Login as regular user
    response = client.post("/auth/login", data={"username": "user", "password": "password"})
    user_token = response.json()["access_token"]
    
    # Try to access admin route
    response = client.get(
        "/admin/dashboard",
        headers={"Authorization": f"Bearer {user_token}"}
    )
    assert response.status_code == 403
    
    # Login as admin
    response = client.post("/auth/login", data={"username": "admin", "password": "password"})
    admin_token = response.json()["access_token"]
    
    # Access admin route
    response = client.get(
        "/admin/dashboard",
        headers={"Authorization": f"Bearer {admin_token}"}
    )
    assert response.status_code == 200
 
def test_permission_based_access(setup_roles_and_users):
    # Login as user
    response = client.post("/auth/login", data={"username": "user", "password": "password"})
    user_token = response.json()["access_token"]
    
    # Can read posts
    response = client.get(
        "/posts",
        headers={"Authorization": f"Bearer {user_token}"}
    )
    assert response.status_code == 200
    
    # Cannot delete posts (no permission)
    response = client.delete(
        "/posts/1",
        headers={"Authorization": f"Bearer {user_token}"}
    )
    assert response.status_code == 403

Best Practices

1. Principle of Least Privilege

# ❌ Bad: Give admin to everyone
user.roles = [admin_role]
 
# ✅ Good: Give only necessary permissions
user.roles = [user_role]

2. Separate Roles and Permissions

# Roles are collections of permissions
# Permissions are atomic actions
 
# Good hierarchy:
# Role: "Editor" -> Permissions: ["posts:read", "posts:update", "posts:publish"]
# Role: "Author" -> Permissions: ["posts:read", "posts:create"]

3. Resource-Level Authorization

# Check both permission AND ownership
def can_update_post(user: User, post: Post) -> bool:
    # Owner can always update
    if post.author_id == user.id:
        return True
    
    # Or has general update permission
    if user.has_permission("posts:update"):
        return True
    
    return False

4. Use Descriptive Permission Names

# ✅ Good: Clear and specific
"posts:create"
"posts:update"
"posts:delete"
"users:manage"
 
# ❌ Bad: Vague
"edit"
"manage"
"access"

5. Cache Permission Checks

from functools import lru_cache
 
@lru_cache(maxsize=128)
def get_user_permissions(user_id: int) -> set[str]:
    """Cache user permissions to avoid repeated DB queries."""
    db = SessionLocal()
    user = db.query(User).filter(User.id == user_id).first()
    permissions = user.get_permissions() if user else set()
    db.close()
    return permissions

Conclusion

You now have a complete authorization system in FastAPI:

Role-Based Access Control (RBAC) with flexible role assignments
Permission-based authorization for fine-grained control
OAuth2 scopes for API access control
Resource ownership verification
Policy-based authorization for complex business logic
Dynamic permission checking with caching
Admin tools for role and permission management
Testing authorization flows

Key Takeaways

  1. Authentication ≠ Authorization - They serve different purposes
  2. Use RBAC for simplicity - Group permissions into roles
  3. Check ownership - Not just permissions, but also resource ownership
  4. Implement policies - For complex authorization logic
  5. Cache permissions - Improve performance
  6. Test thoroughly - Ensure security rules work correctly
  7. Audit access - Log authorization decisions

Next Steps

  • Add audit logging for authorization events
  • Implement hierarchical roles (role inheritance)
  • Add time-based access control
  • Implement IP-based restrictions
  • Add attribute-based access control (ABAC)
  • Create authorization middleware
  • Build admin UI for role management

Your FastAPI application now has enterprise-grade authorization! 🔐

📬 Subscribe to Newsletter

Get the latest blog posts delivered to your inbox every week. No spam, unsubscribe anytime.

We respect your privacy. Unsubscribe at any time.

💬 Comments

Sign in to leave a comment

We'll never post without your permission.