mirror of
				https://github.com/python/cpython.git
				synced 2025-10-31 10:26:02 +00:00 
			
		
		
		
	 6d64a8f49e
			
		
	
	
		6d64a8f49e
		
	
	
	
	
		
			
			This PR deprecate explicit loop parameters in all public asyncio APIs This issues is split to be easier to review. Second step: streams.py https://bugs.python.org/issue36373
		
			
				
	
	
		
			1772 lines
		
	
	
	
		
			62 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			1772 lines
		
	
	
	
		
			62 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| __all__ = (
 | |
|     'Stream', 'StreamMode',
 | |
|     'open_connection', 'start_server',
 | |
|     'connect', 'connect_read_pipe', 'connect_write_pipe',
 | |
|     'StreamServer')
 | |
| 
 | |
| import enum
 | |
| import socket
 | |
| import sys
 | |
| import warnings
 | |
| import weakref
 | |
| 
 | |
| if hasattr(socket, 'AF_UNIX'):
 | |
|     __all__ += ('open_unix_connection', 'start_unix_server',
 | |
|                 'connect_unix',
 | |
|                 'UnixStreamServer')
 | |
| 
 | |
| from . import coroutines
 | |
| from . import events
 | |
| from . import exceptions
 | |
| from . import format_helpers
 | |
| from . import protocols
 | |
| from .log import logger
 | |
| from . import tasks
 | |
| 
 | |
| 
 | |
| _DEFAULT_LIMIT = 2 ** 16  # 64 KiB
 | |
| 
 | |
| 
 | |
| class StreamMode(enum.Flag):
 | |
|     READ = enum.auto()
 | |
|     WRITE = enum.auto()
 | |
|     READWRITE = READ | WRITE
 | |
| 
 | |
| 
 | |
| def _ensure_can_read(mode):
 | |
|     if not mode & StreamMode.READ:
 | |
|         raise RuntimeError("The stream is write-only")
 | |
| 
 | |
| 
 | |
| def _ensure_can_write(mode):
 | |
|     if not mode & StreamMode.WRITE:
 | |
|         raise RuntimeError("The stream is read-only")
 | |
| 
 | |
| 
 | |
| class _ContextManagerHelper:
 | |
|     __slots__ = ('_awaitable', '_result')
 | |
| 
 | |
|     def __init__(self, awaitable):
 | |
|         self._awaitable = awaitable
 | |
|         self._result = None
 | |
| 
 | |
|     def __await__(self):
 | |
|         return self._awaitable.__await__()
 | |
| 
 | |
|     async def __aenter__(self):
 | |
|         ret = await self._awaitable
 | |
|         result = await ret.__aenter__()
 | |
|         self._result = result
 | |
|         return result
 | |
| 
 | |
|     async def __aexit__(self, exc_type, exc_val, exc_tb):
 | |
|         return await self._result.__aexit__(exc_type, exc_val, exc_tb)
 | |
| 
 | |
| 
 | |
| def connect(host=None, port=None, *,
 | |
|             limit=_DEFAULT_LIMIT,
 | |
|             ssl=None, family=0, proto=0,
 | |
|             flags=0, sock=None, local_addr=None,
 | |
|             server_hostname=None,
 | |
|             ssl_handshake_timeout=None,
 | |
|             happy_eyeballs_delay=None, interleave=None):
 | |
|     # Design note:
 | |
|     # Don't use decorator approach but exilicit non-async
 | |
|     # function to fail fast and explicitly
 | |
|     # if passed arguments don't match the function signature
 | |
|     return _ContextManagerHelper(_connect(host, port, limit,
 | |
|                                           ssl, family, proto,
 | |
|                                           flags, sock, local_addr,
 | |
|                                           server_hostname,
 | |
|                                           ssl_handshake_timeout,
 | |
|                                           happy_eyeballs_delay,
 | |
|                                           interleave))
 | |
| 
 | |
| 
 | |
| async def _connect(host, port,
 | |
|                   limit,
 | |
|                   ssl, family, proto,
 | |
|                   flags, sock, local_addr,
 | |
|                   server_hostname,
 | |
|                   ssl_handshake_timeout,
 | |
|                   happy_eyeballs_delay, interleave):
 | |
|     loop = events.get_running_loop()
 | |
|     stream = Stream(mode=StreamMode.READWRITE,
 | |
|                     limit=limit,
 | |
|                     loop=loop,
 | |
|                     _asyncio_internal=True)
 | |
|     await loop.create_connection(
 | |
|         lambda: _StreamProtocol(stream, loop=loop,
 | |
|                                 _asyncio_internal=True),
 | |
|         host, port,
 | |
|         ssl=ssl, family=family, proto=proto,
 | |
|         flags=flags, sock=sock, local_addr=local_addr,
 | |
|         server_hostname=server_hostname,
 | |
|         ssl_handshake_timeout=ssl_handshake_timeout,
 | |
|         happy_eyeballs_delay=happy_eyeballs_delay, interleave=interleave)
 | |
|     return stream
 | |
| 
 | |
| 
 | |
| def connect_read_pipe(pipe, *, limit=_DEFAULT_LIMIT):
 | |
|     # Design note:
 | |
|     # Don't use decorator approach but explicit non-async
 | |
|     # function to fail fast and explicitly
 | |
|     # if passed arguments don't match the function signature
 | |
|     return _ContextManagerHelper(_connect_read_pipe(pipe, limit))
 | |
| 
 | |
| 
 | |
| async def _connect_read_pipe(pipe, limit):
 | |
|     loop = events.get_running_loop()
 | |
|     stream = Stream(mode=StreamMode.READ,
 | |
|                     limit=limit,
 | |
|                     loop=loop,
 | |
|                     _asyncio_internal=True)
 | |
|     await loop.connect_read_pipe(
 | |
|         lambda: _StreamProtocol(stream, loop=loop,
 | |
|                                 _asyncio_internal=True),
 | |
|         pipe)
 | |
|     return stream
 | |
| 
 | |
| 
 | |
| def connect_write_pipe(pipe, *, limit=_DEFAULT_LIMIT):
 | |
|     # Design note:
 | |
|     # Don't use decorator approach but explicit non-async
 | |
|     # function to fail fast and explicitly
 | |
|     # if passed arguments don't match the function signature
 | |
|     return _ContextManagerHelper(_connect_write_pipe(pipe, limit))
 | |
| 
 | |
| 
 | |
| async def _connect_write_pipe(pipe, limit):
 | |
|     loop = events.get_running_loop()
 | |
|     stream = Stream(mode=StreamMode.WRITE,
 | |
|                     limit=limit,
 | |
|                     loop=loop,
 | |
|                     _asyncio_internal=True)
 | |
|     await loop.connect_write_pipe(
 | |
|         lambda: _StreamProtocol(stream, loop=loop,
 | |
|                                 _asyncio_internal=True),
 | |
|         pipe)
 | |
|     return stream
 | |
| 
 | |
| 
 | |
| async def open_connection(host=None, port=None, *,
 | |
|                           loop=None, limit=_DEFAULT_LIMIT, **kwds):
 | |
|     """A wrapper for create_connection() returning a (reader, writer) pair.
 | |
| 
 | |
|     The reader returned is a StreamReader instance; the writer is a
 | |
|     StreamWriter instance.
 | |
| 
 | |
|     The arguments are all the usual arguments to create_connection()
 | |
|     except protocol_factory; most common are positional host and port,
 | |
|     with various optional keyword arguments following.
 | |
| 
 | |
|     Additional optional keyword arguments are loop (to set the event loop
 | |
|     instance to use) and limit (to set the buffer limit passed to the
 | |
|     StreamReader).
 | |
| 
 | |
|     (If you want to customize the StreamReader and/or
 | |
|     StreamReaderProtocol classes, just copy the code -- there's
 | |
|     really nothing special here except some convenience.)
 | |
|     """
 | |
|     warnings.warn("open_connection() is deprecated since Python 3.8 "
 | |
|                   "in favor of connect(), and scheduled for removal "
 | |
|                   "in Python 3.10",
 | |
|                   DeprecationWarning,
 | |
|                   stacklevel=2)
 | |
|     if loop is None:
 | |
|         loop = events.get_event_loop()
 | |
|     else:
 | |
|         warnings.warn("The loop argument is deprecated since Python 3.8, "
 | |
|                       "and scheduled for removal in Python 3.10.",
 | |
|                       DeprecationWarning, stacklevel=2)
 | |
|     reader = StreamReader(limit=limit, loop=loop)
 | |
|     protocol = StreamReaderProtocol(reader, loop=loop, _asyncio_internal=True)
 | |
|     transport, _ = await loop.create_connection(
 | |
|         lambda: protocol, host, port, **kwds)
 | |
|     writer = StreamWriter(transport, protocol, reader, loop)
 | |
|     return reader, writer
 | |
| 
 | |
| 
 | |
| async def start_server(client_connected_cb, host=None, port=None, *,
 | |
|                        loop=None, limit=_DEFAULT_LIMIT, **kwds):
 | |
