mirror of
				https://github.com/python/cpython.git
				synced 2025-10-31 02:15:10 +00:00 
			
		
		
		
	 b261475a48
			
		
	
	
		b261475a48
		
	
	
	
	
		
			
			* PipeServer.close() now cancels the "accept pipe" future which cancels the overlapped operation. * Fix _SelectorTransport.__repr__() if the transport was closed * Fix debug log in BaseEventLoop.create_connection(): get the socket object from the transport because SSL transport closes the old socket and creates a new SSL socket object. Remove also the _SelectorSslTransport._rawsock attribute: it contained the closed socket (not very useful) and it was not used. * Issue #22063: socket operations (sock_recv, sock_sendall, sock_connect, sock_accept) of the proactor event loop don't raise an exception in debug mode if the socket are in blocking mode. Overlapped operations also work on blocking sockets. * Fix unit tests in debug mode: mock a non-blocking socket for socket operations which now raise an exception if the socket is blocking. * _fatal_error() method of _UnixReadPipeTransport and _UnixWritePipeTransport now log all exceptions in debug mode * Don't log expected errors in unit tests * Tulip issue 200: _WaitHandleFuture._unregister_wait() now catchs and logs exceptions. * Tulip issue 200: Log errors in debug mode instead of simply ignoring them.
		
			
				
	
	
		
			217 lines
		
	
	
	
		
			7.6 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			217 lines
		
	
	
	
		
			7.6 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| from asyncio import subprocess
 | |
| from asyncio import test_utils
 | |
| import asyncio
 | |
| import signal
 | |
| import sys
 | |
| import unittest
 | |
| from test import support
 | |
| if sys.platform != 'win32':
 | |
|     from asyncio import unix_events
 | |
| 
 | |
| # Program blocking
 | |
| PROGRAM_BLOCKED = [sys.executable, '-c', 'import time; time.sleep(3600)']
 | |
| 
 | |
| # Program copying input to output
 | |
| PROGRAM_CAT = [
 | |
|     sys.executable, '-c',
 | |
|     ';'.join(('import sys',
 | |
|               'data = sys.stdin.buffer.read()',
 | |
|               'sys.stdout.buffer.write(data)'))]
 | |
| 
 | |
| class SubprocessMixin:
 | |
| 
 | |
|     def test_stdin_stdout(self):
 | |
|         args = PROGRAM_CAT
 | |
| 
 | |
|         @asyncio.coroutine
 | |
|         def run(data):
 | |
|             proc = yield from asyncio.create_subprocess_exec(
 | |
|                                           *args,
 | |
|                                           stdin=subprocess.PIPE,
 | |
|                                           stdout=subprocess.PIPE,
 | |
|                                           loop=self.loop)
 | |
| 
 | |
|             # feed data
 | |
|             proc.stdin.write(data)
 | |
|             yield from proc.stdin.drain()
 | |
|             proc.stdin.close()
 | |
| 
 | |
|             # get output and exitcode
 | |
|             data = yield from proc.stdout.read()
 | |
|             exitcode = yield from proc.wait()
 | |
|             return (exitcode, data)
 | |
| 
 | |
|         task = run(b'some data')
 | |
|         task = asyncio.wait_for(task, 60.0, loop=self.loop)
 | |
|         exitcode, stdout = self.loop.run_until_complete(task)
 | |
|         self.assertEqual(exitcode, 0)
 | |
|         self.assertEqual(stdout, b'some data')
 | |
| 
 | |
|     def test_communicate(self):
 | |
|         args = PROGRAM_CAT
 | |
| 
 | |
|         @asyncio.coroutine
 | |
|         def run(data):
 | |
|             proc = yield from asyncio.create_subprocess_exec(
 | |
|                                           *args,
 | |
|                                           stdin=subprocess.PIPE,
 | |
|                                           stdout=subprocess.PIPE,
 | |
|                                           loop=self.loop)
 | |
|             stdout, stderr = yield from proc.communicate(data)
 | |
|             return proc.returncode, stdout
 | |
| 
 | |
|         task = run(b'some data')
 | |
|         task = asyncio.wait_for(task, 60.0, loop=self.loop)
 | |
|         exitcode, stdout = self.loop.run_until_complete(task)
 | |
|         self.assertEqual(exitcode, 0)
 | |
|         self.assertEqual(stdout, b'some data')
 | |
| 
 | |
|     def test_shell(self):
 | |
|         create = asyncio.create_subprocess_shell('exit 7',
 | |
|                                                  loop=self.loop)
 | |
|         proc = self.loop.run_until_complete(create)
 | |
|         exitcode = self.loop.run_until_complete(proc.wait())
 | |
|         self.assertEqual(exitcode, 7)
 | |
| 
 | |
|     def test_start_new_session(self):
 | |
|         # start the new process in a new session
 | |
|         create = asyncio.create_subprocess_shell('exit 8',
 | |
|                                                  start_new_session=True,
 | |
|                                                  loop=self.loop)
 | |
|         proc = self.loop.run_until_complete(create)
 | |
|         exitcode = self.loop.run_until_complete(proc.wait())
 | |
|         self.assertEqual(exitcode, 8)
 | |
| 
 | |
|     def test_kill(self):
 | |
|         args = PROGRAM_BLOCKED
 | |
|         create = asyncio.create_subprocess_exec(*args, loop=self.loop)
 | |
|         proc = self.loop.run_until_complete(create)
 | |
|         proc.kill()
 | |
|         returncode = self.loop.run_until_complete(proc.wait())
 | |
|         if sys.platform == 'win32':
 | |
|             self.assertIsInstance(returncode, int)
 | |
|             # expect 1 but sometimes get 0
 | |
|         else:
 | |
|             self.assertEqual(-signal.SIGKILL, returncode)
 | |
