mirror of
				https://github.com/python/cpython.git
				synced 2025-10-24 23:46:23 +00:00 
			
		
		
		
	
		
			
				
	
	
		
			1002 lines
		
	
	
	
		
			36 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			1002 lines
		
	
	
	
		
			36 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| import os
 | |
| import signal
 | |
| import sys
 | |
| import textwrap
 | |
| import unittest
 | |
| import warnings
 | |
| from unittest import mock
 | |
| 
 | |
| import asyncio
 | |
| from asyncio import base_subprocess
 | |
| from asyncio import subprocess
 | |
| from test.test_asyncio import utils as test_utils
 | |
| from test import support
 | |
| from test.support import os_helper
 | |
| 
 | |
| 
 | |
| if support.MS_WINDOWS:
 | |
|     import msvcrt
 | |
| else:
 | |
|     from asyncio import unix_events
 | |
| 
 | |
| 
 | |
| if support.check_sanitizer(address=True):
 | |
|     raise unittest.SkipTest("Exposes ASAN flakiness in GitHub CI")
 | |
| 
 | |
| # Program blocking
 | |
| PROGRAM_BLOCKED = [sys.executable, '-c', 'import time; time.sleep(3600)']
 | |
| 
 | |
| # Program copying input to output
 | |
| PROGRAM_CAT = [
 | |
|     sys.executable, '-c',
 | |
|     ';'.join(('import sys',
 | |
|               'data = sys.stdin.buffer.read()',
 | |
|               'sys.stdout.buffer.write(data)'))]
 | |
| 
 | |
| 
 | |
| def tearDownModule():
 | |
|     asyncio.set_event_loop_policy(None)
 | |
| 
 | |
| 
 | |
| class TestSubprocessTransport(base_subprocess.BaseSubprocessTransport):
 | |
|     def _start(self, *args, **kwargs):
 | |
|         self._proc = mock.Mock()
 | |
|         self._proc.stdin = None
 | |
|         self._proc.stdout = None
 | |
|         self._proc.stderr = None
 | |
|         self._proc.pid = -1
 | |
| 
 | |
| 
 | |
| class SubprocessTransportTests(test_utils.TestCase):
 | |
|     def setUp(self):
 | |
|         super().setUp()
 | |
|         self.loop = self.new_test_loop()
 | |
|         self.set_event_loop(self.loop)
 | |
| 
 | |
|     def create_transport(self, waiter=None):
 | |
|         protocol = mock.Mock()
 | |
|         transport = TestSubprocessTransport(
 | |
|                         self.loop, protocol, ['test'], False,
 | |
|                         None, None, None, 0, waiter=waiter)
 | |
|         return (transport, protocol)
 | |
| 
 | |
|     def test_proc_exited(self):
 | |
|         waiter = self.loop.create_future()
 | |
|         transport, protocol = self.create_transport(waiter)
 | |
|         transport._process_exited(6)
 | |
|         self.loop.run_until_complete(waiter)
 | |
| 
 | |
|         self.assertEqual(transport.get_returncode(), 6)
 | |
| 
 | |
|         self.assertTrue(protocol.connection_made.called)
 | |
|         self.assertTrue(protocol.process_exited.called)
 | |
|         self.assertTrue(protocol.connection_lost.called)
 | |
|         self.assertEqual(protocol.connection_lost.call_args[0], (None,))
 | |
| 
 | |
|         self.assertFalse(transport.is_closing())
 | |
|         self.assertIsNone(transport._loop)
 | |
|         self.assertIsNone(transport._proc)
 | |
|         self.assertIsNone(transport._protocol)
 | |
| 
 | |
|         # methods must raise ProcessLookupError if the process exited
 | |
|         self.assertRaises(ProcessLookupError,
 | |
|                           transport.send_signal, signal.SIGTERM)
 | |
|         self.assertRaises(ProcessLookupError, transport.terminate)
 | |
|         self.assertRaises(ProcessLookupError, transport.kill)
 | |
| 
 | |
|         transport.close()
 | |
| 
 | |
|     def test_subprocess_repr(self):
 | |
|         waiter = self.loop.create_future()
 | |
|         transport, protocol = self.create_transport(waiter)
 | |
|         transport._process_exited(6)
 | |
|         self.loop.run_until_complete(waiter)
 | |
| 
 | |
|         self.assertEqual(
 | |
|             repr(transport),
 | |
|             "<TestSubprocessTransport pid=-1 returncode=6>"
 | |
|         )
 | |
|         transport._returncode = None
 | |
|         self.assertEqual(
 | |
|             repr(transport),
 | |
|             "<TestSubprocessTransport pid=-1 running>"
 | |
|         )
 | |
|         transport._pid = None
 | |
|         transport._returncode = None
 | |
|         self.assertEqual(
 | |
|             repr(transport),
 | |
|             "<TestSubprocessTransport not started>"
 | |
|         )
 | |
|         transport.close()
 | |
| 
 | |
| 
 | |
| class SubprocessMixin:
 | |
| 
 | |
|     def test_stdin_stdout(self):
 | |
|         args = PROGRAM_CAT
 | |
| 
 | |
|         async def run(data):
 | |
|             proc = await asyncio.create_subprocess_exec(
 | |
|                 *args,
 | |
|                 stdin=subprocess.PIPE,
 | |
|                 stdout=subprocess.PIPE,
 | |
|             )
 | |
| 
 | |
|             # feed data
 | |
|             proc.stdin.write(data)
 | |
|             await proc.stdin.drain()
 | |
|             proc.stdin.close()
 | |
| 
 | |
|             # get output and exitcode
 | |
|             data = await proc.stdout.read()
 | |
|             exitcode = await proc.wait()
 | |
|             return (exitcode, data)
 | |
| 
 | |
|         task = run(b'some data')
 | |
|         task = asyncio.wait_for(task, 60.0)
 | |
|         exitcode, stdout = self.loop.run_until_complete(task)
 | |
