Processing the output of a subprocess with Python in realtime
It is very easy to run a sub-process in python using its subprocess module. A problem occurs when you need to process the output of a long-running process as soon as it is emitted: the output is usually buffered. It gets more complicated if you want to monitor several subprocesses in parallel. On Linux and other Unixes (probably including macOS) the solution is a mixture of using pseudo-ttys and the select() system call. I always forget the exact recipe, so here it is for posterity.
Note — I will assume Python 3 on Linux in this post. A complete example is provided at the bottom.
Ensuring line buffering
Usually, when a program writes to its standard output (stdout) and
when this stdout is a pipe, the output is buffered. So when using the
following code, you may have to wait for a rather large buffer to fill
before you get someprog
’s output:
proc = subprocess.Popen(["someprog"], stdout=subprocess.PIPE)
while proc.poll() is None:
output = proc.stdout.read(1000)
...
Instead, you can pretend that the program runs on a terminal. In this case, the program will usually enable line buffering, meaning that any newline character flushes the buffer. This is achieved with pseudo-ttys:
out_r, out_w = pty.openpty()
proc = subprocess.Popen(["someprog"], stdout=out_w)
os.close(out_w) # if we do not write to process, close this.
while True:
try:
output = os.read(out_r, 1000)
except OSError as e:
if e.errno != errno.EIO: raise
output = b""
if not output: break
...
If the application did not do any custom buffering, you should get
each line as soon as it is finished. The try
-except
block is
needed because an EIO error is raised when you can no longer read from
the pseudo-tty. In this case, output is set to the empty string,
simulating os.read()
’s behaviour in case of reaching the end of the
file (EOF). Therefore, you want to break out of the loop when you get
the empty string.
Reading lines
The output is still in the form of random bytes from somewhere in the middle of the program’s stdout. You probably prefer having it in the form of lines. For this, you need to buffer unfinished lines and thus keep state. Let’s make a class for that, which will come in handy later:
class OutStream:
def __init__(self, fileno):
self._fileno = fileno
self._buffer = b""
def read_lines(self):
try:
output = os.read(self._fileno, 1000)
except OSError as e:
if e.errno != errno.EIO: raise
output = b""
lines = output.split(b"\n")
lines[0] = self._buffer + lines[0] # prepend previous
# non-finished line.
if output:
self._buffer = lines[-1]
finished_lines = lines[:-1]
readable = True
else:
self._buffer = b""
if len(lines) == 1 and not lines[0]:
# We did not have buffer left, so no output at all.
lines = []
finished_lines = lines
readable = False
os.close(self._fileno)
finished_lines = [line.rstrip(b"\r").decode()
for line in finished_lines]
return finished_lines, readable
Note that we call decode()
on all finished lines in order to get
Python strings. If you want bytes, just skip this part.
We also close the file descriptor when the child process’s output is finished, given that we do not need it any longer. You might want to chose to handle this differently if you need the file descriptor afterwards for some reason.
The second return value indicates if we should keep reading (true), or if we are finished (false). So the main loop looks like this now:
f = OutStream(out_r)
while True:
lines, readable = f.read_lines()
...
if not readable: break
Every time read_lines()
returns, we may get zero, one, or many lines
of output. The final newlines are stripped, so re-add them if
needed. When readable
is false, though, the last line did not have a
newline. You could improve the read_lines()
method to add the
correct newline characters where needed if that is important.
Handling multiple streams
This is all very nice, but what if you want to capture the output of
more than one process? Or, to make the example simple, you want to
treat stdout and stderr differently? Here, the select() system call
comes into play. You pass a list of file descriptors and it returns a
list of those that are ready to be read from. It can also do the same
for descriptors that should be written to, details can be found in
Python’s documentation.
All that is required from an object passed to Python’s select() is
that it has a method fileno()
that returns the file descriptor as an
integer. That can easily be added to OutStream
:
class OutStream:
...
def fileno(self):
return self._fileno
Then we can read from all streams:
out_r, out_w = pty.openpty()
err_r, err_w = pty.openpty()
proc = subprocess.Popen(["someprog"], stdout=out_w, stderr=err_w)
os.close(out_w) # if we do not write to process, close these.
os.close(err_w)
fds = {OutStream(out_r), OutStream(err_r)}
while fds:
rlist, _, _ = select.select(fds, [], [])
for f in rlist:
lines, readable = f.read_lines()
...
if not readable:
fds.remove(f)
Now it also becomes clear why it is nice to wrap out_r
by
OutStream
: We can attach additional data or methods to different
OutStream
objects. This could be done by using different subclasses
for different cases, callback functions, or just tags to keep track
which stream is which.
Handling signals
Some care is required if
signals should be
handled and a Python version ≤ 3.4 is used. Python follows the
default actions for
signals, most of
which terminate the program. That case needs no special action. If the
program should continue running, though, you should take note that
system calls are interrupted by signals. For older Python versions,
that means that select()
may throw an InterruptedError
. Newer
versions restart the system call afterwards and the following
workaround is not needed, see PEP
475 for details. This
implementation will keep working on Python ≥ 3.5, even if it is
unneeded:
signal.signal(signal.SIGINT, lambda s,f: print("received SIGINT"))
while fds:
while True:
try:
rlist, _, _ = select.select(fds, [], [])
break
except InterruptedError:
continue
...
Complete example
Finally, let’s put it all together. A proof of concept that executes
someprog
and re-prints its output line by line. SIGINT is handled by
printing “received SIGINT”.
#!/usr/bin/env python3
#
# Written 2017, 2019, 2023 by Tobias Brink
#
# To the extent possible under law, the author(s) have dedicated
# all copyright and related and neighboring rights to this software
# to the public domain worldwide. This software is distributed
# without any warranty.
#
# You should have received a copy of the CC0 Public Domain
# Dedication along with this software. If not, see
# <http://creativecommons.org/publicdomain/zero/1.0/>.
import errno
import os
import pty
import select
import signal
import subprocess
# Set signal handler for SIGINT.
signal.signal(signal.SIGINT, lambda s,f: print("received SIGINT"))
class OutStream:
def __init__(self, fileno):
self._fileno = fileno
self._buffer = b""
def read_lines(self):
try:
output = os.read(self._fileno, 1000)
except OSError as e:
if e.errno != errno.EIO: raise
output = b""
lines = output.split(b"\n")
lines[0] = self._buffer + lines[0] # prepend previous
# non-finished line.
if output:
self._buffer = lines[-1]
finished_lines = lines[:-1]
readable = True
else:
self._buffer = b""
if len(lines) == 1 and not lines[0]:
# We did not have buffer left, so no output at all.
lines = []
finished_lines = lines
readable = False
os.close(self._fileno)
finished_lines = [line.rstrip(b"\r").decode()
for line in finished_lines]
return finished_lines, readable
def fileno(self):
return self._fileno
# Start the subprocess.
out_r, out_w = pty.openpty()
err_r, err_w = pty.openpty()
proc = subprocess.Popen(["someprog"], stdout=out_w, stderr=err_w)
os.close(out_w) # if we do not write to process, close these.
os.close(err_w)
fds = {OutStream(out_r), OutStream(err_r)}
while fds:
# Call select(), anticipating interruption by signals.
while True:
try:
rlist, _, _ = select.select(fds, [], [])
break
except InterruptedError:
continue
# Handle all file descriptors that are ready.
for f in rlist:
lines, readable = f.read_lines()
# Example: Just print every line. Add your real code here.
for line in lines:
print(line)
if not readable:
# This OutStream is finished.
fds.remove(f)
Update 2019-10-07: When to .decode()
The original version of this code tried to decode the output immediately when receving it:
output = os.read(self._fileno, 1000).decode()
Anthony Sottile contacted me to point out that this can go horribly
wrong when reading (for example) UTF-8 data beyond the ASCII
range. Since a character might be longer than a single byte, we could
conceivably read only part of the character, since we arbitrarily stop
reading after 1000 bytes (or because the program only flushed
incomplete data for some reason). This makes the resulting string
invalid UTF-8 and thus raises a UnicodeDecodeError
and our nice
program crashes. The solution is to only decode complete output. In my
testing, of course, this never occurred because the lines were much shorter
than 1000 bytes and contained few if any characters beyond the
ASCII range.
Update 2023-07-01: Added resource cleanup
The OutStream
class now automatically closes its file descriptor
after the output is finished. This will help with running out of file
descriptors if calling this code repeatedly in the same program and
not cleaning up the file descriptors manually. You can of course
adjust this if you need the file descriptor afterwards.