Back to blog

Python Async Programming: Mastering asyncio

pythonasyncioasyncconcurrencyperformance
Python Async Programming: Mastering asyncio

Python Async Programming: Mastering asyncio

Asynchronous programming is one of the most powerful techniques for building high-performance Python applications. With asyncio and the async/await syntax, you can handle thousands of concurrent operations efficiently—essential for web servers, API clients, and I/O-bound applications. This guide takes you from async fundamentals to production-ready patterns.

What You'll Learn

✅ Async/await syntax fundamentals
✅ Event loops and how they work
✅ Coroutines and Tasks
✅ Concurrent operations with asyncio.gather
✅ Async context managers and iterators
✅ Error handling in async code
✅ Best practices for async programming
✅ When to use async vs threading vs multiprocessing

Prerequisites

Before diving into async programming, you should understand:


1. Understanding Asynchronous Programming

The Problem: I/O-Bound Operations

Traditional synchronous code blocks while waiting for I/O operations:

import time
 
def fetch_data(url: str) -> str:
    """Simulate a network request."""
    print(f"Fetching {url}...")
    time.sleep(2)  # Blocks the entire program!
    return f"Data from {url}"
 
# Sequential execution - takes 6 seconds total
start = time.time()
result1 = fetch_data("https://api.example.com/users")
result2 = fetch_data("https://api.example.com/posts")
result3 = fetch_data("https://api.example.com/comments")
print(f"Total time: {time.time() - start:.2f}s")  # ~6 seconds

The Solution: Async I/O

Async programming lets you start multiple operations and handle them as they complete:

import asyncio
 
async def fetch_data(url: str) -> str:
    """Simulate an async network request."""
    print(f"Fetching {url}...")
    await asyncio.sleep(2)  # Non-blocking!
    return f"Data from {url}"
 
async def main():
    start = asyncio.get_event_loop().time()
 
    # Run all three concurrently
    results = await asyncio.gather(
        fetch_data("https://api.example.com/users"),
        fetch_data("https://api.example.com/posts"),
        fetch_data("https://api.example.com/comments"),
    )
 
    elapsed = asyncio.get_event_loop().time() - start
    print(f"Total time: {elapsed:.2f}s")  # ~2 seconds!
    return results
 
# Run the async function
asyncio.run(main())

Key Insight: Async doesn't make individual operations faster—it lets you do more operations in the same time by not blocking while waiting.


2. Core Concepts

Coroutines

A coroutine is a function defined with async def. It doesn't run immediately when called—it returns a coroutine object:

async def say_hello(name: str) -> str:
    """A simple coroutine."""
    return f"Hello, {name}!"
 
# Calling a coroutine returns a coroutine object, not the result
coro = say_hello("Alice")
print(type(coro))  # <class 'coroutine'>
 
# To get the result, you must await it (inside another coroutine)
# or run it with asyncio.run()
result = asyncio.run(say_hello("Alice"))
print(result)  # "Hello, Alice!"

The await Keyword

await pauses a coroutine until the awaited operation completes:

async def process_data() -> dict:
    """Demonstrate await."""
    print("Starting...")
 
    # await pauses here until fetch_data completes
    data = await fetch_data("https://api.example.com")
 
    print("Processing...")
    # await pauses here until asyncio.sleep completes
    await asyncio.sleep(1)
 
    print("Done!")
    return {"status": "success", "data": data}

Important: You can only use await inside an async def function.

The Event Loop

The event loop is the core of asyncio. It:

  1. Manages all running coroutines
  2. Schedules when coroutines run
  3. Handles I/O events efficiently
import asyncio
 
async def main():
    # Get the running event loop
    loop = asyncio.get_running_loop()
    print(f"Event loop: {loop}")
 
    # The loop manages our coroutine
    await asyncio.sleep(0.1)
 
# asyncio.run() creates and manages the event loop
asyncio.run(main())

