Back to blog

Migrating from ThreadPool to AsyncIO in Python

pythonasyncioconcurrencyperformancethreading
Migrating from ThreadPool to AsyncIO in Python

You've got a Python service that uses ThreadPoolExecutor to handle concurrent I/O. It works — but the thread count keeps climbing, memory is creeping up, and adding more workers doesn't help. You've heard asyncio is "better," but your codebase is full of executor.submit() calls and you have no idea where to start.

This guide walks you through the actual migration: when it's worth doing, how to refactor incrementally, and what to do with the blocking code that can't just be async-ified.

What You'll Learn

✅ When to migrate (and when not to)
✅ The core mental model shift from threads to coroutines
✅ Step-by-step refactoring patterns with before/after code
✅ How to handle blocking libraries that have no async equivalent
✅ Running sync and async code side-by-side during migration
✅ Common pitfalls and how to avoid them

Prerequisites:
Basic Python threading knowledge and familiarity with async/await syntax. If asyncio is completely new to you, read Python Async Programming: Mastering asyncio first.


1. Should You Even Migrate?

Not every ThreadPoolExecutor codebase needs to move to asyncio. Answer these questions first.

Migrate if:

  • You're doing high-volume I/O (HTTP calls, DB queries, file reads) where most time is spent waiting
  • You're hitting OS thread limits or high memory overhead from hundreds of threads
  • You're building on top of an async framework (FastAPI, aiohttp, Starlette) and want to integrate cleanly
  • You have many short-lived concurrent tasks (e.g., fan-out API calls)

Don't migrate if:

  • Your work is CPU-bound — asyncio won't help, and multiprocessing is the right tool
  • Your codebase is heavily synchronous and migration would touch hundreds of files
  • Your team has no async experience and the service isn't under pressure
  • You're using libraries that have no async equivalent and wrapping them adds more complexity than threads

The honest truth: ThreadPoolExecutor is fine for many workloads. Migrate for real reasons, not hype.


2. The Mental Model Shift

Before touching any code, internalize this difference:

Threads: the OS decides when to switch between them. Your code runs in parallel (on different cores for CPU work) or interleaves (for I/O). Each thread costs ~8MB of stack memory.

Coroutines: you decide (implicitly) when to yield. The event loop runs one coroutine at a time, switching only at await points. No OS scheduling overhead, minimal memory per coroutine (~a few KB).

The key insight: asyncio's concurrency comes from cooperative yielding at I/O boundaries, not from running code simultaneously. If a coroutine never hits an await, it blocks everything.


3. Identifying What to Migrate

Start by auditing your ThreadPoolExecutor usage. Look for these patterns:

# Pattern 1: submit + result
with ThreadPoolExecutor(max_workers=10) as pool:
    future = pool.submit(fetch_user, user_id)
    result = future.result()
 
# Pattern 2: map over a list
with ThreadPoolExecutor(max_workers=20) as pool:
    results = list(pool.map(process_item, items))
 
# Pattern 3: as_completed for fan-out
with ThreadPoolExecutor(max_workers=50) as pool:
    futures = {pool.submit(call_api, url): url for url in urls}
    for future in as_completed(futures):
        url = futures[future]
        data = future.result()

For each usage, ask: is the underlying function doing I/O?

  • requests.get(), urllib.request → yes, migrate to aiohttp/httpx
  • psycopg2 queries → yes, migrate to asyncpg/psycopg3
  • open() + read() for large files → yes, migrate to aiofiles
  • subprocess.run() → yes, migrate to asyncio.create_subprocess_exec()
  • CPU computation, numpy, image processing → no, keep in thread pool

4. Step-by-Step Refactoring

Step 1: Start at the Leaf Functions

Always refactor bottom-up. Convert the innermost I/O functions first, then work outward.

Before — synchronous with requests:

import requests
from concurrent.futures import ThreadPoolExecutor, as_completed
from typing import list
 
 
def fetch_user(user_id: int) -> dict:
    response = requests.get(f"https://api.example.com/users/{user_id}")
    response.raise_for_status()
    return response.json()
 
 