|         self.assertEqual(exitcode, 0)
 | |
|         self.assertEqual(stdout, b'some data')
 | |
| 
 | |
|     def test_communicate(self):
 | |
|         args = PROGRAM_CAT
 | |
| 
 | |
|         async def run(data):
 | |
|             proc = await asyncio.create_subprocess_exec(
 | |
|                 *args,
 | |
|                 stdin=subprocess.PIPE,
 | |
|                 stdout=subprocess.PIPE,
 | |
|             )
 | |
|             stdout, stderr = await proc.communicate(data)
 | |
|             return proc.returncode, stdout
 | |
| 
 | |
|         task = run(b'some data')
 | |
|         task = asyncio.wait_for(task, support.LONG_TIMEOUT)
 | |
|         exitcode, stdout = self.loop.run_until_complete(task)
 | |
|         self.assertEqual(exitcode, 0)
 | |
|         self.assertEqual(stdout, b'some data')
 | |
| 
 | |
|     def test_communicate_none_input(self):
 | |
|         args = PROGRAM_CAT
 | |
| 
 | |
|         async def run():
 | |
|             proc = await asyncio.create_subprocess_exec(
 | |
|                 *args,
 | |
|                 stdin=subprocess.PIPE,
 | |
|                 stdout=subprocess.PIPE,
 | |
|             )
 | |
|             stdout, stderr = await proc.communicate()
 | |
|             return proc.returncode, stdout
 | |
| 
 | |
|         task = run()
 | |
|         task = asyncio.wait_for(task, support.LONG_TIMEOUT)
 | |
|         exitcode, stdout = self.loop.run_until_complete(task)
 | |
|         self.assertEqual(exitcode, 0)
 | |
|         self.assertEqual(stdout, b'')
 | |
| 
 | |
|     def test_shell(self):
 | |
|         proc = self.loop.run_until_complete(
 | |
|             asyncio.create_subprocess_shell('exit 7')
 | |
|         )
 | |
|         exitcode = self.loop.run_until_complete(proc.wait())
 | |
|         self.assertEqual(exitcode, 7)
 | |
| 
 | |
|     def test_start_new_session(self):
 | |
|         # start the new process in a new session
 | |
|         proc = self.loop.run_until_complete(
 | |
|             asyncio.create_subprocess_shell(
 | |
|                 'exit 8',
 | |
|                 start_new_session=True,
 | |
|             )
 | |
|         )
 | |
|         exitcode = self.loop.run_until_complete(proc.wait())
 | |
|         self.assertEqual(exitcode, 8)
 | |
| 
 | |
|     def test_kill(self):
 | |
|         args = PROGRAM_BLOCKED
 | |
|         proc = self.loop.run_until_complete(
 | |
|             asyncio.create_subprocess_exec(*args)
 | |
|         )
 | |
|         proc.kill()
 | |
|         returncode = self.loop.run_until_complete(proc.wait())
 | |
|         if sys.platform == 'win32':
 | |
|             self.assertIsInstance(returncode, int)
 | |
|             # expect 1 but sometimes get 0
 | |
|         else:
 | |
|             self.assertEqual(-signal.SIGKILL, returncode)
 | |
| 
 | |
|     def test_kill_issue43884(self):
 | |
|         if sys.platform == 'win32':
 | |
|             blocking_shell_command = f'{sys.executable} -c "import time; time.sleep(2)"'
 | |
|         else:
 | |
|             blocking_shell_command = 'sleep 1; sleep 1'
 | |
|         creationflags = 0
 | |
|         if sys.platform == 'win32':
 | |
|             from subprocess import CREATE_NEW_PROCESS_GROUP
 | |
|             # On windows create a new process group so that killing process
 | |
|             # kills the process and all its children.
 | |
|             creationflags = CREATE_NEW_PROCESS_GROUP
 | |
|         proc = self.loop.run_until_complete(
 | |
|             asyncio.create_subprocess_shell(blocking_shell_command, stdout=asyncio.subprocess.PIPE,
 | |
|             creationflags=creationflags)
 | |
|         )
 | |
|         self.loop.run_until_complete(asyncio.sleep(1))
 | |
|         if sys.platform == 'win32':
 | |
|             proc.send_signal(signal.CTRL_BREAK_EVENT)
 | |
|         # On windows it is an alias of terminate which sets the return code
 | |
|         proc.kill()
 | |
|         returncode = self.loop.run_until_complete(proc.wait())
 | |
|         if sys.platform == 'win32':
 | |
|             self.assertIsInstance(returncode, int)
 | |
|             # expect 1 but sometimes get 0
 | |
|         else:
 | |
|             self.assertEqual(-signal.SIGKILL, returncode)
 | |
| 
 | |
|     def test_terminate(self):
 | |
|         args = PROGRAM_BLOCKED
 | |
|         proc = self.loop.run_until_complete(
 | |
|             asyncio.create_subprocess_exec(*args)
 | |
|         )
 | |
|         proc.terminate()
 | |
|         returncode = self.loop.run_until_complete(proc.wait())
 | |
|         if sys.platform == 'win32':
 | |
|             self.assertIsInstance(returncode, int)
 | |
|             # expect 1 but sometimes get 0
 | |
|         else:
 | |
|             self.assertEqual(-signal.SIGTERM, returncode)
 | |
| 
 | |
|     @unittest.skipIf(sys.platform == 'win32', "Don't have SIGHUP")
 | |
|     def test_send_signal(self):
 | |
|         # bpo-31034: Make sure that we get the default signal handler (killing
 | |
|         # the process). The parent process may have decided to ignore SIGHUP,
 | |
|         # and signal handlers are inherited.
 | |
|         old_handler = signal.signal(signal.SIGHUP, signal.SIG_DFL)
 | |
|         try:
 | |
|             code = 'import time; print("sleeping", flush=True); time.sleep(3600)'
 | |
|             args = [sys.executable, '-c', code]
 | |
