60. `asyncio`

asyncio event loop internals, the coroutine runner, I/O selector integration, and Task scheduling.

60. asyncio

The asyncio module is CPython’s standard library framework for asynchronous I/O. It provides an event loop, tasks, futures, transports, streams, synchronization primitives, subprocess integration, timers, and APIs for coordinating many waiting operations in one thread.

For CPython internals, asyncio matters because it exposes how async def, coroutine objects, await, futures, event loops, callbacks, and I/O readiness fit together.

60.1 The Role of asyncio

asyncio is built for concurrent waiting.

It works well when a program spends much of its time waiting for:

network sockets
subprocess pipes
timers
file descriptor readiness
server connections
client responses
coordination between tasks

A minimal example:

import asyncio

async def main():
    await asyncio.sleep(1)
    print("done")

asyncio.run(main())

The program creates a coroutine, starts an event loop, waits for the timer, resumes the coroutine, prints, and shuts down the loop.

The core model:

coroutines describe async work
tasks schedule coroutines
the event loop drives tasks
await suspends execution
I/O readiness or timers resume execution

60.2 async def

An async def function does not execute immediately when called.

async def fetch():
    return 42

coro = fetch()
print(coro)

Calling fetch() creates a coroutine object.

Execution begins only when the coroutine is awaited or scheduled by an event loop.

import asyncio

async def fetch():
    return 42

async def main():
    result = await fetch()
    print(result)

asyncio.run(main())

Conceptually:

call async function
    ↓
create coroutine object
    ↓
schedule or await it
    ↓
run until await point
    ↓
suspend
    ↓
resume later

This is different from a normal function call, which executes immediately.

60.3 Coroutine Objects

A coroutine object is a resumable execution object.

It contains state similar to a generator:

code object
suspended frame
current await target
running flag
locals
instruction position

You can inspect some fields:

async def f():
    await asyncio.sleep(1)

coro = f()

print(coro.cr_code)
print(coro.cr_frame)
print(coro.cr_running)
print(coro.cr_await)

coro.close()

A coroutine can be in one of several states:

State Meaning
Created Created but not started
Running Currently executing
Suspended Paused at await
Closed Finished, cancelled, or closed

The inspect module exposes these states through inspect.getcoroutinestate().

60.4 await

await suspends the current coroutine until an awaitable produces a result.

Example:

async def main():
    result = await some_async_operation()

The awaited object must be awaitable. Common awaitables include:

Awaitable Meaning
Coroutine object Result of calling an async def function
Task Scheduled coroutine
Future Placeholder for a result
Object with __await__ Custom awaitable

At runtime, await is a controlled suspension point.

Conceptually:

current coroutine reaches await
    ↓
returns control to event loop
    ↓
event loop runs other work
    ↓
awaited object completes
    ↓
current coroutine resumes

await does not create an OS thread. It cooperatively yields control to the event loop.

60.5 Event Loop

The event loop is the scheduler.

It manages:

ready callbacks
scheduled timers
I/O readiness
tasks
futures
subprocess events
signal handlers on supported platforms

A simplified event loop iteration looks like:

run ready callbacks
check timers
wait for I/O readiness
mark futures ready
resume tasks
repeat

At the Python level, most programs use:

asyncio.run(main())

This creates and manages the loop for a top-level coroutine.

Lower-level loop access:

import asyncio

async def main():
    loop = asyncio.get_running_loop()
    print(loop)

asyncio.run(main())

Inside a running coroutine, get_running_loop() returns the active event loop.

60.6 asyncio.run()

asyncio.run() is the standard top-level entry point.

import asyncio

async def main():
    await asyncio.sleep(1)

asyncio.run(main())

It performs:

create event loop
set it as current
run top-level coroutine
finalize asynchronous generators
shutdown default executor
close event loop

Use it once at the outermost program boundary.

Bad pattern:

async def inner():
    asyncio.run(other())

You cannot call asyncio.run() from inside an already running event loop. In async code, use await instead.

60.7 Tasks

A task wraps a coroutine and schedules it on the event loop.

import asyncio

async def worker():
    await asyncio.sleep(1)
    return "done"

async def main():
    task = asyncio.create_task(worker())
    result = await task
    print(result)

asyncio.run(main())

Without create_task(), simply creating a coroutine object does not schedule it.

coro = worker()       # created, not running
task = create_task(coro)  # scheduled

A task manages:

coroutine execution
result storage
exception storage
cancellation
callbacks
state transitions

Task states include:

State Meaning
Pending Scheduled or waiting
Running Currently being advanced
Done Returned, raised, or cancelled