Python 3.10+ Best Practice: Use asyncio.run() to manage the event loop. Avoid creating loops manually unless you have a specific reason.


3. Tasks and Concurrent Execution

Creating Tasks

A Task wraps a coroutine and schedules it to run on the event loop:

import asyncio
 
async def download_file(filename: str) -> str:
    """Simulate downloading a file."""
    print(f"Starting download: {filename}")
    await asyncio.sleep(2)  # Simulate network delay
    print(f"Finished download: {filename}")
    return f"Content of {filename}"
 
async def main():
    # Create tasks - they start running immediately
    task1 = asyncio.create_task(download_file("file1.txt"))
    task2 = asyncio.create_task(download_file("file2.txt"))
    task3 = asyncio.create_task(download_file("file3.txt"))
 
    # Do other work while downloads are in progress
    print("Downloads started, doing other work...")
    await asyncio.sleep(0.5)
    print("Other work done!")
 
    # Wait for all tasks to complete
    result1 = await task1
    result2 = await task2
    result3 = await task3
 
    return [result1, result2, result3]
 
asyncio.run(main())
# Output:
# Starting download: file1.txt
# Starting download: file2.txt
# Starting download: file3.txt
# Downloads started, doing other work...
# Other work done!
# Finished download: file1.txt
# Finished download: file2.txt
# Finished download: file3.txt

asyncio.gather() - Run Multiple Coroutines

gather() runs multiple coroutines concurrently and returns results in order:

import asyncio
from typing import Any
 
async def fetch_user(user_id: int) -> dict:
    """Fetch a user by ID."""
    await asyncio.sleep(1)
    return {"id": user_id, "name": f"User {user_id}"}
 
async def fetch_posts(user_id: int) -> list:
    """Fetch posts for a user."""
    await asyncio.sleep(1.5)
    return [{"id": 1, "title": "Post 1"}, {"id": 2, "title": "Post 2"}]
 
async def fetch_comments(post_id: int) -> list:
    """Fetch comments for a post."""
    await asyncio.sleep(0.5)
    return [{"id": 1, "text": "Great post!"}]
 
async def get_user_data(user_id: int) -> dict[str, Any]:
    """Fetch all user data concurrently."""
    # All three run at the same time
    user, posts, comments = await asyncio.gather(
        fetch_user(user_id),
        fetch_posts(user_id),
        fetch_comments(1),
    )
 
    return {
        "user": user,
        "posts": posts,
        "comments": comments,
    }
 
# Takes ~1.5 seconds (max of all operations), not 3 seconds
result = asyncio.run(get_user_data(42))

asyncio.wait() - More Control

wait() gives you more control over how to handle completed tasks:

import asyncio
 
async def risky_operation(n: int) -> int:
    """An operation that might fail."""
    await asyncio.sleep(n * 0.5)
    if n == 2:
        raise ValueError(f"Operation {n} failed!")
    return n * 10
 
async def main():
    tasks = [
        asyncio.create_task(risky_operation(1)),
        asyncio.create_task(risky_operation(2)),
        asyncio.create_task(risky_operation(3)),
    ]
 
    # Wait for all to complete (or fail)
    done, pending = await asyncio.wait(
        tasks,
        return_when=asyncio.ALL_COMPLETED
    )
 
    for task in done:
        try:
            result = task.result()
            print(f"Success: {result}")
        except Exception as e:
            print(f"Error: {e}")
 
asyncio.run(main())

asyncio.as_completed() - Process Results as They Arrive

import asyncio
 
async def fetch_url(url: str, delay: float) -> str:
    """Fetch URL with variable delay."""
    await asyncio.sleep(delay)
    return f"Data from {url}"
 
async def main():
    tasks = [
        fetch_url("https://fast.com", 0.5),
        fetch_url("https://medium.com", 1.5),
        fetch_url("https://slow.com", 2.5),
    ]
 
    # Process results as they complete (fastest first)
    for coro in asyncio.as_completed(tasks):
        result = await coro
        print(f"Got: {result}")
 