|             proc = self.loop.run_until_complete(
 | |
|                 asyncio.create_subprocess_exec(
 | |
|                     *args,
 | |
|                     stdout=subprocess.PIPE,
 | |
|                 )
 | |
|             )
 | |
| 
 | |
|             async def send_signal(proc):
 | |
|                 # basic synchronization to wait until the program is sleeping
 | |
|                 line = await proc.stdout.readline()
 | |
|                 self.assertEqual(line, b'sleeping\n')
 | |
| 
 | |
|                 proc.send_signal(signal.SIGHUP)
 | |
|                 returncode = await proc.wait()
 | |
|                 return returncode
 | |
| 
 | |
|             returncode = self.loop.run_until_complete(send_signal(proc))
 | |
|             self.assertEqual(-signal.SIGHUP, returncode)
 | |
|         finally:
 | |
|             signal.signal(signal.SIGHUP, old_handler)
 | |
| 
 | |
|     def test_stdin_broken_pipe(self):
 | |
|         # buffer large enough to feed the whole pipe buffer
 | |
|         large_data = b'x' * support.PIPE_MAX_SIZE
 | |
| 
 | |
|         rfd, wfd = os.pipe()
 | |
|         self.addCleanup(os.close, rfd)
 | |
|         self.addCleanup(os.close, wfd)
 | |
|         if support.MS_WINDOWS:
 | |
|             handle = msvcrt.get_osfhandle(rfd)
 | |
|             os.set_handle_inheritable(handle, True)
 | |
|             code = textwrap.dedent(f'''
 | |
|                 import os, msvcrt
 | |
|                 handle = {handle}
 | |
|                 fd = msvcrt.open_osfhandle(handle, os.O_RDONLY)
 | |
|                 os.read(fd, 1)
 | |
|             ''')
 | |
|             from subprocess import STARTUPINFO
 | |
|             startupinfo = STARTUPINFO()
 | |
|             startupinfo.lpAttributeList = {"handle_list": [handle]}
 | |
|             kwargs = dict(startupinfo=startupinfo)
 | |
|         else:
 | |
|             code = f'import os; fd = {rfd}; os.read(fd, 1)'
 | |
|             kwargs = dict(pass_fds=(rfd,))
 | |
| 
 | |
|         # the program ends before the stdin can be fed
 | |
|         proc = self.loop.run_until_complete(
 | |
|             asyncio.create_subprocess_exec(
 | |
|                 sys.executable, '-c', code,
 | |
|                 stdin=subprocess.PIPE,
 | |
|                 **kwargs
 | |
|             )
 | |
|         )
 | |
| 
 | |
|         async def write_stdin(proc, data):
 | |
|             proc.stdin.write(data)
 | |
|             # Only exit the child process once the write buffer is filled
 | |
|             os.write(wfd, b'go')
 | |
|             await proc.stdin.drain()
 | |
| 
 | |
|         coro = write_stdin(proc, large_data)
 | |
|         # drain() must raise BrokenPipeError or ConnectionResetError
 | |
|         with test_utils.disable_logger():
 | |
|             self.assertRaises((BrokenPipeError, ConnectionResetError),
 | |
|                               self.loop.run_until_complete, coro)
 | |
|         self.loop.run_until_complete(proc.wait())
 | |
| 
 | |
|     def test_communicate_ignore_broken_pipe(self):
 | |
|         # buffer large enough to feed the whole pipe buffer
 | |
|         large_data = b'x' * support.PIPE_MAX_SIZE
 | |
| 
 | |
|         # the program ends before the stdin can be fed
 | |
|         proc = self.loop.run_until_complete(
 | |
|             asyncio.create_subprocess_exec(
 | |
|                 sys.executable, '-c', 'pass',
 | |
|                 stdin=subprocess.PIPE,
 | |
|             )
 | |
|         )
 | |
| 
 | |
|         # communicate() must ignore BrokenPipeError when feeding stdin
 | |
|         self.loop.set_exception_handler(lambda loop, msg: None)
 | |
|         self.loop.run_until_complete(proc.communicate(large_data))
 | |
|         self.loop.run_until_complete(proc.wait())
 | |
| 
 | |
|     def test_pause_reading(self):
 | |
|         limit = 10
 | |
|         size = (limit * 2 + 1)
 | |
| 
 | |
|         async def test_pause_reading():
 | |
|             code = '\n'.join((
 | |
|                 'import sys',
 | |
|                 'sys.stdout.write("x" * %s)' % size,
 | |
|                 'sys.stdout.flush()',
 | |
|             ))
 | |
| 
 | |
|             connect_read_pipe = self.loop.connect_read_pipe
 | |
| 
 | |
|             async def connect_read_pipe_mock(*args, **kw):
 | |
|                 transport, protocol = await connect_read_pipe(*args, **kw)
 | |
|                 transport.pause_reading = mock.Mock()
 | |
|                 transport.resume_reading = mock.Mock()
 | |
|                 return (transport, protocol)
 | |
| 
 | |
|             self.loop.connect_read_pipe = connect_read_pipe_mock
 | |
| 
 | |
|             proc = await asyncio.create_subprocess_exec(
 | |
|                 sys.executable, '-c', code,
 | |
|                 stdin=asyncio.subprocess.PIPE,
 | |
|                 stdout=asyncio.subprocess.PIPE,
 | |
|                 limit=limit,
 | |
|             )
 | |
|             stdout_transport = proc._transport.get_pipe_transport(1)
 | |
| 
 | |
|             stdout, stderr = await proc.communicate()
 | |
| 
 | |
|             # The child process produced more than limit bytes of output,
 | |
|             # the stream reader transport should pause the protocol to not
 | |
|             # allocate too much memory.
 | |
|             return (stdout, stdout_transport)
 | |
| 
 | |
|         # Issue #22685: Ensure that the stream reader pauses the protocol
 | |
|         # when the child process produces too much data
 | |
