mirror of
				https://github.com/python/cpython.git
				synced 2025-11-04 11:49:12 +00:00 
			
		
		
		
	Fix typos in the Lib directory as identified by codespell. Co-authored-by: Terry Jan Reedy <tjreedy@udel.edu>
		
			
				
	
	
		
			733 lines
		
	
	
	
		
			25 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			733 lines
		
	
	
	
		
			25 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
import os
 | 
						|
import signal
 | 
						|
import sys
 | 
						|
import unittest
 | 
						|
import warnings
 | 
						|
from unittest import mock
 | 
						|
 | 
						|
import asyncio
 | 
						|
from asyncio import base_subprocess
 | 
						|
from asyncio import subprocess
 | 
						|
from test.test_asyncio import utils as test_utils
 | 
						|
from test import support
 | 
						|
from test.support import os_helper
 | 
						|
 | 
						|
if sys.platform != 'win32':
 | 
						|
    from asyncio import unix_events
 | 
						|
 | 
						|
# Program blocking
 | 
						|
PROGRAM_BLOCKED = [sys.executable, '-c', 'import time; time.sleep(3600)']
 | 
						|
 | 
						|
# Program copying input to output
 | 
						|
PROGRAM_CAT = [
 | 
						|
    sys.executable, '-c',
 | 
						|
    ';'.join(('import sys',
 | 
						|
              'data = sys.stdin.buffer.read()',
 | 
						|
              'sys.stdout.buffer.write(data)'))]
 | 
						|
 | 
						|
 | 
						|
def tearDownModule():
 | 
						|
    asyncio.set_event_loop_policy(None)
 | 
						|
 | 
						|
 | 
						|
class TestSubprocessTransport(base_subprocess.BaseSubprocessTransport):
 | 
						|
    def _start(self, *args, **kwargs):
 | 
						|
        self._proc = mock.Mock()
 | 
						|
        self._proc.stdin = None
 | 
						|
        self._proc.stdout = None
 | 
						|
        self._proc.stderr = None
 | 
						|
        self._proc.pid = -1
 | 
						|
 | 
						|
 | 
						|
class SubprocessTransportTests(test_utils.TestCase):
 | 
						|
    def setUp(self):
 | 
						|
        super().setUp()
 | 
						|
        self.loop = self.new_test_loop()
 | 
						|
        self.set_event_loop(self.loop)
 | 
						|
 | 
						|
    def create_transport(self, waiter=None):
 | 
						|
        protocol = mock.Mock()
 | 
						|
        protocol.connection_made._is_coroutine = False
 | 
						|
        protocol.process_exited._is_coroutine = False
 | 
						|
        transport = TestSubprocessTransport(
 | 
						|
                        self.loop, protocol, ['test'], False,
 | 
						|
                        None, None, None, 0, waiter=waiter)
 | 
						|
        return (transport, protocol)
 | 
						|
 | 
						|
    def test_proc_exited(self):
 | 
						|
        waiter = self.loop.create_future()
 | 
						|
        transport, protocol = self.create_transport(waiter)
 | 
						|
        transport._process_exited(6)
 | 
						|
        self.loop.run_until_complete(waiter)
 | 
						|
 | 
						|
        self.assertEqual(transport.get_returncode(), 6)
 | 
						|
 | 
						|
        self.assertTrue(protocol.connection_made.called)
 | 
						|
        self.assertTrue(protocol.process_exited.called)
 | 
						|
        self.assertTrue(protocol.connection_lost.called)
 | 
						|
        self.assertEqual(protocol.connection_lost.call_args[0], (None,))
 | 
						|
 | 
						|
        self.assertFalse(transport.is_closing())
 | 
						|
        self.assertIsNone(transport._loop)
 | 
						|
        self.assertIsNone(transport._proc)
 | 
						|
        self.assertIsNone(transport._protocol)
 | 
						|
 | 
						|
        # methods must raise ProcessLookupError if the process exited
 | 
						|
        self.assertRaises(ProcessLookupError,
 | 
						|
                          transport.send_signal, signal.SIGTERM)
 | 
						|
        self.assertRaises(ProcessLookupError, transport.terminate)
 | 
						|
        self.assertRaises(ProcessLookupError, transport.kill)
 | 
						|
 | 
						|
        transport.close()
 | 
						|
 | 
						|
    def test_subprocess_repr(self):
 | 
						|
        waiter = self.loop.create_future()
 | 
						|
        transport, protocol = self.create_transport(waiter)
 | 
						|
        transport._process_exited(6)
 | 
						|
        self.loop.run_until_complete(waiter)
 | 
						|
 | 
						|
        self.assertEqual(
 | 
						|
            repr(transport),
 | 
						|
            "<TestSubprocessTransport pid=-1 returncode=6>"
 | 
						|
        )
 | 
						|
        transport._returncode = None
 | 
						|
        self.assertEqual(
 | 
						|
            repr(transport),
 | 
						|
            "<TestSubprocessTransport pid=-1 running>"
 | 
						|
        )
 | 
						|
        transport._pid = None
 | 
						|
        transport._returncode = None
 | 
						|
        self.assertEqual(
 | 
						|
            repr(transport),
 | 
						|
            "<TestSubprocessTransport not started>"
 | 
						|
        )
 | 
						|
        transport.close()
 | 
						|
 | 
						|
 | 
						|