|     """Start a socket server, call back for each client connected.
 | |
| 
 | |
|     The first parameter, `client_connected_cb`, takes two parameters:
 | |
|     client_reader, client_writer.  client_reader is a StreamReader
 | |
|     object, while client_writer is a StreamWriter object.  This
 | |
|     parameter can either be a plain callback function or a coroutine;
 | |
|     if it is a coroutine, it will be automatically converted into a
 | |
|     Task.
 | |
| 
 | |
|     The rest of the arguments are all the usual arguments to
 | |
|     loop.create_server() except protocol_factory; most common are
 | |
|     positional host and port, with various optional keyword arguments
 | |
|     following.  The return value is the same as loop.create_server().
 | |
| 
 | |
|     Additional optional keyword arguments are loop (to set the event loop
 | |
|     instance to use) and limit (to set the buffer limit passed to the
 | |
|     StreamReader).
 | |
| 
 | |
|     The return value is the same as loop.create_server(), i.e. a
 | |
|     Server object which can be used to stop the service.
 | |
|     """
 | |
|     warnings.warn("start_server() is deprecated since Python 3.8 "
 | |
|                   "in favor of StreamServer(), and scheduled for removal "
 | |
|                   "in Python 3.10",
 | |
|                   DeprecationWarning,
 | |
|                   stacklevel=2)
 | |
|     if loop is None:
 | |
|         loop = events.get_event_loop()
 | |
|     else:
 | |
|         warnings.warn("The loop argument is deprecated since Python 3.8, "
 | |
|                       "and scheduled for removal in Python 3.10.",
 | |
|                       DeprecationWarning, stacklevel=2)
 | |
| 
 | |
|     def factory():
 | |
|         reader = StreamReader(limit=limit, loop=loop)
 | |
|         protocol = StreamReaderProtocol(reader, client_connected_cb,
 | |
|                                         loop=loop,
 | |
|                                         _asyncio_internal=True)
 | |
|         return protocol
 | |
| 
 | |
|     return await loop.create_server(factory, host, port, **kwds)
 | |
| 
 | |
| 
 | |
| class _BaseStreamServer:
 | |
|     # Design notes.
 | |
|     # StreamServer and UnixStreamServer are exposed as FINAL classes,
 | |
|     # not function factories.
 | |
|     # async with serve(host, port) as server:
 | |
|     #      server.start_serving()
 | |
|     # looks ugly.
 | |
|     # The class doesn't provide API for enumerating connected streams
 | |
|     # It can be a subject for improvements in Python 3.9
 | |
| 
 | |
|     _server_impl = None
 | |
| 
 | |
|     def __init__(self, client_connected_cb,
 | |
|                  /,
 | |
|                  limit=_DEFAULT_LIMIT,
 | |
|                  shutdown_timeout=60,
 | |
|                  _asyncio_internal=False):
 | |
|         if not _asyncio_internal:
 | |
|             raise RuntimeError("_ServerStream is a private asyncio class")
 | |
|         self._client_connected_cb = client_connected_cb
 | |
|         self._limit = limit
 | |
|         self._loop = events.get_running_loop()
 | |
|         self._streams = {}
 | |
|         self._shutdown_timeout = shutdown_timeout
 | |
| 
 | |
|     def __init_subclass__(cls):
 | |
|         if not cls.__module__.startswith('asyncio.'):
 | |
|             raise TypeError(f"asyncio.{cls.__name__} "
 | |
|                             "class cannot be inherited from")
 | |
| 
 | |
|     async def bind(self):
 | |
|         if self._server_impl is not None:
 | |
|             return
 | |
|         self._server_impl = await self._bind()
 | |
| 
 | |
|     def is_bound(self):
 | |
|         return self._server_impl is not None
 | |
| 
 | |
|     @property
 | |
|     def sockets(self):
 | |
|         # multiple value for socket bound to both IPv4 and IPv6 families
 | |
|         if self._server_impl is None:
 | |
|             return ()
 | |
|         return self._server_impl.sockets
 | |
| 
 | |
|     def is_serving(self):
 | |
|         if self._server_impl is None:
 | |
|             return False
 | |
|         return self._server_impl.is_serving()
 | |
| 
 | |
|     async def start_serving(self):
 | |
|         await self.bind()
 | |
|         await self._server_impl.start_serving()
 | |
| 
 | |
|     async def serve_forever(self):
 | |
|         await self.start_serving()
 | |
|         await self._server_impl.serve_forever()
 | |
| 
 | |
|     async def close(self):
 | |
|         if self._server_impl is None:
 | |
|             return
 | |
|         self._server_impl.close()
 | |
|         streams = list(self._streams.keys())
 | |
|         active_tasks = list(self._streams.values())
 | |
|         if streams:
 | |
|             await tasks.wait([stream.close() for stream in streams])
 | |
|         await self._server_impl.wait_closed()
 | |
|         self._server_impl = None
 | |
|         await self._shutdown_active_tasks(active_tasks)
 | |
| 
 | |
|     async def abort(self):
 | |
|         if self._server_impl is None:
 | |
|             return
 | |
|         self._server_impl.close()
 | |
|         streams = list(self._streams.keys())
 | |
|         active_tasks = list(self._streams.values())
 | |
|         if streams:
 | |
|             await tasks.wait([stream.abort() for stream in streams])
 | |
|         await self._server_impl.wait_closed()
 | |
|         self._server_impl = None
 | |
|         await self._shutdown_active_tasks(active_tasks)
 | |
| 
 | |
|     async def __aenter__(self):
 | |
|         await self.bind()
 | |
|         return self
 | |
| 
 | |
|     async def __aexit__(self, exc_type, exc_value, exc_tb):
 | |
|         await self.close()
 | |
| 
 | |
|     def _attach(self, stream, task):
 | |
|         self._streams[stream] = task
 | |
| 
 | |
|     def _detach(self, stream, task):
 | |
|         del self._streams[stream]
 | |
| 
 | |
|     async def _shutdown_active_tasks(self, active_tasks):
 | |
|         if not active_tasks:
 | |
|             return
 | |
|         # NOTE: tasks finished with exception are reported
 | |
|         # by the Task.__del__() method.
 | |
|         done, pending = await tasks.wait(active_tasks,
 | |
|                                          timeout=self._shutdown_timeout)
 | |
|         if not pending:
 | |
|             return
 | |
|         for task in pending:
 | |
|             task.cancel()
 | |
|         done, pending = await tasks.wait(pending,
 | |
|                                          timeout=self._shutdown_timeout)
 | |
|         for task in pending:
 | |
|             self._loop.call_exception_handler({
 | |
|                 "message": (f'{task!r} ignored cancellation request '
 | |
|                             f'from a closing {self!r}'),
 | |
|                 "stream_server": self
 | |
|             })
 | |
| 
 | |
|     def __repr__(self):
 | |
|         ret = [f'{self.__class__.__name__}']
 | |
|         if self.is_serving():
 | |
|             ret.append('serving')
 | |
|         if self.sockets:
 | |
|             ret.append(f'sockets={self.sockets!r}')
 | |
|         return '<' + ' '.join(ret) + '>'
 | |
| 
 | |
|     def __del__(self, _warn=warnings.warn):
 | |
|         if self._server_impl is not None:
 | |
|             _warn(f"unclosed stream server {self!r}",
 | |
|                   ResourceWarning, source=self)
 | |
|             self._server_impl.close()
 | |
| 
 | |
| 
 | |
| class StreamServer(_BaseStreamServer):
 | |
| 
 | |
|     def __init__(self, client_connected_cb, /, host=None, port=None, *,
 | |
|                  limit=_DEFAULT_LIMIT,
 | |
|                  family=socket.AF_UNSPEC,
 | |
|                  flags=socket.AI_PASSIVE, sock=None, backlog=100,
 | |
|                  ssl=None, reuse_address=None, reuse_port=None,
 | |
|                  ssl_handshake_timeout=None,
 | |
|                  shutdown_timeout=60):
 | |
|         super().__init__(client_connected_cb,
 | |
|                          limit=limit,
 | |
|                          shutdown_timeout=shutdown_timeout,
 | |
|                          _asyncio_internal=True)
 | |
|         self._host = host
 | |
|         self._port = port
 | |
|         self._family = family
 | |
|         self._flags = flags
 | |
|         self._sock = sock
 | |
|         self._backlog = backlog
 | |
|         self._ssl = ssl
 | |
|         self._reuse_address = reuse_address
 | |
|         self._reuse_port = reuse_port
 | |
|         self._ssl_handshake_timeout = ssl_handshake_timeout
 | |
| 
 | |
|     async def _bind(self):
 | |
|         def factory():
 | |
|             protocol = _ServerStreamProtocol(self,
 | |
|                                              self._limit,
 | |
|                                              self._client_connected_cb,
 | |
|                                              loop=self._loop,
 | |
|                                              _asyncio_internal=True)
 | |
|             return protocol
 | |
|         return await self._loop.create_server(
 | |
|             factory,
 | |
|             self._host,
 | |
|             self._port,
 | |
|             start_serving=False,
 | |
|             family=self._family,
 | |
|             flags=self._flags,
 | |
|             sock=self._sock,
 | |
|             backlog=self._backlog,
 | |
|             ssl=self._ssl,
 | |
|             reuse_address=self._reuse_address,
 | |
|             reuse_port=self._reuse_port,
 | |
|             ssl_handshake_timeout=self._ssl_handshake_timeout)
 | |
| 
 | |
| 
 | |
| if hasattr(socket, 'AF_UNIX'):
 | |
|     # UNIX Domain Sockets are supported on this platform
 | |
| 
 | |
|     async def open_unix_connection(path=None, *,
 | |
|                                    loop=None, limit=_DEFAULT_LIMIT, **kwds):
 | |
|         """Similar to `open_connection` but works with UNIX Domain Sockets."""
 | |