|         stdout, transport = self.loop.run_until_complete(test_pause_reading())
 | |
| 
 | |
|         self.assertEqual(stdout, b'x' * size)
 | |
|         self.assertTrue(transport.pause_reading.called)
 | |
|         self.assertTrue(transport.resume_reading.called)
 | |
| 
 | |
|     def test_stdin_not_inheritable(self):
 | |
|         # asyncio issue #209: stdin must not be inheritable, otherwise
 | |
|         # the Process.communicate() hangs
 | |
|         async def len_message(message):
 | |
|             code = 'import sys; data = sys.stdin.read(); print(len(data))'
 | |
|             proc = await asyncio.create_subprocess_exec(
 | |
|                 sys.executable, '-c', code,
 | |
|                 stdin=asyncio.subprocess.PIPE,
 | |
|                 stdout=asyncio.subprocess.PIPE,
 | |
|                 stderr=asyncio.subprocess.PIPE,
 | |
|                 close_fds=False,
 | |
|             )
 | |
|             stdout, stderr = await proc.communicate(message)
 | |
|             exitcode = await proc.wait()
 | |
|             return (stdout, exitcode)
 | |
| 
 | |
|         output, exitcode = self.loop.run_until_complete(len_message(b'abc'))
 | |
|         self.assertEqual(output.rstrip(), b'3')
 | |
|         self.assertEqual(exitcode, 0)
 | |
| 
 | |
|     def test_empty_input(self):
 | |
| 
 | |
|         async def empty_input():
 | |
|             code = 'import sys; data = sys.stdin.read(); print(len(data))'
 | |
|             proc = await asyncio.create_subprocess_exec(
 | |
|                 sys.executable, '-c', code,
 | |
|                 stdin=asyncio.subprocess.PIPE,
 | |
|                 stdout=asyncio.subprocess.PIPE,
 | |
|                 stderr=asyncio.subprocess.PIPE,
 | |
|                 close_fds=False,
 | |
|             )
 | |
|             stdout, stderr = await proc.communicate(b'')
 | |
|             exitcode = await proc.wait()
 | |
|             return (stdout, exitcode)
 | |
| 
 | |
|         output, exitcode = self.loop.run_until_complete(empty_input())
 | |
|         self.assertEqual(output.rstrip(), b'0')
 | |
|         self.assertEqual(exitcode, 0)
 | |
| 
 | |
|     def test_devnull_input(self):
 | |
| 
 | |
|         async def empty_input():
 | |
|             code = 'import sys; data = sys.stdin.read(); print(len(data))'
 | |
|             proc = await asyncio.create_subprocess_exec(
 | |
|                 sys.executable, '-c', code,
 | |
|                 stdin=asyncio.subprocess.DEVNULL,
 | |
|                 stdout=asyncio.subprocess.PIPE,
 | |
|                 stderr=asyncio.subprocess.PIPE,
 | |
|                 close_fds=False,
 | |
|             )
 | |
|             stdout, stderr = await proc.communicate()
 | |
|             exitcode = await proc.wait()
 | |
|             return (stdout, exitcode)
 | |
| 
 | |
|         output, exitcode = self.loop.run_until_complete(empty_input())
 | |
|         self.assertEqual(output.rstrip(), b'0')
 | |
|         self.assertEqual(exitcode, 0)
 | |
| 
 | |
|     def test_devnull_output(self):
 | |
| 
 | |
|         async def empty_output():
 | |
|             code = 'import sys; data = sys.stdin.read(); print(len(data))'
 | |
|             proc = await asyncio.create_subprocess_exec(
 | |
|                 sys.executable, '-c', code,
 | |
|                 stdin=asyncio.subprocess.PIPE,
 | |
|                 stdout=asyncio.subprocess.DEVNULL,
 | |
|                 stderr=asyncio.subprocess.PIPE,
 | |
|                 close_fds=False,
 | |
|             )
 | |
|             stdout, stderr = await proc.communicate(b"abc")
 | |
|             exitcode = await proc.wait()
 | |
|             return (stdout, exitcode)
 | |
| 
 | |
|         output, exitcode = self.loop.run_until_complete(empty_output())
 | |
|         self.assertEqual(output, None)
 | |
|         self.assertEqual(exitcode, 0)
 | |
| 
 | |
|     def test_devnull_error(self):
 | |
| 
 | |
|         async def empty_error():
 | |
|             code = 'import sys; data = sys.stdin.read(); print(len(data))'
 | |
|             proc = await asyncio.create_subprocess_exec(
 | |
|                 sys.executable, '-c', code,
 | |
|                 stdin=asyncio.subprocess.PIPE,
 | |
|                 stdout=asyncio.subprocess.PIPE,
 | |
|                 stderr=asyncio.subprocess.DEVNULL,
 | |
|                 close_fds=False,
 | |
|             )
 | |
|             stdout, stderr = await proc.communicate(b"abc")
 | |
|             exitcode = await proc.wait()
 | |
|             return (stderr, exitcode)
 | |
| 
 | |
|         output, exitcode = self.loop.run_until_complete(empty_error())
 | |
|         self.assertEqual(output, None)
 | |
|         self.assertEqual(exitcode, 0)
 | |
| 
 | |
|     @unittest.skipIf(sys.platform != 'linux', "Don't have /dev/stdin")
 | |
|     def test_devstdin_input(self):
 | |
| 
 | |
|         async def devstdin_input(message):
 | |
|             code = 'file = open("/dev/stdin"); data = file.read(); print(len(data))'
 | |
|             proc = await asyncio.create_subprocess_exec(
 | |
|                 sys.executable, '-c', code,
 | |
|                 stdin=asyncio.subprocess.PIPE,
 | |
|                 stdout=asyncio.subprocess.PIPE,
 | |
|                 stderr=asyncio.subprocess.PIPE,
 | |
|                 close_fds=False,
 | |
|             )
 | |
|             stdout, stderr = await proc.communicate(message)
 | |