asyncio.run(main())
# Output (order may vary):
# Got: Data from https://fast.com
# Got: Data from https://medium.com
# Got: Data from https://slow.com

4. Async Context Managers and Iterators

Async Context Managers

Use async with for resources that need async setup/cleanup:

import asyncio
from typing import AsyncIterator
 
class AsyncDatabaseConnection:
    """Async context manager for database connections."""
 
    def __init__(self, host: str):
        self.host = host
        self.connected = False
 
    async def __aenter__(self) -> "AsyncDatabaseConnection":
        """Async setup - connect to database."""
        print(f"Connecting to {self.host}...")
        await asyncio.sleep(0.5)  # Simulate connection time
        self.connected = True
        print("Connected!")
        return self
 
    async def __aexit__(self, exc_type, exc_val, exc_tb) -> None:
        """Async cleanup - disconnect."""
        print("Disconnecting...")
        await asyncio.sleep(0.1)
        self.connected = False
        print("Disconnected!")
 
    async def query(self, sql: str) -> list:
        """Execute a query."""
        if not self.connected:
            raise RuntimeError("Not connected!")
        await asyncio.sleep(0.2)
        return [{"id": 1, "name": "Result"}]
 
async def main():
    async with AsyncDatabaseConnection("localhost:5432") as db:
        results = await db.query("SELECT * FROM users")
        print(f"Results: {results}")
    # Connection automatically closed here
 
asyncio.run(main())

Using contextlib for Async Context Managers

from contextlib import asynccontextmanager
import asyncio
 
@asynccontextmanager
async def async_timer(name: str):
    """Time an async operation."""
    start = asyncio.get_event_loop().time()
    print(f"[{name}] Starting...")
    try:
        yield
    finally:
        elapsed = asyncio.get_event_loop().time() - start
        print(f"[{name}] Completed in {elapsed:.2f}s")
 
async def main():
    async with async_timer("fetch"):
        await asyncio.sleep(1.5)
        print("Fetching data...")
 
asyncio.run(main())
# Output:
# [fetch] Starting...
# Fetching data...
# [fetch] Completed in 1.50s

Async Iterators

Use async for to iterate over async data streams:

import asyncio
from typing import AsyncIterator
 
class AsyncNumberStream:
    """Async iterator that yields numbers with delays."""
 
    def __init__(self, count: int):
        self.count = count
        self.current = 0
 
    def __aiter__(self) -> "AsyncNumberStream":
        return self
 
    async def __anext__(self) -> int:
        if self.current >= self.count:
            raise StopAsyncIteration
 
        await asyncio.sleep(0.5)  # Simulate async data fetch
        value = self.current
        self.current += 1
        return value
 
async def main():
    async for num in AsyncNumberStream(5):
        print(f"Got number: {num}")
 
asyncio.run(main())

Async Generators (Python 3.6+)

import asyncio
from typing import AsyncGenerator
 
async def fetch_pages(total: int) -> AsyncGenerator[dict, None]:
    """Async generator that fetches pages."""
    for page in range(1, total + 1):
        # Simulate fetching each page
        await asyncio.sleep(0.5)
        yield {"page": page, "items": [f"item_{page}_{i}" for i in range(3)]}
 
async def main():
    async for page_data in fetch_pages(3):
        print(f"Processing page {page_data['page']}: {page_data['items']}")
 
asyncio.run(main())

5. Error Handling in Async Code

Basic Exception Handling

import asyncio
 
async def might_fail(should_fail: bool) -> str:
    """A coroutine that might raise an exception."""
    await asyncio.sleep(0.5)
    if should_fail:
        raise ValueError("Something went wrong!")
    return "Success!"
 
async def main():
    # Standard try/except works with await
    try:
        result = await might_fail(True)
        print(result)
    except ValueError as e:
        print(f"Caught error: {e}")
 
asyncio.run(main())

Handling Errors in gather()

import asyncio
 