def fetch_all_users(user_ids: list[int]) -> list[dict]:
    with ThreadPoolExecutor(max_workers=20) as pool:
        futures = {pool.submit(fetch_user, uid): uid for uid in user_ids}
        results = []
        for future in as_completed(futures):
            results.append(future.result())
    return results

After — async with httpx:

import httpx
import asyncio
from typing import List
 
 
async def fetch_user(client: httpx.AsyncClient, user_id: int) -> dict:
    response = await client.get(f"https://api.example.com/users/{user_id}")
    response.raise_for_status()
    return response.json()
 
 
async def fetch_all_users(user_ids: List[int]) -> List[dict]:
    async with httpx.AsyncClient() as client:
        tasks = [fetch_user(client, uid) for uid in user_ids]
        results = await asyncio.gather(*tasks)
    return list(results)

Notice the changes:

  • requestshttpx.AsyncClient (same API, async-native)
  • pool.submit() → create coroutine objects (tasks)
  • as_completed()asyncio.gather() for collect-all, or asyncio.as_completed() if you need incremental results
  • The client is shared across all calls (connection pooling)

Step 2: Propagate async Upward

Once leaf functions are async, their callers must await them, which means they become async too. Follow the chain upward.

# Before
def process_batch(batch_ids: list[int]) -> dict:
    users = fetch_all_users(batch_ids)  # was sync
    return {"count": len(users), "users": users}
 
 
# After
async def process_batch(batch_ids: list[int]) -> dict:
    users = await fetch_all_users(batch_ids)  # now async
    return {"count": len(users), "users": users}