|             exitcode = await proc.wait()
 | |
|             return (stdout, exitcode)
 | |
| 
 | |
|         output, exitcode = self.loop.run_until_complete(devstdin_input(b'abc'))
 | |
|         self.assertEqual(output.rstrip(), b'3')
 | |
|         self.assertEqual(exitcode, 0)
 | |
| 
 | |
|     def test_cancel_process_wait(self):
 | |
|         # Issue #23140: cancel Process.wait()
 | |
| 
 | |
|         async def cancel_wait():
 | |
|             proc = await asyncio.create_subprocess_exec(*PROGRAM_BLOCKED)
 | |
| 
 | |
|             # Create an internal future waiting on the process exit
 | |
|             task = self.loop.create_task(proc.wait())
 | |
|             self.loop.call_soon(task.cancel)
 | |
|             try:
 | |
|                 await task
 | |
|             except asyncio.CancelledError:
 | |
|                 pass
 | |
| 
 | |
|             # Cancel the future
 | |
|             task.cancel()
 | |
| 
 | |
|             # Kill the process and wait until it is done
 | |
|             proc.kill()
 | |
|             await proc.wait()
 | |
| 
 | |
|         self.loop.run_until_complete(cancel_wait())
 | |
| 
 | |
|     def test_cancel_make_subprocess_transport_exec(self):
 | |
| 
 | |
|         async def cancel_make_transport():
 | |
|             coro = asyncio.create_subprocess_exec(*PROGRAM_BLOCKED)
 | |
|             task = self.loop.create_task(coro)
 | |
| 
 | |
|             self.loop.call_soon(task.cancel)
 | |
|             try:
 | |
|                 await task
 | |
|             except asyncio.CancelledError:
 | |
|                 pass
 | |
| 
 | |
|         # ignore the log:
 | |
|         # "Exception during subprocess creation, kill the subprocess"
 | |
|         with test_utils.disable_logger():
 | |
|             self.loop.run_until_complete(cancel_make_transport())
 | |
| 
 | |
|     def test_cancel_post_init(self):
 | |
| 
 | |
|         async def cancel_make_transport():
 | |
|             coro = self.loop.subprocess_exec(asyncio.SubprocessProtocol,
 | |
|                                              *PROGRAM_BLOCKED)
 | |
|             task = self.loop.create_task(coro)
 | |
| 
 | |
|             self.loop.call_soon(task.cancel)
 | |
|             try:
 | |
|                 await task
 | |
|             except asyncio.CancelledError:
 | |
|                 pass
 | |
| 
 | |
|         # ignore the log:
 | |
|         # "Exception during subprocess creation, kill the subprocess"
 | |
|         with test_utils.disable_logger():
 | |
|             self.loop.run_until_complete(cancel_make_transport())
 | |
|             test_utils.run_briefly(self.loop)
 | |
| 
 | |
|     def test_close_kill_running(self):
 | |
| 
 | |
|         async def kill_running():
 | |
|             create = self.loop.subprocess_exec(asyncio.SubprocessProtocol,
 | |
|                                                *PROGRAM_BLOCKED)
 | |
|             transport, protocol = await create
 | |
| 
 | |
|             kill_called = False
 | |
|             def kill():
 | |
|                 nonlocal kill_called
 | |
|                 kill_called = True
 | |
|                 orig_kill()
 | |
| 
 | |
|             proc = transport.get_extra_info('subprocess')
 | |
|             orig_kill = proc.kill
 | |
|             proc.kill = kill
 | |
|             returncode = transport.get_returncode()
 | |
|             transport.close()
 | |
|             await asyncio.wait_for(transport._wait(), 5)
 | |
|             return (returncode, kill_called)
 | |
| 
 | |
|         # Ignore "Close running child process: kill ..." log
 | |
|         with test_utils.disable_logger():
 | |
|             try:
 | |
|                 returncode, killed = self.loop.run_until_complete(
 | |
|                     kill_running()
 | |
|                 )
 | |
|             except asyncio.TimeoutError:
 | |
|                 self.skipTest(
 | |
|                     "Timeout failure on waiting for subprocess stopping"
 | |
|                 )
 | |
|         self.assertIsNone(returncode)
 | |
| 
 | |
|         # transport.close() must kill the process if it is still running
 | |
|         self.assertTrue(killed)
 | |
|         test_utils.run_briefly(self.loop)
 | |
| 
 | |
|     def test_close_dont_kill_finished(self):
 | |
| 
 | |
|         async def kill_running():
 | |
|             create = self.loop.subprocess_exec(asyncio.SubprocessProtocol,
 | |
|                                                *PROGRAM_BLOCKED)
 | |
|             transport, protocol = await create
 | |
|             proc = transport.get_extra_info('subprocess')
 | |
| 
 | |
|             # kill the process (but asyncio is not notified immediately)
 | |
|             proc.kill()
 | |
|             proc.wait()
 | |
| 
 | |
|             proc.kill = mock.Mock()
 | |
|             proc_returncode = proc.poll()
 | |
|             transport_returncode = transport.get_returncode()
 | |
|             transport.close()
 | |
|             return (proc_returncode, transport_returncode, proc.kill.called)
 | |
| 
 | |
|         # Ignore "Unknown child process pid ..." log of SafeChildWatcher,
 | |
|         # emitted because the test already consumes the exit status:
 | |
|         # proc.wait()
 | |
|         with test_utils.disable_logger():
 | |
|             result = self.loop.run_until_complete(kill_running())
 | |
|             test_utils.run_briefly(self.loop)
 | |
| 
 | |
|         proc_returncode, transport_return_code, killed = result
 | |
| 
 | |
|         self.assertIsNotNone(proc_returncode)
 | |
|         self.assertIsNone(transport_return_code)
 | |
| 
 | |
|         # transport.close() must not kill the process if it finished, even if
 | |
|         # the transport was not notified yet
 | |
|         self.assertFalse(killed)
 | |