async def fetch_data(url: str, should_fail: bool = False) -> dict:
    """Fetch data, possibly failing."""
    await asyncio.sleep(0.5)
    if should_fail:
        raise ConnectionError(f"Failed to connect to {url}")
    return {"url": url, "status": "ok"}
 
async def main():
    # With return_exceptions=True, exceptions are returned as results
    results = await asyncio.gather(
        fetch_data("https://api1.com"),
        fetch_data("https://api2.com", should_fail=True),
        fetch_data("https://api3.com"),
        return_exceptions=True  # Don't raise, return exceptions
    )
 
    for i, result in enumerate(results):
        if isinstance(result, Exception):
            print(f"Task {i} failed: {result}")
        else:
            print(f"Task {i} succeeded: {result}")
 
asyncio.run(main())
# Output:
# Task 0 succeeded: {'url': 'https://api1.com', 'status': 'ok'}
# Task 1 failed: Failed to connect to https://api2.com
# Task 2 succeeded: {'url': 'https://api3.com', 'status': 'ok'}

Task Cancellation

import asyncio
 
async def long_running_task(name: str) -> str:
    """A task that can be cancelled."""
    try:
        print(f"{name}: Starting...")
        await asyncio.sleep(10)
        print(f"{name}: Completed!")
        return f"{name} result"
    except asyncio.CancelledError:
        print(f"{name}: Cancelled! Cleaning up...")
        # Perform cleanup here
        raise  # Re-raise to propagate cancellation
 
async def main():
    task = asyncio.create_task(long_running_task("MyTask"))
 
    # Let it run for a bit
    await asyncio.sleep(1)
 
    # Cancel the task
    task.cancel()
 
    try:
        result = await task
    except asyncio.CancelledError:
        print("Task was cancelled")
 
asyncio.run(main())
# Output:
# MyTask: Starting...
# MyTask: Cancelled! Cleaning up...
# Task was cancelled

Timeouts

import asyncio
 
async def slow_operation() -> str:
    """An operation that takes too long."""
    await asyncio.sleep(10)
    return "Done!"
 
async def main():
    # Using asyncio.timeout (Python 3.11+)
    try:
        async with asyncio.timeout(2):
            result = await slow_operation()
            print(result)
    except asyncio.TimeoutError:
        print("Operation timed out!")
 
    # Using wait_for (all Python versions)
    try:
        result = await asyncio.wait_for(slow_operation(), timeout=2)
        print(result)
    except asyncio.TimeoutError:
        print("Operation timed out!")
 
asyncio.run(main())

6. Synchronization Primitives

asyncio.Lock

Prevent race conditions when multiple coroutines access shared resources:

import asyncio
 
class BankAccount:
    """A thread-safe bank account using async lock."""
 
    def __init__(self, balance: float = 0):
        self.balance = balance
        self._lock = asyncio.Lock()
 
    async def deposit(self, amount: float) -> float:
        async with self._lock:
            # Simulate processing time
            await asyncio.sleep(0.1)
            self.balance += amount
            return self.balance
 
    async def withdraw(self, amount: float) -> float:
        async with self._lock:
            await asyncio.sleep(0.1)
            if amount > self.balance:
                raise ValueError("Insufficient funds")
            self.balance -= amount
            return self.balance
 
async def main():
    account = BankAccount(100)
 
    # Multiple concurrent operations
    tasks = [
        account.deposit(50),
        account.withdraw(30),
        account.deposit(20),
        account.withdraw(40),
    ]
 
    results = await asyncio.gather(*tasks)
    print(f"Final balance: {account.balance}")
 
asyncio.run(main())

asyncio.Semaphore

Limit the number of concurrent operations:

import asyncio
 
async def fetch_with_limit(
    url: str,
    semaphore: asyncio.Semaphore
) -> str:
    """Fetch URL with concurrency limit."""
    async with semaphore:  # Only N can run at once
        print(f"Fetching {url}...")
        await asyncio.sleep(1)  # Simulate network request
        print(f"Done: {url}")
        return f"Data from {url}"
 