This propagation is unavoidable — it's the "async infection." Plan for it from the start. If you can't make a caller async (e.g., it's a Django view or a CLI entry point), see Section 6.

Step 3: Replace as_completed patterns

The as_completed pattern from concurrent.futures has a direct async equivalent:

# Before — thread-based as_completed
from concurrent.futures import ThreadPoolExecutor, as_completed
 
def process_urls(urls: list[str]) -> None:
    with ThreadPoolExecutor(max_workers=30) as pool:
        futures = {pool.submit(scrape, url): url for url in urls}
        for future in as_completed(futures):
            url = futures[future]
            try:
                data = future.result()
                print(f"Done: {url}")
            except Exception as e:
                print(f"Failed {url}: {e}")
 
 
# After — asyncio.as_completed
import asyncio
 
async def process_urls(urls: list[str]) -> None:
    async with httpx.AsyncClient() as client:
        coro_map = {scrape(client, url): url for url in urls}
        for coro in asyncio.as_completed(coro_map.keys()):
            # asyncio.as_completed doesn't preserve the mapping easily
            # use a wrapper instead:
            pass
 
 
# Better pattern — wrap each task with its metadata
async def process_urls(urls: list[str]) -> None:
    async def scrape_with_url(client: httpx.AsyncClient, url: str) -> tuple[str, dict]:
        data = await scrape(client, url)
        return url, data
 
    async with httpx.AsyncClient() as client:
        tasks = [scrape_with_url(client, url) for url in urls]
        for coro in asyncio.as_completed(tasks):
            try:
                url, data = await coro
                print(f"Done: {url}")
            except Exception as e:
                print(f"Failed: {e}")

Step 4: Handle Rate Limiting and Concurrency Caps

ThreadPoolExecutor(max_workers=N) naturally limits concurrency. With asyncio, gather(*tasks) launches everything at once. For large lists, you need a semaphore:

# Before — max_workers caps concurrency automatically
with ThreadPoolExecutor(max_workers=5) as pool:
    results = list(pool.map(call_rate_limited_api, items))
 
 
# After — use a semaphore
async def call_with_semaphore(
    sem: asyncio.Semaphore,
    client: httpx.AsyncClient,
    item: str
) -> dict:
    async with sem:
        return await call_rate_limited_api(client, item)
 
 
async def process_all(items: list[str]) -> list[dict]:
    sem = asyncio.Semaphore(5)  # max 5 concurrent requests
    async with httpx.AsyncClient() as client:
        tasks = [call_with_semaphore(sem, client, item) for item in items]
        return await asyncio.gather(*tasks)

5. Handling Blocking Libraries

This is the hard part. Many Python libraries — psycopg2, boto3, certain ORMs, legacy SDKs — are synchronous and have no async equivalent. You have three options:

Option A: Use loop.run_in_executor() (Quick Wrapping)

import asyncio
import boto3
from functools import partial
 
 
s3 = boto3.client("s3")
 
 
async def upload_to_s3(bucket: str, key: str, data: bytes) -> None:
    loop = asyncio.get_event_loop()
    # run_in_executor wraps blocking call in a thread
    await loop.run_in_executor(
        None,  # None = default ThreadPoolExecutor
        partial(s3.put_object, Bucket=bucket, Key=key, Body=data)
    )

This is the pragmatic bridge: your coroutine awaits the blocking call without blocking the event loop. The call still runs in a thread, but it integrates cleanly with async code.

Set a custom executor to control thread count:

import asyncio
from concurrent.futures import ThreadPoolExecutor
 
executor = ThreadPoolExecutor(max_workers=10)
 
async def some_async_function():
    loop = asyncio.get_event_loop()
    result = await loop.run_in_executor(executor, blocking_function, arg1, arg2)
    return result

Option B: Switch to an Async Library

Sync libraryAsync replacement
requestshttpx, aiohttp
psycopg2asyncpg, psycopg3 (async mode)
pymysqlaiomysql
redis-py (sync)redis-py (async mode, same package)
boto3aiobotocore / aioboto3
standard open()aiofiles

Option C: Isolate Sync Code in a Worker Process

For heavy CPU + blocking sync work, use ProcessPoolExecutor with run_in_executor. Each process gets its own GIL and event loop isn't affected:

from concurrent.futures import ProcessPoolExecutor
import asyncio
 
process_pool = ProcessPoolExecutor(max_workers=4)
 
async def run_cpu_bound(data: bytes) -> bytes:
    loop = asyncio.get_event_loop()
    result = await loop.run_in_executor(process_pool, heavy_cpu_function, data)
    return result

6. Bridging Sync and Async During Migration

Migration is rarely a one-day rewrite. You'll have a period where some callers are sync and some are async. Here's how to bridge:

Calling async from sync code

import asyncio
 
 
async def async_main() -> None:
    result = await fetch_all_users([1, 2, 3])
    print(result)
 
 
# Entry point — sync context
if __name__ == "__main__":
    asyncio.run(async_main())

For libraries or utilities that need to expose a sync interface over async internals:

import asyncio
from typing import TypeVar, Coroutine, Any
 
T = TypeVar("T")
 
 
def run_sync(coro: Coroutine[Any, Any, T]) -> T:
    """Run a coroutine synchronously. Only use at sync/async boundaries."""
    return asyncio.run(coro)
 
 
# Use it sparingly — at the true boundary, not buried in business logic
result = run_sync(fetch_all_users([1, 2, 3]))

Never call asyncio.run() inside an already-running event loop. This is a common mistake when calling async code from a Jupyter notebook or from within FastAPI endpoints. Use await instead, or asyncio.get_event_loop().run_until_complete() if you're in a nested sync context (though this is fragile).

Calling sync from async code (without blocking)

import asyncio
from concurrent.futures import ThreadPoolExecutor
 
_executor = ThreadPoolExecutor(max_workers=10)
 
 
async def call_sync_safely(sync_fn, *args):
    loop = asyncio.get_event_loop()
    return await loop.run_in_executor(_executor, sync_fn, *args)
 
 
# Usage
async def handler():
    # Don't do this — blocks the event loop!
    # result = some_sync_library_call()
 
    # Do this instead
    result = await call_sync_safely(some_sync_library_call, arg1, arg2)
    return result

7. Common Pitfalls

Pitfall 1: Blocking the Event Loop

The most common async bug. Any sync I/O or CPU-heavy code in a coroutine blocks everything:

# Bad — blocks the entire event loop
async def bad_handler():
    time.sleep(2)           # blocks
    data = requests.get(url) # blocks
    result = json.loads(data) # fine, it's fast CPU work
 
# Good
async def good_handler():
    await asyncio.sleep(2)           # yields
    async with httpx.AsyncClient() as client:
        response = await client.get(url)  # yields
    result = response.json()              # fine

Use asyncio.get_event_loop().set_debug(True) during development — it logs coroutines that run longer than 100ms without yielding.

Pitfall 2: Creating a New Event Loop Per Request

# Bad — creates a new event loop every time, expensive and error-prone
def handle_request(data):
    loop = asyncio.new_event_loop()
    result = loop.run_until_complete(process(data))
    loop.close()
    return result
 
# Good — use asyncio.run() at the top level, or make the handler async
async def handle_request(data):
    return await process(data)

Pitfall 3: Not Awaiting Tasks

# Bad — fire-and-forget, exceptions are silently swallowed
async def bad():
    asyncio.create_task(risky_operation())  # not awaited
    return "done"
 
# Good — collect and await, or handle exceptions explicitly
async def good():
    task = asyncio.create_task(risky_operation())
    try:
        await task
    except Exception as e:
        logger.error(f"Task failed: {e}")
    return "done"

Pitfall 4: Sharing Non-Thread-Safe Objects Across Coroutines

Coroutines run on a single thread, so objects shared between them are generally safe. But if you're mixing run_in_executor with shared state, watch out:

# Dangerous if db_connection is not thread-safe
async def dangerous():
    loop = asyncio.get_event_loop()
    # db_connection accessed from multiple threads via executor
    result = await loop.run_in_executor(None, db_connection.query, sql)

Use a connection pool designed for concurrent access, or ensure each thread gets its own connection.


8. Measuring the Improvement

Validate that migration actually helped before declaring success:

import asyncio
import time
import httpx
 
 
async def benchmark_async(urls: list[str]) -> float:
    start = time.perf_counter()
    async with httpx.AsyncClient() as client:
        await asyncio.gather(*[client.get(url) for url in urls])
    return time.perf_counter() - start
 
 
def benchmark_threads(urls: list[str]) -> float:
    import requests
    from concurrent.futures import ThreadPoolExecutor
    start = time.perf_counter()
    with ThreadPoolExecutor(max_workers=len(urls)) as pool:
        list(pool.map(requests.get, urls))
    return time.perf_counter() - start

Typical results for I/O-bound work:

  • Threads: scales well up to ~100 concurrent tasks, memory grows linearly
  • Asyncio: scales to thousands of concurrent tasks, memory stays flat

The real win isn't raw speed — it's resource efficiency at scale. If you're only doing 10 concurrent requests, the difference is negligible.


9. Migration Checklist

Use this when planning your migration:

Before starting:
✅ Identify all ThreadPoolExecutor / ThreadPool usage with grep
✅ Classify each: I/O-bound (migrate) vs CPU-bound (keep in pool)
✅ Check if async equivalents exist for every library you use
✅ Decide on migration scope (full rewrite vs incremental)

During migration:
✅ Start from leaf I/O functions, propagate async upward
✅ Replace sync HTTP/DB libraries with async equivalents
✅ Add semaphores wherever max_workers was limiting concurrency
✅ Wrap unavoidable blocking calls in run_in_executor
✅ Enable asyncio debug mode during development

After migration:
✅ Benchmark: compare throughput and memory vs before
✅ Test error handling — async exceptions surface differently
✅ Check for fire-and-forget tasks with unhandled exceptions
✅ Review timeout handling — asyncio has asyncio.wait_for()


Summary

Migrating from ThreadPoolExecutor to asyncio is a meaningful upgrade for I/O-heavy Python services, but it's not free. The core steps are:

  1. Audit your thread pool usage and classify I/O vs CPU work
  2. Refactor bottom-up — leaf functions first, callers second
  3. Replace sync libraries with async equivalents where possible
  4. Wrap blocking code with run_in_executor where no async library exists
  5. Add semaphores to replace the concurrency cap that max_workers provided
  6. Measure before claiming success

The migration is worth it when you're running hundreds of concurrent I/O operations and starting to see thread exhaustion, high memory, or poor throughput under load. For lighter workloads, ThreadPoolExecutor remains a perfectly good tool.

Further Reading:
Python Async Programming: Mastering asyncio
Async Processing in FastAPI

📬 Subscribe to Newsletter

Get the latest blog posts delivered to your inbox every week. No spam, unsubscribe anytime.

We respect your privacy. Unsubscribe at any time.

💬 Comments

Sign in to leave a comment

We'll never post without your permission.