| 
 | |
|         # Unlike SafeChildWatcher, FastChildWatcher does not pop the
 | |
|         # callbacks if waitpid() is called elsewhere. Let's clear them
 | |
|         # manually to avoid a warning when the watcher is detached.
 | |
|         if (sys.platform != 'win32' and
 | |
|                 isinstance(self, SubprocessFastWatcherTests)):
 | |
|             with warnings.catch_warnings():
 | |
|                 warnings.simplefilter('ignore', DeprecationWarning)
 | |
|                 asyncio.get_child_watcher()._callbacks.clear()
 | |
| 
 | |
|     async def _test_popen_error(self, stdin):
 | |
|         if sys.platform == 'win32':
 | |
|             target = 'asyncio.windows_utils.Popen'
 | |
|         else:
 | |
|             target = 'subprocess.Popen'
 | |
|         with mock.patch(target) as popen:
 | |
|             exc = ZeroDivisionError
 | |
|             popen.side_effect = exc
 | |
| 
 | |
|             with warnings.catch_warnings(record=True) as warns:
 | |
|                 with self.assertRaises(exc):
 | |
|                     await asyncio.create_subprocess_exec(
 | |
|                         sys.executable,
 | |
|                         '-c',
 | |
|                         'pass',
 | |
|                         stdin=stdin
 | |
|                     )
 | |
|                 self.assertEqual(warns, [])
 | |
| 
 | |
|     def test_popen_error(self):
 | |
|         # Issue #24763: check that the subprocess transport is closed
 | |
|         # when BaseSubprocessTransport fails
 | |
|         self.loop.run_until_complete(self._test_popen_error(stdin=None))
 | |
| 
 | |
|     def test_popen_error_with_stdin_pipe(self):
 | |
|         # Issue #35721: check that newly created socket pair is closed when
 | |
|         # Popen fails
 | |
|         self.loop.run_until_complete(
 | |
|             self._test_popen_error(stdin=subprocess.PIPE))
 | |
| 
 | |
|     def test_read_stdout_after_process_exit(self):
 | |
| 
 | |
|         async def execute():
 | |
|             code = '\n'.join(['import sys',
 | |
|                               'for _ in range(64):',
 | |
|                               '    sys.stdout.write("x" * 4096)',
 | |
|                               'sys.stdout.flush()',
 | |
|                               'sys.exit(1)'])
 | |
| 
 | |
|             process = await asyncio.create_subprocess_exec(
 | |
|                 sys.executable, '-c', code,
 | |
|                 stdout=asyncio.subprocess.PIPE,
 | |
|             )
 | |
| 
 | |
|             while True:
 | |
|                 data = await process.stdout.read(65536)
 | |
|                 if data:
 | |
|                     await asyncio.sleep(0.3)
 | |
|                 else:
 | |
|                     break
 | |
| 
 | |
|         self.loop.run_until_complete(execute())
 | |
| 
 | |
|     def test_create_subprocess_exec_text_mode_fails(self):
 | |
|         async def execute():
 | |
|             with self.assertRaises(ValueError):
 | |
|                 await subprocess.create_subprocess_exec(sys.executable,
 | |
|                                                         text=True)
 | |
| 
 | |
|             with self.assertRaises(ValueError):
 | |
|                 await subprocess.create_subprocess_exec(sys.executable,
 | |
|                                                         encoding="utf-8")
 | |
| 
 | |
|             with self.assertRaises(ValueError):
 | |
|                 await subprocess.create_subprocess_exec(sys.executable,
 | |
|                                                         errors="strict")
 | |
| 
 | |
|         self.loop.run_until_complete(execute())
 | |
| 
 | |
|     def test_create_subprocess_shell_text_mode_fails(self):
 | |
| 
 | |
|         async def execute():
 | |
|             with self.assertRaises(ValueError):
 | |
|                 await subprocess.create_subprocess_shell(sys.executable,
 | |
|                                                          text=True)
 | |
| 
 | |
|             with self.assertRaises(ValueError):
 | |
|                 await subprocess.create_subprocess_shell(sys.executable,
 | |
|                                                          encoding="utf-8")
 | |
| 
 | |
|             with self.assertRaises(ValueError):
 | |
|                 await subprocess.create_subprocess_shell(sys.executable,
 | |
|                                                          errors="strict")
 | |
| 
 | |
|         self.loop.run_until_complete(execute())
 | |
| 
 | |
|     def test_create_subprocess_exec_with_path(self):
 | |
|         async def execute():
 | |
|             p = await subprocess.create_subprocess_exec(
 | |
|                 os_helper.FakePath(sys.executable), '-c', 'pass')
 | |
|             await p.wait()
 | |
|             p = await subprocess.create_subprocess_exec(
 | |
|                 sys.executable, '-c', 'pass', os_helper.FakePath('.'))
 | |
|             await p.wait()
 | |
| 
 | |
|         self.assertIsNone(self.loop.run_until_complete(execute()))
 | |
| 
 | |
|     async def check_stdout_output(self, coro, output):
 | |
|         proc = await coro
 | |
|         stdout, _ = await proc.communicate()
 | |
|         self.assertEqual(stdout, output)
 | |
|         self.assertEqual(proc.returncode, 0)
 | |
|         task = asyncio.create_task(proc.wait())
 | |
|         await asyncio.sleep(0)
 | |
|         self.assertEqual(task.result(), proc.returncode)
 | |
| 
 | |
|     def test_create_subprocess_env_shell(self) -> None:
 | |
|         async def main() -> None:
 | |
|             cmd = f'''{sys.executable} -c "import os, sys; sys.stdout.write(os.getenv('FOO'))"'''
 | |
|             env = os.environ.copy()
 | |
|             env["FOO"] = "bar"
 | |
|             proc = await asyncio.create_subprocess_shell(
 | |
|                 cmd, env=env, stdout=subprocess.PIPE
 | |
|             )
 | |
|             return proc
 | |
