Skip to content

Async patterns with asyncio

Basics: async def and await

An async def function is a coroutine. Calling it returns a coroutine object - it does not run until you await it or schedule it on an event loop:

import asyncio

async def fetch_price(symbol: str) -> float:
    await asyncio.sleep(0.1)   # simulate I/O
    prices = {"AAPL": 182.5, "GOOG": 175.3, "MSFT": 415.1}
    return prices.get(symbol, 0.0)

async def main() -> None:
    price = await fetch_price("AAPL")
    print(f"AAPL: ${price}")

asyncio.run(main())

asyncio.run creates a new event loop, runs the coroutine to completion, and closes the loop. Always use it as the entry point - never call loop.run_until_complete manually in new code.

gather: run tasks in parallel

asyncio.gather schedules multiple coroutines concurrently and collects their return values in order:

import asyncio
import time

async def fetch(symbol: str, delay: float) -> tuple[str, float]:
    await asyncio.sleep(delay)
    prices = {"AAPL": 182.5, "GOOG": 175.3, "MSFT": 415.1, "TSLA": 245.0}
    return symbol, prices.get(symbol, 0.0)

async def main() -> None:
    start = time.perf_counter()

    # All four fetches run concurrently - total time is max(delays), not sum
    results = await asyncio.gather(
        fetch("AAPL", 0.3),
        fetch("GOOG", 0.1),
        fetch("MSFT", 0.2),
        fetch("TSLA", 0.15),
    )

    elapsed = time.perf_counter() - start
    for symbol, price in results:
        print(f"{symbol}: ${price}")
    print(f"Done in {elapsed:.2f}s")   # ~0.3s, not ~0.75s

asyncio.run(main())

If one coroutine raises, gather cancels the others by default. Pass return_exceptions=True to get exceptions as values instead:

import asyncio

async def might_fail(n: int) -> int:
    if n == 2:
        raise ValueError(f"Bad number: {n}")
    return n * 10

async def main() -> None:
    results = await asyncio.gather(
        might_fail(1), might_fail(2), might_fail(3),
        return_exceptions=True,
    )
    for result in results:
        if isinstance(result, Exception):
            print(f"Error: {result}")
        else:
            print(f"OK: {result}")

asyncio.run(main())

TaskGroup (Python 3.11+)

asyncio.TaskGroup is the structured concurrency alternative to gather. All tasks are cancelled if any task raises, and errors are reported together as an ExceptionGroup:

import asyncio

async def fetch_user(user_id: int) -> dict:
    await asyncio.sleep(0.05)
    return {"id": user_id, "name": f"User {user_id}"}

async def main() -> None:
    users: list[dict] = []

    async with asyncio.TaskGroup() as tg:
        tasks = [tg.create_task(fetch_user(i)) for i in range(1, 6)]

    # all tasks are done here
    for task in tasks:
        users.append(task.result())

    print(users)

asyncio.run(main())

TaskGroup enforces that you do not leak background tasks - the async with block does not exit until every task created inside it has finished or been cancelled.

Timeout with asyncio.timeout

import asyncio

async def slow_operation() -> str:
    await asyncio.sleep(5)
    return "done"

async def main() -> None:
    try:
        async with asyncio.timeout(1.0):
            result = await slow_operation()
    except TimeoutError:
        print("Operation timed out after 1s")

asyncio.run(main())

For Python 3.10 and earlier, use asyncio.wait_for:

import asyncio

async def main() -> None:
    try:
        result = await asyncio.wait_for(slow_operation(), timeout=1.0)
    except asyncio.TimeoutError:
        print("Timed out")

asyncio.run(main())

Cancellation

Tasks can be cancelled from outside. Always clean up in a finally block or by catching asyncio.CancelledError:

import asyncio

async def worker(name: str) -> None:
    try:
        print(f"{name}: starting")
        while True:
            await asyncio.sleep(0.5)
            print(f"{name}: tick")
    except asyncio.CancelledError:
        print(f"{name}: cancelled, cleaning up")
        raise   # always re-raise CancelledError
    finally:
        print(f"{name}: done")

async def main() -> None:
    task = asyncio.create_task(worker("background"))

    await asyncio.sleep(1.8)
    task.cancel()

    try:
        await task
    except asyncio.CancelledError:
        pass

asyncio.run(main())

Async context managers

Implement __aenter__ and __aexit__ (or use @asynccontextmanager) for resources that need async setup and teardown:

import asyncio
from contextlib import asynccontextmanager
from typing import AsyncGenerator

@asynccontextmanager
async def managed_connection(host: str) -> AsyncGenerator[dict, None]:
    print(f"Connecting to {host}")
    conn = {"host": host, "alive": True}   # simulated connection object
    try:
        yield conn
    finally:
        conn["alive"] = False
        print(f"Disconnected from {host}")

async def main() -> None:
    async with managed_connection("db.example.com") as conn:
        print(f"Using connection: {conn}")
        await asyncio.sleep(0.1)   # do some work

asyncio.run(main())

Async generators

Async generators yield values lazily, one at a time, from an async source:

import asyncio
from typing import AsyncGenerator

async def paginate_api(endpoint: str, pages: int = 3) -> AsyncGenerator[list[dict], None]:
    for page in range(1, pages + 1):
        await asyncio.sleep(0.05)   # simulate network call
        yield [{"page": page, "item": i} for i in range(5)]

async def main() -> None:
    total = 0
    async for batch in paginate_api("/items", pages=4):
        total += len(batch)
        print(f"Received {len(batch)} items, total so far: {total}")

asyncio.run(main())

Use async for to consume them. Use aiostream or manual buffering if you need to fan them out.

Running sync code in an executor

Blocking I/O or CPU-bound work inside a coroutine blocks the event loop. Offload it with loop.run_in_executor:

import asyncio
import time
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor

def blocking_read(path: str) -> str:
    time.sleep(0.2)   # simulate slow disk
    return f"contents of {path}"

def cpu_intensive(n: int) -> int:
    return sum(i * i for i in range(n))

async def main() -> None:
    loop = asyncio.get_running_loop()

    # thread pool for I/O-bound blocking calls
    with ThreadPoolExecutor(max_workers=4) as thread_pool:
        results = await asyncio.gather(
            loop.run_in_executor(thread_pool, blocking_read, "file1.txt"),
            loop.run_in_executor(thread_pool, blocking_read, "file2.txt"),
            loop.run_in_executor(thread_pool, blocking_read, "file3.txt"),
        )
    print(results)

    # process pool for CPU-bound work (bypasses the GIL)
    with ProcessPoolExecutor(max_workers=4) as proc_pool:
        heavy_results = await asyncio.gather(
            loop.run_in_executor(proc_pool, cpu_intensive, 1_000_000),
            loop.run_in_executor(proc_pool, cpu_intensive, 2_000_000),
        )
    print(heavy_results)

asyncio.run(main())

Real-world: parallel API calls

Fetch data from multiple GitHub endpoints concurrently, with per-request timeouts and error handling:

import asyncio
import httpx
import os
from typing import Any

GITHUB_TOKEN = os.environ.get("GITHUB_TOKEN", "")

async def github_get(client: httpx.AsyncClient, path: str) -> dict[str, Any]:
    async with asyncio.timeout(10.0):
        response = await client.get(path)
        response.raise_for_status()
        return response.json()

async def fetch_repo_stats(repos: list[str]) -> None:
    async with httpx.AsyncClient(
        base_url="https://api.github.com",
        headers={
            "Authorization": f"Bearer {GITHUB_TOKEN}",
            "Accept": "application/vnd.github+json",
        },
    ) as client:
        tasks = [github_get(client, f"/repos/{repo}") for repo in repos]
        results = await asyncio.gather(*tasks, return_exceptions=True)

        for repo, result in zip(repos, results):
            if isinstance(result, Exception):
                print(f"{repo}: ERROR - {result}")
            else:
                print(f"{repo}: {result['stargazers_count']} stars, {result['open_issues_count']} open issues")

repos = [
    "python/cpython",
    "pallets/flask",
    "tiangolo/fastapi",
    "django/django",
]

asyncio.run(fetch_repo_stats(repos))

Real-world: async database connection pool

Using asyncpg for PostgreSQL (the pattern works the same with aiosqlite, aiomysql, etc.):

bunpy add asyncpg
import asyncio
import asyncpg

async def main() -> None:
    pool = await asyncpg.create_pool(
        dsn="postgresql://user:pass@localhost/mydb",
        min_size=2,
        max_size=10,
    )

    async def fetch_user(user_id: int) -> asyncpg.Record | None:
        async with pool.acquire() as conn:
            return await conn.fetchrow("SELECT id, username FROM users WHERE id = $1", user_id)

    # fetch 20 users concurrently, sharing the connection pool
    results = await asyncio.gather(*[fetch_user(i) for i in range(1, 21)])
    for row in results:
        if row:
            print(row["username"])

    await pool.close()

asyncio.run(main())

Semaphore: limit concurrency

When making many outbound requests, a semaphore prevents opening thousands of connections at once:

import asyncio
import httpx

async def fetch_with_limit(
    client: httpx.AsyncClient,
    sem: asyncio.Semaphore,
    url: str,
) -> int:
    async with sem:
        response = await client.get(url, timeout=10.0)
        return response.status_code

async def main() -> None:
    urls = [f"https://httpbin.org/delay/0?n={i}" for i in range(20)]
    sem = asyncio.Semaphore(5)   # at most 5 concurrent requests

    async with httpx.AsyncClient() as client:
        tasks = [fetch_with_limit(client, sem, url) for url in urls]
        statuses = await asyncio.gather(*tasks)

    print(statuses)

asyncio.run(main())

Run the examples

bunpy parallel_api.py
bunpy async_queue.py

asyncio’s model is single-threaded cooperative multitasking: only one coroutine runs at a time, but any coroutine can yield control at an await point. That means shared state is safe within a single event loop - but blocking for more than a few milliseconds without await stalls everything else.