mirror of
				https://github.com/python/cpython.git
				synced 2025-10-25 07:48:51 +00:00 
			
		
		
		
	 29ad0111bd
			
		
	
	
		29ad0111bd
		
	
	
	
	
		
			
			* PipeHandle now uses None instead of -1 for a closed handle * Sort imports in windows_utils. * Fix test_events on Python older than 3.5. Skip SSL tests on the ProactorEventLoop if ssl.MemoryIO is missing * Fix BaseEventLoop._create_connection_transport(). Close the transport if the creation of the transport (if the waiter) gets an exception. * _ProactorBasePipeTransport now sets _sock to None when the transport is closed. * Fix BaseSubprocessTransport.close(). Ignore pipes for which the protocol is not set yet (still equal to None). * TestLoop.close() now calls the close() method of the parent class (BaseEventLoop). * Cleanup BaseSelectorEventLoop: create the protocol on a separated line for readability and ease debugging. * Fix BaseSubprocessTransport._kill_wait(). Set the _returncode attribute, so close() doesn't try to terminate the process. * Tests: explicitly close event loops and transports * UNIX pipe transports: add closed/closing in repr(). Add "closed" or "closing" state in the __repr__() method of _UnixReadPipeTransport and _UnixWritePipeTransport classes.
		
			
				
	
	
		
			473 lines
		
	
	
	
		
			16 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			473 lines
		
	
	
	
		
			16 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| """Tests for futures.py."""
 | |
| 
 | |
| import concurrent.futures
 | |
| import re
 | |
| import sys
 | |
| import threading
 | |
| import unittest
 | |
| from unittest import mock
 | |
| 
 | |
| import asyncio
 | |
| from asyncio import test_utils
 | |
| try:
 | |
|     from test import support
 | |
| except ImportError:
 | |
|     from asyncio import test_support as support
 | |
| 
 | |
| 
 | |
| def _fakefunc(f):
 | |
|     return f
 | |
| 
 | |
| def first_cb():
 | |
|     pass
 | |
| 
 | |
| def last_cb():
 | |
|     pass
 | |
| 
 | |
| 
 | |
| class FutureTests(test_utils.TestCase):
 | |
| 
 | |
|     def setUp(self):
 | |
|         self.loop = self.new_test_loop()
 | |
|         self.addCleanup(self.loop.close)
 | |
| 
 | |
|     def test_initial_state(self):
 | |
|         f = asyncio.Future(loop=self.loop)
 | |
|         self.assertFalse(f.cancelled())
 | |
|         self.assertFalse(f.done())
 | |
|         f.cancel()
 | |
|         self.assertTrue(f.cancelled())
 | |
| 
 | |
|     def test_init_constructor_default_loop(self):
 | |
|         asyncio.set_event_loop(self.loop)
 | |
|         f = asyncio.Future()
 | |
|         self.assertIs(f._loop, self.loop)
 | |
| 
 | |
|     def test_constructor_positional(self):
 | |
|         # Make sure Future doesn't accept a positional argument
 | |
|         self.assertRaises(TypeError, asyncio.Future, 42)
 | |
| 
 | |
|     def test_cancel(self):
 | |
|         f = asyncio.Future(loop=self.loop)
 | |
|         self.assertTrue(f.cancel())
 | |
|         self.assertTrue(f.cancelled())
 | |
|         self.assertTrue(f.done())
 | |
|         self.assertRaises(asyncio.CancelledError, f.result)
 | |
|         self.assertRaises(asyncio.CancelledError, f.exception)
 | |
|         self.assertRaises(asyncio.InvalidStateError, f.set_result, None)
 | |
|         self.assertRaises(asyncio.InvalidStateError, f.set_exception, None)
 | |
|         self.assertFalse(f.cancel())
 | |
| 
 | |
|     def test_result(self):
 | |
|         f = asyncio.Future(loop=self.loop)
 | |
|         self.assertRaises(asyncio.InvalidStateError, f.result)
 | |
| 
 | |
|         f.set_result(42)
 | |
|         self.assertFalse(f.cancelled())
 | |