|         warnings.warn("open_unix_connection() is deprecated since Python 3.8 "
 | |
|                       "in favor of connect_unix(), and scheduled for removal "
 | |
|                       "in Python 3.10",
 | |
|                       DeprecationWarning,
 | |
|                       stacklevel=2)
 | |
|         if loop is None:
 | |
|             loop = events.get_event_loop()
 | |
|         else:
 | |
|             warnings.warn("The loop argument is deprecated since Python 3.8, "
 | |
|                           "and scheduled for removal in Python 3.10.",
 | |
|                           DeprecationWarning, stacklevel=2)
 | |
|         reader = StreamReader(limit=limit, loop=loop)
 | |
|         protocol = StreamReaderProtocol(reader, loop=loop,
 | |
|                                         _asyncio_internal=True)
 | |
|         transport, _ = await loop.create_unix_connection(
 | |
|             lambda: protocol, path, **kwds)
 | |
|         writer = StreamWriter(transport, protocol, reader, loop)
 | |
|         return reader, writer
 | |
| 
 | |
| 
 | |
|     def connect_unix(path=None, *,
 | |
|                      limit=_DEFAULT_LIMIT,
 | |
|                      ssl=None, sock=None,
 | |
|                      server_hostname=None,
 | |
|                      ssl_handshake_timeout=None):
 | |
|         """Similar to `connect()` but works with UNIX Domain Sockets."""
 | |
|         # Design note:
 | |
|         # Don't use decorator approach but exilicit non-async
 | |
|         # function to fail fast and explicitly
 | |
|         # if passed arguments don't match the function signature
 | |
|         return _ContextManagerHelper(_connect_unix(path,
 | |
|                                                    limit,
 | |
|                                                    ssl, sock,
 | |
|                                                    server_hostname,
 | |
|                                                    ssl_handshake_timeout))
 | |
| 
 | |
| 
 | |
|     async def _connect_unix(path,
 | |
|                            limit,
 | |
|                            ssl, sock,
 | |
|                            server_hostname,
 | |
|                            ssl_handshake_timeout):
 | |
|         """Similar to `connect()` but works with UNIX Domain Sockets."""
 | |
|         loop = events.get_running_loop()
 | |
|         stream = Stream(mode=StreamMode.READWRITE,
 | |
|                         limit=limit,
 | |
|                         loop=loop,
 | |
|                         _asyncio_internal=True)
 | |
|         await loop.create_unix_connection(
 | |
|             lambda: _StreamProtocol(stream,
 | |
|                                     loop=loop,
 | |
|                                     _asyncio_internal=True),
 | |
|             path,
 | |
|             ssl=ssl,
 | |
|             sock=sock,
 | |
|             server_hostname=server_hostname,
 | |
|             ssl_handshake_timeout=ssl_handshake_timeout)
 | |
|         return stream
 | |
| 
 | |
| 
 | |
|     async def start_unix_server(client_connected_cb, path=None, *,
 | |
|                                 loop=None, limit=_DEFAULT_LIMIT, **kwds):
 | |
|         """Similar to `start_server` but works with UNIX Domain Sockets."""
 | |
|         warnings.warn("start_unix_server() is deprecated since Python 3.8 "
 | |
|                       "in favor of UnixStreamServer(), and scheduled "
 | |
|                       "for removal in Python 3.10",
 | |
|                       DeprecationWarning,
 | |
|                       stacklevel=2)
 | |
|         if loop is None:
 | |
|             loop = events.get_event_loop()
 | |
|         else:
 | |
|             warnings.warn("The loop argument is deprecated since Python 3.8, "
 | |
|                           "and scheduled for removal in Python 3.10.",
 | |
|                           DeprecationWarning, stacklevel=2)
 | |
| 
 | |
|         def factory():
 | |
|             reader = StreamReader(limit=limit, loop=loop)
 | |
|             protocol = StreamReaderProtocol(reader, client_connected_cb,
 | |
|                                             loop=loop,
 | |
|                                             _asyncio_internal=True)
 | |
|             return protocol
 | |
| 
 | |
|         return await loop.create_unix_server(factory, path, **kwds)
 | |
| 
 | |
|     class UnixStreamServer(_BaseStreamServer):
 | |
| 
 | |
|         def __init__(self, client_connected_cb, /, path=None, *,
 | |
|                      limit=_DEFAULT_LIMIT,
 | |
|                      sock=None,
 | |
|                      backlog=100,
 | |
|                      ssl=None,
 | |
|                      ssl_handshake_timeout=None,
 | |
|                      shutdown_timeout=60):
 | |
|             super().__init__(client_connected_cb,
 | |
|                              limit=limit,
 | |
|                              shutdown_timeout=shutdown_timeout,
 | |
|                              _asyncio_internal=True)
 | |
|             self._path = path
 | |
|             self._sock = sock
 | |
|             self._backlog = backlog
 | |
|             self._ssl = ssl
 | |
|             self._ssl_handshake_timeout = ssl_handshake_timeout
 | |
| 
 | |
|         async def _bind(self):
 | |
|             def factory():
 | |
|                 protocol = _ServerStreamProtocol(self,
 | |
|                                                  self._limit,
 | |
|                                                  self._client_connected_cb,
 | |
|                                                  loop=self._loop,
 | |
|                                                  _asyncio_internal=True)
 | |
|                 return protocol
 | |
|             return await self._loop.create_unix_server(
 | |
|                 factory,
 | |
|                 self._path,
 | |
|                 start_serving=False,
 | |
|                 sock=self._sock,
 | |
|                 backlog=self._backlog,
 | |
|                 ssl=self._ssl,
 | |
|                 ssl_handshake_timeout=self._ssl_handshake_timeout)
 | |
| 
 | |
| 
 | |
| class FlowControlMixin(protocols.Protocol):
 | |
|     """Reusable flow control logic for StreamWriter.drain().
 | |
| 
 | |
|     This implements the protocol methods pause_writing(),
 | |
|     resume_writing() and connection_lost().  If the subclass overrides
 | |
|     these it must call the super methods.
 | |
| 
 | |
|     StreamWriter.drain() must wait for _drain_helper() coroutine.
 | |
|     """
 | |
| 
 | |
|     def __init__(self, loop=None, *, _asyncio_internal=False):
 | |
|         if loop is None:
 | |
|             self._loop = events.get_event_loop()
 | |
|         else:
 | |
|             self._loop = loop
 | |
|         if not _asyncio_internal:
 | |
|             # NOTE:
 | |
|             # Avoid inheritance from FlowControlMixin
 | |
|             # Copy-paste the code to your project
 | |
|             # if you need flow control helpers
 | |
|             warnings.warn(f"{self.__class__} should be instaniated "
 | |
|                           "by asyncio internals only, "
 | |
|                           "please avoid its creation from user code",
 | |
|                           DeprecationWarning)
 | |
|         self._paused = False
 | |
|         self._drain_waiter = None
 | |
|         self._connection_lost = False
 | |
| 
 | |
|     def pause_writing(self):
 | |
|         assert not self._paused
 | |
|         self._paused = True
 | |
|         if self._loop.get_debug():
 | |
|             logger.debug("%r pauses writing", self)
 | |
| 
 | |
|     def resume_writing(self):
 | |
|         assert self._paused
 | |
|         self._paused = False
 | |
|         if self._loop.get_debug():
 | |
|             logger.debug("%r resumes writing", self)
 | |
| 
 | |
|         waiter = self._drain_waiter
 | |
|         if waiter is not None:
 | |
|             self._drain_waiter = None
 | |
|             if not waiter.done():
 | |
|                 waiter.set_result(None)
 | |
| 
 | |
|     def connection_lost(self, exc):
 | |
|         self._connection_lost = True
 | |
|         # Wake up the writer if currently paused.
 | |
|         if not self._paused:
 | |
|             return
 | |
|         waiter = self._drain_waiter
 | |
|         if waiter is None:
 | |
|             return
 | |
|         self._drain_waiter = None
 | |
|         if waiter.done():
 | |
|             return
 | |
|         if exc is None:
 | |
|             waiter.set_result(None)
 | |
|         else:
 | |
|             waiter.set_exception(exc)
 | |
| 
 | |
|     async def _drain_helper(self):
 | |
|         if self._connection_lost:
 | |
|             raise ConnectionResetError('Connection lost')
 | |
|         if not self._paused:
 | |
|             return
 | |
|         waiter = self._drain_waiter
 | |
|         assert waiter is None or waiter.cancelled()
 | |
|         waiter = self._loop.create_future()
 | |
|         self._drain_waiter = waiter
 | |
|         await waiter
 | |
| 
 | |
|     def _get_close_waiter(self, stream):
 | |
|         raise NotImplementedError
 | |
| 
 | |
| 
 | |
| # begin legacy stream APIs
 | |
| 
 | |
| class StreamReaderProtocol(FlowControlMixin, protocols.Protocol):
 | |
|     """Helper class to adapt between Protocol and StreamReader.
 | |
| 
 | |
|     (This is a helper class instead of making StreamReader itself a
 | |
|     Protocol subclass, because the StreamReader has other potential
 | |
|     uses, and to prevent the user of the StreamReader to accidentally
 | |
|     call inappropriate methods of the protocol.)
 | |
|     """
 | |
| 
 | |
|     def __init__(self, stream_reader, client_connected_cb=None, loop=None,
 | |
|                  *, _asyncio_internal=False):
 | |
|         super().__init__(loop=loop, _asyncio_internal=_asyncio_internal)
 | |
