Asynchronous Programming in FastAPI: Boosting Performance with async/await

Introduction
One of FastAPI's most powerful features is its native support for asynchronous programming using Python's async/await syntax. This capability allows you to build high-performance APIs that can handle thousands of concurrent requests efficiently.
In this comprehensive guide, we'll explore:
- What asynchronous programming is and why it matters
- How async/await works in FastAPI
- When to use async vs sync endpoints
- Best practices and common pitfalls
- Real-world examples with databases and external APIs
What is Asynchronous Programming?
Asynchronous programming allows your application to handle multiple operations concurrently without blocking the execution thread. Instead of waiting for I/O operations (like database queries, HTTP requests, or file reads) to complete, your application can switch to handling other tasks.
Synchronous vs Asynchronous
Synchronous (blocking):
def fetch_user(user_id: int):
# Wait 2 seconds for database query
user = database.query(user_id) # βΈοΈ Thread blocks here
return user
def fetch_orders(user_id: int):
# Wait 1 second for database query
orders = database.get_orders(user_id) # βΈοΈ Thread blocks here
return orders
# Total time: 2s + 1s = 3 seconds
user = fetch_user(1)
orders = fetch_orders(1)Asynchronous (non-blocking):
async def fetch_user(user_id: int):
# Start database query, don't wait
user = await database.query(user_id) # π Can handle other requests
return user
async def fetch_orders(user_id: int):
# Start database query, don't wait
orders = await database.get_orders(user_id) # π Can handle other requests
return orders
# Total time: max(2s, 1s) = 2 seconds (runs concurrently!)
user, orders = await asyncio.gather(
fetch_user(1),
fetch_orders(1)
)FastAPI's Async Support
FastAPI supports both synchronous and asynchronous endpoints, giving you flexibility based on your use case.
Basic Async Endpoint
from fastapi import FastAPI
import asyncio
app = FastAPI()
@app.get("/")
async def root():
"""Async endpoint - can handle concurrent requests efficiently"""
await asyncio.sleep(1) # Simulate I/O operation
return {"message": "Hello World"}Mixed Sync and Async Endpoints
@app.get("/sync-endpoint")
def sync_endpoint():
"""Synchronous endpoint - runs in a thread pool"""
import time
time.sleep(1) # Blocking operation
return {"message": "Sync operation"}
@app.get("/async-endpoint")
async def async_endpoint():
"""Asynchronous endpoint - runs in event loop"""
await asyncio.sleep(1) # Non-blocking operation
return {"message": "Async operation"}How FastAPI handles these:
- Async endpoints (
async def): Run directly in the event loop - Sync endpoints (
def): Run in a separate thread pool to avoid blocking
When to Use Async vs Sync
Use async def when:
β I/O-bound operations:
- Database queries (with async drivers)
- HTTP requests to external APIs
- File operations (with async libraries)
- Cache operations (Redis, Memcached)
- Message queue operations
β High concurrency requirements:
- Microservices handling many concurrent requests
- WebSocket connections
- Real-time applications
Use regular def when:
β CPU-bound operations:
- Heavy computations
- Data processing
- Image/video manipulation
- Machine learning inference
β Sync-only libraries:
- Legacy database drivers (SQLAlchemy sync mode)
- Third-party libraries without async support
Important: Don't use await with sync libraries in async functionsβit defeats the purpose!
Real-World Examples
Example 1: Async Database Operations
Using SQLAlchemy with async support:
from fastapi import FastAPI, Depends, HTTPException
from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine
from sqlalchemy.orm import sessionmaker
from sqlalchemy import select
# Async database setup
DATABASE_URL = "postgresql+asyncpg://user:pass@localhost/db"
engine = create_async_engine(DATABASE_URL, echo=True)
AsyncSessionLocal = sessionmaker(
engine, class_=AsyncSession, expire_on_commit=False
)
app = FastAPI()
# Dependency for database session
async def get_db():
async with AsyncSessionLocal() as session:
yield session
# Async endpoint with database query
@app.get("/users/{user_id}")
async def get_user(user_id: int, db: AsyncSession = Depends(get_db)):
"""Fetch user asynchronously"""
result = await db.execute(select(User).where(User.id == user_id))
user = result.scalar_one_or_none()
if not user:
raise HTTPException(status_code=404, detail="User not found")
return userExample 2: Concurrent External API Calls
import httpx
from fastapi import FastAPI
app = FastAPI()
@app.get("/weather/{city}")
async def get_weather_and_forecast(city: str):
"""Fetch current weather and forecast concurrently"""
async with httpx.AsyncClient() as client:
# Run both API calls concurrently
current_weather, forecast = await asyncio.gather(
client.get(f"https://api.weather.com/current/{city}"),
client.get(f"https://api.weather.com/forecast/{city}")
)
return {
"current": current_weather.json(),
"forecast": forecast.json()
}Performance gain: If each API takes 500ms, concurrent execution takes ~500ms vs 1000ms sequentially.
Example 3: Background Tasks with Async
from fastapi import BackgroundTasks
import asyncio
async def send_email_async(email: str, message: str):
"""Simulate sending email asynchronously"""
await asyncio.sleep(2) # Simulate email sending
print(f"Email sent to {email}: {message}")
@app.post("/register")
async def register_user(
email: str,
password: str,
background_tasks: BackgroundTasks
):
"""Register user and send welcome email in background"""
# Save user to database (async)
user = await create_user_in_db(email, password)
# Add email task to background (won't block response)
background_tasks.add_task(send_email_async, email, "Welcome!")
return {"message": "User registered", "user_id": user.id}Example 4: WebSocket with Async
from fastapi import WebSocket
@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
"""WebSocket connection with async message handling"""
await websocket.accept()
try:
while True:
# Wait for message from client (non-blocking)
data = await websocket.receive_text()
# Process message (could be async database query)
response = await process_message(data)
# Send response back to client
await websocket.send_text(f"Response: {response}")
except:
await websocket.close()
async def process_message(message: str):
"""Process message asynchronously"""
await asyncio.sleep(0.1) # Simulate processing
return message.upper()Common Pitfalls and Solutions
Pitfall 1: Using Blocking Code in Async Functions
β Wrong:
@app.get("/users")
async def get_users():
# time.sleep() blocks the entire event loop!
import time
time.sleep(5) # BAD: Blocks all other requests
return {"users": []}β Correct:
@app.get("/users")
async def get_users():
# asyncio.sleep() is non-blocking
await asyncio.sleep(5) # GOOD: Other requests can be handled
return {"users": []}Pitfall 2: Forgetting await
β Wrong:
@app.get("/user/{user_id}")
async def get_user(user_id: int):
# Returns a coroutine, not the actual result!
user = fetch_user_async(user_id) # Missing await
return user # Returns coroutine object, not user dataβ Correct:
@app.get("/user/{user_id}")
async def get_user(user_id: int):
user = await fetch_user_async(user_id) # Properly awaits result
return userPitfall 3: Mixing Sync and Async Database Drivers
β Wrong:
from sqlalchemy import create_engine # Sync engine
@app.get("/users")
async def get_users():
# Using sync SQLAlchemy in async function blocks event loop
users = session.query(User).all() # Blocking call!
return usersβ Correct:
from sqlalchemy.ext.asyncio import create_async_engine
@app.get("/users")
async def get_users(db: AsyncSession = Depends(get_db)):
# Using async SQLAlchemy
result = await db.execute(select(User))
users = result.scalars().all()
return usersPitfall 4: Not Using Async HTTP Clients
β Wrong:
import requests # Sync library
@app.get("/external-data")
async def fetch_external():
# requests.get() is blocking!
response = requests.get("https://api.example.com/data")
return response.json()β Correct:
import httpx # Async HTTP client
@app.get("/external-data")
async def fetch_external():
async with httpx.AsyncClient() as client:
response = await client.get("https://api.example.com/data")
return response.json()Performance Best Practices
1. Use Connection Pooling
from sqlalchemy.ext.asyncio import create_async_engine
# Configure connection pool for optimal performance
engine = create_async_engine(
DATABASE_URL,
pool_size=20, # Maximum connections in pool
max_overflow=10, # Additional connections if pool is full
pool_pre_ping=True, # Verify connections before use
)2. Set Timeouts for External Calls
import httpx
async def fetch_with_timeout(url: str):
"""Always set timeouts to prevent hanging requests"""
async with httpx.AsyncClient(timeout=5.0) as client:
try:
response = await client.get(url)
return response.json()
except httpx.TimeoutException:
return {"error": "Request timed out"}3. Use asyncio.gather() for Concurrent Operations
@app.get("/dashboard/{user_id}")
async def get_dashboard(user_id: int, db: AsyncSession = Depends(get_db)):
"""Fetch multiple resources concurrently"""
user_task = db.execute(select(User).where(User.id == user_id))
orders_task = db.execute(select(Order).where(Order.user_id == user_id))
stats_task = fetch_user_stats(user_id)
# Wait for all tasks concurrently
user_result, orders_result, stats = await asyncio.gather(
user_task,
orders_task,
stats_task
)
return {
"user": user_result.scalar_one(),
"orders": orders_result.scalars().all(),
"stats": stats
}4. Use Semaphores to Limit Concurrency
# Global semaphore to limit concurrent external API calls
api_semaphore = asyncio.Semaphore(10) # Max 10 concurrent calls
async def fetch_external_data(url: str):
"""Limit concurrent external API calls"""
async with api_semaphore:
async with httpx.AsyncClient() as client:
response = await client.get(url)
return response.json()Testing Async Endpoints
Use pytest with pytest-asyncio:
import pytest
from httpx import AsyncClient
from main import app
@pytest.mark.asyncio
async def test_async_endpoint():
"""Test async endpoint"""
async with AsyncClient(app=app, base_url="http://test") as client:
response = await client.get("/users/1")
assert response.status_code == 200
assert "user" in response.json()Monitoring Async Performance
Track key metrics:
import time
from fastapi import Request
@app.middleware("http")
async def add_process_time_header(request: Request, call_next):
"""Measure request processing time"""
start_time = time.time()
response = await call_next(request)
process_time = time.time() - start_time
response.headers["X-Process-Time"] = str(process_time)
return responseConclusion
Asynchronous programming in FastAPI unlocks significant performance improvements for I/O-bound applications. Key takeaways:
- Use
async deffor I/O operations (database, HTTP, files) - Use regular
deffor CPU-bound operations or sync-only libraries - Always
awaitasync functions and use async-compatible libraries - Run concurrent operations with
asyncio.gather()for maximum performance - Set timeouts and use connection pooling for production reliability
- Test thoroughly with
pytest-asyncio
By following these patterns and avoiding common pitfalls, you'll build FastAPI applications that scale efficiently and handle thousands of concurrent users with ease.
References
π¬ Subscribe to Newsletter
Get the latest blog posts delivered to your inbox every week. No spam, unsubscribe anytime.
We respect your privacy. Unsubscribe at any time.
π¬ Comments
Sign in to leave a comment
We'll never post without your permission.