|         self.assertTrue(f.done())
 | |
|         self.assertEqual(f.result(), 42)
 | |
|         self.assertEqual(f.exception(), None)
 | |
|         self.assertRaises(asyncio.InvalidStateError, f.set_result, None)
 | |
|         self.assertRaises(asyncio.InvalidStateError, f.set_exception, None)
 | |
|         self.assertFalse(f.cancel())
 | |
| 
 | |
|     def test_exception(self):
 | |
|         exc = RuntimeError()
 | |
|         f = asyncio.Future(loop=self.loop)
 | |
|         self.assertRaises(asyncio.InvalidStateError, f.exception)
 | |
| 
 | |
|         f.set_exception(exc)
 | |
|         self.assertFalse(f.cancelled())
 | |
|         self.assertTrue(f.done())
 | |
|         self.assertRaises(RuntimeError, f.result)
 | |
|         self.assertEqual(f.exception(), exc)
 | |
|         self.assertRaises(asyncio.InvalidStateError, f.set_result, None)
 | |
|         self.assertRaises(asyncio.InvalidStateError, f.set_exception, None)
 | |
|         self.assertFalse(f.cancel())
 | |
| 
 | |
|     def test_exception_class(self):
 | |
|         f = asyncio.Future(loop=self.loop)
 | |
|         f.set_exception(RuntimeError)
 | |
|         self.assertIsInstance(f.exception(), RuntimeError)
 | |
| 
 | |
|     def test_yield_from_twice(self):
 | |
|         f = asyncio.Future(loop=self.loop)
 | |
| 
 | |
|         def fixture():
 | |
|             yield 'A'
 | |
|             x = yield from f
 | |
|             yield 'B', x
 | |
|             y = yield from f
 | |
|             yield 'C', y
 | |
| 
 | |
|         g = fixture()
 | |
|         self.assertEqual(next(g), 'A')  # yield 'A'.
 | |
|         self.assertEqual(next(g), f)  # First yield from f.
 | |
|         f.set_result(42)
 | |
|         self.assertEqual(next(g), ('B', 42))  # yield 'B', x.
 | |
|         # The second "yield from f" does not yield f.
 | |
|         self.assertEqual(next(g), ('C', 42))  # yield 'C', y.
 | |
| 
 | |
|     def test_future_repr(self):
 | |
|         self.loop.set_debug(True)
 | |
|         f_pending_debug = asyncio.Future(loop=self.loop)
 | |
|         frame = f_pending_debug._source_traceback[-1]
 | |
|         self.assertEqual(repr(f_pending_debug),
 | |
|                          '<Future pending created at %s:%s>'
 | |
|                          % (frame[0], frame[1]))
 | |
|         f_pending_debug.cancel()
 | |
| 
 | |
|         self.loop.set_debug(False)
 | |
|         f_pending = asyncio.Future(loop=self.loop)
 | |
|         self.assertEqual(repr(f_pending), '<Future pending>')
 | |
|         f_pending.cancel()
 | |
| 
 | |
|         f_cancelled = asyncio.Future(loop=self.loop)
 | |
|         f_cancelled.cancel()
 | |
|         self.assertEqual(repr(f_cancelled), '<Future cancelled>')
 | |
| 
 | |
|         f_result = asyncio.Future(loop=self.loop)
 | |
|         f_result.set_result(4)
 | |
|         self.assertEqual(repr(f_result), '<Future finished result=4>')
 | |
|         self.assertEqual(f_result.result(), 4)
 | |
| 
 | |
|         exc = RuntimeError()
 | |
|         f_exception = asyncio.Future(loop=self.loop)
 | |
|         f_exception.set_exception(exc)
 | |
|         self.assertEqual(repr(f_exception),
 | |
|                          '<Future finished exception=RuntimeError()>')
 | |
|         self.assertIs(f_exception.exception(), exc)
 | |
| 
 | |
|         def func_repr(func):
 | |
|             filename, lineno = test_utils.get_function_source(func)
 | |
|             text = '%s() at %s:%s' % (func.__qualname__, filename, lineno)
 | |
