gh-121313: Limit the reading size from pipes to their default buffer size on POSIX systems (GH-121315)

See https://github.com/python/cpython/issues/121313 for analysis, but this greatly reduces memory overallocation and overhead when multiprocessing is sending non-small data over its pipes between processes.
This commit is contained in:
Alexander P. 2024-08-31 07:57:22 +02:00 committed by GitHub
parent 1ce9e58803
commit 74bfb53e3a
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
2 changed files with 19 additions and 3 deletions

View file

@ -11,13 +11,14 @@ __all__ = [ 'Client', 'Listener', 'Pipe', 'wait' ]
import errno import errno
import io import io
import itertools
import os import os
import stat
import sys import sys
import socket import socket
import struct import struct
import time
import tempfile import tempfile
import itertools import time
from . import util from . import util
@ -360,6 +361,11 @@ if _winapi:
f.write(ov.getbuffer()) f.write(ov.getbuffer())
return f return f
"""
The default size of a pipe on Linux systems is 16 times the base page size:
https://man7.org/linux/man-pages/man7/pipe.7.html
"""
PAGES_PER_PIPE = 16
class Connection(_ConnectionBase): class Connection(_ConnectionBase):
""" """
@ -372,11 +378,14 @@ class Connection(_ConnectionBase):
_close(self._handle) _close(self._handle)
_write = _multiprocessing.send _write = _multiprocessing.send
_read = _multiprocessing.recv _read = _multiprocessing.recv
_default_pipe_size = 0
else: else:
def _close(self, _close=os.close): def _close(self, _close=os.close):
_close(self._handle) _close(self._handle)
_write = os.write _write = os.write
_read = os.read _read = os.read
_base_page_size = os.sysconf(os.sysconf_names['SC_PAGESIZE'])
_default_pipe_size = _base_page_size * PAGES_PER_PIPE
def _send(self, buf, write=_write): def _send(self, buf, write=_write):
remaining = len(buf) remaining = len(buf)
@ -391,8 +400,14 @@ class Connection(_ConnectionBase):
buf = io.BytesIO() buf = io.BytesIO()
handle = self._handle handle = self._handle
remaining = size remaining = size
is_pipe = False
if size > self._default_pipe_size > 0:
mode = os.fstat(handle).st_mode
is_pipe = stat.S_ISFIFO(mode)
limit = self._default_pipe_size if is_pipe else remaining
while remaining > 0: while remaining > 0:
chunk = read(handle, remaining) to_read = min(limit, remaining)
chunk = read(handle, to_read)
n = len(chunk) n = len(chunk)
if n == 0: if n == 0:
if remaining == size: if remaining == size:

View file

@ -0,0 +1 @@
Limit reading size in multiprocessing connection._recv for pipes to default pipe size of 16 times base page size, in order to avoid memory overallocation and unnecessary memory management system calls.