mirror of
				https://github.com/python/cpython.git
				synced 2025-11-03 19:34:08 +00:00 
			
		
		
		
	Rewrite asyncio test to be more meaningful (#4363)
This commit is contained in:
		
							parent
							
								
									9f914a01af
								
							
						
					
					
						commit
						4652bf2acc
					
				
					 1 changed files with 18 additions and 13 deletions
				
			
		| 
						 | 
				
			
			@ -530,20 +530,25 @@ class BaseEventLoopTests(test_utils.TestCase):
 | 
			
		|||
            other_loop.run_until_complete, task)
 | 
			
		||||
 | 
			
		||||
    def test_run_until_complete_loop_orphan_future_close_loop(self):
 | 
			
		||||
        async def foo(sec=0):
 | 
			
		||||
            await asyncio.sleep(sec)
 | 
			
		||||
 | 
			
		||||
        self.loop.close()
 | 
			
		||||
        loop = asyncio.new_event_loop()
 | 
			
		||||
        asyncio.set_event_loop(loop)
 | 
			
		||||
        try:
 | 
			
		||||
            with mock.patch('asyncio.base_events.BaseEventLoop.run_forever', 
 | 
			
		||||
                            side_effect=Exception):
 | 
			
		||||
                loop.run_until_complete(foo())
 | 
			
		||||
        except:
 | 
			
		||||
        class ShowStopper(BaseException):
 | 
			
		||||
            pass
 | 
			
		||||
        loop.run_until_complete(foo(0.1))
 | 
			
		||||
        loop.close()
 | 
			
		||||
 | 
			
		||||
        async def foo(delay):
 | 
			
		||||
            await asyncio.sleep(delay, loop=self.loop)
 | 
			
		||||
 | 
			
		||||
        def throw():
 | 
			
		||||
            raise ShowStopper
 | 
			
		||||
 | 
			
		||||
        self.loop._process_events = mock.Mock()
 | 
			
		||||
        self.loop.call_soon(throw)
 | 
			
		||||
        try:
 | 
			
		||||
            self.loop.run_until_complete(foo(0.1))
 | 
			
		||||
        except ShowStopper:
 | 
			
		||||
            pass
 | 
			
		||||
 | 
			
		||||
        # This call fails if run_until_complete does not clean up
 | 
			
		||||
        # done-callback for the previous future.
 | 
			
		||||
        self.loop.run_until_complete(foo(0.2))
 | 
			
		||||
 | 
			
		||||
    def test_subprocess_exec_invalid_args(self):
 | 
			
		||||
        args = [sys.executable, '-c', 'pass']
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
		Loading…
	
	Add table
		Add a link
		
	
		Reference in a new issue