Authorization in FastAPI: RBAC, Permissions & Access Control

After implementing authentication (verifying who users are), the next critical step is authorization (determining what they can do). FastAPI provides powerful tools for implementing sophisticated authorization systems. Let's build a complete authorization system from scratch.
Authentication vs Authorization
Authentication: Who are you?
- Login with username/password
- JWT tokens
- OAuth2 flows
Authorization: What can you do?
- Role-Based Access Control (RBAC)
- Permission checks
- Resource ownership
- Policy-based decisions
What We'll Build
By the end of this guide, you'll have:
- Role-Based Access Control (RBAC) system
- Permission-based authorization
- OAuth2 scopes integration
- Resource-level authorization
- Dynamic permission checking
- Policy-based access control
- Best practices for production
Project Setup
Building on our authentication system, let's add authorization:
# Install dependencies (if not already installed)
poetry add fastapi uvicorn sqlalchemy python-jose[cryptography] passlib[bcrypt]Database Models with Roles
# models.py
from sqlalchemy import Boolean, Column, Integer, String, DateTime, Table, ForeignKey
from sqlalchemy.orm import relationship
from sqlalchemy.ext.declarative import declarative_base
from datetime import datetime
Base = declarative_base()
# Many-to-many relationship tables
user_roles = Table(
'user_roles',
Base.metadata,
Column('user_id', Integer, ForeignKey('users.id')),
Column('role_id', Integer, ForeignKey('roles.id'))
)
role_permissions = Table(
'role_permissions',
Base.metadata,
Column('role_id', Integer, ForeignKey('roles.id')),
Column('permission_id', Integer, ForeignKey('permissions.id'))
)
class User(Base):
__tablename__ = "users"
id = Column(Integer, primary_key=True, index=True)
email = Column(String, unique=True, index=True, nullable=False)
username = Column(String, unique=True, index=True, nullable=False)
hashed_password = Column(String, nullable=False)
full_name = Column(String)
is_active = Column(Boolean, default=True)
created_at = Column(DateTime, default=datetime.utcnow)
# Relationships
roles = relationship("Role", secondary=user_roles, back_populates="users")
posts = relationship("Post", back_populates="author")
def has_role(self, role_name: str) -> bool:
"""Check if user has a specific role."""
return any(role.name == role_name for role in self.roles)
def has_permission(self, permission_name: str) -> bool:
"""Check if user has a specific permission."""
for role in self.roles:
if any(perm.name == permission_name for perm in role.permissions):
return True
return False
def get_permissions(self) -> set[str]:
"""Get all permissions for user."""
permissions = set()
for role in self.roles:
for permission in role.permissions:
permissions.add(permission.name)
return permissions
class Role(Base):
__tablename__ = "roles"
id = Column(Integer, primary_key=True, index=True)
name = Column(String, unique=True, nullable=False, index=True)
description = Column(String)
created_at = Column(DateTime, default=datetime.utcnow)
# Relationships
users = relationship("User", secondary=user_roles, back_populates="roles")
permissions = relationship("Permission", secondary=role_permissions, back_populates="roles")
class Permission(Base):
__tablename__ = "permissions"
id = Column(Integer, primary_key=True, index=True)
name = Column(String, unique=True, nullable=False, index=True)
description = Column(String)
resource = Column(String) # e.g., "posts", "users", "comments"
action = Column(String) # e.g., "read", "create", "update", "delete"
created_at = Column(DateTime, default=datetime.utcnow)
# Relationships
roles = relationship("Role", secondary=role_permissions, back_populates="permissions")
class Post(Base):
__tablename__ = "posts"
id = Column(Integer, primary_key=True, index=True)
title = Column(String, nullable=False)
content = Column(String, nullable=False)
author_id = Column(Integer, ForeignKey("users.id"))
is_published = Column(Boolean, default=False)
created_at = Column(DateTime, default=datetime.utcnow)
updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
# Relationships
author = relationship("User", back_populates="posts")Pydantic Schemas
# schemas.py
from pydantic import BaseModel
from datetime import datetime
from typing import Optional
class PermissionBase(BaseModel):
name: str
description: Optional[str] = None
resource: str
action: str
class Permission(PermissionBase):
id: int
created_at: datetime
class Config:
from_attributes = True
class RoleBase(BaseModel):
name: str
description: Optional[str] = None
class RoleCreate(RoleBase):
permission_ids: list[int] = []
class Role(RoleBase):
id: int
created_at: datetime
permissions: list[Permission] = []
class Config:
from_attributes = True
class UserWithRoles(BaseModel):
id: int
username: str
email: str
full_name: Optional[str]
roles: list[Role] = []
class Config:
from_attributes = True
class PostBase(BaseModel):
title: str
content: str
is_published: bool = False
class PostCreate(PostBase):
pass
class PostUpdate(BaseModel):
title: Optional[str] = None
content: Optional[str] = None
is_published: Optional[bool] = None
class Post(PostBase):
id: int
author_id: int
created_at: datetime
updated_at: datetime
class Config:
from_attributes = TrueAuthorization Dependencies
# dependencies.py
from fastapi import Depends, HTTPException, status
from sqlalchemy.orm import Session
from typing import Callable
from .models import User
from .auth import get_current_active_user
from database import get_db
def require_role(required_role: str):
"""Dependency to require a specific role."""
async def role_checker(
current_user: User = Depends(get_current_active_user)
) -> User:
if not current_user.has_role(required_role):
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail=f"Role '{required_role}' required"
)
return current_user
return role_checker
def require_permission(required_permission: str):
"""Dependency to require a specific permission."""
async def permission_checker(
current_user: User = Depends(get_current_active_user)
) -> User:
if not current_user.has_permission(required_permission):
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail=f"Permission '{required_permission}' required"
)
return current_user
return permission_checker
def require_any_role(*roles: str):
"""Dependency to require any of the specified roles."""
async def role_checker(
current_user: User = Depends(get_current_active_user)
) -> User:
if not any(current_user.has_role(role) for role in roles):
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail=f"One of these roles required: {', '.join(roles)}"
)
return current_user
return role_checker
def require_all_roles(*roles: str):
"""Dependency to require all specified roles."""
async def role_checker(
current_user: User = Depends(get_current_active_user)
) -> User:
if not all(current_user.has_role(role) for role in roles):
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail=f"All these roles required: {', '.join(roles)}"
)
return current_user
return role_checker
def require_any_permission(*permissions: str):
"""Dependency to require any of the specified permissions."""
async def permission_checker(
current_user: User = Depends(get_current_active_user)
) -> User:
if not any(current_user.has_permission(perm) for perm in permissions):
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail=f"One of these permissions required: {', '.join(permissions)}"
)
return current_user
return permission_checkerUsing Authorization in Routes
1. Role-Based Authorization
# routes/admin.py
from fastapi import APIRouter, Depends
from dependencies import require_role, require_any_role
router = APIRouter(prefix="/admin", tags=["admin"])
@router.get("/dashboard")
async def admin_dashboard(
current_user: User = Depends(require_role("admin"))
):
"""Only accessible by users with 'admin' role."""
return {
"message": "Welcome to admin dashboard",
"user": current_user.username
}
@router.get("/users")
async def list_users(
current_user: User = Depends(require_any_role("admin", "moderator")),
db: Session = Depends(get_db)
):
"""Accessible by admin or moderator."""
users = db.query(User).all()
return users
@router.delete("/users/{user_id}")
async def delete_user(
user_id: int,
current_user: User = Depends(require_role("admin")),
db: Session = Depends(get_db)
):
"""Only admins can delete users."""
user = db.query(User).filter(User.id == user_id).first()
if not user:
raise HTTPException(status_code=404, detail="User not found")
db.delete(user)
db.commit()
return {"message": "User deleted"}2. Permission-Based Authorization
# routes/posts.py
from fastapi import APIRouter, Depends, HTTPException, status
from sqlalchemy.orm import Session
from dependencies import require_permission, get_current_active_user
from schemas import Post, PostCreate, PostUpdate
from models import Post as PostModel, User
router = APIRouter(prefix="/posts", tags=["posts"])
@router.get("/")
async def list_posts(
db: Session = Depends(get_db),
current_user: User = Depends(require_permission("posts:read"))
):
"""List all posts - requires 'posts:read' permission."""
posts = db.query(PostModel).all()
return posts
@router.post("/", response_model=Post)
async def create_post(
post_data: PostCreate,
db: Session = Depends(get_db),
current_user: User = Depends(require_permission("posts:create"))
):
"""Create a new post - requires 'posts:create' permission."""
post = PostModel(
title=post_data.title,
content=post_data.content,
is_published=post_data.is_published,
author_id=current_user.id
)
db.add(post)
db.commit()
db.refresh(post)
return post
@router.put("/{post_id}", response_model=Post)
async def update_post(
post_id: int,
post_data: PostUpdate,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_active_user)
):
"""Update a post - requires ownership or 'posts:update' permission."""
post = db.query(PostModel).filter(PostModel.id == post_id).first()
if not post:
raise HTTPException(status_code=404, detail="Post not found")
# Check authorization: owner or has permission
if post.author_id != current_user.id and not current_user.has_permission("posts:update"):
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Not authorized to update this post"
)
# Update fields
if post_data.title is not None:
post.title = post_data.title
if post_data.content is not None:
post.content = post_data.content
if post_data.is_published is not None:
post.is_published = post_data.is_published
db.commit()
db.refresh(post)
return post
@router.delete("/{post_id}")
async def delete_post(
post_id: int,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_active_user)
):
"""Delete a post - requires ownership or 'posts:delete' permission."""
post = db.query(PostModel).filter(PostModel.id == post_id).first()
if not post:
raise HTTPException(status_code=404, detail="Post not found")
# Check authorization: owner or has permission
if post.author_id != current_user.id and not current_user.has_permission("posts:delete"):
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Not authorized to delete this post"
)
db.delete(post)
db.commit()
return {"message": "Post deleted"}Resource Ownership Authorization
# dependencies.py (continued)
from typing import Type, Callable
from sqlalchemy.orm import Session
def require_resource_owner(
model: Type[Base],
owner_field: str = "author_id"
):
"""
Dependency to ensure user owns the resource.
Args:
model: SQLAlchemy model class
owner_field: Name of the field containing owner's user_id
"""
async def ownership_checker(
resource_id: int,
current_user: User = Depends(get_current_active_user),
db: Session = Depends(get_db)
):
resource = db.query(model).filter(model.id == resource_id).first()
if not resource:
raise HTTPException(status_code=404, detail="Resource not found")
owner_id = getattr(resource, owner_field)
if owner_id != current_user.id:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="You don't own this resource"
)
return resource
return ownership_checker
# Usage in routes
@router.put("/posts/{post_id}")
async def update_my_post(
post_id: int,
post_data: PostUpdate,
post: Post = Depends(require_resource_owner(PostModel)),
db: Session = Depends(get_db)
):
"""Update own post only."""
# Update post...
passOAuth2 Scopes Integration
# security.py
from fastapi.security import OAuth2PasswordBearer, SecurityScopes
from fastapi import Security, HTTPException, status
from pydantic import ValidationError
# Define OAuth2 scheme with scopes
oauth2_scheme = OAuth2PasswordBearer(
tokenUrl="auth/login",
scopes={
"users:read": "Read user information",
"users:write": "Modify user information",
"posts:read": "Read posts",
"posts:write": "Create and update posts",
"posts:delete": "Delete posts",
"admin": "Admin access to all resources",
}
)
async def get_current_user_with_scopes(
security_scopes: SecurityScopes,
token: str = Depends(oauth2_scheme),
db: Session = Depends(get_db)
) -> User:
"""Get current user and validate OAuth2 scopes."""
if security_scopes.scopes:
authenticate_value = f'Bearer scope="{security_scopes.scope_str}"'
else:
authenticate_value = "Bearer"
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": authenticate_value},
)
# Verify token
payload = verify_token(token, token_type="access")
if payload is None:
raise credentials_exception
username: str = payload.get("sub")
token_scopes = payload.get("scopes", [])
# Get user
user = db.query(User).filter(User.username == username).first()
if user is None:
raise credentials_exception
# Check scopes
for scope in security_scopes.scopes:
if scope not in token_scopes:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Not enough permissions",
headers={"WWW-Authenticate": authenticate_value},
)
return user
# Usage in routes
@router.get("/users/me")
async def read_users_me(
current_user: User = Security(get_current_user_with_scopes, scopes=["users:read"])
):
"""Read own user info - requires 'users:read' scope."""
return current_user
@router.put("/users/me")
async def update_users_me(
user_update: UserUpdate,
current_user: User = Security(get_current_user_with_scopes, scopes=["users:write"]),
db: Session = Depends(get_db)
):
"""Update own user info - requires 'users:write' scope."""
# Update user...
passDynamic Authorization Policies
# policies.py
from typing import Protocol, runtime_checkable
from models import User, Post
@runtime_checkable
class Policy(Protocol):
"""Base protocol for authorization policies."""
def authorize(self, user: User, resource=None) -> bool:
"""Check if user is authorized."""
...
class PostPolicy:
"""Authorization policy for posts."""
@staticmethod
def can_read(user: User, post: Post) -> bool:
"""Check if user can read a post."""
# Published posts are public
if post.is_published:
return True
# Owners can read their own drafts
if post.author_id == user.id:
return True
# Admins can read everything
if user.has_role("admin"):
return True
return False
@staticmethod
def can_update(user: User, post: Post) -> bool:
"""Check if user can update a post."""
# Owners can update their posts
if post.author_id == user.id:
return True
# Editors can update any post
if user.has_permission("posts:update"):
return True
return False
@staticmethod
def can_delete(user: User, post: Post) -> bool:
"""Check if user can delete a post."""
# Owners can delete their posts
if post.author_id == user.id:
return True
# Admins can delete any post
if user.has_role("admin"):
return True
return False
@staticmethod
def can_publish(user: User, post: Post) -> bool:
"""Check if user can publish a post."""
# Owners can publish their posts
if post.author_id == user.id:
return True
# Publishers can publish any post
if user.has_permission("posts:publish"):
return True
return False
# Dependency for policy checking
def authorize_post_action(action: str):
"""Dependency to check post authorization using policy."""
async def policy_checker(
post_id: int,
current_user: User = Depends(get_current_active_user),
db: Session = Depends(get_db)
):
post = db.query(Post).filter(Post.id == post_id).first()
if not post:
raise HTTPException(status_code=404, detail="Post not found")
policy_method = getattr(PostPolicy, f"can_{action}", None)
if not policy_method or not policy_method(current_user, post):
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail=f"Not authorized to {action} this post"
)
return post
return policy_checker
# Usage in routes
@router.put("/posts/{post_id}")
async def update_post(
post_id: int,
post_data: PostUpdate,
post: Post = Depends(authorize_post_action("update")),
db: Session = Depends(get_db)
):
"""Update post with policy-based authorization."""
# Update logic...
pass
@router.post("/posts/{post_id}/publish")
async def publish_post(
post_id: int,
post: Post = Depends(authorize_post_action("publish")),
db: Session = Depends(get_db)
):
"""Publish post - checked by policy."""
post.is_published = True
db.commit()
return {"message": "Post published"}Seeding Roles and Permissions
# seed.py
from sqlalchemy.orm import Session
from models import Role, Permission, User, user_roles
from database import SessionLocal, engine, Base
def seed_permissions(db: Session):
"""Create initial permissions."""
permissions = [
# User permissions
Permission(name="users:read", description="Read users", resource="users", action="read"),
Permission(name="users:create", description="Create users", resource="users", action="create"),
Permission(name="users:update", description="Update users", resource="users", action="update"),
Permission(name="users:delete", description="Delete users", resource="users", action="delete"),
# Post permissions
Permission(name="posts:read", description="Read posts", resource="posts", action="read"),
Permission(name="posts:create", description="Create posts", resource="posts", action="create"),
Permission(name="posts:update", description="Update any post", resource="posts", action="update"),
Permission(name="posts:delete", description="Delete any post", resource="posts", action="delete"),
Permission(name="posts:publish", description="Publish posts", resource="posts", action="publish"),
]
for perm in permissions:
existing = db.query(Permission).filter(Permission.name == perm.name).first()
if not existing:
db.add(perm)
db.commit()
def seed_roles(db: Session):
"""Create initial roles with permissions."""
# Get permissions
all_permissions = {p.name: p for p in db.query(Permission).all()}
# Define roles
roles_config = {
"user": {
"description": "Regular user",
"permissions": ["posts:read", "posts:create"]
},
"author": {
"description": "Content author",
"permissions": ["posts:read", "posts:create"]
},
"editor": {
"description": "Content editor",
"permissions": ["posts:read", "posts:create", "posts:update", "posts:publish"]
},
"moderator": {
"description": "Content moderator",
"permissions": ["users:read", "posts:read", "posts:update", "posts:delete"]
},
"admin": {
"description": "Administrator with full access",
"permissions": list(all_permissions.keys())
}
}
for role_name, config in roles_config.items():
role = db.query(Role).filter(Role.name == role_name).first()
if not role:
role = Role(name=role_name, description=config["description"])
db.add(role)
db.flush()
# Add permissions to role
role.permissions = [all_permissions[p] for p in config["permissions"] if p in all_permissions]
db.commit()
def seed_database():
"""Seed the database with initial data."""
Base.metadata.create_all(bind=engine)
db = SessionLocal()
try:
print("Seeding permissions...")
seed_permissions(db)
print("Seeding roles...")
seed_roles(db)
print("Database seeded successfully!")
finally:
db.close()
if __name__ == "__main__":
seed_database()Run the seed script:
python seed.pyAdmin Routes for Role Management
# routes/admin.py
from fastapi import APIRouter, Depends, HTTPException
from sqlalchemy.orm import Session
from dependencies import require_role
from schemas import Role, RoleCreate, UserWithRoles
from models import Role as RoleModel, User, Permission
router = APIRouter(prefix="/admin", tags=["admin"])
@router.post("/roles", response_model=Role)
async def create_role(
role_data: RoleCreate,
db: Session = Depends(get_db),
current_user: User = Depends(require_role("admin"))
):
"""Create a new role with permissions."""
# Check if role exists
existing = db.query(RoleModel).filter(RoleModel.name == role_data.name).first()
if existing:
raise HTTPException(status_code=400, detail="Role already exists")
# Create role
role = RoleModel(name=role_data.name, description=role_data.description)
# Add permissions
if role_data.permission_ids:
permissions = db.query(Permission).filter(
Permission.id.in_(role_data.permission_ids)
).all()
role.permissions = permissions
db.add(role)
db.commit()
db.refresh(role)
return role
@router.post("/users/{user_id}/roles/{role_id}")
async def assign_role_to_user(
user_id: int,
role_id: int,
db: Session = Depends(get_db),
current_user: User = Depends(require_role("admin"))
):
"""Assign a role to a user."""
user = db.query(User).filter(User.id == user_id).first()
if not user:
raise HTTPException(status_code=404, detail="User not found")
role = db.query(RoleModel).filter(RoleModel.id == role_id).first()
if not role:
raise HTTPException(status_code=404, detail="Role not found")
if role not in user.roles:
user.roles.append(role)
db.commit()
return {"message": f"Role '{role.name}' assigned to user '{user.username}'"}
@router.delete("/users/{user_id}/roles/{role_id}")
async def remove_role_from_user(
user_id: int,
role_id: int,
db: Session = Depends(get_db),
current_user: User = Depends(require_role("admin"))
):
"""Remove a role from a user."""
user = db.query(User).filter(User.id == user_id).first()
if not user:
raise HTTPException(status_code=404, detail="User not found")
role = db.query(RoleModel).filter(RoleModel.id == role_id).first()
if not role:
raise HTTPException(status_code=404, detail="Role not found")
if role in user.roles:
user.roles.remove(role)
db.commit()
return {"message": f"Role '{role.name}' removed from user '{user.username}'"}
@router.get("/users/{user_id}", response_model=UserWithRoles)
async def get_user_with_roles(
user_id: int,
db: Session = Depends(get_db),
current_user: User = Depends(require_role("admin"))
):
"""Get user with their roles and permissions."""
user = db.query(User).filter(User.id == user_id).first()
if not user:
raise HTTPException(status_code=404, detail="User not found")
return userTesting Authorization
# tests/test_authorization.py
import pytest
from fastapi.testclient import TestClient
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from main import app
from database import Base, get_db
from models import User, Role, Permission
# Test database
SQLALCHEMY_DATABASE_URL = "sqlite:///./test.db"
engine = create_engine(SQLALCHEMY_DATABASE_URL, connect_args={"check_same_thread": False})
TestingSessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
def override_get_db():
db = TestingSessionLocal()
try:
yield db
finally:
db.close()
app.dependency_overrides[get_db] = override_get_db
client = TestClient(app)
@pytest.fixture
def setup_database():
Base.metadata.create_all(bind=engine)
yield
Base.metadata.drop_all(bind=engine)
@pytest.fixture
def setup_roles_and_users(setup_database):
db = TestingSessionLocal()
# Create permissions
read_perm = Permission(name="posts:read", resource="posts", action="read")
create_perm = Permission(name="posts:create", resource="posts", action="create")
update_perm = Permission(name="posts:update", resource="posts", action="update")
delete_perm = Permission(name="posts:delete", resource="posts", action="delete")
db.add_all([read_perm, create_perm, update_perm, delete_perm])
db.commit()
# Create roles
user_role = Role(name="user", permissions=[read_perm, create_perm])
admin_role = Role(name="admin", permissions=[read_perm, create_perm, update_perm, delete_perm])
db.add_all([user_role, admin_role])
db.commit()
# Create users
regular_user = User(username="user", email="user@test.com", hashed_password="hashed", roles=[user_role])
admin_user = User(username="admin", email="admin@test.com", hashed_password="hashed", roles=[admin_role])
db.add_all([regular_user, admin_user])
db.commit()
db.close()
def test_role_based_access(setup_roles_and_users):
# Login as regular user
response = client.post("/auth/login", data={"username": "user", "password": "password"})
user_token = response.json()["access_token"]
# Try to access admin route
response = client.get(
"/admin/dashboard",
headers={"Authorization": f"Bearer {user_token}"}
)
assert response.status_code == 403
# Login as admin
response = client.post("/auth/login", data={"username": "admin", "password": "password"})
admin_token = response.json()["access_token"]
# Access admin route
response = client.get(
"/admin/dashboard",
headers={"Authorization": f"Bearer {admin_token}"}
)
assert response.status_code == 200
def test_permission_based_access(setup_roles_and_users):
# Login as user
response = client.post("/auth/login", data={"username": "user", "password": "password"})
user_token = response.json()["access_token"]
# Can read posts
response = client.get(
"/posts",
headers={"Authorization": f"Bearer {user_token}"}
)
assert response.status_code == 200
# Cannot delete posts (no permission)
response = client.delete(
"/posts/1",
headers={"Authorization": f"Bearer {user_token}"}
)
assert response.status_code == 403Best Practices
1. Principle of Least Privilege
# ❌ Bad: Give admin to everyone
user.roles = [admin_role]
# ✅ Good: Give only necessary permissions
user.roles = [user_role]2. Separate Roles and Permissions
# Roles are collections of permissions
# Permissions are atomic actions
# Good hierarchy:
# Role: "Editor" -> Permissions: ["posts:read", "posts:update", "posts:publish"]
# Role: "Author" -> Permissions: ["posts:read", "posts:create"]3. Resource-Level Authorization
# Check both permission AND ownership
def can_update_post(user: User, post: Post) -> bool:
# Owner can always update
if post.author_id == user.id:
return True
# Or has general update permission
if user.has_permission("posts:update"):
return True
return False4. Use Descriptive Permission Names
# ✅ Good: Clear and specific
"posts:create"
"posts:update"
"posts:delete"
"users:manage"
# ❌ Bad: Vague
"edit"
"manage"
"access"5. Cache Permission Checks
from functools import lru_cache
@lru_cache(maxsize=128)
def get_user_permissions(user_id: int) -> set[str]:
"""Cache user permissions to avoid repeated DB queries."""
db = SessionLocal()
user = db.query(User).filter(User.id == user_id).first()
permissions = user.get_permissions() if user else set()
db.close()
return permissionsConclusion
You now have a complete authorization system in FastAPI:
✅ Role-Based Access Control (RBAC) with flexible role assignments
✅ Permission-based authorization for fine-grained control
✅ OAuth2 scopes for API access control
✅ Resource ownership verification
✅ Policy-based authorization for complex business logic
✅ Dynamic permission checking with caching
✅ Admin tools for role and permission management
✅ Testing authorization flows
Key Takeaways
- Authentication ≠ Authorization - They serve different purposes
- Use RBAC for simplicity - Group permissions into roles
- Check ownership - Not just permissions, but also resource ownership
- Implement policies - For complex authorization logic
- Cache permissions - Improve performance
- Test thoroughly - Ensure security rules work correctly
- Audit access - Log authorization decisions
Next Steps
- Add audit logging for authorization events
- Implement hierarchical roles (role inheritance)
- Add time-based access control
- Implement IP-based restrictions
- Add attribute-based access control (ABAC)
- Create authorization middleware
- Build admin UI for role management
Your FastAPI application now has enterprise-grade authorization! 🔐
📬 Subscribe to Newsletter
Get the latest blog posts delivered to your inbox every week. No spam, unsubscribe anytime.
We respect your privacy. Unsubscribe at any time.
💬 Comments
Sign in to leave a comment
We'll never post without your permission.