|             return re.escape(text)
 | |
| 
 | |
|         f_one_callbacks = asyncio.Future(loop=self.loop)
 | |
|         f_one_callbacks.add_done_callback(_fakefunc)
 | |
|         fake_repr = func_repr(_fakefunc)
 | |
|         self.assertRegex(repr(f_one_callbacks),
 | |
|                          r'<Future pending cb=\[%s\]>' % fake_repr)
 | |
|         f_one_callbacks.cancel()
 | |
|         self.assertEqual(repr(f_one_callbacks),
 | |
|                          '<Future cancelled>')
 | |
| 
 | |
|         f_two_callbacks = asyncio.Future(loop=self.loop)
 | |
|         f_two_callbacks.add_done_callback(first_cb)
 | |
|         f_two_callbacks.add_done_callback(last_cb)
 | |
|         first_repr = func_repr(first_cb)
 | |
|         last_repr = func_repr(last_cb)
 | |
|         self.assertRegex(repr(f_two_callbacks),
 | |
|                          r'<Future pending cb=\[%s, %s\]>'
 | |
|                          % (first_repr, last_repr))
 | |
| 
 | |
|         f_many_callbacks = asyncio.Future(loop=self.loop)
 | |
|         f_many_callbacks.add_done_callback(first_cb)
 | |
|         for i in range(8):
 | |
|             f_many_callbacks.add_done_callback(_fakefunc)
 | |
|         f_many_callbacks.add_done_callback(last_cb)
 | |
|         cb_regex = r'%s, <8 more>, %s' % (first_repr, last_repr)
 | |
|         self.assertRegex(repr(f_many_callbacks),
 | |
|                          r'<Future pending cb=\[%s\]>' % cb_regex)
 | |
|         f_many_callbacks.cancel()
 | |
|         self.assertEqual(repr(f_many_callbacks),
 | |
|                          '<Future cancelled>')
 | |
| 
 | |
|     def test_copy_state(self):
 | |
|         # Test the internal _copy_state method since it's being directly
 | |
|         # invoked in other modules.
 | |
|         f = asyncio.Future(loop=self.loop)
 | |
|         f.set_result(10)
 | |
| 
 | |
|         newf = asyncio.Future(loop=self.loop)
 | |
|         newf._copy_state(f)
 | |
|         self.assertTrue(newf.done())
 | |
|         self.assertEqual(newf.result(), 10)
 | |
| 
 | |
|         f_exception = asyncio.Future(loop=self.loop)
 | |
|         f_exception.set_exception(RuntimeError())
 | |
| 
 | |
|         newf_exception = asyncio.Future(loop=self.loop)
 | |
|         newf_exception._copy_state(f_exception)
 | |
|         self.assertTrue(newf_exception.done())
 | |
|         self.assertRaises(RuntimeError, newf_exception.result)
 | |
| 
 | |
|         f_cancelled = asyncio.Future(loop=self.loop)
 | |
|         f_cancelled.cancel()
 | |
| 
 | |
|         newf_cancelled = asyncio.Future(loop=self.loop)
 | |
|         newf_cancelled._copy_state(f_cancelled)
 | |
|         self.assertTrue(newf_cancelled.cancelled())
 | |
| 
 | |
|     def test_iter(self):
 | |
|         fut = asyncio.Future(loop=self.loop)
 | |
| 
 | |
|         def coro():
 | |
|             yield from fut
 | |
| 
 | |
|         def test():
 | |
|             arg1, arg2 = coro()
 | |
| 
 | |
|         self.assertRaises(AssertionError, test)
 | |
|         fut.cancel()
 | |
| 
 | |
|     @mock.patch('asyncio.base_events.logger')
 | |
|     def test_tb_logger_abandoned(self, m_log):
 | |
|         fut = asyncio.Future(loop=self.loop)
 | |
|         del fut
 | |
|         self.assertFalse(m_log.error.called)
 | |
| 
 | |
|     @mock.patch('asyncio.base_events.logger')
 | |
|     def test_tb_logger_result_unretrieved(self, m_log):
 | |