async def main():
    # Limit to 3 concurrent requests
    semaphore = asyncio.Semaphore(3)
 
    urls = [f"https://api.example.com/{i}" for i in range(10)]
 
    tasks = [fetch_with_limit(url, semaphore) for url in urls]
    results = await asyncio.gather(*tasks)
 
    print(f"Fetched {len(results)} URLs")
 
asyncio.run(main())

asyncio.Event

Coordinate coroutines with signals:

import asyncio
 
async def waiter(event: asyncio.Event, name: str):
    """Wait for the event to be set."""
    print(f"{name}: Waiting for signal...")
    await event.wait()
    print(f"{name}: Got signal! Proceeding...")
 
async def setter(event: asyncio.Event):
    """Set the event after a delay."""
    print("Setter: Preparing...")
    await asyncio.sleep(2)
    print("Setter: Sending signal!")
    event.set()
 
async def main():
    event = asyncio.Event()
 
    await asyncio.gather(
        waiter(event, "Worker1"),
        waiter(event, "Worker2"),
        waiter(event, "Worker3"),
        setter(event),
    )
 
asyncio.run(main())

asyncio.Queue

Producer-consumer pattern:

import asyncio
import random
 
async def producer(queue: asyncio.Queue, name: str):
    """Produce items and put them in the queue."""
    for i in range(5):
        item = f"{name}-item-{i}"
        await asyncio.sleep(random.uniform(0.1, 0.5))
        await queue.put(item)
        print(f"Produced: {item}")
 
async def consumer(queue: asyncio.Queue, name: str):
    """Consume items from the queue."""
    while True:
        item = await queue.get()
        print(f"{name} processing: {item}")
        await asyncio.sleep(0.3)  # Simulate processing
        queue.task_done()
        print(f"{name} done with: {item}")
 
async def main():
    queue: asyncio.Queue = asyncio.Queue(maxsize=5)
 
    # Create producers and consumers
    producers = [
        asyncio.create_task(producer(queue, f"Producer{i}"))
        for i in range(2)
    ]
 
    consumers = [
        asyncio.create_task(consumer(queue, f"Consumer{i}"))
        for i in range(3)
    ]
 
    # Wait for all producers to finish
    await asyncio.gather(*producers)
 
    # Wait for queue to be fully processed
    await queue.join()
 
    # Cancel consumers (they're infinite loops)
    for c in consumers:
        c.cancel()
 
asyncio.run(main())

7. Real-World Example: Async HTTP Client

Here's a practical example using aiohttp (install with pip install aiohttp):

import asyncio
import aiohttp
from typing import Any
 
async def fetch_json(
    session: aiohttp.ClientSession,
    url: str
) -> dict[str, Any]:
    """Fetch JSON from a URL."""
    async with session.get(url) as response:
        response.raise_for_status()
        return await response.json()
 
async def fetch_all_users(user_ids: list[int]) -> list[dict]:
    """Fetch multiple users concurrently."""
    base_url = "https://jsonplaceholder.typicode.com/users"
 
    async with aiohttp.ClientSession() as session:
        tasks = [
            fetch_json(session, f"{base_url}/{uid}")
            for uid in user_ids
        ]
 
        # Fetch all users concurrently
        users = await asyncio.gather(*tasks, return_exceptions=True)
 
        # Filter out errors
        return [u for u in users if isinstance(u, dict)]
 
async def main():
    user_ids = [1, 2, 3, 4, 5]
    users = await fetch_all_users(user_ids)
 
    for user in users:
        print(f"User: {user['name']} ({user['email']})")
 
if __name__ == "__main__":
    asyncio.run(main())

Rate-Limited API Client

import asyncio
import aiohttp
from typing import Any
 