|         self._stream_reader = stream_reader
 | |
|         self._stream_writer = None
 | |
|         self._client_connected_cb = client_connected_cb
 | |
|         self._over_ssl = False
 | |
|         self._closed = self._loop.create_future()
 | |
| 
 | |
|     def connection_made(self, transport):
 | |
|         self._stream_reader.set_transport(transport)
 | |
|         self._over_ssl = transport.get_extra_info('sslcontext') is not None
 | |
|         if self._client_connected_cb is not None:
 | |
|             self._stream_writer = StreamWriter(transport, self,
 | |
|                                                self._stream_reader,
 | |
|                                                self._loop)
 | |
|             res = self._client_connected_cb(self._stream_reader,
 | |
|                                             self._stream_writer)
 | |
|             if coroutines.iscoroutine(res):
 | |
|                 self._loop.create_task(res)
 | |
| 
 | |
|     def connection_lost(self, exc):
 | |
|         if self._stream_reader is not None:
 | |
|             if exc is None:
 | |
|                 self._stream_reader.feed_eof()
 | |
|             else:
 | |
|                 self._stream_reader.set_exception(exc)
 | |
|         if not self._closed.done():
 | |
|             if exc is None:
 | |
|                 self._closed.set_result(None)
 | |
|             else:
 | |
|                 self._closed.set_exception(exc)
 | |
|         super().connection_lost(exc)
 | |
|         self._stream_reader = None
 | |
|         self._stream_writer = None
 | |
| 
 | |
|     def data_received(self, data):
 | |
|         self._stream_reader.feed_data(data)
 | |
| 
 | |
|     def eof_received(self):
 | |
|         self._stream_reader.feed_eof()
 | |
|         if self._over_ssl:
 | |
|             # Prevent a warning in SSLProtocol.eof_received:
 | |
|             # "returning true from eof_received()
 | |
|             # has no effect when using ssl"
 | |
|             return False
 | |
|         return True
 | |
| 
 | |
|     def __del__(self):
 | |
|         # Prevent reports about unhandled exceptions.
 | |
|         # Better than self._closed._log_traceback = False hack
 | |
|         closed = self._closed
 | |
|         if closed.done() and not closed.cancelled():
 | |
|             closed.exception()
 | |
| 
 | |
| 
 | |
| class StreamWriter:
 | |
|     """Wraps a Transport.
 | |
| 
 | |
|     This exposes write(), writelines(), [can_]write_eof(),
 | |
|     get_extra_info() and close().  It adds drain() which returns an
 | |
|     optional Future on which you can wait for flow control.  It also
 | |
|     adds a transport property which references the Transport
 | |
|     directly.
 | |
|     """
 | |
| 
 | |
|     def __init__(self, transport, protocol, reader, loop):
 | |
|         self._transport = transport
 | |
|         self._protocol = protocol
 | |
|         # drain() expects that the reader has an exception() method
 | |
|         assert reader is None or isinstance(reader, StreamReader)
 | |
|         self._reader = reader
 | |
|         self._loop = loop
 | |
| 
 | |
|     def __repr__(self):
 | |
|         info = [self.__class__.__name__, f'transport={self._transport!r}']
 | |
|         if self._reader is not None:
 | |
|             info.append(f'reader={self._reader!r}')
 | |
|         return '<{}>'.format(' '.join(info))
 | |
| 
 | |
|     @property
 | |
|     def transport(self):
 | |
|         return self._transport
 | |
| 
 | |
|     def write(self, data):
 | |
|         self._transport.write(data)
 | |
| 
 | |
|     def writelines(self, data):
 | |
|         self._transport.writelines(data)
 | |
| 
 | |
|     def write_eof(self):
 | |
|         return self._transport.write_eof()
 | |
| 
 | |
|     def can_write_eof(self):
 | |
|         return self._transport.can_write_eof()
 | |
| 
 | |
|     def close(self):
 | |
|         return self._transport.close()
 | |
| 
 | |
|     def is_closing(self):
 | |
|         return self._transport.is_closing()
 | |
| 
 | |
|     async def wait_closed(self):
 | |
|         await self._protocol._closed
 | |
| 
 | |
|     def get_extra_info(self, name, default=None):
 | |
|         return self._transport.get_extra_info(name, default)
 | |
| 
 | |
|     async def drain(self):
 | |
|         """Flush the write buffer.
 | |
| 
 | |
|         The intended use is to write
 | |
| 
 | |
|           w.write(data)
 | |
|           await w.drain()
 | |
|         """
 | |
|         if self._reader is not None:
 | |
|             exc = self._reader.exception()
 | |
|             if exc is not None:
 | |
|                 raise exc
 | |
|         if self._transport.is_closing():
 | |
|             # Yield to the event loop so connection_lost() may be
 | |
|             # called.  Without this, _drain_helper() would return
 | |
|             # immediately, and code that calls
 | |
|             #     write(...); await drain()
 | |
|             # in a loop would never call connection_lost(), so it
 | |
|             # would not see an error when the socket is closed.
 | |
|             await tasks.sleep(0, loop=self._loop)
 | |
|         await self._protocol._drain_helper()
 | |
| 
 | |
| 
 | |
| class StreamReader:
 | |
| 
 | |
|     def __init__(self, limit=_DEFAULT_LIMIT, loop=None):
 | |
|         # The line length limit is  a security feature;
 | |
|         # it also doubles as half the buffer limit.
 | |
| 
 | |
|         if limit <= 0:
 | |
|             raise ValueError('Limit cannot be <= 0')
 | |
| 
 | |
|         self._limit = limit
 | |
|         if loop is None:
 | |
|             self._loop = events.get_event_loop()
 | |
|         else:
 | |
|             self._loop = loop
 | |
|         self._buffer = bytearray()
 | |
|         self._eof = False    # Whether we're done.
 | |
|         self._waiter = None  # A future used by _wait_for_data()
 | |
|         self._exception = None
 | |
|         self._transport = None
 | |
|         self._paused = False
 | |
| 
 | |
|     def __repr__(self):
 | |
|         info = ['StreamReader']
 | |
|         if self._buffer:
 | |
|             info.append(f'{len(self._buffer)} bytes')
 | |
|         if self._eof:
 | |
|             info.append('eof')
 | |
|         if self._limit != _DEFAULT_LIMIT:
 | |
|             info.append(f'limit={self._limit}')
 | |
|         if self._waiter:
 | |
|             info.append(f'waiter={self._waiter!r}')
 | |
|         if self._exception:
 | |
|             info.append(f'exception={self._exception!r}')
 | |
|         if self._transport:
 | |
|             info.append(f'transport={self._transport!r}')
 | |
|         if self._paused:
 | |
|             info.append('paused')
 | |
|         return '<{}>'.format(' '.join(info))
 | |
| 
 | |
|     def exception(self):
 | |
|         return self._exception
 | |
| 
 | |
|     def set_exception(self, exc):
 | |
|         self._exception = exc
 | |
| 
 | |
|         waiter = self._waiter
 | |
|         if waiter is not None:
 | |
|             self._waiter = None
 | |
|             if not waiter.cancelled():
 | |
|                 waiter.set_exception(exc)
 | |
| 
 | |
|     def _wakeup_waiter(self):
 | |
|         """Wakeup read*() functions waiting for data or EOF."""
 | |
|         waiter = self._waiter
 | |
|         if waiter is not None:
 | |
|             self._waiter = None
 | |
|             if not waiter.cancelled():
 | |
|                 waiter.set_result(None)
 | |
| 
 | |
|     def set_transport(self, transport):
 | |
|         assert self._transport is None, 'Transport already set'
 | |
|         self._transport = transport
 | |
| 
 | |
|     def _maybe_resume_transport(self):
 | |
|         if self._paused and len(self._buffer) <= self._limit:
 | |
|             self._paused = False
 | |
|             self._transport.resume_reading()
 | |
| 
 | |
|     def feed_eof(self):
 | |
|         self._eof = True
 | |
|         self._wakeup_waiter()
 | |
| 
 | |
|     def at_eof(self):
 | |
|         """Return True if the buffer is empty and 'feed_eof' was called."""
 | |
|         return self._eof and not self._buffer
 | |
| 
 | |
|     def feed_data(self, data):
 | |
|         assert not self._eof, 'feed_data after feed_eof'
 | |
| 
 | |
|         if not data:
 | |
|             return
 | |
| 
 | |
|         self._buffer.extend(data)
 | |
|         self._wakeup_waiter()
 | |
| 
 | |
|         if (self._transport is not None and
 | |
|                 not self._paused and
 | |
|                 len(self._buffer) > 2 * self._limit):
 | |
|             try:
 | |
|                 self._transport.pause_reading()
 | |
|             except NotImplementedError:
 | |
|                 # The transport can't be paused.
 | |
|                 # We'll just have to buffer all data.
 | |
|                 # Forget the transport so we don't keep trying.
 | |
|                 self._transport = None
 | |
|             else:
 | |
|                 self._paused = True
 | |
| 
 | |
|     async def _wait_for_data(self, func_name):
 | |
|         """Wait until feed_data() or feed_eof() is called.
 | |
| 
 | |
|         If stream was paused, automatically resume it.
 | |
|         """
 | |
|         # StreamReader uses a future to link the protocol feed_data() method
 | |
|         # to a read coroutine. Running two read coroutines at the same time
 | |
|         # would have an unexpected behaviour. It would not possible to know
 | |
