December 13: Python Concurrent Execution Modules

Now we get to talk about a completely trivial topic, barely worth mentioning: Concurrent execution, including threading, multiprocessing, concurrent, subprocess, sched and queue.

Now, you might say, "wait a second, it is not December 13th!", and … you would be right. Life got in the way and I had to spend a week figuring some things out, and pushing out a blog post about concurrency in Python did not quite fit in with that. Now all the things are figured out, and I'm continuing the Traversal of the Python Standard Library Documentation.

Highlights

  • Oh god Popen has so many arguments.
  • Python ships a "general purpose event scheduler"

threading

This module builds on the low-level _thread module to provide useful interfaces for interacting with threads. The straightforward helper methods include enumerate() (returning all Thread objects currently alive), current_thread() and main_thread(), and get_ident() and get_native_id to get magical identifiers. Interestingly, with setprofile and settrace you can build, in theory, your own debugger.

Thread-local data

When you want to keep some data local to a thread, you can use threading.local() – or contextvars, which we covered earlier.

Threads

Thread objects execute their activity either by being passed a callable on instantiation (plus args and kwargs), or by having an overridden run() method. You kick them off with the start() method, after which is_alive() returns True until the thread is done. When you call another thread's join() method, the calling thread will block until the called thread is done. Exceptions are handled via threading.excepthook().

Threads can be flagged as daemon threads – when only those remain, they are stopped abruptly and the program exits. That's a fairly dangerous behaviour, since files and locks and the likes may not be released properly.

Locks

Lock objects are the lowest level of synchronization primitives available in Python. It is, interestingly, a factory function that returns a different implementation based on what is available on the current platform. It has only three methods: acquire() acquires a lock – you can set blocking to False to just attempt to acquire the lock and return False if it is unavailable, whereas with blocking=True you will wait until the lock is available and acquire it immediately. Deadlocks down this road. The timeout argument can limit the block time while waiting for the lock to be available. Release it with release() and use locked() to check its status.

Locks support the context manager protocol, that is, you can use them in with blocks.

Rlocks

RLocks, aka reentrant locks, support multiple acquire() calls, as long as they come from the same thread. It is only marked as available once every acquire() call has been matched by a release() call.

Conditions

Condition objects are terribly documented and seem like a glorious footgun. They wrap a lock, and give you convenience methods to build producer/consumer chains. The wait method releases the wrapped lock, and then blocks until another thread calls notify or notify_all and releases the lock afterwards. You can also wait conditionally with wait_for, which passes a predicate callable.

Semaphores

Semaphores are way more fun than locks, but not as footgunny as Conditions. They start out with a value (1 by default, but feel free to get creative!), then increase the value when you call release() and decrease it when you acquire(), and according to the Python documentation, "the counter can never go below zero":

>>> s = Semaphore(value=0.5)
>>> s.acquire()
True
>>> s.__dict__
{'_cond': <Condition(<...>, 0)>, '_value': -0.5}

Oops. Sorry.

Events

Event objects are a simple cross-thread communication API. They are a simple flag: They get set() or clear(), are is_set() or not, and can be wait()-ed for.

Timers

A Timer is a subclass of Thread that will execute its run() method only after the given interval (in seconds) has passed after calling start().

Barriers

A Barrier is a neat synchronization mechanism: You initialize it with an (integer! this time) count of threads. Only when that number of threads have called its wait() method, it will release all of them at once. You can use n_waiting to see how many threads are already waiting for the barrier.

multiprocessing

multiprocessing is like threading, in that it spawns processes and has a similar API, but it has a different (you might say: wider, though there would be a lot of arguing) range of application. multiprocessing side-steps the GIL, and is thus well-suited for CPU-bound concurrency. You'll probably want to stick with threading for I/O or network bound tasks, though you'd be well-advised to figure out what's your bottleneck with either module.

Process

Process follows the same API as Thread: instantiate with a callable or override run(), then kick off with start(). Await with join(). You can run set_start_method() to select either spawn (default on Windows and MacOS), fork (only available on Unix, and default there), or forkserver (only on some Unixes). In addition to the Thread API, Process provides a terminate() and kill() method (which risks data corruption and will orphan child processes of the killed process).

If you need more than one of these in one program (may somebody have mercy on your soul), you can instead run ctx = get_context(). Context objects behave just like the whole multiprocessing module, so you can use ctx.Process and so on. If you develop a library, you should probably do this to inherit the user's configuration of choice.

Data exchange