class SubprocessMixin:
 | 
						|
 | 
						|
    def test_stdin_stdout(self):
 | 
						|
        args = PROGRAM_CAT
 | 
						|
 | 
						|
        async def run(data):
 | 
						|
            proc = await asyncio.create_subprocess_exec(
 | 
						|
                *args,
 | 
						|
                stdin=subprocess.PIPE,
 | 
						|
                stdout=subprocess.PIPE,
 | 
						|
            )
 | 
						|
 | 
						|
            # feed data
 | 
						|
            proc.stdin.write(data)
 | 
						|
            await proc.stdin.drain()
 | 
						|
            proc.stdin.close()
 | 
						|
 | 
						|
            # get output and exitcode
 | 
						|
            data = await proc.stdout.read()
 | 
						|
            exitcode = await proc.wait()
 | 
						|
            return (exitcode, data)
 | 
						|
 | 
						|
        task = run(b'some data')
 | 
						|
        task = asyncio.wait_for(task, 60.0)
 | 
						|
        exitcode, stdout = self.loop.run_until_complete(task)
 | 
						|
        self.assertEqual(exitcode, 0)
 | 
						|
        self.assertEqual(stdout, b'some data')
 | 
						|
 | 
						|
    def test_communicate(self):
 | 
						|
        args = PROGRAM_CAT
 | 
						|
 | 
						|
        async def run(data):
 | 
						|
            proc = await asyncio.create_subprocess_exec(
 | 
						|
                *args,
 | 
						|
                stdin=subprocess.PIPE,
 | 
						|
                stdout=subprocess.PIPE,
 | 
						|
            )
 | 
						|
            stdout, stderr = await proc.communicate(data)
 | 
						|
            return proc.returncode, stdout
 | 
						|
 | 
						|
        task = run(b'some data')
 | 
						|
        task = asyncio.wait_for(task, support.LONG_TIMEOUT)
 | 
						|
        exitcode, stdout = self.loop.run_until_complete(task)
 | 
						|
        self.assertEqual(exitcode, 0)
 | 
						|
        self.assertEqual(stdout, b'some data')
 | 
						|
 | 
						|
    def test_shell(self):
 | 
						|
        proc = self.loop.run_until_complete(
 | 
						|
            asyncio.create_subprocess_shell('exit 7')
 | 
						|
        )
 | 
						|
        exitcode = self.loop.run_until_complete(proc.wait())
 | 
						|
        self.assertEqual(exitcode, 7)
 | 
						|
 | 
						|
    def test_start_new_session(self):
 | 
						|
        # start the new process in a new session
 | 
						|
        proc = self.loop.run_until_complete(
 | 
						|
            asyncio.create_subprocess_shell(
 | 
						|
                'exit 8',
 | 
						|
                start_new_session=True,
 | 
						|
            )
 | 
						|
        )
 | 
						|
        exitcode = self.loop.run_until_complete(proc.wait())
 | 
						|
        self.assertEqual(exitcode, 8)
 | 
						|
 | 
						|
    def test_kill(self):
 | 
						|
        args = PROGRAM_BLOCKED
 | 
						|
        proc = self.loop.run_until_complete(
 | 
						|
            asyncio.create_subprocess_exec(*args)
 | 
						|
        )
 | 
						|
        proc.kill()
 | 
						|
        returncode = self.loop.run_until_complete(proc.wait())
 | 
						|
        if sys.platform == 'win32':
 | 
						|
            self.assertIsInstance(returncode, int)
 | 
						|
            # expect 1 but sometimes get 0
 | 
						|
        else:
 | 
						|
            self.assertEqual(-signal.SIGKILL, returncode)
 | 
						|
 | 
						|
    def test_terminate(self):
 | 
						|
        args = PROGRAM_BLOCKED
 | 
						|
        proc = self.loop.run_until_complete(
 | 
						|
            asyncio.create_subprocess_exec(*args)
 | 
						|
        )
 | 
						|
        proc.terminate()
 | 
						|
        returncode = self.loop.run_until_complete(proc.wait())
 | 
						|
        if sys.platform == 'win32':
 | 
						|
            self.assertIsInstance(returncode, int)
 | 
						|
            # expect 1 but sometimes get 0
 | 
						|
        else:
 | 
						|
            self.assertEqual(-signal.SIGTERM, returncode)
 | 
						|
 | 
						|
    @unittest.skipIf(sys.platform == 'win32', "Don't have SIGHUP")
 | 
						|
    def test_send_signal(self):
 | 
						|
        # bpo-31034: Make sure that we get the default signal handler (killing
 | 
						|
        # the process). The parent process may have decided to ignore SIGHUP,
 | 
						|
        # and signal handlers are inherited.
 | 
						|
        old_handler = signal.signal(signal.SIGHUP, signal.SIG_DFL)
 | 
						|
        try:
 | 
						|
            code = 'import time; print("sleeping", flush=True); time.sleep(3600)'
 | 
						|
            args = [sys.executable, '-c', code]
 | 
						|
            proc = self.loop.run_until_complete(
 | 
						|
                asyncio.create_subprocess_exec(
 | 
						|
                    *args,
 | 
						|
                    stdout=subprocess.PIPE,
 | 
						|
                )
 | 
						|
            )
 | 
						|
 | 
						|
            async def send_signal(proc):
 | 
						|
                # basic synchronization to wait until the program is sleeping
 | 
						|
                line = await proc.stdout.readline()
 | 
						|
                self.assertEqual(line, b'sleeping\n')
 | 
						|
 | 
						|
                proc.send_signal(signal.SIGHUP)
 | 
						|
                returncode = await proc.wait()
 | 
						|
                return returncode
 | 
						|
 | 
						|
            returncode = self.loop.run_until_complete(send_signal(proc))
 | 
						|
            self.assertEqual(-signal.SIGHUP, returncode)
 | 
						|
        finally:
 | 
						|
            signal.signal(signal.SIGHUP, old_handler)
 | 
						|
 | 
						|
    def prepare_broken_pipe_test(self):
 | 
						|
        # buffer large enough to feed the whole pipe buffer
 | 
						|
        large_data = b'x' * support.PIPE_MAX_SIZE
 | 
						|
 | 
						|
        # the program ends before the stdin can be fed
 | 
						|
        proc = self.loop.run_until_complete(
 | 
						|
            asyncio.create_subprocess_exec(
 | 
						|
                sys.executable, '-c', 'pass',
 | 
						|
                stdin=subprocess.PIPE,
 | 
						|
            )
 | 
						|
        )
 | 
						|
 | 
						|
        return (proc, large_data)
 | 
						|
 | 
						|
    def test_stdin_broken_pipe(self):
 | 
						|
        proc, large_data = self.prepare_broken_pipe_test()
 | 
						|
 | 
						|
        async def write_stdin(proc, data):
 | 
						|
            await asyncio.sleep(0.5)
 | 
						|
            proc.stdin.write(data)
 | 
						|
            await proc.stdin.drain()
 | 
						|
 | 
						|
        coro = write_stdin(proc, large_data)
 | 
						|
        # drain() must raise BrokenPipeError or ConnectionResetError
 | 
						|
        with test_utils.disable_logger():
 | 
						|
            self.assertRaises((BrokenPipeError, ConnectionResetError),
 | 
						|
                              self.loop.run_until_complete, coro)
 | 
						|
        self.loop.run_until_complete(proc.wait())
 | 
						|
 | 
						|
    def test_communicate_ignore_broken_pipe(self):
 | 
						|
        proc, large_data = self.prepare_broken_pipe_test()
 | 
						|
 | 
						|
        # communicate() must ignore BrokenPipeError when feeding stdin
 | 
						|
        self.loop.set_exception_handler(lambda loop, msg: None)
 | 
						|
        self.loop.run_until_complete(proc.communicate(large_data))
 | 
						|
        self.loop.run_until_complete(proc.wait())
 | 
						|
 | 
						|
    def test_pause_reading(self):
 | 
						|
        limit = 10
 | 
						|
        size = (limit * 2 + 1)
 | 
						|
 | 
						|
        async def test_pause_reading():
 | 
						|
            code = '\n'.join((
 | 
						|
                'import sys',
 | 
						|
                'sys.stdout.write("x" * %s)' % size,
 | 
						|
                'sys.stdout.flush()',
 | 
						|
            ))
 | 
						|
 | 
						|
            connect_read_pipe = self.loop.connect_read_pipe
 | 
						|
 | 
						|
            async def connect_read_pipe_mock(*args, **kw):
 | 
						|
                transport, protocol = await connect_read_pipe(*args, **kw)
 | 
						|
                transport.pause_reading = mock.Mock()
 | 
						|
                transport.resume_reading = mock.Mock()
 | 
						|
                return (transport, protocol)
 | 
						|
 | 
						|
            self.loop.connect_read_pipe = connect_read_pipe_mock
 | 
						|
 | 
						|
            proc = await asyncio.create_subprocess_exec(
 | 
						|
                sys.executable, '-c', code,
 | 
						|
                stdin=asyncio.subprocess.PIPE,
 | 
						|
                stdout=asyncio.subprocess.PIPE,
 | 
						|
                limit=limit,
 | 
						|
            )
 | 
						|
            stdout_transport = proc._transport.get_pipe_transport(1)
 | 
						|
 | 
						|
            stdout, stderr = await proc.communicate()
 | 
						|
 | 
						|
            # The child process produced more than limit bytes of output,
 | 
						|
            # the stream reader transport should pause the protocol to not
 | 
						|
            # allocate too much memory.
 | 
						|
            return (stdout, stdout_transport)
 | 
						|
 | 
						|
        # Issue #22685: Ensure that the stream reader pauses the protocol
 | 
						|
        # when the child process produces too much data
 | 
						|
        stdout, transport = self.loop.run_until_complete(test_pause_reading())
 | 
						|
 | 
						|
        self.assertEqual(stdout, b'x' * size)
 | 
						|
        self.assertTrue(transport.pause_reading.called)
 | 
						|
        self.assertTrue(transport.resume_reading.called)
 | 
						|
 | 
						|
    def test_stdin_not_inheritable(self):
 | 
						|
        # asyncio issue #209: stdin must not be inheritable, otherwise
 | 
						|
        # the Process.communicate() hangs
 | 
						|
        async def len_message(message):
 | 
						|
            code = 'import sys; data = sys.stdin.read(); print(len(data))'
 | 
						|
            proc = await asyncio.create_subprocess_exec(
 | 
						|
                sys.executable, '-c', code,
 | 
						|
                stdin=asyncio.subprocess.PIPE,
 | 
						|
                stdout=asyncio.subprocess.PIPE,
 | 
						|
                stderr=asyncio.subprocess.PIPE,
 | 
						|
                close_fds=False,
 | 
						|
            )
 | 
						|
            stdout, stderr = await proc.communicate(message)
 | 
						|
            exitcode = await proc.wait()
 | 
						|
            return (stdout, exitcode)
 | 
						|
 | 
						|
        output, exitcode = self.loop.run_until_complete(len_message(b'abc'))
 | 
						|
        self.assertEqual(output.rstrip(), b'3')
 | 
						|
        self.assertEqual(exitcode, 0)
 | 
						|
 | 
						|
    def test_empty_input(self):
 | 
						|
 | 
						|
        async def empty_input():
 | 
						|
            code = 'import sys; data = sys.stdin.read(); print(len(data))'
 | 
						|
            proc = await asyncio.create_subprocess_exec(
 | 
						|
                sys.executable, '-c', code,
 | 
						|
                stdin=asyncio.subprocess.PIPE,
 | 
						|
                stdout=asyncio.subprocess.PIPE,
 | 
						|
                stderr=asyncio.subprocess.PIPE,
 | 
						|
                close_fds=False,
 | 
						|
            )
 | 
						|
            stdout, stderr = await proc.communicate(b'')
 | 
						|
            exitcode = await proc.wait()
 | 
						|
            return (stdout, exitcode)
 | 
						|
 | 
						|
        output, exitcode = self.loop.run_until_complete(empty_input())
 | 
						|
        self.assertEqual(output.rstrip(), b'0')
 | 
						|
        self.assertEqual(exitcode, 0)
 | 
						|
 | 
						|
    def test_devnull_input(self):
 | 
						|
 | 
						|
        async def empty_input():
 | 
						|
            code = 'import sys; data = sys.stdin.read(); print(len(data))'
 | 
						|
            proc = await asyncio.create_subprocess_exec(
 | 
						|
                sys.executable, '-c', code,
 | 
						|
                stdin=asyncio.subprocess.DEVNULL,
 | 
						|
                stdout=asyncio.subprocess.PIPE,
 | 
						|
                stderr=asyncio.subprocess.PIPE,
 | 
						|
                close_fds=False,
 | 
						|
            )
 | 
						|
            stdout, stderr = await proc.communicate()
 | 
						|
            exitcode = await proc.wait()
 | 
						|
            return (stdout, exitcode)
 | 
						|
 | 
						|
        output, exitcode = self.loop.run_until_complete(empty_input())
 | 
						|
        self.assertEqual(output.rstrip(), b'0')
 | 
						|
        self.assertEqual(exitcode, 0)
 | 
						|
 | 
						|
    def test_devnull_output(self):
 | 
						|
 | 
						|
        async def empty_output():
 | 
						|
            code = 'import sys; data = sys.stdin.read(); print(len(data))'
 | 
						|
            proc = await asyncio.create_subprocess_exec(
 | 
						|
                sys.executable, '-c', code,
 | 
						|
                stdin=asyncio.subprocess.PIPE,
 | 
						|
                stdout=asyncio.subprocess.DEVNULL,
 | 
						|
                stderr=asyncio.subprocess.PIPE,
 | 
						|
                close_fds=False,
 | 
						|
            )
 | 
						|
            stdout, stderr = await proc.communicate(b"abc")
 | 
						|
            exitcode = await proc.wait()
 | 
						|
            return (stdout, exitcode)
 | 
						|
 | 
						|
        output, exitcode = self.loop.run_until_complete(empty_output())
 | 
						|
        self.assertEqual(output, None)
 | 
						|
        self.assertEqual(exitcode, 0)
 | 
						|
 | 
						|
    def test_devnull_error(self):
 | 
						|
 | 
						|
        async def empty_error():
 | 
						|
            code = 'import sys; data = sys.stdin.read(); print(len(data))'
 | 
						|
            proc = await asyncio.create_subprocess_exec(
 | 
						|
                sys.executable, '-c', code,
 | 
						|
                stdin=asyncio.subprocess.PIPE,
 | 
						|
                stdout=asyncio.subprocess.PIPE,
 | 
						|
                stderr=asyncio.subprocess.DEVNULL,
 | 
						|
                close_fds=False,
 | 
						|
            )
 | 
						|
            stdout, stderr = await proc.communicate(b"abc")
 | 
						|
            exitcode = await proc.wait()
 | 
						|
            return (stderr, exitcode)
 | 
						|
 | 
						|
        output, exitcode = self.loop.run_until_complete(empty_error())
 | 
						|
        self.assertEqual(output, None)
 | 
						|
        self.assertEqual(exitcode, 0)
 | 
						|
 | 
						|
    def test_cancel_process_wait(self):
 | 
						|
        # Issue #23140: cancel Process.wait()
 | 
						|
 | 
						|
        async def cancel_wait():
 | 
						|
            proc = await asyncio.create_subprocess_exec(*PROGRAM_BLOCKED)
 | 
						|
 | 
						|
            # Create an internal future waiting on the process exit
 | 
						|
            task = self.loop.create_task(proc.wait())
 | 
						|
            self.loop.call_soon(task.cancel)
 | 
						|
            try:
 | 
						|
                await task
 | 
						|
            except asyncio.CancelledError:
 | 
						|
                pass
 | 
						|
 | 
						|
            # Cancel the future
 | 
						|
            task.cancel()
 | 
						|
 | 
						|
            # Kill the process and wait until it is done
 | 
						|
            proc.kill()
 | 
						|
            await proc.wait()
 | 
						|
 | 
						|
        self.loop.run_until_complete(cancel_wait())
 | 
						|
 | 
						|
    def test_cancel_make_subprocess_transport_exec(self):
 | 
						|
 | 
						|
        async def cancel_make_transport():
 | 
						|
            coro = asyncio.create_subprocess_exec(*PROGRAM_BLOCKED)
 | 
						|
            task = self.loop.create_task(coro)
 | 
						|
 | 
						|
            self.loop.call_soon(task.cancel)
 | 
						|
            try:
 | 
						|
                await task
 | 
						|
            except asyncio.CancelledError:
 | 
						|
                pass
 | 
						|
 | 
						|
        # ignore the log:
 | 
						|
        # "Exception during subprocess creation, kill the subprocess"
 | 
						|
        with test_utils.disable_logger():
 | 
						|
            self.loop.run_until_complete(cancel_make_transport())
 | 
						|
 | 
						|
    def test_cancel_post_init(self):
 | 
						|
 | 
						|
        async def cancel_make_transport():
 | 
						|
            coro = self.loop.subprocess_exec(asyncio.SubprocessProtocol,
 | 
						|
                                             *PROGRAM_BLOCKED)
 | 
						|
            task = self.loop.create_task(coro)
 | 
						|
 | 
						|
            self.loop.call_soon(task.cancel)
 | 
						|
            try:
 | 
						|
                await task
 | 
						|
            except asyncio.CancelledError:
 | 
						|
                pass
 | 
						|
 | 
						|
        # ignore the log:
 | 
						|
        # "Exception during subprocess creation, kill the subprocess"
 | 
						|
        with test_utils.disable_logger():
 | 
						|
            self.loop.run_until_complete(cancel_make_transport())
 | 
						|
            test_utils.run_briefly(self.loop)
 | 
						|
 | 
						|
    def test_close_kill_running(self):
 | 
						|
 | 
						|
        async def kill_running():
 | 
						|
            create = self.loop.subprocess_exec(asyncio.SubprocessProtocol,
 | 
						|
                                               *PROGRAM_BLOCKED)
 | 
						|
            transport, protocol = await create
 | 
						|
 | 
						|
            kill_called = False
 | 
						|
            def kill():
 | 
						|
                nonlocal kill_called
 | 
						|
                kill_called = True
 | 
						|
                orig_kill()
 | 
						|
 | 
						|
            proc = transport.get_extra_info('subprocess')
 | 
						|
            orig_kill = proc.kill
 | 
						|
            proc.kill = kill
 | 
						|
            returncode = transport.get_returncode()
 | 
						|
            transport.close()
 | 
						|
            await asyncio.wait_for(transport._wait(), 5)
 | 
						|
            return (returncode, kill_called)
 | 
						|
 | 
						|
        # Ignore "Close running child process: kill ..." log
 | 
						|
        with test_utils.disable_logger():
 | 
						|
            try:
 | 
						|
                returncode, killed = self.loop.run_until_complete(
 | 
						|
                    kill_running()
 | 
						|
                )
 | 
						|
            except asyncio.TimeoutError:
 | 
						|
                self.skipTest(
 | 
						|
                    "Timeout failure on waiting for subprocess stopping"
 | 
						|
                )
 | 
						|
        self.assertIsNone(returncode)
 | 
						|
 | 
						|
        # transport.close() must kill the process if it is still running
 | 
						|
        self.assertTrue(killed)
 | 
						|
        test_utils.run_briefly(self.loop)
 | 
						|
 | 
						|
    def test_close_dont_kill_finished(self):
 | 
						|
 | 
						|
        async def kill_running():
 | 
						|
            create = self.loop.subprocess_exec(asyncio.SubprocessProtocol,
 | 
						|
                                               *PROGRAM_BLOCKED)
 | 
						|
            transport, protocol = await create
 | 
						|
            proc = transport.get_extra_info('subprocess')
 | 
						|
 | 
						|
            # kill the process (but asyncio is not notified immediately)
 | 
						|
            proc.kill()
 | 
						|
            proc.wait()
 | 
						|
 | 
						|
            proc.kill = mock.Mock()
 | 
						|
            proc_returncode = proc.poll()
 | 
						|
            transport_returncode = transport.get_returncode()
 | 
						|
            transport.close()
 | 
						|
            return (proc_returncode, transport_returncode, proc.kill.called)
 | 
						|
 | 
						|
        # Ignore "Unknown child process pid ..." log of SafeChildWatcher,
 | 
						|
        # emitted because the test already consumes the exit status:
 | 
						|
        # proc.wait()
 | 
						|
        with test_utils.disable_logger():
 | 
						|
            result = self.loop.run_until_complete(kill_running())
 | 
						|
            test_utils.run_briefly(self.loop)
 | 
						|
 | 
						|
        proc_returncode, transport_return_code, killed = result
 | 
						|
 | 
						|
        self.assertIsNotNone(proc_returncode)
 | 
						|
        self.assertIsNone(transport_return_code)
 | 
						|
 | 
						|
        # transport.close() must not kill the process if it finished, even if
 | 
						|
        # the transport was not notified yet
 | 
						|
        self.assertFalse(killed)
 | 
						|
 | 
						|
        # Unlike SafeChildWatcher, FastChildWatcher does not pop the
 | 
						|
        # callbacks if waitpid() is called elsewhere. Let's clear them
 | 
						|
        # manually to avoid a warning when the watcher is detached.
 | 
						|
        if (sys.platform != 'win32' and
 | 
						|
                isinstance(self, SubprocessFastWatcherTests)):
 | 
						|
            asyncio.get_child_watcher()._callbacks.clear()
 | 
						|
 | 
						|
    async def _test_popen_error(self, stdin):
 | 
						|
        if sys.platform == 'win32':
 | 
						|
            target = 'asyncio.windows_utils.Popen'
 | 
						|
        else:
 | 
						|
            target = 'subprocess.Popen'
 | 
						|
        with mock.patch(target) as popen:
 | 
						|
            exc = ZeroDivisionError
 | 
						|
            popen.side_effect = exc
 | 
						|
 | 
						|
            with warnings.catch_warnings(record=True) as warns:
 | 
						|
                with self.assertRaises(exc):
 | 
						|
                    await asyncio.create_subprocess_exec(
 | 
						|
                        sys.executable,
 | 
						|
                        '-c',
 | 
						|
                        'pass',
 | 
						|
                        stdin=stdin
 | 
						|
                    )
 | 
						|
                self.assertEqual(warns, [])
 | 
						|
 | 
						|
    def test_popen_error(self):
 | 
						|
        # Issue #24763: check that the subprocess transport is closed
 | 
						|
        # when BaseSubprocessTransport fails
 | 
						|
        self.loop.run_until_complete(self._test_popen_error(stdin=None))
 | 
						|
 | 
						|
    def test_popen_error_with_stdin_pipe(self):
 | 
						|
        # Issue #35721: check that newly created socket pair is closed when
 | 
						|
        # Popen fails
 | 
						|
        self.loop.run_until_complete(
 | 
						|
            self._test_popen_error(stdin=subprocess.PIPE))
 | 
						|
 | 
						|
    def test_read_stdout_after_process_exit(self):
 | 
						|
 | 
						|
        async def execute():
 | 
						|
            code = '\n'.join(['import sys',
 | 
						|
                              'for _ in range(64):',
 | 
						|
                              '    sys.stdout.write("x" * 4096)',
 | 
						|
                              'sys.stdout.flush()',
 | 
						|
                              'sys.exit(1)'])
 | 
						|
 | 
						|
            process = await asyncio.create_subprocess_exec(
 | 
						|
                sys.executable, '-c', code,
 | 
						|
                stdout=asyncio.subprocess.PIPE,
 | 
						|
            )
 | 
						|
 | 
						|
            while True:
 | 
						|
                data = await process.stdout.read(65536)
 | 
						|
                if data:
 | 
						|
                    await asyncio.sleep(0.3)
 | 
						|
                else:
 | 
						|
                    break
 | 
						|
 | 
						|
        self.loop.run_until_complete(execute())
 | 
						|
 | 
						|
    def test_create_subprocess_exec_text_mode_fails(self):
 | 
						|
        async def execute():
 | 
						|
            with self.assertRaises(ValueError):
 | 
						|
                await subprocess.create_subprocess_exec(sys.executable,
 | 
						|
                                                        text=True)
 | 
						|
 | 
						|
            with self.assertRaises(ValueError):
 | 
						|
                await subprocess.create_subprocess_exec(sys.executable,
 | 
						|
                                                        encoding="utf-8")
 | 
						|
 | 
						|
            with self.assertRaises(ValueError):
 | 
						|
                await subprocess.create_subprocess_exec(sys.executable,
 | 
						|
                                                        errors="strict")
 | 
						|
 | 
						|
        self.loop.run_until_complete(execute())
 | 
						|
 | 
						|
    def test_create_subprocess_shell_text_mode_fails(self):
 | 
						|
 | 
						|
        async def execute():
 | 
						|
            with self.assertRaises(ValueError):
 | 
						|
                await subprocess.create_subprocess_shell(sys.executable,
 | 
						|
                                                         text=True)
 | 
						|
 | 
						|
            with self.assertRaises(ValueError):
 | 
						|
                await subprocess.create_subprocess_shell(sys.executable,
 | 
						|
                                                         encoding="utf-8")
 | 
						|
 | 
						|
            with self.assertRaises(ValueError):
 | 
						|
                await subprocess.create_subprocess_shell(sys.executable,
 | 
						|
                                                         errors="strict")
 | 
						|
 | 
						|
        self.loop.run_until_complete(execute())
 | 
						|
 | 
						|
    def test_create_subprocess_exec_with_path(self):
 | 
						|
        async def execute():
 | 
						|
            p = await subprocess.create_subprocess_exec(
 | 
						|
                os_helper.FakePath(sys.executable), '-c', 'pass')
 | 
						|
            await p.wait()
 | 
						|
            p = await subprocess.create_subprocess_exec(
 | 
						|
                sys.executable, '-c', 'pass', os_helper.FakePath('.'))
 | 
						|
            await p.wait()
 | 
						|
 | 
						|
        self.assertIsNone(self.loop.run_until_complete(execute()))
 | 
						|
 | 
						|
 | 
						|