|         # which coroutine would get the next data.
 | |
|         if self._waiter is not None:
 | |
|             raise RuntimeError(
 | |
|                 f'{func_name}() called while another coroutine is '
 | |
|                 f'already waiting for incoming data')
 | |
| 
 | |
|         assert not self._eof, '_wait_for_data after EOF'
 | |
| 
 | |
|         # Waiting for data while paused will make deadlock, so prevent it.
 | |
|         # This is essential for readexactly(n) for case when n > self._limit.
 | |
|         if self._paused:
 | |
|             self._paused = False
 | |
|             self._transport.resume_reading()
 | |
| 
 | |
|         self._waiter = self._loop.create_future()
 | |
|         try:
 | |
|             await self._waiter
 | |
|         finally:
 | |
|             self._waiter = None
 | |
| 
 | |
|     async def readline(self):
 | |
|         """Read chunk of data from the stream until newline (b'\n') is found.
 | |
| 
 | |
|         On success, return chunk that ends with newline. If only partial
 | |
|         line can be read due to EOF, return incomplete line without
 | |
|         terminating newline. When EOF was reached while no bytes read, empty
 | |
|         bytes object is returned.
 | |
| 
 | |
|         If limit is reached, ValueError will be raised. In that case, if
 | |
|         newline was found, complete line including newline will be removed
 | |
|         from internal buffer. Else, internal buffer will be cleared. Limit is
 | |
|         compared against part of the line without newline.
 | |
| 
 | |
|         If stream was paused, this function will automatically resume it if
 | |
|         needed.
 | |
|         """
 | |
|         sep = b'\n'
 | |
|         seplen = len(sep)
 | |
|         try:
 | |
|             line = await self.readuntil(sep)
 | |
|         except exceptions.IncompleteReadError as e:
 | |
|             return e.partial
 | |
|         except exceptions.LimitOverrunError as e:
 | |
|             if self._buffer.startswith(sep, e.consumed):
 | |
|                 del self._buffer[:e.consumed + seplen]
 | |
|             else:
 | |
|                 self._buffer.clear()
 | |
|             self._maybe_resume_transport()
 | |
|             raise ValueError(e.args[0])
 | |
|         return line
 | |
| 
 | |
|     async def readuntil(self, separator=b'\n'):
 | |
|         """Read data from the stream until ``separator`` is found.
 | |
| 
 | |
|         On success, the data and separator will be removed from the
 | |
|         internal buffer (consumed). Returned data will include the
 | |
|         separator at the end.
 | |
| 
 | |
|         Configured stream limit is used to check result. Limit sets the
 | |
|         maximal length of data that can be returned, not counting the
 | |
|         separator.
 | |
| 
 | |
|         If an EOF occurs and the complete separator is still not found,
 | |
|         an IncompleteReadError exception will be raised, and the internal
 | |
|         buffer will be reset.  The IncompleteReadError.partial attribute
 | |
|         may contain the separator partially.
 | |
| 
 | |
|         If the data cannot be read because of over limit, a
 | |
|         LimitOverrunError exception  will be raised, and the data
 | |
|         will be left in the internal buffer, so it can be read again.
 | |
|         """
 | |
|         seplen = len(separator)
 | |
|         if seplen == 0:
 | |
|             raise ValueError('Separator should be at least one-byte string')
 | |
| 
 | |
|         if self._exception is not None:
 | |
|             raise self._exception
 | |
| 
 | |
|         # Consume whole buffer except last bytes, which length is
 | |
|         # one less than seplen. Let's check corner cases with
 | |
|         # separator='SEPARATOR':
 | |
|         # * we have received almost complete separator (without last
 | |
|         #   byte). i.e buffer='some textSEPARATO'. In this case we
 | |
|         #   can safely consume len(separator) - 1 bytes.
 | |
|         # * last byte of buffer is first byte of separator, i.e.
 | |
|         #   buffer='abcdefghijklmnopqrS'. We may safely consume
 | |
|         #   everything except that last byte, but this require to
 | |
|         #   analyze bytes of buffer that match partial separator.
 | |
|         #   This is slow and/or require FSM. For this case our
 | |
|         #   implementation is not optimal, since require rescanning
 | |
|         #   of data that is known to not belong to separator. In
 | |
|         #   real world, separator will not be so long to notice
 | |
|         #   performance problems. Even when reading MIME-encoded
 | |
|         #   messages :)
 | |
| 
 | |
|         # `offset` is the number of bytes from the beginning of the buffer
 | |
|         # where there is no occurrence of `separator`.
 | |
|         offset = 0
 | |
| 
 | |
|         # Loop until we find `separator` in the buffer, exceed the buffer size,
 | |
|         # or an EOF has happened.
 | |
|         while True:
 | |
|             buflen = len(self._buffer)
 | |
| 
 | |
|             # Check if we now have enough data in the buffer for `separator` to
 | |
|             # fit.
 | |
|             if buflen - offset >= seplen:
 | |
|                 isep = self._buffer.find(separator, offset)
 | |
| 
 | |
|                 if isep != -1:
 | |
|                     # `separator` is in the buffer. `isep` will be used later
 | |
|                     # to retrieve the data.
 | |
|                     break
 | |
| 
 | |
|                 # see upper comment for explanation.
 | |
|                 offset = buflen + 1 - seplen
 | |
|                 if offset > self._limit:
 | |
|                     raise exceptions.LimitOverrunError(
 | |
|                         'Separator is not found, and chunk exceed the limit',
 | |
|                         offset)
 | |
| 
 | |
|             # Complete message (with full separator) may be present in buffer
 | |
|             # even when EOF flag is set. This may happen when the last chunk
 | |
|             # adds data which makes separator be found. That's why we check for
 | |
|             # EOF *ater* inspecting the buffer.
 | |
|             if self._eof:
 | |
|                 chunk = bytes(self._buffer)
 | |
|                 self._buffer.clear()
 | |
|                 raise exceptions.IncompleteReadError(chunk, None)
 | |
| 
 | |
|             # _wait_for_data() will resume reading if stream was paused.
 | |
|             await self._wait_for_data('readuntil')
 | |
| 
 | |
|         if isep > self._limit:
 | |
|             raise exceptions.LimitOverrunError(
 | |
|                 'Separator is found, but chunk is longer than limit', isep)
 | |
| 
 | |
|         chunk = self._buffer[:isep + seplen]
 | |
|         del self._buffer[:isep + seplen]
 | |
|         self._maybe_resume_transport()
 | |
|         return bytes(chunk)
 | |
| 
 | |
|     async def read(self, n=-1):
 | |
|         """Read up to `n` bytes from the stream.
 | |
| 
 | |
|         If n is not provided, or set to -1, read until EOF and return all read
 | |
|         bytes. If the EOF was received and the internal buffer is empty, return
 | |
|         an empty bytes object.
 | |
| 
 | |
|         If n is zero, return empty bytes object immediately.
 | |
| 
 | |
|         If n is positive, this function try to read `n` bytes, and may return
 | |
|         less or equal bytes than requested, but at least one byte. If EOF was
 | |
|         received before any byte is read, this function returns empty byte
 | |
|         object.
 | |
| 
 | |
|         Returned value is not limited with limit, configured at stream
 | |
|         creation.
 | |
| 
 | |
|         If stream was paused, this function will automatically resume it if
 | |
|         needed.
 | |
|         """
 | |
| 
 | |
|         if self._exception is not None:
 | |
|             raise self._exception
 | |
| 
 | |
|         if n == 0:
 | |
|             return b''
 | |
| 
 | |
|         if n < 0:
 | |
|             # This used to just loop creating a new waiter hoping to
 | |
|             # collect everything in self._buffer, but that would
 | |
|             # deadlock if the subprocess sends more than self.limit
 | |
|             # bytes.  So just call self.read(self._limit) until EOF.
 | |
|             blocks = []
 | |
|             while True:
 | |
|                 block = await self.read(self._limit)
 | |
|                 if not block:
 | |
|                     break
 | |
|                 blocks.append(block)
 | |
|             return b''.join(blocks)
 | |
| 
 | |
|         if not self._buffer and not self._eof:
 | |
|             await self._wait_for_data('read')
 | |
| 
 | |
|         # This will work right even if buffer is less than n bytes
 | |
|         data = bytes(self._buffer[:n])
 | |
|         del self._buffer[:n]
 | |
| 
 | |
|         self._maybe_resume_transport()
 | |
|         return data
 | |
| 
 | |
|     async def readexactly(self, n):
 | |
|         """Read exactly `n` bytes.
 | |
| 
 | |
|         Raise an IncompleteReadError if EOF is reached before `n` bytes can be
 | |
|         read. The IncompleteReadError.partial attribute of the exception will
 | |
|         contain the partial read bytes.
 | |
| 
 | |
|         if n is zero, return empty bytes object.
 | |
| 
 | |
|         Returned value is not limited with limit, configured at stream
 | |
|         creation.
 | |
| 
 | |
|         If stream was paused, this function will automatically resume it if
 | |
|         needed.
 | |
|         """
 | |
|         if n < 0:
 | |
|             raise ValueError('readexactly size can not be less than zero')
 | |
| 
 | |
|         if self._exception is not None:
 | |
|             raise self._exception
 | |
| 
 | |
|         if n == 0:
 | |
|             return b''
 | |
| 
 | |
|         while len(self._buffer) < n:
 | |
|             if self._eof:
 | |
|                 incomplete = bytes(self._buffer)
 | |