|         fut = asyncio.Future(loop=self.loop)
 | |
|         fut.set_result(42)
 | |
|         del fut
 | |
|         self.assertFalse(m_log.error.called)
 | |
| 
 | |
|     @mock.patch('asyncio.base_events.logger')
 | |
|     def test_tb_logger_result_retrieved(self, m_log):
 | |
|         fut = asyncio.Future(loop=self.loop)
 | |
|         fut.set_result(42)
 | |
|         fut.result()
 | |
|         del fut
 | |
|         self.assertFalse(m_log.error.called)
 | |
| 
 | |
|     @mock.patch('asyncio.base_events.logger')
 | |
|     def test_tb_logger_exception_unretrieved(self, m_log):
 | |
|         fut = asyncio.Future(loop=self.loop)
 | |
|         fut.set_exception(RuntimeError('boom'))
 | |
|         del fut
 | |
|         test_utils.run_briefly(self.loop)
 | |
|         self.assertTrue(m_log.error.called)
 | |
| 
 | |
|     @mock.patch('asyncio.base_events.logger')
 | |
|     def test_tb_logger_exception_retrieved(self, m_log):
 | |
|         fut = asyncio.Future(loop=self.loop)
 | |
|         fut.set_exception(RuntimeError('boom'))
 | |
|         fut.exception()
 | |
|         del fut
 | |
|         self.assertFalse(m_log.error.called)
 | |
| 
 | |
|     @mock.patch('asyncio.base_events.logger')
 | |
|     def test_tb_logger_exception_result_retrieved(self, m_log):
 | |
|         fut = asyncio.Future(loop=self.loop)
 | |
|         fut.set_exception(RuntimeError('boom'))
 | |
|         self.assertRaises(RuntimeError, fut.result)
 | |
|         del fut
 | |
|         self.assertFalse(m_log.error.called)
 | |
| 
 | |
|     def test_wrap_future(self):
 | |
| 
 | |
|         def run(arg):
 | |
|             return (arg, threading.get_ident())
 | |
|         ex = concurrent.futures.ThreadPoolExecutor(1)
 | |
|         f1 = ex.submit(run, 'oi')
 | |
|         f2 = asyncio.wrap_future(f1, loop=self.loop)
 | |
|         res, ident = self.loop.run_until_complete(f2)
 | |
|         self.assertIsInstance(f2, asyncio.Future)
 | |
|         self.assertEqual(res, 'oi')
 | |
|         self.assertNotEqual(ident, threading.get_ident())
 | |
| 
 | |
|     def test_wrap_future_future(self):
 | |
|         f1 = asyncio.Future(loop=self.loop)
 | |
|         f2 = asyncio.wrap_future(f1)
 | |
|         self.assertIs(f1, f2)
 | |
| 
 | |
|     @mock.patch('asyncio.futures.events')
 | |
|     def test_wrap_future_use_global_loop(self, m_events):
 | |
|         def run(arg):
 | |
|             return (arg, threading.get_ident())
 | |
|         ex = concurrent.futures.ThreadPoolExecutor(1)
 | |
|         f1 = ex.submit(run, 'oi')
 | |
|         f2 = asyncio.wrap_future(f1)
 | |
|         self.assertIs(m_events.get_event_loop.return_value, f2._loop)
 | |
| 
 | |
|     def test_wrap_future_cancel(self):
 | |
|         f1 = concurrent.futures.Future()
 | |
|         f2 = asyncio.wrap_future(f1, loop=self.loop)
 | |
|         f2.cancel()
 | |
|         test_utils.run_briefly(self.loop)
 | |
|         self.assertTrue(f1.cancelled())
 | |
|         self.assertTrue(f2.cancelled())
 | |
| 
 | |
|     def test_wrap_future_cancel2(self):
 | |
|         f1 = concurrent.futures.Future()
 | |
|         f2 = asyncio.wrap_future(f1, loop=self.loop)
 | |
|         f1.set_result(42)
 | |
|         f2.cancel()
 | |
|         test_utils.run_briefly(self.loop)
 | |