| 
 | |
|     def test_terminate(self):
 | |
|         args = PROGRAM_BLOCKED
 | |
|         create = asyncio.create_subprocess_exec(*args, loop=self.loop)
 | |
|         proc = self.loop.run_until_complete(create)
 | |
|         proc.terminate()
 | |
|         returncode = self.loop.run_until_complete(proc.wait())
 | |
|         if sys.platform == 'win32':
 | |
|             self.assertIsInstance(returncode, int)
 | |
|             # expect 1 but sometimes get 0
 | |
|         else:
 | |
|             self.assertEqual(-signal.SIGTERM, returncode)
 | |
| 
 | |
|     @unittest.skipIf(sys.platform == 'win32', "Don't have SIGHUP")
 | |
|     def test_send_signal(self):
 | |
|         code = 'import time; print("sleeping", flush=True); time.sleep(3600)'
 | |
|         args = [sys.executable, '-c', code]
 | |
|         create = asyncio.create_subprocess_exec(*args, loop=self.loop, stdout=subprocess.PIPE)
 | |
|         proc = self.loop.run_until_complete(create)
 | |
| 
 | |
|         @asyncio.coroutine
 | |
|         def send_signal(proc):
 | |
|             # basic synchronization to wait until the program is sleeping
 | |
|             line = yield from proc.stdout.readline()
 | |
|             self.assertEqual(line, b'sleeping\n')
 | |
| 
 | |
|             proc.send_signal(signal.SIGHUP)
 | |
|             returncode = (yield from proc.wait())
 | |
|             return returncode
 | |
| 
 | |
|         returncode = self.loop.run_until_complete(send_signal(proc))
 | |
|         self.assertEqual(-signal.SIGHUP, returncode)
 | |
| 
 | |
|     def prepare_broken_pipe_test(self):
 | |
|         # buffer large enough to feed the whole pipe buffer
 | |
|         large_data = b'x' * support.PIPE_MAX_SIZE
 | |
| 
 | |
|         # the program ends before the stdin can be feeded
 | |
|         create = asyncio.create_subprocess_exec(
 | |
|                              sys.executable, '-c', 'pass',
 | |
|                              stdin=subprocess.PIPE,
 | |
|                              loop=self.loop)
 | |
|         proc = self.loop.run_until_complete(create)
 | |
|         return (proc, large_data)
 | |
| 
 | |
|     def test_stdin_broken_pipe(self):
 | |
|         proc, large_data = self.prepare_broken_pipe_test()
 | |
| 
 | |
|         @asyncio.coroutine
 | |
|         def write_stdin(proc, data):
 | |
|             proc.stdin.write(data)
 | |
|             yield from proc.stdin.drain()
 | |
| 
 | |
|         coro = write_stdin(proc, large_data)
 | |
|         # drain() must raise BrokenPipeError or ConnectionResetError
 | |
|         with test_utils.disable_logger():
 | |
|             self.assertRaises((BrokenPipeError, ConnectionResetError),
 | |
|                               self.loop.run_until_complete, coro)
 | |
|         self.loop.run_until_complete(proc.wait())
 | |
| 
 | |
|     def test_communicate_ignore_broken_pipe(self):
 | |
|         proc, large_data = self.prepare_broken_pipe_test()
 | |
| 
 | |
|         # communicate() must ignore BrokenPipeError when feeding stdin
 | |
|         with test_utils.disable_logger():
 | |
|             self.loop.run_until_complete(proc.communicate(large_data))
 | |
|         self.loop.run_until_complete(proc.wait())
 | |
| 
 | |
| 
 | |
| if sys.platform != 'win32':
 | |
|     # Unix
 | |
|     class SubprocessWatcherMixin(SubprocessMixin):
 | |
| 
 | |
|         Watcher = None
 | |
| 
 | |
|         def setUp(self):
 | |
|             policy = asyncio.get_event_loop_policy()
 | |
|             self.loop = policy.new_event_loop()
 | |
| 
 | |
|             # ensure that the event loop is passed explicitly in asyncio
 | |
|             policy.set_event_loop(None)
 | |
| 
 | |
|             watcher = self.Watcher()
 | |
|             watcher.attach_loop(self.loop)
 | |
|             policy.set_child_watcher(watcher)
 | |
| 
 | |
|         def tearDown(self):
 | |
|             policy = asyncio.get_event_loop_policy()
 | |
|             policy.set_child_watcher(None)
 | |
|             self.loop.close()
 | |
|             super().tearDown()
 | |
| 
 | |
|     class SubprocessSafeWatcherTests(SubprocessWatcherMixin,
 | |
|                                      test_utils.TestCase):
 | |
| 
 | |
|         Watcher = unix_events.SafeChildWatcher
 | |
| 
 | |
|     class SubprocessFastWatcherTests(SubprocessWatcherMixin,
 | |
|                                      test_utils.TestCase):
 | |
| 
 | |
|         Watcher = unix_events.FastChildWatcher
 | |
| 
 | |
| else:
 | |
|     # Windows
 | |
|     class SubprocessProactorTests(SubprocessMixin, test_utils.TestCase):
 | |
| 
 | |
|         def setUp(self):
 | |
|             policy = asyncio.get_event_loop_policy()
 | |
|             self.loop = asyncio.ProactorEventLoop()
 | |
| 
 | |
|             # ensure that the event loop is passed explicitly in asyncio
 | |
|             policy.set_event_loop(None)
 | |
| 
 | |
|         def tearDown(self):
 | |
|             policy = asyncio.get_event_loop_policy()
 | |
|             self.loop.close()
 | |
|             policy.set_event_loop(None)
 | |
|             super().tearDown()
 | |
| 
 | |
| 
 | |
| if __name__ == '__main__':
 | |
|     unittest.main()
 |