|                 self._buffer.clear()
 | |
|                 raise exceptions.IncompleteReadError(incomplete, n)
 | |
| 
 | |
|             await self._wait_for_data('readexactly')
 | |
| 
 | |
|         if len(self._buffer) == n:
 | |
|             data = bytes(self._buffer)
 | |
|             self._buffer.clear()
 | |
|         else:
 | |
|             data = bytes(self._buffer[:n])
 | |
|             del self._buffer[:n]
 | |
|         self._maybe_resume_transport()
 | |
|         return data
 | |
| 
 | |
|     def __aiter__(self):
 | |
|         return self
 | |
| 
 | |
|     async def __anext__(self):
 | |
|         val = await self.readline()
 | |
|         if val == b'':
 | |
|             raise StopAsyncIteration
 | |
|         return val
 | |
| 
 | |
| 
 | |
| # end legacy stream APIs
 | |
| 
 | |
| 
 | |
| class _BaseStreamProtocol(FlowControlMixin, protocols.Protocol):
 | |
|     """Helper class to adapt between Protocol and StreamReader.
 | |
| 
 | |
|     (This is a helper class instead of making StreamReader itself a
 | |
|     Protocol subclass, because the StreamReader has other potential
 | |
|     uses, and to prevent the user of the StreamReader to accidentally
 | |
|     call inappropriate methods of the protocol.)
 | |
|     """
 | |
| 
 | |
|     _stream = None  # initialized in derived classes
 | |
| 
 | |
|     def __init__(self, loop=None,
 | |
|                  *, _asyncio_internal=False):
 | |
|         super().__init__(loop=loop, _asyncio_internal=_asyncio_internal)
 | |
|         self._transport = None
 | |
|         self._over_ssl = False
 | |
|         self._closed = self._loop.create_future()
 | |
| 
 | |
|     def connection_made(self, transport):
 | |
|         self._transport = transport
 | |
|         self._over_ssl = transport.get_extra_info('sslcontext') is not None
 | |
| 
 | |
|     def connection_lost(self, exc):
 | |
|         stream = self._stream
 | |
|         if stream is not None:
 | |
|             if exc is None:
 | |
|                 stream.feed_eof()
 | |
|             else:
 | |
|                 stream.set_exception(exc)
 | |
|         if not self._closed.done():
 | |
|             if exc is None:
 | |
|                 self._closed.set_result(None)
 | |
|             else:
 | |
|                 self._closed.set_exception(exc)
 | |
|         super().connection_lost(exc)
 | |
|         self._transport = None
 | |
| 
 | |
|     def data_received(self, data):
 | |
|         stream = self._stream
 | |
|         if stream is not None:
 | |
|             stream.feed_data(data)
 | |
| 
 | |
|     def eof_received(self):
 | |
|         stream = self._stream
 | |
|         if stream is not None:
 | |
|             stream.feed_eof()
 | |
|         if self._over_ssl:
 | |
|             # Prevent a warning in SSLProtocol.eof_received:
 | |
|             # "returning true from eof_received()
 | |
|             # has no effect when using ssl"
 | |
|             return False
 | |
|         return True
 | |
| 
 | |
|     def _get_close_waiter(self, stream):
 | |
|         return self._closed
 | |
| 
 | |
|     def __del__(self):
 | |
|         # Prevent reports about unhandled exceptions.
 | |
|         # Better than self._closed._log_traceback = False hack
 | |
|         closed = self._get_close_waiter(self._stream)
 | |
|         if closed.done() and not closed.cancelled():
 | |
|             closed.exception()
 | |
| 
 | |
| 
 | |
| class _StreamProtocol(_BaseStreamProtocol):
 | |
|     _source_traceback = None
 | |
| 
 | |
|     def __init__(self, stream, loop=None,
 | |
|                  *, _asyncio_internal=False):
 | |
|         super().__init__(loop=loop, _asyncio_internal=_asyncio_internal)
 | |
|         self._source_traceback = stream._source_traceback
 | |
|         self._stream_wr = weakref.ref(stream, self._on_gc)
 | |
|         self._reject_connection = False
 | |
| 
 | |
|     def _on_gc(self, wr):
 | |
|         transport = self._transport
 | |
|         if transport is not None:
 | |
|             # connection_made was called
 | |
|             context = {
 | |
|                 'message': ('An open stream object is being garbage '
 | |
|                             'collected; call "stream.close()" explicitly.')
 | |
|             }
 | |
|             if self._source_traceback:
 | |
|                 context['source_traceback'] = self._source_traceback
 | |
|             self._loop.call_exception_handler(context)
 | |
|             transport.abort()
 | |
|         else:
 | |
|             self._reject_connection = True
 | |
|         self._stream_wr = None
 | |
| 
 | |
|     @property
 | |
|     def _stream(self):
 | |
|         if self._stream_wr is None:
 | |
|             return None
 | |
|         return self._stream_wr()
 | |
| 
 | |
|     def connection_made(self, transport):
 | |
|         if self._reject_connection:
 | |
|             context = {
 | |
|                 'message': ('An open stream was garbage collected prior to '
 | |
|                             'establishing network connection; '
 | |
|                             'call "stream.close()" explicitly.')
 | |
|             }
 | |
|             if self._source_traceback:
 | |
|                 context['source_traceback'] = self._source_traceback
 | |
|             self._loop.call_exception_handler(context)
 | |
|             transport.abort()
 | |
|             return
 | |
|         super().connection_made(transport)
 | |
|         stream = self._stream
 | |
|         if stream is None:
 | |
|             return
 | |
|         stream.set_transport(transport)
 | |
|         stream._protocol = self
 | |
| 
 | |
|     def connection_lost(self, exc):
 | |
|         super().connection_lost(exc)
 | |
|         self._stream_wr = None
 | |
| 
 | |
| 
 | |
| class _ServerStreamProtocol(_BaseStreamProtocol):
 | |
|     def __init__(self, server, limit, client_connected_cb, loop=None,
 | |
|                  *, _asyncio_internal=False):
 | |
|         super().__init__(loop=loop, _asyncio_internal=_asyncio_internal)
 | |
|         assert self._closed
 | |
|         self._client_connected_cb = client_connected_cb
 | |
|         self._limit = limit
 | |
|         self._server = server
 | |
|         self._task = None
 | |
| 
 | |
|     def connection_made(self, transport):
 | |
|         super().connection_made(transport)
 | |
|         stream = Stream(mode=StreamMode.READWRITE,
 | |
|                         transport=transport,
 | |
|                         protocol=self,
 | |
|                         limit=self._limit,
 | |
|                         loop=self._loop,
 | |
|                         is_server_side=True,
 | |
|                         _asyncio_internal=True)
 | |
|         self._stream = stream
 | |
|         # If self._client_connected_cb(self._stream) fails
 | |
|         # the exception is logged by transport
 | |
|         self._task = self._loop.create_task(
 | |
|             self._client_connected_cb(self._stream))
 | |
|         self._server._attach(stream, self._task)
 | |
| 
 | |
|     def connection_lost(self, exc):
 | |
|         super().connection_lost(exc)
 | |
|         self._server._detach(self._stream, self._task)
 | |
|         self._stream = None
 | |
| 
 | |
| 
 | |
| class _OptionalAwait:
 | |
|     # The class doesn't create a coroutine
 | |
|     # if not awaited
 | |
|     # It prevents "coroutine is never awaited" message
 | |
| 
 | |
|     __slots___ = ('_method',)
 | |
| 
 | |
|     def __init__(self, method):
 | |
|         self._method = method
 | |
| 
 | |
|     def __await__(self):
 | |
|         return self._method().__await__()
 | |
| 
 | |
| 
 | |
| class Stream:
 | |
|     """Wraps a Transport.
 | |
| 
 | |
|     This exposes write(), writelines(), [can_]write_eof(),
 | |
|     get_extra_info() and close().  It adds drain() which returns an
 | |
|     optional Future on which you can wait for flow control.  It also
 | |
|     adds a transport property which references the Transport
 | |
|     directly.
 | |
|     """
 | |
| 
 | |
|     _source_traceback = None
 | |
| 
 | |
|     def __init__(self, mode, *,
 | |
|                  transport=None,
 | |
|                  protocol=None,
 | |
|                  loop=None,
 | |
|                  limit=_DEFAULT_LIMIT,
 | |
|                  is_server_side=False,
 | |
|                  _asyncio_internal=False):
 | |
|         if not _asyncio_internal:
 | |
|             warnings.warn(f"{self.__class__} should be instaniated "
 | |
|                           "by asyncio internals only, "
 | |
|                           "please avoid its creation from user code",
 | |
|                           DeprecationWarning)
 | |
|         self._mode = mode
 | |
|         self._transport = transport
 | |
|         self._protocol = protocol
 | |
|         self._is_server_side = is_server_side
 | |
| 
 | |
|         # The line length limit is  a security feature;
 | |
|         # it also doubles as half the buffer limit.
 | |
| 
 | |
|         if limit <= 0:
 | |
|             raise ValueError('Limit cannot be <= 0')
 | |
| 
 | |
|         self._limit = limit
 | |
|         if loop is None:
 | |
|             self._loop = events.get_event_loop()
 | |
|         else:
 | |
|             self._loop = loop
 | |
|         self._buffer = bytearray()
 | |
|         self._eof = False    # Whether we're done.
 | |
|         self._waiter = None  # A future used by _wait_for_data()
 | |