| 
 | |
|         self.loop.run_until_complete(self.check_stdout_output(main(), b'bar'))
 | |
| 
 | |
|     def test_create_subprocess_env_exec(self) -> None:
 | |
|         async def main() -> None:
 | |
|             cmd = [sys.executable, "-c",
 | |
|                    "import os, sys; sys.stdout.write(os.getenv('FOO'))"]
 | |
|             env = os.environ.copy()
 | |
|             env["FOO"] = "baz"
 | |
|             proc = await asyncio.create_subprocess_exec(
 | |
|                 *cmd, env=env, stdout=subprocess.PIPE
 | |
|             )
 | |
|             return proc
 | |
| 
 | |
|         self.loop.run_until_complete(self.check_stdout_output(main(), b'baz'))
 | |
| 
 | |
| 
 | |
|     def test_subprocess_concurrent_wait(self) -> None:
 | |
|         async def main() -> None:
 | |
|             proc = await asyncio.create_subprocess_exec(
 | |
|                 *PROGRAM_CAT,
 | |
|                 stdin=subprocess.PIPE,
 | |
|                 stdout=subprocess.PIPE,
 | |
|             )
 | |
|             stdout, _ = await proc.communicate(b'some data')
 | |
|             self.assertEqual(stdout, b"some data")
 | |
|             self.assertEqual(proc.returncode, 0)
 | |
|             self.assertEqual(await asyncio.gather(*[proc.wait() for _ in range(10)]),
 | |
|                              [proc.returncode] * 10)
 | |
| 
 | |
|         self.loop.run_until_complete(main())
 | |
| 
 | |
|     def test_subprocess_protocol_events(self):
 | |
|         # gh-108973: Test that all subprocess protocol methods are called.
 | |
|         # The protocol methods are not called in a determistic order.
 | |
|         # The order depends on the event loop and the operating system.
 | |
|         events = []
 | |
|         fds = [1, 2]
 | |
|         expected = [
 | |
|             ('pipe_data_received', 1, b'stdout'),
 | |
|             ('pipe_data_received', 2, b'stderr'),
 | |
|             ('pipe_connection_lost', 1),
 | |
|             ('pipe_connection_lost', 2),
 | |
|             'process_exited',
 | |
|         ]
 | |
|         per_fd_expected = [
 | |
|             'pipe_data_received',
 | |
|             'pipe_connection_lost',
 | |
|         ]
 | |
| 
 | |
|         class MyProtocol(asyncio.SubprocessProtocol):
 | |
|             def __init__(self, exit_future: asyncio.Future) -> None:
 | |
|                 self.exit_future = exit_future
 | |
| 
 | |
|             def pipe_data_received(self, fd, data) -> None:
 | |
|                 events.append(('pipe_data_received', fd, data))
 | |
|                 self.exit_maybe()
 | |
| 
 | |
|             def pipe_connection_lost(self, fd, exc) -> None:
 | |
|                 events.append(('pipe_connection_lost', fd))
 | |
|                 self.exit_maybe()
 | |
| 
 | |
|             def process_exited(self) -> None:
 | |
|                 events.append('process_exited')
 | |
|                 self.exit_maybe()
 | |
| 
 | |
|             def exit_maybe(self):
 | |
|                 # Only exit when we got all expected events
 | |
|                 if len(events) >= len(expected):
 | |
|                     self.exit_future.set_result(True)
 | |
| 
 | |
|         async def main() -> None:
 | |
|             loop = asyncio.get_running_loop()
 | |
|             exit_future = asyncio.Future()
 | |
|             code = 'import sys; sys.stdout.write("stdout"); sys.stderr.write("stderr")'
 | |
|             transport, _ = await loop.subprocess_exec(lambda: MyProtocol(exit_future),
 | |
|                                                       sys.executable, '-c', code, stdin=None)
 | |
|             await exit_future
 | |
|             transport.close()
 | |
| 
 | |
|             return events
 | |
| 
 | |
|         events = self.loop.run_until_complete(main())
 | |
| 
 | |
|         # First, make sure that we received all events
 | |
|         self.assertSetEqual(set(events), set(expected))
 | |
| 
 | |
|         # Second, check order of pipe events per file descriptor
 | |
|         per_fd_events = {fd: [] for fd in fds}
 | |
|         for event in events:
 | |
|             if event == 'process_exited':
 | |
|                 continue
 | |
|             name, fd = event[:2]
 | |
|             per_fd_events[fd].append(name)
 | |
| 
 | |
|         for fd in fds:
 | |
|             self.assertEqual(per_fd_events[fd], per_fd_expected, (fd, events))
 | |
| 
 | |
|     def test_subprocess_communicate_stdout(self):
 | |
|         # See https://github.com/python/cpython/issues/100133
 | |
|         async def get_command_stdout(cmd, *args):
 | |
|             proc = await asyncio.create_subprocess_exec(
 | |
|                 cmd, *args, stdout=asyncio.subprocess.PIPE,
 | |
|             )
 | |
|             stdout, _ = await proc.communicate()
 | |
|             return stdout.decode().strip()
 | |
| 
 | |
|         async def main():
 | |
|             outputs = [f'foo{i}' for i in range(10)]
 | |
|             res = await asyncio.gather(*[get_command_stdout(sys.executable, '-c',
 | |
|                                         f'print({out!r})') for out in outputs])
 | |
|             self.assertEqual(res, outputs)
 | |
| 
 | |
|         self.loop.run_until_complete(main())
 | |
| 
 | |
| 
 | |
| if sys.platform != 'win32':
 | |
|     # Unix
 | |
|     class SubprocessWatcherMixin(SubprocessMixin):
 | |
| 
 | |
|         Watcher = None
 | |
| 
 | |
|         def setUp(self):
 | |
|             super().setUp()
 | |
|             policy = asyncio.get_event_loop_policy()
 | |
|             self.loop = policy.new_event_loop()
 | |
|             self.set_event_loop(self.loop)
 | |
