December 13: Python Concurrent Execution Modules
Now we get to talk about a completely trivial topic, barely worth mentioning: Concurrent execution, including
threading
, multiprocessing
, concurrent
, subprocess
, sched
and queue
.
Now, you might say, "wait a second, it is not December 13th!", and … you would be right. Life got in the way and I had to spend a week figuring some things out, and pushing out a blog post about concurrency in Python did not quite fit in with that. Now all the things are figured out, and I'm continuing the Traversal of the Python Standard Library Documentation.
Highlights
- Oh god Popen has so many arguments.
- Python ships a "general purpose event scheduler"
threading
This module builds on the low-level _thread
module to provide useful interfaces for interacting with threads. The
straightforward helper methods include enumerate()
(returning all Thread
objects currently alive),
current_thread()
and main_thread()
, and get_ident()
and get_native_id
to get magical identifiers. Interestingly,
with setprofile
and settrace
you can build, in theory, your own debugger.
Thread-local data
When you want to keep some data local to a thread, you can use threading.local()
– or contextvars
, which we covered
earlier.
Threads
Thread
objects execute their activity either by being passed a callable on instantiation (plus args
and kwargs
),
or by having an overridden run()
method. You kick them off with the start()
method, after which is_alive()
returns
True
until the thread is done. When you call another thread's join()
method, the calling thread will block until the
called thread is done. Exceptions are handled via threading.excepthook()
.
Threads can be flagged as daemon threads – when only those remain, they are stopped abruptly and the program exits. That's a fairly dangerous behaviour, since files and locks and the likes may not be released properly.
Locks
Lock
objects are the lowest level of synchronization primitives available in Python. It is, interestingly, a factory
function that returns a different implementation based on what is available on the current platform. It has only three
methods: acquire()
acquires a lock – you can set blocking
to False
to just attempt to acquire the lock and return
False
if it is unavailable, whereas with blocking=True
you will wait until the lock is available and acquire it
immediately. Deadlocks down this road. The timeout
argument can limit the block time while waiting for the lock to be
available. Release it with release()
and use locked()
to check its status.
Locks support the context manager protocol, that is, you can use them in with
blocks.
Rlocks
RLocks
, aka reentrant locks, support multiple acquire()
calls, as long as they come from the same thread. It is only
marked as available once every acquire()
call has been matched by a release()
call.
Conditions
Condition
objects are terribly documented and seem like a glorious footgun. They wrap a lock, and give you convenience
methods to build producer/consumer chains. The wait
method releases the wrapped lock, and then blocks until another
thread calls notify
or notify_all
and releases the lock afterwards. You can also wait conditionally with
wait_for
, which passes a predicate
callable.
Semaphores
Semaphores are way more fun than locks, but not as footgunny as Conditions
. They start out with a value (1 by default,
but feel free to get creative!), then increase the value when you call release()
and decrease it when you acquire()
,
and according to the Python documentation, "the counter can never go below zero":
>>> s = Semaphore(value=0.5) >>> s.acquire() True >>> s.__dict__ {'_cond': <Condition(<...>, 0)>, '_value': -0.5}
Oops. Sorry.
Events
Event
objects are a simple cross-thread communication API. They are a simple flag: They get set()
or clear()
, are
is_set()
or not, and can be wait()
-ed for.
Timers
A Timer
is a subclass of Thread
that will execute its run()
method only after the given interval (in seconds) has
passed after calling start()
.
Barriers
A Barrier
is a neat synchronization mechanism: You initialize it with an (integer! this time) count of threads. Only
when that number of threads have called its wait()
method, it will release all of them at once. You can use
n_waiting
to see how many threads are already waiting for the barrier.
multiprocessing
multiprocessing
is like threading
, in that it spawns processes and has a similar API, but it has a different (you
might say: wider, though there would be a lot of arguing) range of application. multiprocessing
side-steps the GIL,
and is thus well-suited for CPU-bound concurrency. You'll probably want to stick with threading
for I/O or network
bound tasks, though you'd be well-advised to figure out what's your bottleneck with either module.
Process
Process
follows the same API as Thread
: instantiate with a callable or override run()
, then kick off with
start()
. Await with join()
. You can run set_start_method()
to select either spawn (default on Windows and
MacOS), fork (only available on Unix, and default there), or forkserver (only on some Unixes). In addition to the
Thread
API, Process
provides a terminate()
and kill()
method (which risks data corruption and will orphan child
processes of the killed process).
If you need more than one of these in one program (may somebody have mercy on your soul), you can instead run
ctx = get_context()
. Context objects behave just like the whole multiprocessing
module, so you can use ctx.Process
and so on. If you develop a library, you should probably do this to inherit the user's configuration of choice.
Data exchange
There are two ways to exchange data in multiprocessing land: queues and pipes. You pass them to the process on startup.
Queue
is nearly the same as queue.Queue
. There's also SimpleQueue
and JoinableQueue
(where you have to call
task_done()
for every task removed from the queue, and can call join()
).
The Pipe
constructor returns two connection objects, which each have send()
and recv()
methods (see the age by the
missing vowels). You can also peek at the queue state with poll()
.
Synchronization
multiprocessing
contains all the synchronization primitives that are also contained in threading
, so you can eg use
multiprocessing.Semaphore
.
State
Sharing state is much harder and evil-er than exchanging data, but there are ways. You can use shared memory with
Value
and Array
, which work exactly as you'd expect them to (as much as anything does in concurrency land).
Or you can use a Manager
. Managers support all locking primitives, plus Value
and Array
, plus list
and
dict
– extremely useful! You can extend their functionality to support more data types with the register()
method.
Managers can also be run on remote machines. This, of course, is a security risk, so you can provide auth keys to
connections, which will be used to generate and check hmacs before unpickling the data.
Pools
Pool
objects can be used to manage groups of processes. You have to call close()
and terminate()
manually on
those! You cannot rely on the Cpython garbage collection. It can interact with its workers in a variety of ways. apply
and apply_async
block until results are there. map
, imap
, map_async
and starmap
all chop up a given iterable
before distributing it to the pool.
other
There is a sharedctypes
module. I assume you will know it when you need it. There is also a dummy
module, which
provides a multiprocessing
-shaped wrapper for the threading
module. The end of the documentation comes with best
practices and admonishments, which you
should read and heed when using multiprocessing
.
concurrent.futures
concurrent.futures
is a high-level interface for executing callables asynchronously, and can be used with threads
(ThreadPoolExecutor
) or
processes (ProcessPoolExecutor
). Both implement the same interface: submit
schedules the callable, map
applies the
iterable in chunks to the callable, and shutdown
signals that you want to stop – though you can avoid that one by
using the executor as a context manager.
Future
When you run an executable with submit
, you receive a Future
instance. You can check its status with running()
and
done()
, abort with cancel()
, and retrieve the result with result()
(which will wait if its not done yet).
You can also use module functions: Use wait()
to block until a future object is done.
subprocess
With subprocess
, you spawn new external processes, and connect to their input/output/error pipes (plus their return
codes). For simple cases, you will want to use run()
, and for more complex cases, you can use the underlying Popen
interface.
run
run()
takes a ton of arguments that change its behaviour remarkably. You can pass a timeout to automatically kill the
process, you can pass STDIN content and modify how the three streams are handled (for example, you can set any of them
to DEVNULL
or any existing file descriptor, plus you can set STDERR to STDEOUT). With check=True
, an exception will
be raised on non-zero exit code, with shell=True
it will be excuted by the shell directly (security caveats below!)
and you can change the default shell /bin/sh
with the executable
argument. You can also pass env
variables. The
return value is a CompletedProcess
object, which has stdout
, stderr
and a returncode
.
Popen
Popen
takes an unholy amount of arguments. You can change groups and users, pass specific file descriptors, change the
working directory, set a umask, and pass a magical STARTUPINFO
object to modify process creation flags. With the
resulting Popen
object, you can poll()
or wait()
if you just want it to end successfully, or terminate()
and
kill()
if you're really impatient. If you want to interact, send data to STDIN with communicate()
or send a signal
with send_signal()
.
Security
System shells are dangerous. Do use caution. Do also use shlex.quote()
to properly escape whitespace and control
characters.
sched
sched
provides "a general purpose event scheduler". Huh. What does that mean? A scheduler is instantiated with two
functions: the timefunc (a callable that returns a numerical time, regardless of its unit, defaulting to
time.monotonic
) and the delayfunc (a callable delaying for a given time, defaulting to time.sleep
). You can then
enter()
callables in the scheduler, and finally run()
them. The scheduler will shuffle them around according to their
priority and their delay, then execute them in order and wait in between. The read-only queue
attribute shows you
events to be run.
queue
The queue
module provides thread-safe multiple-producer multiple-consumer queues. There are a couple of different
queue classes, which differ in their retrieval order, but share the same interface: Queue
is a FIFO queue, LifoQueue
is exactly that, and PriorityQueue
sorts its entries and retrieves lowest/smallest items first.
Interface
Queues can be queried as to their size with empty()
, qsize()
and full()
. You can put()
items into the queue, and
if the queue is limited in size, you can pass block=False
to raise an exception instead of waiting, or wait for a
specific timeout
. Retrieve items with get()
, and pass block=False
to avoid hanging on an empty queue. There's a
task_done()
to signal that you have processed the previously retrieved item. Finally, join()
blocks until all tasks
have been marked as done.