60.8 Futures

A future is a placeholder for a result that will be available later.

import asyncio

async def main():
    loop = asyncio.get_running_loop()
    fut = loop.create_future()

    loop.call_soon(fut.set_result, 42)

    result = await fut
    print(result)

asyncio.run(main())

A future can be:

pending
done with result
done with exception
cancelled

Tasks are future-like. A task is a future that drives a coroutine.

Conceptually:

Future
    stores eventual result

Task
    Future + coroutine runner

Library code often uses futures to bridge callback-style APIs into await.

60.9 Cooperative Scheduling

asyncio scheduling is cooperative.

A task runs until it reaches an await point, returns, raises, or is cancelled.

This means CPU-bound code blocks the event loop:

async def bad():
    while True:
        pass

While bad() runs, no other task can proceed.

Good async code yields control at waiting points:

async def good():
    while True:
        await asyncio.sleep(0)

asyncio.sleep(0) gives other tasks a chance to run.

For CPU-bound work, use:

process pools
thread pools for blocking I/O
native libraries
separate services

60.10 Timers

asyncio.sleep() suspends the current coroutine for a duration.

await asyncio.sleep(1.5)

Internally, the loop schedules a timer and resumes the task later.

Conceptually:

task calls sleep
    ↓
future created
    ↓
timer scheduled
    ↓
task suspends
    ↓
timer fires
    ↓
future completes
    ↓
task resumes

The event loop can run other tasks while waiting.

60.11 Running Concurrent Tasks

Use asyncio.gather() to wait for several awaitables.

import asyncio

async def fetch(i):
    await asyncio.sleep(1)
    return i

async def main():
    results = await asyncio.gather(
        fetch(1),
        fetch(2),
        fetch(3),
    )
    print(results)

asyncio.run(main())

All three tasks can wait concurrently.

Expected runtime is about one second, not three, because the sleeps overlap.

gather() preserves input order in its result list.

60.12 TaskGroup

TaskGroup provides structured task management.

import asyncio

async def worker(i):
    await asyncio.sleep(1)
    return i

async def main():
    async with asyncio.TaskGroup() as tg:
        t1 = tg.create_task(worker(1))
        t2 = tg.create_task(worker(2))

    print(t1.result(), t2.result())

asyncio.run(main())

The task group waits for all child tasks before exiting.

If one task fails, the group cancels the others and raises an exception group.

This gives async code a clear ownership structure:

parent task
    owns task group
        owns child tasks

Structured concurrency reduces leaked background tasks.

60.13 Cancellation

Cancellation is cooperative.

import asyncio

async def worker():
    try:
        await asyncio.sleep(10)
    except asyncio.CancelledError:
        print("cancelled")
        raise

async def main():
    task = asyncio.create_task(worker())
    await asyncio.sleep(0.1)
    task.cancel()

    try:
        await task
    except asyncio.CancelledError:
        print("observed cancellation")

asyncio.run(main())

Calling task.cancel() requests cancellation. The task receives CancelledError at an await point.

Important rule:

catch CancelledError only to clean up, then usually re-raise it

Swallowing cancellation can break timeouts, task groups, and shutdown logic.

60.14 Timeouts

asyncio.timeout() limits how long an operation may run.

import asyncio

async def main():
    try:
        async with asyncio.timeout(1):
            await asyncio.sleep(10)
    except TimeoutError:
        print("timed out")

asyncio.run(main())

Timeouts are implemented through cancellation.

Conceptually:

start protected block
    ↓
schedule timeout
    ↓
if timeout fires, cancel current task
    ↓
convert cancellation to TimeoutError at boundary

Timeout behavior depends on cooperative cancellation. If the coroutine blocks the event loop with CPU work, the timeout cannot fire promptly.

60.15 Shielding

asyncio.shield() protects an awaitable from outer cancellation.

await asyncio.shield(task)

If the awaiting coroutine is cancelled, the shielded task may continue running.

Use this sparingly. Shielding can make shutdown harder and can leave background work alive longer than expected.

Typical use cases:

must-finish cleanup
commit or rollback boundary
protocol close handshake

60.16 Streams

asyncio streams provide high-level TCP I/O.

Server:

import asyncio

async def handle(reader, writer):
    data = await reader.read(100)
    writer.write(data)
    await writer.drain()
    writer.close()
    await writer.wait_closed()

async def main():
    server = await asyncio.start_server(handle, "127.0.0.1", 8000)

    async with server:
        await server.serve_forever()

asyncio.run(main())

Client:

reader, writer = await asyncio.open_connection("127.0.0.1", 8000)

