Add a 'timeout' argument to subprocess.Popen.

If the timeout expires before the subprocess exits, the wait method and the
communicate method will raise a subprocess.TimeoutExpired exception.  When used
with communicate, it is possible to catch the exception, kill the process, and
retry the communicate and receive any output written to stdout or stderr.
This commit is contained in:
Reid Kleckner 2011-03-14 12:02:10 -04:00
parent 4169826a00
commit 31aa7dd141
4 changed files with 367 additions and 98 deletions

View file

@ -340,6 +340,7 @@ mswindows = (sys.platform == "win32")
import io
import os
import time
import traceback
import gc
import signal
@ -361,6 +362,19 @@ class CalledProcessError(Exception):
return "Command '%s' returned non-zero exit status %d" % (self.cmd, self.returncode)
class TimeoutExpired(Exception):
"""This exception is raised when the timeout expires while waiting for a
child process.
"""
def __init__(self, cmd, output=None):
self.cmd = cmd
self.output = output
def __str__(self):
return ("Command '%s' timed out after %s seconds" %
(self.cmd, self.timeout))
if mswindows:
import threading
import msvcrt
@ -449,15 +463,21 @@ def _eintr_retry_call(func, *args):
raise
def call(*popenargs, **kwargs):
"""Run command with arguments. Wait for command to complete, then
return the returncode attribute.
def call(*popenargs, timeout=None, **kwargs):
"""Run command with arguments. Wait for command to complete or
timeout, then return the returncode attribute.
The arguments are the same as for the Popen constructor. Example:
retcode = call(["ls", "-l"])
"""
return Popen(*popenargs, **kwargs).wait()
p = Popen(*popenargs, **kwargs)
try:
return p.wait(timeout=timeout)
except TimeoutExpired:
p.kill()
p.wait()
raise
def check_call(*popenargs, **kwargs):
@ -466,7 +486,7 @@ def check_call(*popenargs, **kwargs):
CalledProcessError. The CalledProcessError object will have the
return code in the returncode attribute.
The arguments are the same as for the Popen constructor. Example:
The arguments are the same as for the call function. Example:
check_call(["ls", "-l"])
"""
@ -479,7 +499,7 @@ def check_call(*popenargs, **kwargs):
return 0
def check_output(*popenargs, **kwargs):
def check_output(*popenargs, timeout=None, **kwargs):
r"""Run command with arguments and return its output as a byte string.
If the exit code was non-zero it raises a CalledProcessError. The
@ -502,13 +522,15 @@ def check_output(*popenargs, **kwargs):
if 'stdout' in kwargs:
raise ValueError('stdout argument not allowed, it will be overridden.')
process = Popen(*popenargs, stdout=PIPE, **kwargs)
output, unused_err = process.communicate()
try:
output, unused_err = process.communicate(timeout=timeout)
except TimeoutExpired:
process.kill()
output, unused_err = process.communicate()
raise TimeoutExpired(process.args, output=output)
retcode = process.poll()
if retcode:
cmd = kwargs.get("args")
if cmd is None:
cmd = popenargs[0]
raise CalledProcessError(retcode, cmd, output=output)
raise CalledProcessError(retcode, process.args, output=output)
return output
@ -639,6 +661,8 @@ class Popen(object):
_cleanup()
self._child_created = False
self._input = None
self._communication_started = False
if bufsize is None:
bufsize = 0 # Restore default
if not isinstance(bufsize, int):
@ -673,6 +697,7 @@ class Popen(object):
raise ValueError("creationflags is only supported on Windows "
"platforms")
self.args = args
self.stdin = None
self.stdout = None
self.stderr = None
@ -771,7 +796,7 @@ class Popen(object):
_active.append(self)
def communicate(self, input=None):
def communicate(self, input=None, timeout=None):
"""Interact with process: Send data to stdin. Read data from
stdout and stderr, until end-of-file is reached. Wait for
process to terminate. The optional input argument should be a
@ -780,9 +805,19 @@ class Popen(object):
communicate() returns a tuple (stdout, stderr)."""
# Optimization: If we are only using one pipe, or no pipe at
# all, using select() or threads is unnecessary.
if [self.stdin, self.stdout, self.stderr].count(None) >= 2:
if self._communication_started and input:
raise ValueError("Cannot send input after starting communication")
if timeout is not None:
endtime = time.time() + timeout
else:
endtime = None
# Optimization: If we are not worried about timeouts, we haven't
# started communicating, and we have one or zero pipes, using select()
# or threads is unnecessary.
if (endtime is None and not self._communication_started and
[self.stdin, self.stdout, self.stderr].count(None) >= 2):
stdout = None
stderr = None
if self.stdin:
@ -798,13 +833,36 @@ class Popen(object):
self.wait()
return (stdout, stderr)
return self._communicate(input)
try:
stdout, stderr = self._communicate(input, endtime)
finally:
self._communication_started = True
sts = self.wait(timeout=self._remaining_time(endtime))
return (stdout, stderr)
def poll(self):
return self._internal_poll()
def _remaining_time(self, endtime):
"""Convenience for _communicate when computing timeouts."""
if endtime is None:
return None
else:
return endtime - time.time()
def _check_timeout(self, endtime):
"""Convenience for checking if a timeout has expired."""
if endtime is None:
return
if time.time() > endtime:
raise TimeoutExpired(self.args)
if mswindows:
#
# Windows methods
@ -987,12 +1045,17 @@ class Popen(object):
return self.returncode
def wait(self):
def wait(self, timeout=None):
"""Wait for child process to terminate. Returns returncode
attribute."""
if timeout is None:
timeout = _subprocess.INFINITE
else:
timeout = int(timeout * 1000)
if self.returncode is None:
_subprocess.WaitForSingleObject(self._handle,
_subprocess.INFINITE)
result = _subprocess.WaitForSingleObject(self._handle, timeout)
if result == _subprocess.WAIT_TIMEOUT:
raise TimeoutExpired(self.args)
self.returncode = _subprocess.GetExitCodeProcess(self._handle)
return self.returncode
@ -1002,32 +1065,51 @@ class Popen(object):
fh.close()
def _communicate(self, input):
stdout = None # Return
stderr = None # Return
if self.stdout:
stdout = []
stdout_thread = threading.Thread(target=self._readerthread,
args=(self.stdout, stdout))
stdout_thread.daemon = True
stdout_thread.start()
if self.stderr:
stderr = []
stderr_thread = threading.Thread(target=self._readerthread,
args=(self.stderr, stderr))
stderr_thread.daemon = True
stderr_thread.start()
def _communicate(self, input, endtime):
# Start reader threads feeding into a list hanging off of this
# object, unless they've already been started.
if self.stdout and not hasattr(self, "_stdout_buff"):
self._stdout_buff = []
self.stdout_thread = \
threading.Thread(target=self._readerthread,
args=(self.stdout, self._stdout_buff))
self.stdout_thread.daemon = True
self.stdout_thread.start()
if self.stderr and not hasattr(self, "_stderr_buff"):
self._stderr_buff = []
self.stderr_thread = \
threading.Thread(target=self._readerthread,
args=(self.stderr, self._stderr_buff))
self.stderr_thread.daemon = True
self.stderr_thread.start()
if self.stdin:
if input is not None:
self.stdin.write(input)
self.stdin.close()
# Wait for the reader threads, or time out. If we time out, the
# threads remain reading and the fds left open in case the user
# calls communicate again.
if self.stdout is not None:
self.stdout_thread.join(self._remaining_time(endtime))
if self.stdout_thread.isAlive():
raise TimeoutExpired(self.args)
if self.stderr is not None:
self.stderr_thread.join(self._remaining_time(endtime))
if self.stderr_thread.isAlive():
raise TimeoutExpired(self.args)
# Collect the output from and close both pipes, now that we know
# both have been read successfully.
stdout = None
stderr = None
if self.stdout:
stdout_thread.join()
stdout = self._stdout_buff
self.stdout.close()
if self.stderr:
stderr_thread.join()
stderr = self._stderr_buff
self.stderr.close()
# All data exchanged. Translate lists into strings.
if stdout is not None:
@ -1035,7 +1117,6 @@ class Popen(object):
if stderr is not None:
stderr = stderr[0]
self.wait()
return (stdout, stderr)
def send_signal(self, sig):
@ -1365,25 +1446,52 @@ class Popen(object):
return self.returncode
def wait(self):
def _try_wait(self, wait_flags):
try:
(pid, sts) = _eintr_retry_call(os.waitpid, self.pid, wait_flags)
except OSError as e:
if e.errno != errno.ECHILD:
raise
# This happens if SIGCLD is set to be ignored or waiting
# for child processes has otherwise been disabled for our
# process. This child is dead, we can't get the status.
pid = self.pid
sts = 0
return (pid, sts)
def wait(self, timeout=None, endtime=None):
"""Wait for child process to terminate. Returns returncode
attribute."""
if self.returncode is None:
try:
pid, sts = _eintr_retry_call(os.waitpid, self.pid, 0)
except OSError as e:
if e.errno != errno.ECHILD:
raise
# This happens if SIGCLD is set to be ignored or waiting
# for child processes has otherwise been disabled for our
# process. This child is dead, we can't get the status.
sts = 0
# If timeout was passed but not endtime, compute endtime in terms of
# timeout.
if endtime is None and timeout is not None:
endtime = time.time() + timeout
if self.returncode is not None:
return self.returncode
elif endtime is not None:
# Enter a busy loop if we have a timeout. This busy loop was
# cribbed from Lib/threading.py in Thread.wait() at r71065.
delay = 0.0005 # 500 us -> initial delay of 1 ms
while True:
(pid, sts) = self._try_wait(os.WNOHANG)
assert pid == self.pid or pid == 0
if pid == self.pid:
self._handle_exitstatus(sts)
break
remaining = self._remaining_time(endtime)
if remaining <= 0:
raise TimeoutExpired(self.args)
delay = min(delay * 2, remaining, .05)
time.sleep(delay)
elif self.returncode is None:
(pid, sts) = self._try_wait(0)
self._handle_exitstatus(sts)
return self.returncode
def _communicate(self, input):
if self.stdin:
def _communicate(self, input, endtime):
if self.stdin and not self._communication_started:
# Flush stdio buffer. This might block, if the user has
# been writing to .stdin in an uncontrolled fashion.
self.stdin.flush()
@ -1391,9 +1499,11 @@ class Popen(object):
self.stdin.close()
if _has_poll:
stdout, stderr = self._communicate_with_poll(input)
stdout, stderr = self._communicate_with_poll(input, endtime)
else:
stdout, stderr = self._communicate_with_select(input)
stdout, stderr = self._communicate_with_select(input, endtime)
self.wait(timeout=self._remaining_time(endtime))
# All data exchanged. Translate lists into strings.
if stdout is not None:
@ -1411,60 +1521,77 @@ class Popen(object):
stderr = self._translate_newlines(stderr,
self.stderr.encoding)
self.wait()
return (stdout, stderr)
def _communicate_with_poll(self, input):
def _communicate_with_poll(self, input, endtime):
stdout = None # Return
stderr = None # Return
fd2file = {}
fd2output = {}
if not self._communication_started:
self._fd2file = {}
poller = select.poll()
def register_and_append(file_obj, eventmask):
poller.register(file_obj.fileno(), eventmask)
fd2file[file_obj.fileno()] = file_obj
self._fd2file[file_obj.fileno()] = file_obj
def close_unregister_and_remove(fd):
poller.unregister(fd)
fd2file[fd].close()
fd2file.pop(fd)
self._fd2file[fd].close()
self._fd2file.pop(fd)
if self.stdin and input:
register_and_append(self.stdin, select.POLLOUT)
# Only create this mapping if we haven't already.
if not self._communication_started:
self._fd2output = {}
if self.stdout:
self._fd2output[self.stdout.fileno()] = []
if self.stderr:
self._fd2output[self.stderr.fileno()] = []
select_POLLIN_POLLPRI = select.POLLIN | select.POLLPRI
if self.stdout:
register_and_append(self.stdout, select_POLLIN_POLLPRI)
fd2output[self.stdout.fileno()] = stdout = []
stdout = self._fd2output[self.stdout.fileno()]
if self.stderr:
register_and_append(self.stderr, select_POLLIN_POLLPRI)
fd2output[self.stderr.fileno()] = stderr = []
stderr = self._fd2output[self.stderr.fileno()]
input_offset = 0
while fd2file:
# Save the input here so that if we time out while communicating,
# we can continue sending input if we retry.
if self.stdin and self._input is None:
self._input_offset = 0
self._input = input
if self.universal_newlines:
self._input = self._input.encode(self.stdin.encoding)
while self._fd2file:
try:
ready = poller.poll()
ready = poller.poll(self._remaining_time(endtime))
except select.error as e:
if e.args[0] == errno.EINTR:
continue
raise
self._check_timeout(endtime)
# XXX Rewrite these to use non-blocking I/O on the
# file objects; they are no longer using C stdio!
for fd, mode in ready:
if mode & select.POLLOUT:
chunk = input[input_offset : input_offset + _PIPE_BUF]
input_offset += os.write(fd, chunk)
if input_offset >= len(input):
chunk = self._input[self._input_offset :
self._input_offset + _PIPE_BUF]
self._input_offset += os.write(fd, chunk)
if self._input_offset >= len(self._input):
close_unregister_and_remove(fd)
elif mode & select_POLLIN_POLLPRI:
data = os.read(fd, 4096)
if not data:
close_unregister_and_remove(fd)
fd2output[fd].append(data)
self._fd2output[fd].append(data)
else:
# Ignore hang up or errors.
close_unregister_and_remove(fd)
@ -1472,53 +1599,76 @@ class Popen(object):
return (stdout, stderr)
def _communicate_with_select(self, input):
read_set = []
write_set = []
def _communicate_with_select(self, input, endtime):
if not self._communication_started:
self._read_set = []
self._write_set = []
if self.stdin and input:
self._write_set.append(self.stdin)
if self.stdout:
self._read_set.append(self.stdout)
if self.stderr:
self._read_set.append(self.stderr)
if self.stdin and self._input is None:
self._input_offset = 0
self._input = input
if self.universal_newlines:
self._input = self._input.encode(self.stdin.encoding)
stdout = None # Return
stderr = None # Return
if self.stdin and input:
write_set.append(self.stdin)
if self.stdout:
read_set.append(self.stdout)
stdout = []
if not self._communication_started:
self._stdout_buff = []
stdout = self._stdout_buff
if self.stderr:
read_set.append(self.stderr)
stderr = []
if not self._communication_started:
self._stderr_buff = []
stderr = self._stderr_buff
input_offset = 0
while read_set or write_set:
while self._read_set or self._write_set:
try:
rlist, wlist, xlist = select.select(read_set, write_set, [])
(rlist, wlist, xlist) = \
select.select(self._read_set, self._write_set, [],
self._remaining_time(endtime))
except select.error as e:
if e.args[0] == errno.EINTR:
continue
raise
# According to the docs, returning three empty lists indicates
# that the timeout expired.
if not (rlist or wlist or xlist):
raise TimeoutExpired(self.args)
# We also check what time it is ourselves for good measure.
self._check_timeout(endtime)
# XXX Rewrite these to use non-blocking I/O on the
# file objects; they are no longer using C stdio!
if self.stdin in wlist:
chunk = input[input_offset : input_offset + _PIPE_BUF]
chunk = self._input[self._input_offset :
self._input_offset + _PIPE_BUF]
bytes_written = os.write(self.stdin.fileno(), chunk)
input_offset += bytes_written
if input_offset >= len(input):
self._input_offset += bytes_written
if self._input_offset >= len(self._input):
self.stdin.close()
write_set.remove(self.stdin)
self._write_set.remove(self.stdin)
if self.stdout in rlist:
data = os.read(self.stdout.fileno(), 1024)
if not data:
self.stdout.close()
read_set.remove(self.stdout)
self._read_set.remove(self.stdout)
stdout.append(data)
if self.stderr in rlist:
data = os.read(self.stderr.fileno(), 1024)
if not data:
self.stderr.close()
read_set.remove(self.stderr)
self._read_set.remove(self.stderr)
stderr.append(data)
return (stdout, stderr)