class RateLimitedClient:
    """HTTP client with rate limiting."""
 
    def __init__(self, requests_per_second: float = 10):
        self.semaphore = asyncio.Semaphore(int(requests_per_second))
        self.delay = 1.0 / requests_per_second
 
    async def fetch(
        self,
        session: aiohttp.ClientSession,
        url: str
    ) -> dict[str, Any]:
        """Fetch URL with rate limiting."""
        async with self.semaphore:
            async with session.get(url) as response:
                data = await response.json()
                await asyncio.sleep(self.delay)  # Rate limit
                return data
 
    async def fetch_many(self, urls: list[str]) -> list[dict]:
        """Fetch multiple URLs with rate limiting."""
        async with aiohttp.ClientSession() as session:
            tasks = [self.fetch(session, url) for url in urls]
            return await asyncio.gather(*tasks, return_exceptions=True)
 
async def main():
    client = RateLimitedClient(requests_per_second=5)
 
    urls = [
        f"https://jsonplaceholder.typicode.com/posts/{i}"
        for i in range(1, 21)
    ]
 
    results = await client.fetch_many(urls)
    print(f"Fetched {len(results)} posts")
 
asyncio.run(main())

8. Async vs Threading vs Multiprocessing

When to Use Each

ApproachBest ForGIL AffectedOverhead
asyncioI/O-bound (network, files)Yes, but doesn't matterLow
threadingI/O-bound, C extensionsYesMedium
multiprocessingCPU-bound (calculations)No (separate processes)High

Comparison Example

import asyncio
import time
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
 
# I/O-bound task (async is best)
async def async_io_task():
    await asyncio.sleep(1)
    return "async done"
 
def sync_io_task():
    time.sleep(1)
    return "sync done"
 
# CPU-bound task (multiprocessing is best)
def cpu_bound_task(n: int) -> int:
    """Calculate sum of squares (CPU-intensive)."""
    return sum(i * i for i in range(n))
 
async def main():
    # Async I/O - best for network/file operations
    start = time.time()
    tasks = [async_io_task() for _ in range(10)]
    await asyncio.gather(*tasks)
    print(f"Async I/O: {time.time() - start:.2f}s")  # ~1 second
 
    # Threading - good for I/O, GIL limits CPU
    start = time.time()
    with ThreadPoolExecutor(max_workers=10) as executor:
        futures = [executor.submit(sync_io_task) for _ in range(10)]
        [f.result() for f in futures]
    print(f"Threading: {time.time() - start:.2f}s")  # ~1 second
 
    # Run CPU-bound in executor (don't block event loop)
    loop = asyncio.get_running_loop()
 
    # Thread pool for I/O
    with ThreadPoolExecutor() as pool:
        result = await loop.run_in_executor(pool, sync_io_task)
 
    # Process pool for CPU
    with ProcessPoolExecutor() as pool:
        result = await loop.run_in_executor(pool, cpu_bound_task, 1000000)
 
asyncio.run(main())

Running Blocking Code in Async

import asyncio
from concurrent.futures import ThreadPoolExecutor
import time
 
def blocking_operation(duration: float) -> str:
    """A blocking function (can't be made async)."""
    time.sleep(duration)
    return f"Slept for {duration}s"
 
async def main():
    loop = asyncio.get_running_loop()
 
    # Run blocking code in a thread pool
    with ThreadPoolExecutor() as executor:
        # This won't block the event loop
        result = await loop.run_in_executor(
            executor,
            blocking_operation,
            2.0
        )
        print(result)
 
asyncio.run(main())

9. Best Practices

✅ Do's

import asyncio
from typing import Any
 
# ✅ Use asyncio.run() for the entry point
async def main():
    pass
 
asyncio.run(main())
 
# ✅ Use async context managers for resources
async with aiohttp.ClientSession() as session:
    pass
 
# ✅ Use gather() for concurrent operations
results = await asyncio.gather(task1(), task2(), task3())
 
# ✅ Handle exceptions properly
try:
    result = await risky_operation()
except Exception as e:
    logger.error(f"Operation failed: {e}")
 