|         self.assertFalse(f1.cancelled())
 | |
|         self.assertEqual(f1.result(), 42)
 | |
|         self.assertTrue(f2.cancelled())
 | |
| 
 | |
|     def test_future_source_traceback(self):
 | |
|         self.loop.set_debug(True)
 | |
| 
 | |
|         future = asyncio.Future(loop=self.loop)
 | |
|         lineno = sys._getframe().f_lineno - 1
 | |
|         self.assertIsInstance(future._source_traceback, list)
 | |
|         self.assertEqual(future._source_traceback[-1][:3],
 | |
|                          (__file__,
 | |
|                           lineno,
 | |
|                           'test_future_source_traceback'))
 | |
| 
 | |
|     @mock.patch('asyncio.base_events.logger')
 | |
|     def check_future_exception_never_retrieved(self, debug, m_log):
 | |
|         self.loop.set_debug(debug)
 | |
| 
 | |
|         def memory_error():
 | |
|             try:
 | |
|                 raise MemoryError()
 | |
|             except BaseException as exc:
 | |
|                 return exc
 | |
|         exc = memory_error()
 | |
| 
 | |
|         future = asyncio.Future(loop=self.loop)
 | |
|         if debug:
 | |
|             source_traceback = future._source_traceback
 | |
|         future.set_exception(exc)
 | |
|         future = None
 | |
|         test_utils.run_briefly(self.loop)
 | |
|         support.gc_collect()
 | |
| 
 | |
|         if sys.version_info >= (3, 4):
 | |
|             if debug:
 | |
|                 frame = source_traceback[-1]
 | |
|                 regex = (r'^Future exception was never retrieved\n'
 | |
|                          r'future: <Future finished exception=MemoryError\(\) '
 | |
|                              r'created at {filename}:{lineno}>\n'
 | |
|                          r'source_traceback: Object '
 | |
|                             r'created at \(most recent call last\):\n'
 | |
|                          r'  File'
 | |
|                          r'.*\n'
 | |
|                          r'  File "{filename}", line {lineno}, '
 | |
|                             r'in check_future_exception_never_retrieved\n'
 | |
|                          r'    future = asyncio\.Future\(loop=self\.loop\)$'
 | |
|                          ).format(filename=re.escape(frame[0]),
 | |
|                                   lineno=frame[1])
 | |
|             else:
 | |
|                 regex = (r'^Future exception was never retrieved\n'
 | |
|                          r'future: '
 | |
|                             r'<Future finished exception=MemoryError\(\)>$'
 | |
|                          )
 | |
|             exc_info = (type(exc), exc, exc.__traceback__)
 | |
|             m_log.error.assert_called_once_with(mock.ANY, exc_info=exc_info)
 | |
|         else:
 | |
|             if debug:
 | |
|                 frame = source_traceback[-1]
 | |
|                 regex = (r'^Future/Task exception was never retrieved\n'
 | |
|                          r'Future/Task created at \(most recent call last\):\n'
 | |
|                          r'  File'
 | |
|                          r'.*\n'
 | |
|                          r'  File "{filename}", line {lineno}, '
 | |
|                             r'in check_future_exception_never_retrieved\n'
 | |
|                          r'    future = asyncio\.Future\(loop=self\.loop\)\n'
 | |
|                          r'Traceback \(most recent call last\):\n'
 | |
|                          r'.*\n'
 | |
|                          r'MemoryError$'
 | |
|                          ).format(filename=re.escape(frame[0]),
 | |
|                                   lineno=frame[1])
 | |
|             else:
 | |
|                 regex = (r'^Future/Task exception was never retrieved\n'
 | |
|                          r'Traceback \(most recent call last\):\n'
 | |
|                          r'.*\n'
 | |
|                          r'MemoryError$'
 | |
|                          )
 | |
|             m_log.error.assert_called_once_with(mock.ANY, exc_info=False)
 | |
|         message = m_log.error.call_args[0][0]
 | |
|         self.assertRegex(message, re.compile(regex, re.DOTALL))
 | |
