mirror of
				https://github.com/python/cpython.git
				synced 2025-11-04 11:49:12 +00:00 
			
		
		
		
	* bpo-38148: Add slots to asyncio transports * Update Misc/NEWS.d/next/Library/2019-09-13-08-55-43.bpo-38148.Lnww6D.rst Co-Authored-By: Kyle Stanley <aeros167@gmail.com>
		
			
				
	
	
		
			97 lines
		
	
	
	
		
			3.5 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			97 lines
		
	
	
	
		
			3.5 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
"""Tests for transports.py."""
 | 
						|
 | 
						|
import unittest
 | 
						|
from unittest import mock
 | 
						|
 | 
						|
import asyncio
 | 
						|
from asyncio import transports
 | 
						|
 | 
						|
 | 
						|
class TransportTests(unittest.TestCase):
 | 
						|
 | 
						|
    def test_ctor_extra_is_none(self):
 | 
						|
        transport = asyncio.Transport()
 | 
						|
        self.assertEqual(transport._extra, {})
 | 
						|
 | 
						|
    def test_get_extra_info(self):
 | 
						|
        transport = asyncio.Transport({'extra': 'info'})
 | 
						|
        self.assertEqual('info', transport.get_extra_info('extra'))
 | 
						|
        self.assertIsNone(transport.get_extra_info('unknown'))
 | 
						|
 | 
						|
        default = object()
 | 
						|
        self.assertIs(default, transport.get_extra_info('unknown', default))
 | 
						|
 | 
						|
    def test_writelines(self):
 | 
						|
        writer = mock.Mock()
 | 
						|
 | 
						|
        class MyTransport(asyncio.Transport):
 | 
						|
            def write(self, data):
 | 
						|
                writer(data)
 | 
						|
 | 
						|
        transport = MyTransport()
 | 
						|
 | 
						|
        transport.writelines([b'line1',
 | 
						|
                              bytearray(b'line2'),
 | 
						|
                              memoryview(b'line3')])
 | 
						|
        self.assertEqual(1, writer.call_count)
 | 
						|
        writer.assert_called_with(b'line1line2line3')
 | 
						|
 | 
						|
    def test_not_implemented(self):
 | 
						|
        transport = asyncio.Transport()
 | 
						|
 | 
						|
        self.assertRaises(NotImplementedError,
 | 
						|
                          transport.set_write_buffer_limits)
 | 
						|
        self.assertRaises(NotImplementedError, transport.get_write_buffer_size)
 | 
						|
        self.assertRaises(NotImplementedError, transport.write, 'data')
 | 
						|
        self.assertRaises(NotImplementedError, transport.write_eof)
 | 
						|
        self.assertRaises(NotImplementedError, transport.can_write_eof)
 | 
						|
        self.assertRaises(NotImplementedError, transport.pause_reading)
 | 
						|
        self.assertRaises(NotImplementedError, transport.resume_reading)
 | 
						|
        self.assertRaises(NotImplementedError, transport.is_reading)
 | 
						|
        self.assertRaises(NotImplementedError, transport.close)
 | 
						|
        self.assertRaises(NotImplementedError, transport.abort)
 | 
						|
 | 
						|
    def test_dgram_not_implemented(self):
 | 
						|
        transport = asyncio.DatagramTransport()
 | 
						|
 | 
						|
        self.assertRaises(NotImplementedError, transport.sendto, 'data')
 | 
						|
        self.assertRaises(NotImplementedError, transport.abort)
 | 
						|
 | 
						|
    def test_subprocess_transport_not_implemented(self):
 | 
						|
        transport = asyncio.SubprocessTransport()
 | 
						|
 | 
						|
        self.assertRaises(NotImplementedError, transport.get_pid)
 | 
						|
        self.assertRaises(NotImplementedError, transport.get_returncode)
 | 
						|
        self.assertRaises(NotImplementedError, transport.get_pipe_transport, 1)
 | 
						|
        self.assertRaises(NotImplementedError, transport.send_signal, 1)
 | 
						|
        self.assertRaises(NotImplementedError, transport.terminate)
 | 
						|
        self.assertRaises(NotImplementedError, transport.kill)
 | 
						|
 | 
						|
    def test_flowcontrol_mixin_set_write_limits(self):
 | 
						|
 | 
						|
        class MyTransport(transports._FlowControlMixin,
 | 
						|
                          transports.Transport):
 | 
						|
 | 
						|
            def get_write_buffer_size(self):
 | 
						|
                return 512
 | 
						|
 | 
						|
        loop = mock.Mock()
 | 
						|
        transport = MyTransport(loop=loop)
 | 
						|
        transport._protocol = mock.Mock()
 | 
						|
 | 
						|
        self.assertFalse(transport._protocol_paused)
 | 
						|
 | 
						|
        with self.assertRaisesRegex(ValueError, 'high.*must be >= low'):
 | 
						|
            transport.set_write_buffer_limits(high=0, low=1)
 | 
						|
 | 
						|
        transport.set_write_buffer_limits(high=1024, low=128)
 | 
						|
        self.assertFalse(transport._protocol_paused)
 | 
						|
        self.assertEqual(transport.get_write_buffer_limits(), (128, 1024))
 | 
						|
 | 
						|
        transport.set_write_buffer_limits(high=256, low=128)
 | 
						|
        self.assertTrue(transport._protocol_paused)
 | 
						|
        self.assertEqual(transport.get_write_buffer_limits(), (128, 256))
 | 
						|
 | 
						|
 | 
						|
if __name__ == '__main__':
 | 
						|
    unittest.main()
 |