Concurrency with concurrent.futures
Overview
concurrent.futures is in the standard library - no install needed. It gives you two executors:
ThreadPoolExecutor- for I/O-bound work (network calls, disk reads, database queries). Threads share memory and the GIL, so they do not speed up CPU-bound Python code.ProcessPoolExecutor- for CPU-bound work (image processing, number crunching, compression). Separate processes bypass the GIL but have higher startup cost and cannot share memory.
ThreadPoolExecutor basics
from concurrent.futures import ThreadPoolExecutor, as_completed
import time
def fetch_price(symbol: str) -> tuple[str, float]:
time.sleep(0.2) # simulate network I/O
prices = {"AAPL": 182.5, "GOOG": 175.3, "MSFT": 415.1, "TSLA": 245.0}
return symbol, prices.get(symbol, 0.0)
symbols = ["AAPL", "GOOG", "MSFT", "TSLA"]
start = time.perf_counter()
with ThreadPoolExecutor(max_workers=4) as executor:
results = list(executor.map(fetch_price, symbols))
elapsed = time.perf_counter() - start
for symbol, price in results:
print(f"{symbol}: ${price}")
print(f"Done in {elapsed:.2f}s") # ~0.2s, not ~0.8sexecutor.map preserves input order. Results are yielded as each completes but collected in the original order.
ProcessPoolExecutor basics
from concurrent.futures import ProcessPoolExecutor
import time
def count_primes(limit: int) -> int:
"""Return the count of primes below limit (CPU-bound)."""
sieve = bytearray([1]) * limit
sieve[0] = sieve[1] = 0
for i in range(2, int(limit ** 0.5) + 1):
if sieve[i]:
sieve[i * i::i] = bytearray(len(sieve[i * i::i]))
return sum(sieve)
limits = [1_000_000, 2_000_000, 3_000_000, 4_000_000]
start = time.perf_counter()
with ProcessPoolExecutor(max_workers=4) as executor:
counts = list(executor.map(count_primes, limits))
elapsed = time.perf_counter() - start
for limit, count in zip(limits, counts):
print(f"Primes below {limit:,}: {count:,}")
print(f"Done in {elapsed:.2f}s")Because ProcessPoolExecutor uses multiprocessing under the hood, all arguments and return values must be picklable. Lambdas and local functions are not - define them at module level.
as_completed: process results as they arrive
as_completed yields futures in the order they finish, not the order they were submitted. Use it when you want to display progress or handle errors as soon as they happen:
from concurrent.futures import ThreadPoolExecutor, as_completed
import httpx
def check_url(url: str) -> tuple[str, int]:
try:
response = httpx.get(url, timeout=5.0, follow_redirects=True)
return url, response.status_code
except Exception as exc:
return url, -1
urls = [
"https://python.org",
"https://pypi.org",
"https://docs.python.org",
"https://github.com",
"https://httpbin.org",
"https://this-does-not-exist.invalid",
]
with ThreadPoolExecutor(max_workers=6) as executor:
future_to_url = {executor.submit(check_url, url): url for url in urls}
for future in as_completed(future_to_url):
url, status = future.result()
status_str = str(status) if status > 0 else "ERROR"
print(f"{status_str:>5} {url}")wait: block until a subset finishes
wait gives finer control than as_completed. You can wait for the first result, the first exception, or all results:
from concurrent.futures import ThreadPoolExecutor, wait, FIRST_COMPLETED, ALL_COMPLETED
import time
def task(name: str, delay: float) -> str:
time.sleep(delay)
return f"{name} finished"
with ThreadPoolExecutor(max_workers=4) as executor:
futures = {
executor.submit(task, "alpha", 1.0),
executor.submit(task, "beta", 0.3),
executor.submit(task, "gamma", 0.7),
executor.submit(task, "delta", 0.1),
}
done, pending = wait(futures, return_when=FIRST_COMPLETED)
print(f"First finished: {done.pop().result()}")
print(f"Still running: {len(pending)}")
done, _ = wait(pending, return_when=ALL_COMPLETED)
for f in done:
print(f.result())Cancel pending futures
Submit work, then cancel tasks that have not started yet - useful for timeouts or early-exit patterns:
from concurrent.futures import ThreadPoolExecutor, as_completed
import time
def slow_task(n: int) -> int:
time.sleep(2)
return n * n
with ThreadPoolExecutor(max_workers=2) as executor:
futures = [executor.submit(slow_task, i) for i in range(10)]
# cancel all but the first two (which are already running)
for future in futures[2:]:
cancelled = future.cancel()
print(f"Future {futures.index(future)} cancelled: {cancelled}")
for future in as_completed(futures[:2]):
print(f"Result: {future.result()}")cancel() returns True only if the task has not started. Running tasks cannot be cancelled - that is a fundamental limit of threads.
map with timeout
executor.map accepts a timeout parameter. If any result takes longer than timeout seconds to retrieve, TimeoutError is raised:
from concurrent.futures import ThreadPoolExecutor
import time
def fetch(n: int) -> int:
time.sleep(n * 0.1)
return n * 10
with ThreadPoolExecutor(max_workers=4) as executor:
try:
for result in executor.map(fetch, [1, 2, 3, 15, 4], timeout=1.0):
print(result)
except TimeoutError:
print("One of the tasks took too long.")Real-world: parallel file processing
Read and summarize a directory of log files concurrently:
from concurrent.futures import ThreadPoolExecutor, as_completed
from pathlib import Path
import re
def analyze_log(path: Path) -> dict:
error_count = 0
warn_count = 0
lines = 0
with path.open() as f:
for line in f:
lines += 1
if "ERROR" in line:
error_count += 1
elif "WARN" in line:
warn_count += 1
return {
"file": path.name,
"lines": lines,
"errors": error_count,
"warnings": warn_count,
}
def summarize_logs(log_dir: str, workers: int = 8) -> list[dict]:
paths = list(Path(log_dir).glob("*.log"))
results = []
with ThreadPoolExecutor(max_workers=workers) as executor:
future_to_path = {executor.submit(analyze_log, p): p for p in paths}
for future in as_completed(future_to_path):
path = future_to_path[future]
try:
result = future.result()
results.append(result)
print(f"Processed {result['file']}: {result['errors']} errors")
except Exception as exc:
print(f"Failed to process {path.name}: {exc}")
return sorted(results, key=lambda r: r["errors"], reverse=True)
# usage
# results = summarize_logs("/var/log/myapp")
# for r in results[:5]:
# print(r)Real-world: parallel API calls with a thread pool
Pull data from a REST API for a list of IDs, honoring a rate limit:
from concurrent.futures import ThreadPoolExecutor, as_completed
from threading import Semaphore
import httpx
import time
RATE_LIMIT = 10 # max 10 concurrent requests
sem = Semaphore(RATE_LIMIT)
def fetch_item(item_id: int) -> dict:
with sem:
response = httpx.get(
f"https://jsonplaceholder.typicode.com/posts/{item_id}",
timeout=10.0,
)
response.raise_for_status()
return response.json()
def fetch_all(ids: list[int]) -> list[dict]:
results: list[dict] = []
with ThreadPoolExecutor(max_workers=20) as executor:
future_to_id = {executor.submit(fetch_item, id_): id_ for id_ in ids}
for future in as_completed(future_to_id):
id_ = future_to_id[future]
try:
data = future.result()
results.append(data)
except Exception as exc:
print(f"Item {id_} failed: {exc}")
return results
start = time.perf_counter()
items = fetch_all(list(range(1, 51)))
elapsed = time.perf_counter() - start
print(f"Fetched {len(items)} items in {elapsed:.2f}s")Real-world: image resizing with processes
Image processing is CPU-bound - use ProcessPoolExecutor to use all cores:
bunpy add pillowfrom concurrent.futures import ProcessPoolExecutor, as_completed
from pathlib import Path
from PIL import Image
def resize_image(src: str, dest: str, width: int, height: int) -> str:
img = Image.open(src)
img_resized = img.resize((width, height), Image.LANCZOS)
img_resized.save(dest, optimize=True, quality=85)
return dest
def resize_all(input_dir: str, output_dir: str, size: tuple[int, int] = (800, 600)) -> None:
src_dir = Path(input_dir)
out_dir = Path(output_dir)
out_dir.mkdir(parents=True, exist_ok=True)
images = list(src_dir.glob("*.jpg")) + list(src_dir.glob("*.png"))
if not images:
print("No images found.")
return
jobs = [
(str(p), str(out_dir / p.name), size[0], size[1])
for p in images
]
with ProcessPoolExecutor() as executor:
futures = {
executor.submit(resize_image, src, dest, w, h): src
for src, dest, w, h in jobs
}
completed = 0
for future in as_completed(futures):
src = futures[future]
try:
dest = future.result()
completed += 1
print(f"[{completed}/{len(jobs)}] {Path(src).name} -> {dest}")
except Exception as exc:
print(f"Failed {src}: {exc}")
# resize_all("photos/", "photos_resized/", size=(1280, 720))Exception handling
Exceptions are stored on the future and re-raised when you call .result():
from concurrent.futures import ThreadPoolExecutor
def divide(a: int, b: int) -> float:
if b == 0:
raise ZeroDivisionError("cannot divide by zero")
return a / b
with ThreadPoolExecutor(max_workers=2) as executor:
futures = [
executor.submit(divide, 10, 2),
executor.submit(divide, 5, 0),
executor.submit(divide, 8, 4),
]
for i, future in enumerate(futures):
try:
print(f"Result {i}: {future.result()}")
except ZeroDivisionError as exc:
print(f"Result {i}: error - {exc}")Choosing workers count
A rough starting point:
import os
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
cpu_count = os.cpu_count() or 4
# I/O-bound: more threads than CPUs is fine because threads spend most time waiting
io_workers = min(32, cpu_count * 4)
# CPU-bound: one process per core; more just adds context-switch overhead
cpu_workers = cpu_count
with ThreadPoolExecutor(max_workers=io_workers) as executor:
pass
with ProcessPoolExecutor(max_workers=cpu_workers) as executor:
passRun the examples
bunpy parallel_api.py
bunpy resize_images.py
bunpy log_analyzer.pyconcurrent.futures is the right tool when you want straightforward parallelism without the complexity of raw threading or multiprocessing. For async I/O (aiohttp, httpx async), prefer asyncio with asyncio.gather - threads add overhead that async avoids entirely when the whole stack is async-native.