writer.write(b"hello")
await writer.drain()

data = await reader.read(100)
writer.close()
await writer.wait_closed()

Streams wrap lower-level transports and protocols into reader and writer objects.

60.17 Backpressure

writer.write() buffers data. It may not send immediately.

writer.write(data)
await writer.drain()

drain() waits until the transport buffer has room.

This is backpressure.

Without drain(), a fast producer can buffer too much data and increase memory usage.

Conceptually:

producer writes faster than socket can send
    ↓
transport buffer grows
    ↓
drain waits for buffer to shrink

Correct async I/O respects backpressure.

60.18 Synchronization Primitives

asyncio provides task-level synchronization primitives.

Primitive Purpose
asyncio.Lock Mutual exclusion between tasks
asyncio.Event One-bit notification
asyncio.Condition Wait and notify
asyncio.Semaphore Limit concurrent access
asyncio.Queue Async producer-consumer channel
asyncio.Barrier Wait for a group of tasks

Example semaphore:

import asyncio

sem = asyncio.Semaphore(10)

async def fetch(url):
    async with sem:
        return await do_request(url)

These primitives coordinate tasks in one event loop. They are not process locks or thread locks.

60.19 Async Queues

asyncio.Queue supports async producer-consumer pipelines.

import asyncio

async def producer(q):
    for i in range(5):
        await q.put(i)
    await q.put(None)

async def consumer(q):
    while True:
        item = await q.get()
        try:
            if item is None:
                return
            print(item)
        finally:
            q.task_done()

async def main():
    q = asyncio.Queue()

    await asyncio.gather(
        producer(q),
        consumer(q),
    )

asyncio.run(main())

Queues provide natural backpressure with maxsize.

q = asyncio.Queue(maxsize=100)

When the queue is full, put() waits.

60.20 Blocking Calls

Blocking calls stop the event loop.

Bad:

async def main():
    time.sleep(5)

Good:

async def main():
    await asyncio.sleep(5)

For blocking functions that cannot be rewritten as async, use a thread executor:

import asyncio
import time

def blocking():
    time.sleep(5)
    return 42

async def main():
    result = await asyncio.to_thread(blocking)
    print(result)

asyncio.run(main())

asyncio.to_thread() runs the function in a thread and returns an awaitable.

This helps with blocking I/O. It does not make CPU-bound Python code parallel in traditional CPython because of the GIL.

60.21 Executors

The event loop can run blocking work in executors.

import asyncio
from concurrent.futures import ProcessPoolExecutor

def cpu_work(x):
    return x * x

async def main():
    loop = asyncio.get_running_loop()

    with ProcessPoolExecutor() as pool:
        result = await loop.run_in_executor(pool, cpu_work, 10)

    print(result)

asyncio.run(main())

Executor choices:

Executor Good for
Thread pool Blocking I/O
Process pool CPU-bound Python code
Default executor Convenient blocking I/O offload

This is the bridge between async scheduling and blocking APIs.

60.22 Subprocesses

asyncio can manage subprocesses asynchronously.

import asyncio

async def main():
    proc = await asyncio.create_subprocess_exec(
        "python",
        "-c",
        "print('hello')",
        stdout=asyncio.subprocess.PIPE,
    )

    out, err = await proc.communicate()
    print(out.decode())

asyncio.run(main())

This lets the event loop wait for process output without blocking other tasks.

60.23 Low-Level Callbacks

The event loop supports callback scheduling.

import asyncio

def callback():
    print("soon")

async def main():
    loop = asyncio.get_running_loop()
    loop.call_soon(callback)

    await asyncio.sleep(0)

asyncio.run(main())

Timers:

loop.call_later(1.0, callback)

Thread-safe scheduling from another thread:

loop.call_soon_threadsafe(callback)

Most application code should prefer coroutines and tasks, but callbacks are important for bridging low-level event sources.

60.24 Transports and Protocols

Before streams, asyncio exposed transports and protocols.

A protocol object receives callbacks:

connection_made
data_received
connection_lost

A transport object performs I/O:

write
close
pause_reading
resume_reading

Conceptually:

event loop
    ↓ I/O event
protocol callback
    ↓
transport operation

Streams are built on top of this lower-level abstraction.

Transports and protocols are useful for high-performance protocol implementations or compatibility with older asyncio code.

60.25 Event Loop Implementations

asyncio has platform-specific event loop implementations.

Common mechanisms include:

Platform area Mechanism
Unix sockets Selector-based readiness
Windows I/O Proactor-based APIs
Timers Event loop scheduled callbacks
Subprocesses Platform-specific child watchers or proactor support

