Lib/concurrent/futures (part 3)
Source:
cpython 3.14 @ ab2d84fe1023/Lib/concurrent/futures/_base.py
This annotation covers the Future lifecycle and executor submit. See lib_concurrent2_detail for ThreadPoolExecutor.__init__, the work queue, and thread management.
Map
| Lines | Symbol | Role |
|---|---|---|
| 1-80 | Executor.submit | Wrap a callable in a Future and enqueue |
| 81-160 | Future.result | Block until done, return result or raise exception |
| 161-240 | Future.add_done_callback | Register a callback for completion |
| 241-360 | as_completed | Yield futures as they finish |
| 361-500 | wait | Block until a set of futures reaches a condition |
Reading
Executor.submit
# CPython: Lib/concurrent/futures/thread.py:180 ThreadPoolExecutor.submit
def submit(self, fn, /, *args, **kwargs):
with self._shutdown_lock:
if self._shutdown:
raise RuntimeError('cannot schedule new futures after shutdown')
f = _base.Future()
w = _WorkItem(f, fn, args, kwargs)
self._work_queue.put(w)
self._adjust_thread_count()
return f
submit creates a Future immediately and enqueues the work. The caller gets the Future back before the work starts. _adjust_thread_count starts a new thread if the pool is below max_workers and all existing threads are busy.
Future.result
# CPython: Lib/concurrent/futures/_base.py:440 Future.result
def result(self, timeout=None):
with self._condition:
if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]:
raise CancelledError()
elif self._state == FINISHED:
return self.__get_result()
self._condition.wait(timeout)
if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]:
raise CancelledError()
elif self._state == FINISHED:
return self.__get_result()
else:
raise TimeoutError()
def __get_result(self):
if self._exception:
raise self._exception
return self._result
future.result() blocks on a threading.Condition until the future is done. If the callable raised, result() re-raises the same exception. TimeoutError is raised if timeout expires while still waiting.
Future.add_done_callback
# CPython: Lib/concurrent/futures/_base.py:480 add_done_callback
def add_done_callback(self, fn):
with self._condition:
if self._state not in [CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED]:
self._done_callbacks.append(fn)
return
fn(self) # already done: call immediately
Callbacks registered before completion are called when the future finishes. Callbacks registered after completion are called synchronously in add_done_callback. All callbacks run in the thread that completed the future.
as_completed
# CPython: Lib/concurrent/futures/_base.py:220 as_completed
def as_completed(fs, timeout=None):
future_set = set(fs)
waiter = _create_and_install_waiters(future_set, _AS_COMPLETED)
try:
for future in done:
yield future
end_time = timeout and (time.monotonic() + timeout)
while pending:
wait_timeout = end_time and (end_time - time.monotonic())
if not waiter.event.wait(wait_timeout):
raise TimeoutError(...)
for future in waiter.finished_futures:
yield future
waiter.finished_futures = []
waiter.event.clear()
finally:
for f in future_set:
f._waiters.remove(waiter)
as_completed installs a waiter object into each future's _waiters list. When a future completes, it notifies the event. The generator yields completed futures in completion order, not submission order.
wait
# CPython: Lib/concurrent/futures/_base.py:180 wait
def wait(fs, timeout=None, return_when=ALL_COMPLETED):
done = set()
not_done = set()
waiter = _create_and_install_waiters(fs, return_when)
...
waiter.event.wait(timeout)
...
return DoneAndNotDoneFutures(done, not_done)
wait(futures, return_when=FIRST_COMPLETED) returns as soon as any future finishes. ALL_COMPLETED (default) waits for all. FIRST_EXCEPTION returns when any future raises.
gopy notes
ThreadPoolExecutor.submit is module/concurrent.Submit in module/concurrent/module.go. It uses sync.WaitGroup and a buffered channel as the work queue. Future.result calls future.Wait() which blocks on a sync.WaitGroup. as_completed uses a chan *Future.