|         self._exception = None
 | |
|         self._paused = False
 | |
|         self._complete_fut = self._loop.create_future()
 | |
|         self._complete_fut.set_result(None)
 | |
| 
 | |
|         if self._loop.get_debug():
 | |
|             self._source_traceback = format_helpers.extract_stack(
 | |
|                 sys._getframe(1))
 | |
| 
 | |
|     def __repr__(self):
 | |
|         info = [self.__class__.__name__]
 | |
|         info.append(f'mode={self._mode}')
 | |
|         if self._buffer:
 | |
|             info.append(f'{len(self._buffer)} bytes')
 | |
|         if self._eof:
 | |
|             info.append('eof')
 | |
|         if self._limit != _DEFAULT_LIMIT:
 | |
|             info.append(f'limit={self._limit}')
 | |
|         if self._waiter:
 | |
|             info.append(f'waiter={self._waiter!r}')
 | |
|         if self._exception:
 | |
|             info.append(f'exception={self._exception!r}')
 | |
|         if self._transport:
 | |
|             info.append(f'transport={self._transport!r}')
 | |
|         if self._paused:
 | |
|             info.append('paused')
 | |
|         return '<{}>'.format(' '.join(info))
 | |
| 
 | |
|     @property
 | |
|     def mode(self):
 | |
|         return self._mode
 | |
| 
 | |
|     def is_server_side(self):
 | |
|         return self._is_server_side
 | |
| 
 | |
|     @property
 | |
|     def transport(self):
 | |
|         return self._transport
 | |
| 
 | |
|     def write(self, data):
 | |
|         _ensure_can_write(self._mode)
 | |
|         self._transport.write(data)
 | |
|         return self._fast_drain()
 | |
| 
 | |
|     def writelines(self, data):
 | |
|         _ensure_can_write(self._mode)
 | |
|         self._transport.writelines(data)
 | |
|         return self._fast_drain()
 | |
| 
 | |
|     def _fast_drain(self):
 | |
|         # The helper tries to use fast-path to return already existing
 | |
|         # complete future object if underlying transport is not paused
 | |
|         #and actual waiting for writing resume is not needed
 | |
|         exc = self.exception()
 | |
|         if exc is not None:
 | |
|             fut = self._loop.create_future()
 | |
|             fut.set_exception(exc)
 | |
|             return fut
 | |
|         if not self._transport.is_closing():
 | |
|             if self._protocol._connection_lost:
 | |
|                 fut = self._loop.create_future()
 | |
|                 fut.set_exception(ConnectionResetError('Connection lost'))
 | |
|                 return fut
 | |
|             if not self._protocol._paused:
 | |
|                 # fast path, the stream is not paused
 | |
|                 # no need to wait for resume signal
 | |
|                 return self._complete_fut
 | |
|         return _OptionalAwait(self.drain)
 | |
| 
 | |
|     def write_eof(self):
 | |
|         _ensure_can_write(self._mode)
 | |
|         return self._transport.write_eof()
 | |
| 
 | |
|     def can_write_eof(self):
 | |
|         if not self._mode.is_write():
 | |
|             return False
 | |
|         return self._transport.can_write_eof()
 | |
| 
 | |
|     def close(self):
 | |
|         self._transport.close()
 | |
|         return _OptionalAwait(self.wait_closed)
 | |
| 
 | |
|     def is_closing(self):
 | |
|         return self._transport.is_closing()
 | |
| 
 | |
|     async def abort(self):
 | |
|         self._transport.abort()
 | |
|         await self.wait_closed()
 | |
| 
 | |
|     async def wait_closed(self):
 | |
|         await self._protocol._get_close_waiter(self)
 | |
| 
 | |
|     def get_extra_info(self, name, default=None):
 | |
|         return self._transport.get_extra_info(name, default)
 | |
| 
 | |
|     async def drain(self):
 | |
|         """Flush the write buffer.
 | |
| 
 | |
|         The intended use is to write
 | |
| 
 | |
|           w.write(data)
 | |
|           await w.drain()
 | |
|         """
 | |
|         _ensure_can_write(self._mode)
 | |
|         exc = self.exception()
 | |
|         if exc is not None:
 | |
|             raise exc
 | |
|         if self._transport.is_closing():
 | |
|             # Wait for protocol.connection_lost() call
 | |
|             # Raise connection closing error if any,
 | |
|             # ConnectionResetError otherwise
 | |
|             await tasks.sleep(0)
 | |
|         await self._protocol._drain_helper()
 | |
| 
 | |
|     async def sendfile(self, file, offset=0, count=None, *, fallback=True):
 | |
|         await self.drain()  # check for stream mode and exceptions
 | |
|         return await self._loop.sendfile(self._transport, file,
 | |
|                                          offset, count, fallback=fallback)
 | |
| 
 | |
|     async def start_tls(self, sslcontext, *,
 | |
|                         server_hostname=None,
 | |
|                         ssl_handshake_timeout=None):
 | |
|         await self.drain()  # check for stream mode and exceptions
 | |
|         transport = await self._loop.start_tls(
 | |
|             self._transport, self._protocol, sslcontext,
 | |
|             server_side=self._is_server_side,
 | |
|             server_hostname=server_hostname,
 | |
|             ssl_handshake_timeout=ssl_handshake_timeout)
 | |
|         self._transport = transport
 | |
|         self._protocol._transport = transport
 | |
|         self._protocol._over_ssl = True
 | |
| 
 | |
|     def exception(self):
 | |
|         return self._exception
 | |
| 
 | |
|     def set_exception(self, exc):
 | |
|         self._exception = exc
 | |
| 
 | |
|         waiter = self._waiter
 | |
|         if waiter is not None:
 | |
|             self._waiter = None
 | |
|             if not waiter.cancelled():
 | |
|                 waiter.set_exception(exc)
 | |
| 
 | |
|     def _wakeup_waiter(self):
 | |
|         """Wakeup read*() functions waiting for data or EOF."""
 | |
|         waiter = self._waiter
 | |
|         if waiter is not None:
 | |
|             self._waiter = None
 | |
|             if not waiter.cancelled():
 | |
|                 waiter.set_result(None)
 | |
| 
 | |
|     def set_transport(self, transport):
 | |
|         if transport is self._transport:
 | |
|             return
 | |
|         assert self._transport is None, 'Transport already set'
 | |
|         self._transport = transport
 | |
| 
 | |
|     def _maybe_resume_transport(self):
 | |
|         if self._paused and len(self._buffer) <= self._limit:
 | |
|             self._paused = False
 | |
|             self._transport.resume_reading()
 | |
| 
 | |
|     def feed_eof(self):
 | |
|         self._eof = True
 | |
|         self._wakeup_waiter()
 | |
| 
 | |
|     def at_eof(self):
 | |
|         """Return True if the buffer is empty and 'feed_eof' was called."""
 | |
|         return self._eof and not self._buffer
 | |
| 
 | |
|     def feed_data(self, data):
 | |
|         _ensure_can_read(self._mode)
 | |
|         assert not self._eof, 'feed_data after feed_eof'
 | |
| 
 | |
|         if not data:
 | |
|             return
 | |
| 
 | |
|         self._buffer.extend(data)
 | |
|         self._wakeup_waiter()
 | |
| 
 | |
|         if (self._transport is not None and
 | |
|                 not self._paused and
 | |
|                 len(self._buffer) > 2 * self._limit):
 | |
|             try:
 | |
|                 self._transport.pause_reading()
 | |
|             except NotImplementedError:
 | |
|                 # The transport can't be paused.
 | |
|                 # We'll just have to buffer all data.
 | |
|                 # Forget the transport so we don't keep trying.
 | |
|                 self._transport = None
 | |
|             else:
 | |
|                 self._paused = True
 | |
| 
 | |
|     async def _wait_for_data(self, func_name):
 | |
|         """Wait until feed_data() or feed_eof() is called.
 | |
| 
 | |
|         If stream was paused, automatically resume it.
 | |
|         """
 | |
|         # StreamReader uses a future to link the protocol feed_data() method
 | |
|         # to a read coroutine. Running two read coroutines at the same time
 | |
|         # would have an unexpected behaviour. It would not possible to know
 | |
|         # which coroutine would get the next data.
 | |
|         if self._waiter is not None:
 | |
|             raise RuntimeError(
 | |
|                 f'{func_name}() called while another coroutine is '
 | |
|                 f'already waiting for incoming data')
 | |
| 
 | |
|         assert not self._eof, '_wait_for_data after EOF'
 | |
| 
 | |
|         # Waiting for data while paused will make deadlock, so prevent it.
 | |
|         # This is essential for readexactly(n) for case when n > self._limit.
 | |
|         if self._paused:
 | |
|             self._paused = False
 | |
|             self._transport.resume_reading()
 | |
| 
 | |
|         self._waiter = self._loop.create_future()
 | |
|         try:
 | |
|             await self._waiter
 | |
|         finally:
 | |
|             self._waiter = None
 | |
| 
 | |
|     async def readline(self):
 | |
|         """Read chunk of data from the stream until newline (b'\n') is found.
 | |
| 
 | |
|         On success, return chunk that ends with newline. If only partial
 | |
|         line can be read due to EOF, return incomplete line without
 | |
|         terminating newline. When EOF was reached while no bytes read, empty
 | |
|         bytes object is returned.
 | |
| 
 | |
|         If limit is reached, ValueError will be raised. In that case, if
 | |
|         newline was found, complete line including newline will be removed
 | |
|         from internal buffer. Else, internal buffer will be cleared. Limit is
 | |
|         compared against part of the line without newline.
 | |
| 
 | |
|         If stream was paused, this function will automatically resume it if
 | |
|         needed.
 | |
|         """
 | |