| 
 | |
|     def test_future_exception_never_retrieved(self):
 | |
|         self.check_future_exception_never_retrieved(False)
 | |
| 
 | |
|     def test_future_exception_never_retrieved_debug(self):
 | |
|         self.check_future_exception_never_retrieved(True)
 | |
| 
 | |
|     def test_set_result_unless_cancelled(self):
 | |
|         fut = asyncio.Future(loop=self.loop)
 | |
|         fut.cancel()
 | |
|         fut._set_result_unless_cancelled(2)
 | |
|         self.assertTrue(fut.cancelled())
 | |
| 
 | |
| 
 | |
| class FutureDoneCallbackTests(test_utils.TestCase):
 | |
| 
 | |
|     def setUp(self):
 | |
|         self.loop = self.new_test_loop()
 | |
| 
 | |
|     def run_briefly(self):
 | |
|         test_utils.run_briefly(self.loop)
 | |
| 
 | |
|     def _make_callback(self, bag, thing):
 | |
|         # Create a callback function that appends thing to bag.
 | |
|         def bag_appender(future):
 | |
|             bag.append(thing)
 | |
|         return bag_appender
 | |
| 
 | |
|     def _new_future(self):
 | |
|         return asyncio.Future(loop=self.loop)
 | |
| 
 | |
|     def test_callbacks_invoked_on_set_result(self):
 | |
|         bag = []
 | |
|         f = self._new_future()
 | |
|         f.add_done_callback(self._make_callback(bag, 42))
 | |
|         f.add_done_callback(self._make_callback(bag, 17))
 | |
| 
 | |
|         self.assertEqual(bag, [])
 | |
|         f.set_result('foo')
 | |
| 
 | |
|         self.run_briefly()
 | |
| 
 | |
|         self.assertEqual(bag, [42, 17])
 | |
|         self.assertEqual(f.result(), 'foo')
 | |
| 
 | |
|     def test_callbacks_invoked_on_set_exception(self):
 | |
|         bag = []
 | |
|         f = self._new_future()
 | |
|         f.add_done_callback(self._make_callback(bag, 100))
 | |
| 
 | |
|         self.assertEqual(bag, [])
 | |
|         exc = RuntimeError()
 | |
|         f.set_exception(exc)
 | |
| 
 | |
|         self.run_briefly()
 | |
| 
 | |
|         self.assertEqual(bag, [100])
 | |
|         self.assertEqual(f.exception(), exc)
 | |
| 
 | |
|     def test_remove_done_callback(self):
 | |
|         bag = []
 | |
|         f = self._new_future()
 | |
|         cb1 = self._make_callback(bag, 1)
 | |
|         cb2 = self._make_callback(bag, 2)
 | |
|         cb3 = self._make_callback(bag, 3)
 | |
| 
 | |
|         # Add one cb1 and one cb2.
 | |
|         f.add_done_callback(cb1)
 | |
|         f.add_done_callback(cb2)
 | |
| 
 | |
|         # One instance of cb2 removed. Now there's only one cb1.
 | |
|         self.assertEqual(f.remove_done_callback(cb2), 1)
 | |
| 
 | |
|         # Never had any cb3 in there.
 | |
|         self.assertEqual(f.remove_done_callback(cb3), 0)
 | |
| 
 | |
|         # After this there will be 6 instances of cb1 and one of cb2.
 | |
|         f.add_done_callback(cb2)
 | |
|         for i in range(5):
 | |
|             f.add_done_callback(cb1)
 | |
| 
 | |
|         # Remove all instances of cb1. One cb2 remains.
 | |
|         self.assertEqual(f.remove_done_callback(cb1), 6)
 | |
| 
 | |
|         self.assertEqual(bag, [])
 | |
|         f.set_result('foo')
 | |
| 
 | |
|         self.run_briefly()
 | |
| 
 | |
|         self.assertEqual(bag, [2])
 | |
|         self.assertEqual(f.result(), 'foo')
 | |
| 
 | |
| 
 | |
| if __name__ == '__main__':
 | |
|     unittest.main()
 |