Third-party event loops such as uvloop can replace the default loop in some environments.

The event loop abstraction lets most asyncio programs avoid direct OS readiness APIs.

60.26 Debug Mode

asyncio has debug support.

Enable with:

import asyncio

asyncio.run(main(), debug=True)

or environment variables and loop settings.

Debug mode can help detect:

slow callbacks
forgotten awaits
pending task destruction
wrong-thread loop access
resource warnings

It adds overhead, so it is primarily for development and testing.

60.27 Async Context Managers

Async context managers use async with.

class Connection:
    async def __aenter__(self):
        await self.open()
        return self

    async def __aexit__(self, exc_type, exc, tb):
        await self.close()

Usage:

async with Connection() as conn:
    await conn.query("select 1")

This pattern is essential when cleanup itself requires awaiting, such as closing network connections gracefully.

60.28 Async Iterators

Async iterators use async for.

class Counter:
    def __init__(self, n):
        self.i = 0
        self.n = n

    def __aiter__(self):
        return self

    async def __anext__(self):
        if self.i >= self.n:
            raise StopAsyncIteration

        self.i += 1
        await asyncio.sleep(0.1)
        return self.i

Usage:

async for item in Counter(3):
    print(item)

This supports asynchronous streams of values, such as network messages or paginated API results.

60.29 Async Generators

An async generator is declared with async def and yield.

async def ticker(n):
    for i in range(n):
        await asyncio.sleep(1)
        yield i

Usage:

async for value in ticker(3):
    print(value)

Async generators combine coroutine suspension with streamed output.

They have cleanup semantics because they may hold resources across yields. asyncio.run() finalizes remaining async generators during shutdown.

60.30 Context Variables

asyncio integrates with contextvars.

Context variables allow task-local context.

import asyncio
import contextvars

request_id = contextvars.ContextVar("request_id")

async def worker():
    print(request_id.get())

async def main():
    request_id.set("abc")
    await worker()

asyncio.run(main())

Context is preserved across awaits and task scheduling in controlled ways.

This is important for:

request IDs
tracing
logging context
auth context
locale or tenant state

Unlike thread-local storage, context variables work with asynchronous task switching.

60.31 Common Failure Modes

Common asyncio bugs include:

Mistake Consequence
Calling coroutine without awaiting Coroutine never runs
Blocking with time.sleep() Event loop stalls
CPU-bound loop in coroutine Other tasks starve
Forgetting writer.drain() Memory growth
Swallowing CancelledError Broken cancellation
Creating orphan tasks Leaked background work
Calling asyncio.run() inside loop Runtime error
Sharing non-async locks Deadlocks or blocking
Ignoring task exceptions Lost failures
Using async for CPU parallelism No real CPU parallelism

Most errors come from treating async code like threaded code. asyncio is cooperative scheduling, not preemptive scheduling.

60.32 Relationship to CPython Internals

asyncio connects to several CPython subsystems:

CPython area Connection
Compiler async def, await, async for, async with syntax
Code objects Coroutine flags and async generator flags
Object model Coroutine, task, future, async iterator objects
Frames Suspended coroutine frames
Exceptions Cancellation and exception propagation
Context variables Task-local context propagation
Event loop Scheduling and callback execution
Selectors OS readiness polling
Subprocess support Async process integration
Thread pools Blocking work offload

At the language level, async and await define suspension semantics. At the library level, asyncio provides the scheduler and I/O integration needed to make those semantics useful.

60.33 Why asyncio Matters

asyncio matters because modern Python programs often handle many concurrent I/O operations:

HTTP clients
web servers
database clients
message brokers
chat systems
crawlers
proxies
stream processors
automation systems

For these workloads, creating one OS thread per operation can be expensive. asyncio lets one event loop manage many waiting operations with explicit suspension points.

The core tradeoff:

Strength Cost
High concurrency for I/O-bound workloads Requires async-compatible APIs
Low per-task overhead Blocking code stalls everything
Structured cancellation and timeouts Cancellation must be handled carefully
Clear scheduling points CPU-bound work needs another strategy

60.34 Chapter Summary

The asyncio module is CPython’s standard asynchronous I/O framework. It uses an event loop to drive coroutine objects, tasks, futures, timers, sockets, subprocesses, and callbacks.

For CPython internals, asyncio is important because it connects language-level coroutine machinery to runtime scheduling. It shows how suspended frames, awaitables, tasks, cancellation, event loops, context variables, and OS I/O readiness cooperate to provide concurrent execution in one thread.