|         _ensure_can_read(self._mode)
 | |
|         sep = b'\n'
 | |
|         seplen = len(sep)
 | |
|         try:
 | |
|             line = await self.readuntil(sep)
 | |
|         except exceptions.IncompleteReadError as e:
 | |
|             return e.partial
 | |
|         except exceptions.LimitOverrunError as e:
 | |
|             if self._buffer.startswith(sep, e.consumed):
 | |
|                 del self._buffer[:e.consumed + seplen]
 | |
|             else:
 | |
|                 self._buffer.clear()
 | |
|             self._maybe_resume_transport()
 | |
|             raise ValueError(e.args[0])
 | |
|         return line
 | |
| 
 | |
|     async def readuntil(self, separator=b'\n'):
 | |
|         """Read data from the stream until ``separator`` is found.
 | |
| 
 | |
|         On success, the data and separator will be removed from the
 | |
|         internal buffer (consumed). Returned data will include the
 | |
|         separator at the end.
 | |
| 
 | |
|         Configured stream limit is used to check result. Limit sets the
 | |
|         maximal length of data that can be returned, not counting the
 | |
|         separator.
 | |
| 
 | |
|         If an EOF occurs and the complete separator is still not found,
 | |
|         an IncompleteReadError exception will be raised, and the internal
 | |
|         buffer will be reset.  The IncompleteReadError.partial attribute
 | |
|         may contain the separator partially.
 | |
| 
 | |
|         If the data cannot be read because of over limit, a
 | |
|         LimitOverrunError exception  will be raised, and the data
 | |
|         will be left in the internal buffer, so it can be read again.
 | |
|         """
 | |
|         _ensure_can_read(self._mode)
 | |
|         seplen = len(separator)
 | |
|         if seplen == 0:
 | |
|             raise ValueError('Separator should be at least one-byte string')
 | |
| 
 | |
|         if self._exception is not None:
 | |
|             raise self._exception
 | |
| 
 | |
|         # Consume whole buffer except last bytes, which length is
 | |
|         # one less than seplen. Let's check corner cases with
 | |
|         # separator='SEPARATOR':
 | |
|         # * we have received almost complete separator (without last
 | |
|         #   byte). i.e buffer='some textSEPARATO'. In this case we
 | |
|         #   can safely consume len(separator) - 1 bytes.
 | |
|         # * last byte of buffer is first byte of separator, i.e.
 | |
|         #   buffer='abcdefghijklmnopqrS'. We may safely consume
 | |
|         #   everything except that last byte, but this require to
 | |
|         #   analyze bytes of buffer that match partial separator.
 | |
|         #   This is slow and/or require FSM. For this case our
 | |
|         #   implementation is not optimal, since require rescanning
 | |
|         #   of data that is known to not belong to separator. In
 | |
|         #   real world, separator will not be so long to notice
 | |
|         #   performance problems. Even when reading MIME-encoded
 | |
|         #   messages :)
 | |
| 
 | |
|         # `offset` is the number of bytes from the beginning of the buffer
 | |
|         # where there is no occurrence of `separator`.
 | |
|         offset = 0
 | |
| 
 | |
|         # Loop until we find `separator` in the buffer, exceed the buffer size,
 | |
|         # or an EOF has happened.
 | |
|         while True:
 | |
|             buflen = len(self._buffer)
 | |
| 
 | |
|             # Check if we now have enough data in the buffer for `separator` to
 | |
|             # fit.
 | |
|             if buflen - offset >= seplen:
 | |
|                 isep = self._buffer.find(separator, offset)
 | |
| 
 | |
|                 if isep != -1:
 | |
|                     # `separator` is in the buffer. `isep` will be used later
 | |
|                     # to retrieve the data.
 | |
|                     break
 | |
| 
 | |
|                 # see upper comment for explanation.
 | |
|                 offset = buflen + 1 - seplen
 | |
|                 if offset > self._limit:
 | |
|                     raise exceptions.LimitOverrunError(
 | |
|                         'Separator is not found, and chunk exceed the limit',
 | |
|                         offset)
 | |
| 
 | |
|             # Complete message (with full separator) may be present in buffer
 | |
|             # even when EOF flag is set. This may happen when the last chunk
 | |
|             # adds data which makes separator be found. That's why we check for
 | |
|             # EOF *ater* inspecting the buffer.
 | |
|             if self._eof:
 | |
|                 chunk = bytes(self._buffer)
 | |
|                 self._buffer.clear()
 | |
|                 raise exceptions.IncompleteReadError(chunk, None)
 | |
| 
 | |
|             # _wait_for_data() will resume reading if stream was paused.
 | |
|             await self._wait_for_data('readuntil')
 | |
| 
 | |
|         if isep > self._limit:
 | |
|             raise exceptions.LimitOverrunError(
 | |
|                 'Separator is found, but chunk is longer than limit', isep)
 | |
| 
 | |
|         chunk = self._buffer[:isep + seplen]
 | |
|         del self._buffer[:isep + seplen]
 | |
|         self._maybe_resume_transport()
 | |
|         return bytes(chunk)
 | |
| 
 | |
|     async def read(self, n=-1):
 | |
|         """Read up to `n` bytes from the stream.
 | |
| 
 | |
|         If n is not provided, or set to -1, read until EOF and return all read
 | |
|         bytes. If the EOF was received and the internal buffer is empty, return
 | |
|         an empty bytes object.
 | |
| 
 | |
|         If n is zero, return empty bytes object immediately.
 | |
| 
 | |
|         If n is positive, this function try to read `n` bytes, and may return
 | |
|         less or equal bytes than requested, but at least one byte. If EOF was
 | |
|         received before any byte is read, this function returns empty byte
 | |
|         object.
 | |
| 
 | |
|         Returned value is not limited with limit, configured at stream
 | |
|         creation.
 | |
| 
 | |
|         If stream was paused, this function will automatically resume it if
 | |
|         needed.
 | |
|         """
 | |
|         _ensure_can_read(self._mode)
 | |
| 
 | |
|         if self._exception is not None:
 | |
|             raise self._exception
 | |
| 
 | |
|         if n == 0:
 | |
|             return b''
 | |
| 
 | |
|         if n < 0:
 | |
|             # This used to just loop creating a new waiter hoping to
 | |
|             # collect everything in self._buffer, but that would
 | |
|             # deadlock if the subprocess sends more than self.limit
 | |
|             # bytes.  So just call self.read(self._limit) until EOF.
 | |
|             blocks = []
 | |
|             while True:
 | |
|                 block = await self.read(self._limit)
 | |
|                 if not block:
 | |
|                     break
 | |
|                 blocks.append(block)
 | |
|             return b''.join(blocks)
 | |
| 
 | |
|         if not self._buffer and not self._eof:
 | |
|             await self._wait_for_data('read')
 | |
| 
 | |
|         # This will work right even if buffer is less than n bytes
 | |
|         data = bytes(self._buffer[:n])
 | |
|         del self._buffer[:n]
 | |
| 
 | |
|         self._maybe_resume_transport()
 | |
|         return data
 | |
| 
 | |
|     async def readexactly(self, n):
 | |
|         """Read exactly `n` bytes.
 | |
| 
 | |
|         Raise an IncompleteReadError if EOF is reached before `n` bytes can be
 | |
|         read. The IncompleteReadError.partial attribute of the exception will
 | |
|         contain the partial read bytes.
 | |
| 
 | |
|         if n is zero, return empty bytes object.
 | |
| 
 | |
|         Returned value is not limited with limit, configured at stream
 | |
|         creation.
 | |
| 
 | |
|         If stream was paused, this function will automatically resume it if
 | |
|         needed.
 | |
|         """
 | |
|         _ensure_can_read(self._mode)
 | |
|         if n < 0:
 | |
|             raise ValueError('readexactly size can not be less than zero')
 | |
| 
 | |
|         if self._exception is not None:
 | |
|             raise self._exception
 | |
| 
 | |
|         if n == 0:
 | |
|             return b''
 | |
| 
 | |
|         while len(self._buffer) < n:
 | |
|             if self._eof:
 | |
|                 incomplete = bytes(self._buffer)
 | |
|                 self._buffer.clear()
 | |
|                 raise exceptions.IncompleteReadError(incomplete, n)
 | |
| 
 | |
|             await self._wait_for_data('readexactly')
 | |
| 
 | |
|         if len(self._buffer) == n:
 | |
|             data = bytes(self._buffer)
 | |
|             self._buffer.clear()
 | |
|         else:
 | |
|             data = bytes(self._buffer[:n])
 | |
|             del self._buffer[:n]
 | |
|         self._maybe_resume_transport()
 | |
|         return data
 | |
| 
 | |
|     def __aiter__(self):
 | |
|         _ensure_can_read(self._mode)
 | |
|         return self
 | |
| 
 | |
|     async def __anext__(self):
 | |
|         val = await self.readline()
 | |
|         if val == b'':
 | |
|             raise StopAsyncIteration
 | |
|         return val
 | |
| 
 | |
|     async def __aenter__(self):
 | |
|         return self
 | |
| 
 | |
|     async def __aexit__(self, exc_type, exc_val, exc_tb):
 | |
|         await self.close()
 |