if sys.platform != 'win32':
 | 
						|
    # Unix
 | 
						|
    class SubprocessWatcherMixin(SubprocessMixin):
 | 
						|
 | 
						|
        Watcher = None
 | 
						|
 | 
						|
        def setUp(self):
 | 
						|
            super().setUp()
 | 
						|
            policy = asyncio.get_event_loop_policy()
 | 
						|
            self.loop = policy.new_event_loop()
 | 
						|
            self.set_event_loop(self.loop)
 | 
						|
 | 
						|
            watcher = self.Watcher()
 | 
						|
            watcher.attach_loop(self.loop)
 | 
						|
            policy.set_child_watcher(watcher)
 | 
						|
 | 
						|
        def tearDown(self):
 | 
						|
            super().tearDown()
 | 
						|
            policy = asyncio.get_event_loop_policy()
 | 
						|
            watcher = policy.get_child_watcher()
 | 
						|
            policy.set_child_watcher(None)
 | 
						|
            watcher.attach_loop(None)
 | 
						|
            watcher.close()
 | 
						|
 | 
						|
    class SubprocessThreadedWatcherTests(SubprocessWatcherMixin,
 | 
						|
                                         test_utils.TestCase):
 | 
						|
 | 
						|
        Watcher = unix_events.ThreadedChildWatcher
 | 
						|
 | 
						|
    @unittest.skip("bpo-38323: MultiLoopChildWatcher has a race condition \
 | 
						|
                    and these tests can hang the test suite")
 | 
						|
    class SubprocessMultiLoopWatcherTests(SubprocessWatcherMixin,
 | 
						|
                                          test_utils.TestCase):
 | 
						|
 | 
						|
        Watcher = unix_events.MultiLoopChildWatcher
 | 
						|
 | 
						|
    class SubprocessSafeWatcherTests(SubprocessWatcherMixin,
 | 
						|
                                     test_utils.TestCase):
 | 
						|
 | 
						|
        Watcher = unix_events.SafeChildWatcher
 | 
						|
 | 
						|
    class SubprocessFastWatcherTests(SubprocessWatcherMixin,
 | 
						|
                                     test_utils.TestCase):
 | 
						|
 | 
						|
        Watcher = unix_events.FastChildWatcher
 | 
						|
 | 
						|
    def has_pidfd_support():
 | 
						|
        if not hasattr(os, 'pidfd_open'):
 | 
						|
            return False
 | 
						|
        try:
 | 
						|
            os.close(os.pidfd_open(os.getpid()))
 | 
						|
        except OSError:
 | 
						|
            return False
 | 
						|
        return True
 | 
						|
 | 
						|
    @unittest.skipUnless(
 | 
						|
        has_pidfd_support(),
 | 
						|
        "operating system does not support pidfds",
 | 
						|
    )
 | 
						|
    class SubprocessPidfdWatcherTests(SubprocessWatcherMixin,
 | 
						|
                                      test_utils.TestCase):
 | 
						|
        Watcher = unix_events.PidfdChildWatcher
 | 
						|
 | 
						|
else:
 | 
						|
    # Windows
 | 
						|
    class SubprocessProactorTests(SubprocessMixin, test_utils.TestCase):
 | 
						|
 | 
						|
        def setUp(self):
 | 
						|
            super().setUp()
 | 
						|
            self.loop = asyncio.ProactorEventLoop()
 | 
						|
            self.set_event_loop(self.loop)
 | 
						|
 | 
						|
 | 
						|
class GenericWatcherTests:
 | 
						|
 | 
						|
    def test_create_subprocess_fails_with_inactive_watcher(self):
 | 
						|
 | 
						|
        async def execute():
 | 
						|
            watcher = mock.create_authspec(asyncio.AbstractChildWatcher)
 | 
						|
            watcher.is_active.return_value = False
 | 
						|
            asyncio.set_child_watcher(watcher)
 | 
						|
 | 
						|
            with self.assertRaises(RuntimeError):
 | 
						|
                await subprocess.create_subprocess_exec(
 | 
						|
                    os_helper.FakePath(sys.executable), '-c', 'pass')
 | 
						|
 | 
						|
            watcher.add_child_handler.assert_not_called()
 | 
						|
 | 
						|
        self.assertIsNone(self.loop.run_until_complete(execute()))
 | 
						|
 | 
						|
 | 
						|
 | 
						|
 | 
						|
if __name__ == '__main__':
 | 
						|
    unittest.main()
 |