mirror of
https://github.com/python/cpython.git
synced 2025-07-24 03:35:53 +00:00
(Merge 3.4) Closes #22685, asyncio: Set the transport of stdout and stderr
StreamReader objects in the SubprocessStreamProtocol. It allows to pause the transport to not buffer too much stdout or stderr data.
This commit is contained in:
commit
6a11e5e1ae
2 changed files with 44 additions and 5 deletions
|
@ -41,15 +41,22 @@ class SubprocessStreamProtocol(streams.FlowControlMixin,
|
|||
|
||||
def connection_made(self, transport):
|
||||
self._transport = transport
|
||||
if transport.get_pipe_transport(1):
|
||||
|
||||
stdout_transport = transport.get_pipe_transport(1)
|
||||
if stdout_transport is not None:
|
||||
self.stdout = streams.StreamReader(limit=self._limit,
|
||||
loop=self._loop)
|
||||
if transport.get_pipe_transport(2):
|
||||
self.stdout.set_transport(stdout_transport)
|
||||
|
||||
stderr_transport = transport.get_pipe_transport(2)
|
||||
if stderr_transport is not None:
|
||||
self.stderr = streams.StreamReader(limit=self._limit,
|
||||
loop=self._loop)
|
||||
stdin = transport.get_pipe_transport(0)
|
||||
if stdin is not None:
|
||||
self.stdin = streams.StreamWriter(stdin,
|
||||
self.stderr.set_transport(stderr_transport)
|
||||
|
||||
stdin_transport = transport.get_pipe_transport(0)
|
||||
if stdin_transport is not None:
|
||||
self.stdin = streams.StreamWriter(stdin_transport,
|
||||
protocol=self,
|
||||
reader=None,
|
||||
loop=self._loop)
|
||||
|
|
|
@ -4,6 +4,7 @@ import asyncio
|
|||
import signal
|
||||
import sys
|
||||
import unittest
|
||||
from unittest import mock
|
||||
from test import support
|
||||
if sys.platform != 'win32':
|
||||
from asyncio import unix_events
|
||||
|
@ -161,6 +162,37 @@ class SubprocessMixin:
|
|||
self.loop.run_until_complete(proc.communicate(large_data))
|
||||
self.loop.run_until_complete(proc.wait())
|
||||
|
||||
def test_pause_reading(self):
|
||||
@asyncio.coroutine
|
||||
def test_pause_reading():
|
||||
limit = 100
|
||||
|
||||
code = '\n'.join((
|
||||
'import sys',
|
||||
'sys.stdout.write("x" * %s)' % (limit * 2 + 1),
|
||||
'sys.stdout.flush()',
|
||||
))
|
||||
proc = yield from asyncio.create_subprocess_exec(
|
||||
sys.executable, '-c', code,
|
||||
stdin=asyncio.subprocess.PIPE,
|
||||
stdout=asyncio.subprocess.PIPE,
|
||||
limit=limit,
|
||||
loop=self.loop)
|
||||
stdout_transport = proc._transport.get_pipe_transport(1)
|
||||
stdout_transport.pause_reading = mock.Mock()
|
||||
|
||||
yield from proc.wait()
|
||||
|
||||
# The child process produced more than limit bytes of output,
|
||||
# the stream reader transport should pause the protocol to not
|
||||
# allocate too much memory.
|
||||
return stdout_transport.pause_reading.called
|
||||
|
||||
# Issue #22685: Ensure that the stream reader pauses the protocol
|
||||
# when the child process produces too much data
|
||||
called = self.loop.run_until_complete(test_pause_reading())
|
||||
self.assertTrue(called)
|
||||
|
||||
|
||||
if sys.platform != 'win32':
|
||||
# Unix
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue