mirror of
				https://github.com/python/cpython.git
				synced 2025-11-04 11:49:12 +00:00 
			
		
		
		
	
		
			
				
	
	
		
			184 lines
		
	
	
	
		
			6.3 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			184 lines
		
	
	
	
		
			6.3 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
from asyncio import subprocess
 | 
						|
import asyncio
 | 
						|
import signal
 | 
						|
import sys
 | 
						|
import unittest
 | 
						|
from test import support
 | 
						|
if sys.platform != 'win32':
 | 
						|
    from asyncio import unix_events
 | 
						|
 | 
						|
# Program blocking
 | 
						|
PROGRAM_BLOCKED = [sys.executable, '-c', 'import time; time.sleep(3600)']
 | 
						|
 | 
						|
# Program sleeping during 1 second
 | 
						|
PROGRAM_SLEEP_1SEC = [sys.executable, '-c', 'import time; time.sleep(1)']
 | 
						|
 | 
						|
# Program copying input to output
 | 
						|
PROGRAM_CAT = [
 | 
						|
    sys.executable, '-c',
 | 
						|
    ';'.join(('import sys',
 | 
						|
              'data = sys.stdin.buffer.read()',
 | 
						|
              'sys.stdout.buffer.write(data)'))]
 | 
						|
 | 
						|
class SubprocessMixin:
 | 
						|
 | 
						|
    def test_stdin_stdout(self):
 | 
						|
        args = PROGRAM_CAT
 | 
						|
 | 
						|
        @asyncio.coroutine
 | 
						|
        def run(data):
 | 
						|
            proc = yield from asyncio.create_subprocess_exec(
 | 
						|
                                          *args,
 | 
						|
                                          stdin=subprocess.PIPE,
 | 
						|
                                          stdout=subprocess.PIPE,
 | 
						|
                                          loop=self.loop)
 | 
						|
 | 
						|
            # feed data
 | 
						|
            proc.stdin.write(data)
 | 
						|
            yield from proc.stdin.drain()
 | 
						|
            proc.stdin.close()
 | 
						|
 | 
						|
            # get output and exitcode
 | 
						|
            data = yield from proc.stdout.read()
 | 
						|
            exitcode = yield from proc.wait()
 | 
						|
            return (exitcode, data)
 | 
						|
 | 
						|
        task = run(b'some data')
 | 
						|
        task = asyncio.wait_for(task, 10.0, loop=self.loop)
 | 
						|
        exitcode, stdout = self.loop.run_until_complete(task)
 | 
						|
        self.assertEqual(exitcode, 0)
 | 
						|
        self.assertEqual(stdout, b'some data')
 | 
						|
 | 
						|
    def test_communicate(self):
 | 
						|
        args = PROGRAM_CAT
 | 
						|
 | 
						|
        @asyncio.coroutine
 | 
						|
        def run(data):
 | 
						|
            proc = yield from asyncio.create_subprocess_exec(
 | 
						|
                                          *args,
 | 
						|
                                          stdin=subprocess.PIPE,
 | 
						|
                                          stdout=subprocess.PIPE,
 | 
						|
                                          loop=self.loop)
 | 
						|
            stdout, stderr = yield from proc.communicate(data)
 | 
						|
            return proc.returncode, stdout
 | 
						|
 | 
						|
        task = run(b'some data')
 | 
						|
        task = asyncio.wait_for(task, 10.0, loop=self.loop)
 | 
						|
        exitcode, stdout = self.loop.run_until_complete(task)
 | 
						|
        self.assertEqual(exitcode, 0)
 | 
						|
        self.assertEqual(stdout, b'some data')
 | 
						|
 | 
						|
    def test_shell(self):
 | 
						|
        create = asyncio.create_subprocess_shell('exit 7',
 | 
						|
                                                 loop=self.loop)
 | 
						|
        proc = self.loop.run_until_complete(create)
 | 
						|
        exitcode = self.loop.run_until_complete(proc.wait())
 | 
						|
        self.assertEqual(exitcode, 7)
 | 
						|
 | 
						|
    def test_start_new_session(self):
 | 
						|
        # start the new process in a new session
 | 
						|
        create = asyncio.create_subprocess_shell('exit 8',
 | 
						|
                                                 start_new_session=True,
 | 
						|
                                                 loop=self.loop)
 | 
						|
        proc = self.loop.run_until_complete(create)
 | 
						|
        exitcode = self.loop.run_until_complete(proc.wait())
 | 
						|
        self.assertEqual(exitcode, 8)
 | 
						|
 | 
						|
    def test_kill(self):
 | 
						|
        args = PROGRAM_BLOCKED
 | 
						|
        create = asyncio.create_subprocess_exec(*args, loop=self.loop)
 | 
						|
        proc = self.loop.run_until_complete(create)
 | 
						|
        proc.kill()
 | 
						|
        returncode = self.loop.run_until_complete(proc.wait())
 | 
						|
        if sys.platform == 'win32':
 | 
						|
            self.assertIsInstance(returncode, int)
 | 
						|
            # expect 1 but sometimes get 0
 | 
						|
        else:
 | 
						|
            self.assertEqual(-signal.SIGKILL, returncode)
 | 
						|
 | 
						|
    def test_terminate(self):
 | 
						|
        args = PROGRAM_BLOCKED
 | 
						|
        create = asyncio.create_subprocess_exec(*args, loop=self.loop)
 | 
						|
        proc = self.loop.run_until_complete(create)
 | 
						|
        proc.terminate()
 | 
						|
        returncode = self.loop.run_until_complete(proc.wait())
 | 
						|
        if sys.platform == 'win32':
 | 
						|
            self.assertIsInstance(returncode, int)
 | 
						|
            # expect 1 but sometimes get 0
 | 
						|
        else:
 | 
						|
            self.assertEqual(-signal.SIGTERM, returncode)
 | 
						|
 | 
						|
    @unittest.skipIf(sys.platform == 'win32', "Don't have SIGHUP")
 | 
						|
    def test_send_signal(self):
 | 
						|
        args = PROGRAM_BLOCKED
 | 
						|
        create = asyncio.create_subprocess_exec(*args, loop=self.loop)
 | 
						|
        proc = self.loop.run_until_complete(create)
 | 
						|
        proc.send_signal(signal.SIGHUP)
 | 
						|
        returncode = self.loop.run_until_complete(proc.wait())
 | 
						|
        self.assertEqual(-signal.SIGHUP, returncode)
 | 
						|
 | 
						|
    def test_broken_pipe(self):
 | 
						|
        large_data = b'x' * support.PIPE_MAX_SIZE
 | 
						|
 | 
						|
        create = asyncio.create_subprocess_exec(
 | 
						|
                             *PROGRAM_SLEEP_1SEC,
 | 
						|
                             stdin=subprocess.PIPE,
 | 
						|
                             loop=self.loop)
 | 
						|
        proc = self.loop.run_until_complete(create)
 | 
						|
        with self.assertRaises(BrokenPipeError):
 | 
						|
            self.loop.run_until_complete(proc.communicate(large_data))
 | 
						|
        self.loop.run_until_complete(proc.wait())
 | 
						|
 | 
						|
 | 
						|
if sys.platform != 'win32':
 | 
						|
    # Unix
 | 
						|
    class SubprocessWatcherMixin(SubprocessMixin):
 | 
						|
 | 
						|
        Watcher = None
 | 
						|
 | 
						|
        def setUp(self):
 | 
						|
            policy = asyncio.get_event_loop_policy()
 | 
						|
            self.loop = policy.new_event_loop()
 | 
						|
 | 
						|
            # ensure that the event loop is passed explicitly in the code
 | 
						|
            policy.set_event_loop(None)
 | 
						|
 | 
						|
            watcher = self.Watcher()
 | 
						|
            watcher.attach_loop(self.loop)
 | 
						|
            policy.set_child_watcher(watcher)
 | 
						|
 | 
						|
        def tearDown(self):
 | 
						|
            policy = asyncio.get_event_loop_policy()
 | 
						|
            policy.set_child_watcher(None)
 | 
						|
            self.loop.close()
 | 
						|
            policy.set_event_loop(None)
 | 
						|
 | 
						|
    class SubprocessSafeWatcherTests(SubprocessWatcherMixin,
 | 
						|
                                     unittest.TestCase):
 | 
						|
 | 
						|
        Watcher = unix_events.SafeChildWatcher
 | 
						|
 | 
						|
    class SubprocessFastWatcherTests(SubprocessWatcherMixin,
 | 
						|
                                     unittest.TestCase):
 | 
						|
 | 
						|
        Watcher = unix_events.FastChildWatcher
 | 
						|
 | 
						|
else:
 | 
						|
    # Windows
 | 
						|
    class SubprocessProactorTests(SubprocessMixin, unittest.TestCase):
 | 
						|
 | 
						|
        def setUp(self):
 | 
						|
            policy = asyncio.get_event_loop_policy()
 | 
						|
            self.loop = asyncio.ProactorEventLoop()
 | 
						|
 | 
						|
            # ensure that the event loop is passed explicitly in the code
 | 
						|
            policy.set_event_loop(None)
 | 
						|
 | 
						|
        def tearDown(self):
 | 
						|
            policy = asyncio.get_event_loop_policy()
 | 
						|
            self.loop.close()
 | 
						|
            policy.set_event_loop(None)
 | 
						|
 | 
						|
 | 
						|
if __name__ == '__main__':
 | 
						|
    unittest.main()
 |