# ✅ Use timeouts for network operations
async with asyncio.timeout(30):
    data = await fetch_data()
 
# ✅ Use semaphores for rate limiting
semaphore = asyncio.Semaphore(10)
async with semaphore:
    await api_call()
 
# ✅ Cancel tasks properly
task = asyncio.create_task(long_operation())
try:
    result = await asyncio.wait_for(task, timeout=5)
except asyncio.TimeoutError:
    task.cancel()
    try:
        await task
    except asyncio.CancelledError:
        pass

❌ Don'ts

# ❌ Don't use time.sleep() in async code
async def bad():
    time.sleep(1)  # Blocks the entire event loop!
 
# ✅ Use asyncio.sleep() instead
async def good():
    await asyncio.sleep(1)
 
# ❌ Don't forget to await coroutines
async def bad():
    fetch_data()  # Returns coroutine, doesn't run it!
 
# ✅ Always await
async def good():
    await fetch_data()
 
# ❌ Don't create event loops manually (usually)
loop = asyncio.new_event_loop()  # Avoid this
 
# ✅ Use asyncio.run() instead
asyncio.run(main())
 
# ❌ Don't mix sync and async without executor
def sync_function():
    asyncio.run(async_function())  # Creates new loop each time!
 
# ✅ Use run_in_executor for blocking code
async def good():
    loop = asyncio.get_running_loop()
    await loop.run_in_executor(None, blocking_function)

10. Common Pitfalls

Pitfall 1: Forgetting to await

# ❌ Wrong - coroutine never runs
async def fetch_data():
    result = some_async_function()  # Missing await!
    return result  # Returns coroutine object, not result
 
# ✅ Correct
async def fetch_data():
    result = await some_async_function()
    return result

Pitfall 2: Blocking the Event Loop

# ❌ Wrong - blocks all other coroutines
async def bad_example():
    import requests
    response = requests.get("https://api.example.com")  # Blocking!
    return response.json()
 
# ✅ Correct - use async HTTP client
async def good_example():
    async with aiohttp.ClientSession() as session:
        async with session.get("https://api.example.com") as response:
            return await response.json()

Pitfall 3: Creating Tasks Without Awaiting

# ❌ Wrong - task might not complete before program ends
async def bad():
    asyncio.create_task(background_work())
    # Function returns immediately, task might be cancelled
 
# ✅ Correct - keep reference and await
async def good():
    task = asyncio.create_task(background_work())
    # ... do other stuff ...
    await task  # Ensure task completes

Pitfall 4: Not Handling Cancellation

# ❌ Wrong - resources may leak
async def bad_cleanup():
    resource = await acquire_resource()
    await long_operation()  # If cancelled, resource leaks!
 
# ✅ Correct - handle cancellation
async def good_cleanup():
    resource = await acquire_resource()
    try:
        await long_operation()
    except asyncio.CancelledError:
        await release_resource(resource)
        raise
    finally:
        await release_resource(resource)

Summary

In this guide, you learned:

✅ Async fundamentals: coroutines, await, event loop
✅ Creating and managing Tasks
✅ Concurrent execution with gather(), wait(), as_completed()
✅ Async context managers and iterators
✅ Error handling and timeouts
✅ Synchronization primitives (Lock, Semaphore, Event, Queue)
✅ When to use async vs threading vs multiprocessing
✅ Best practices and common pitfalls

Async programming is essential for building high-performance Python applications, especially web servers and API clients. With FastAPI, async is the default and enables handling thousands of concurrent requests efficiently.

Next Steps

Now that you understand async programming, explore related topics:

More Python Deep Dives:

Apply Async in Web Development:

Back to the Roadmap:


Part of the Python Learning Roadmap series

📬 Subscribe to Newsletter

Get the latest blog posts delivered to your inbox every week. No spam, unsubscribe anytime.

We respect your privacy. Unsubscribe at any time.

💬 Comments

Sign in to leave a comment

We'll never post without your permission.