There are two ways to exchange data in multiprocessing land: queues and pipes. You pass them to the process on startup. Queue is nearly the same as queue.Queue. There's also SimpleQueue and JoinableQueue (where you have to call task_done() for every task removed from the queue, and can call join()).

The Pipe constructor returns two connection objects, which each have send() and recv() methods (see the age by the missing vowels). You can also peek at the queue state with poll().

Synchronization

multiprocessing contains all the synchronization primitives that are also contained in threading, so you can eg use multiprocessing.Semaphore.

State

Sharing state is much harder and evil-er than exchanging data, but there are ways. You can use shared memory with Value and Array, which work exactly as you'd expect them to (as much as anything does in concurrency land).

Or you can use a Manager. Managers support all locking primitives, plus Value and Array, plus list and dict – extremely useful! You can extend their functionality to support more data types with the register() method. Managers can also be run on remote machines. This, of course, is a security risk, so you can provide auth keys to connections, which will be used to generate and check hmacs before unpickling the data.

Pools

Pool objects can be used to manage groups of processes. You have to call close() and terminate() manually on those! You cannot rely on the Cpython garbage collection. It can interact with its workers in a variety of ways. apply and apply_async block until results are there. map, imap, map_async and starmap all chop up a given iterable before distributing it to the pool.

other

There is a sharedctypes module. I assume you will know it when you need it. There is also a dummy module, which provides a multiprocessing-shaped wrapper for the threading module. The end of the documentation comes with best practices and admonishments, which you should read and heed when using multiprocessing.

concurrent.futures

concurrent.futures is a high-level interface for executing callables asynchronously, and can be used with threads (ThreadPoolExecutor) or processes (ProcessPoolExecutor). Both implement the same interface: submit schedules the callable, map applies the iterable in chunks to the callable, and shutdown signals that you want to stop – though you can avoid that one by using the executor as a context manager.

Future

When you run an executable with submit, you receive a Future instance. You can check its status with running() and done(), abort with cancel(), and retrieve the result with result() (which will wait if its not done yet). You can also use module functions: Use wait() to block until a future object is done.

subprocess

With subprocess, you spawn new external processes, and connect to their input/output/error pipes (plus their return codes). For simple cases, you will want to use run(), and for more complex cases, you can use the underlying Popen interface.

run

run() takes a ton of arguments that change its behaviour remarkably. You can pass a timeout to automatically kill the process, you can pass STDIN content and modify how the three streams are handled (for example, you can set any of them to DEVNULL or any existing file descriptor, plus you can set STDERR to STDEOUT). With check=True, an exception will be raised on non-zero exit code, with shell=True it will be excuted by the shell directly (security caveats below!) and you can change the default shell /bin/sh with the executable argument. You can also pass env variables. The return value is a CompletedProcess object, which has stdout, stderr and a returncode.

Popen

Popen takes an unholy amount of arguments. You can change groups and users, pass specific file descriptors, change the working directory, set a umask, and pass a magical STARTUPINFO object to modify process creation flags. With the resulting Popen object, you can poll() or wait() if you just want it to end successfully, or terminate() and kill() if you're really impatient. If you want to interact, send data to STDIN with communicate() or send a signal with send_signal().

Security

System shells are dangerous. Do use caution. Do also use shlex.quote() to properly escape whitespace and control characters.

sched

sched provides "a general purpose event scheduler". Huh. What does that mean? A scheduler is instantiated with two functions: the timefunc (a callable that returns a numerical time, regardless of its unit, defaulting to time.monotonic) and the delayfunc (a callable delaying for a given time, defaulting to time.sleep). You can then enter() callables in the scheduler, and finally run() them. The scheduler will shuffle them around according to their priority and their delay, then execute them in order and wait in between. The read-only queue attribute shows you events to be run.

queue

The queue module provides thread-safe multiple-producer multiple-consumer queues. There are a couple of different queue classes, which differ in their retrieval order, but share the same interface: Queue is a FIFO queue, LifoQueue is exactly that, and PriorityQueue sorts its entries and retrieves lowest/smallest items first.

Interface

Queues can be queried as to their size with empty(), qsize() and full(). You can put() items into the queue, and if the queue is limited in size, you can pass block=False to raise an exception instead of waiting, or wait for a specific timeout. Retrieve items with get(), and pass block=False to avoid hanging on an empty queue. There's a task_done() to signal that you have processed the previously retrieved item. Finally, join() blocks until all tasks have been marked as done.