| 
 | |
|             watcher = self._get_watcher()
 | |
|             watcher.attach_loop(self.loop)
 | |
|             with warnings.catch_warnings():
 | |
|                 warnings.simplefilter('ignore', DeprecationWarning)
 | |
|                 policy.set_child_watcher(watcher)
 | |
| 
 | |
|         def tearDown(self):
 | |
|             super().tearDown()
 | |
|             policy = asyncio.get_event_loop_policy()
 | |
|             with warnings.catch_warnings():
 | |
|                 warnings.simplefilter('ignore', DeprecationWarning)
 | |
|                 watcher = policy.get_child_watcher()
 | |
|                 policy.set_child_watcher(None)
 | |
|             watcher.attach_loop(None)
 | |
|             watcher.close()
 | |
| 
 | |
|     class SubprocessThreadedWatcherTests(SubprocessWatcherMixin,
 | |
|                                          test_utils.TestCase):
 | |
| 
 | |
|         def _get_watcher(self):
 | |
|             return unix_events.ThreadedChildWatcher()
 | |
| 
 | |
|     class SubprocessSafeWatcherTests(SubprocessWatcherMixin,
 | |
|                                      test_utils.TestCase):
 | |
| 
 | |
|         def _get_watcher(self):
 | |
|             with self.assertWarns(DeprecationWarning):
 | |
|                 return unix_events.SafeChildWatcher()
 | |
| 
 | |
|     class MultiLoopChildWatcherTests(test_utils.TestCase):
 | |
| 
 | |
|         def test_warns(self):
 | |
|             with self.assertWarns(DeprecationWarning):
 | |
|                 unix_events.MultiLoopChildWatcher()
 | |
| 
 | |
|     class SubprocessFastWatcherTests(SubprocessWatcherMixin,
 | |
|                                      test_utils.TestCase):
 | |
| 
 | |
|         def _get_watcher(self):
 | |
|             with self.assertWarns(DeprecationWarning):
 | |
|                 return unix_events.FastChildWatcher()
 | |
| 
 | |
|     @unittest.skipUnless(
 | |
|         unix_events.can_use_pidfd(),
 | |
|         "operating system does not support pidfds",
 | |
|     )
 | |
|     class SubprocessPidfdWatcherTests(SubprocessWatcherMixin,
 | |
|                                       test_utils.TestCase):
 | |
| 
 | |
|         def _get_watcher(self):
 | |
|             return unix_events.PidfdChildWatcher()
 | |
| 
 | |
| 
 | |
|     class GenericWatcherTests(test_utils.TestCase):
 | |
| 
 | |
|         def test_create_subprocess_fails_with_inactive_watcher(self):
 | |
|             watcher = mock.create_autospec(asyncio.AbstractChildWatcher)
 | |
|             watcher.is_active.return_value = False
 | |
| 
 | |
|             async def execute():
 | |
|                 asyncio.set_child_watcher(watcher)
 | |
| 
 | |
|                 with self.assertRaises(RuntimeError):
 | |
|                     await subprocess.create_subprocess_exec(
 | |
|                         os_helper.FakePath(sys.executable), '-c', 'pass')
 | |
| 
 | |
|                 watcher.add_child_handler.assert_not_called()
 | |
| 
 | |
|             with asyncio.Runner(loop_factory=asyncio.new_event_loop) as runner:
 | |
|                 with warnings.catch_warnings():
 | |
|                     warnings.simplefilter('ignore', DeprecationWarning)
 | |
|                     self.assertIsNone(runner.run(execute()))
 | |
|             self.assertListEqual(watcher.mock_calls, [
 | |
|                 mock.call.__enter__(),
 | |
|                 mock.call.is_active(),
 | |
|                 mock.call.__exit__(RuntimeError, mock.ANY, mock.ANY),
 | |
|             ], watcher.mock_calls)
 | |
| 
 | |
| 
 | |
|         @unittest.skipUnless(
 | |
|             unix_events.can_use_pidfd(),
 | |
|             "operating system does not support pidfds",
 | |
|         )
 | |
|         def test_create_subprocess_with_pidfd(self):
 | |
|             async def in_thread():
 | |
|                 proc = await asyncio.create_subprocess_exec(
 | |
|                     *PROGRAM_CAT,
 | |
|                     stdin=subprocess.PIPE,
 | |
|                     stdout=subprocess.PIPE,
 | |
|                 )
 | |
|                 stdout, stderr = await proc.communicate(b"some data")
 | |
|                 return proc.returncode, stdout
 | |
| 
 | |
|             async def main():
 | |
|                 # asyncio.Runner did not call asyncio.set_event_loop()
 | |
|                 with self.assertRaises(RuntimeError):
 | |
|                     asyncio.get_event_loop_policy().get_event_loop()
 | |
|                 return await asyncio.to_thread(asyncio.run, in_thread())
 | |
|             with self.assertWarns(DeprecationWarning):
 | |
|                 asyncio.set_child_watcher(asyncio.PidfdChildWatcher())
 | |
|             try:
 | |
|                 with asyncio.Runner(loop_factory=asyncio.new_event_loop) as runner:
 | |
|                     returncode, stdout = runner.run(main())
 | |
|                 self.assertEqual(returncode, 0)
 | |
|                 self.assertEqual(stdout, b'some data')
 | |
|             finally:
 | |
|                 with self.assertWarns(DeprecationWarning):
 | |
|                     asyncio.set_child_watcher(None)
 | |
| else:
 | |
|     # Windows
 | |
|     class SubprocessProactorTests(SubprocessMixin, test_utils.TestCase):
 | |
| 
 | |
|         def setUp(self):
 | |
|             super().setUp()
 | |
|             self.loop = asyncio.ProactorEventLoop()
 | |
|             self.set_event_loop(self.loop)
 | |
| 
 | |
| 
 | |
| if __name__ == '__main__':
 | |
|     unittest.main()
 | 
