mirror of
				https://github.com/python/cpython.git
				synced 2025-11-04 11:49:12 +00:00 
			
		
		
		
	After some failures in AMD64 FreeBSD CURRENT Debug 3.x buildbots regarding tests in test_multiprocessing_spawn and after examining similar failures in test_socket, some errors in the calculation of ancillary data buffers were found in multiprocessing.reduction. CMSG_LEN() can often be used as the buffer size for recvmsg() to receive a single item of ancillary data, but RFC 3542 requires portable applications to use CMSG_SPACE() and thus include space for padding, even when the item will be the last in the buffer. The failures we experience are due to the usage of CMSG_LEN() instead of CMSG_SPACE().
		
			
				
	
	
		
			281 lines
		
	
	
	
		
			9.3 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			281 lines
		
	
	
	
		
			9.3 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
#
 | 
						|
# Module which deals with pickling of objects.
 | 
						|
#
 | 
						|
# multiprocessing/reduction.py
 | 
						|
#
 | 
						|
# Copyright (c) 2006-2008, R Oudkerk
 | 
						|
# Licensed to PSF under a Contributor Agreement.
 | 
						|
#
 | 
						|
 | 
						|
from abc import ABCMeta
 | 
						|
import copyreg
 | 
						|
import functools
 | 
						|
import io
 | 
						|
import os
 | 
						|
import pickle
 | 
						|
import socket
 | 
						|
import sys
 | 
						|
 | 
						|
from . import context
 | 
						|
 | 
						|
__all__ = ['send_handle', 'recv_handle', 'ForkingPickler', 'register', 'dump']
 | 
						|
 | 
						|
 | 
						|
HAVE_SEND_HANDLE = (sys.platform == 'win32' or
 | 
						|
                    (hasattr(socket, 'CMSG_LEN') and
 | 
						|
                     hasattr(socket, 'SCM_RIGHTS') and
 | 
						|
                     hasattr(socket.socket, 'sendmsg')))
 | 
						|
 | 
						|
#
 | 
						|
# Pickler subclass
 | 
						|
#
 | 
						|
 | 
						|
class ForkingPickler(pickle.Pickler):
 | 
						|
    '''Pickler subclass used by multiprocessing.'''
 | 
						|
    _extra_reducers = {}
 | 
						|
    _copyreg_dispatch_table = copyreg.dispatch_table
 | 
						|
 | 
						|
    def __init__(self, *args):
 | 
						|
        super().__init__(*args)
 | 
						|
        self.dispatch_table = self._copyreg_dispatch_table.copy()
 | 
						|
        self.dispatch_table.update(self._extra_reducers)
 | 
						|
 | 
						|
    @classmethod
 | 
						|
    def register(cls, type, reduce):
 | 
						|
        '''Register a reduce function for a type.'''
 | 
						|
        cls._extra_reducers[type] = reduce
 | 
						|
 | 
						|
    @classmethod
 | 
						|
    def dumps(cls, obj, protocol=None):
 | 
						|
        buf = io.BytesIO()
 | 
						|
        cls(buf, protocol).dump(obj)
 | 
						|
        return buf.getbuffer()
 | 
						|
 | 
						|
    loads = pickle.loads
 | 
						|
 | 
						|
register = ForkingPickler.register
 | 
						|
 | 
						|
def dump(obj, file, protocol=None):
 | 
						|
    '''Replacement for pickle.dump() using ForkingPickler.'''
 | 
						|
    ForkingPickler(file, protocol).dump(obj)
 | 
						|
 | 
						|
#
 | 
						|
# Platform specific definitions
 | 
						|
#
 | 
						|
 | 
						|
if sys.platform == 'win32':
 | 
						|
    # Windows
 | 
						|
    __all__ += ['DupHandle', 'duplicate', 'steal_handle']
 | 
						|
    import _winapi
 | 
						|
 | 
						|
    def duplicate(handle, target_process=None, inheritable=False,
 | 
						|
                  *, source_process=None):
 | 
						|
        '''Duplicate a handle.  (target_process is a handle not a pid!)'''
 | 
						|
        current_process = _winapi.GetCurrentProcess()
 | 
						|
        if source_process is None:
 | 
						|
            source_process = current_process
 | 
						|
        if target_process is None:
 | 
						|
            target_process = current_process
 | 
						|
        return _winapi.DuplicateHandle(
 | 
						|
            source_process, handle, target_process,
 | 
						|
            0, inheritable, _winapi.DUPLICATE_SAME_ACCESS)
 | 
						|
 | 
						|
    def steal_handle(source_pid, handle):
 | 
						|
        '''Steal a handle from process identified by source_pid.'''
 | 
						|
        source_process_handle = _winapi.OpenProcess(
 | 
						|
            _winapi.PROCESS_DUP_HANDLE, False, source_pid)
 | 
						|
        try:
 | 
						|
            return _winapi.DuplicateHandle(
 | 
						|
                source_process_handle, handle,
 | 
						|
                _winapi.GetCurrentProcess(), 0, False,
 | 
						|
                _winapi.DUPLICATE_SAME_ACCESS | _winapi.DUPLICATE_CLOSE_SOURCE)
 | 
						|
        finally:
 | 
						|
            _winapi.CloseHandle(source_process_handle)
 | 
						|
 | 
						|
    def send_handle(conn, handle, destination_pid):
 | 
						|
        '''Send a handle over a local connection.'''
 | 
						|
        dh = DupHandle(handle, _winapi.DUPLICATE_SAME_ACCESS, destination_pid)
 | 
						|
        conn.send(dh)
 | 
						|
 | 
						|
    def recv_handle(conn):
 | 
						|
        '''Receive a handle over a local connection.'''
 | 
						|
        return conn.recv().detach()
 | 
						|
 | 
						|
    class DupHandle(object):
 | 
						|
        '''Picklable wrapper for a handle.'''
 | 
						|
        def __init__(self, handle, access, pid=None):
 | 
						|
            if pid is None:
 | 
						|
                # We just duplicate the handle in the current process and
 | 
						|
                # let the receiving process steal the handle.
 | 
						|
                pid = os.getpid()
 | 
						|
            proc = _winapi.OpenProcess(_winapi.PROCESS_DUP_HANDLE, False, pid)
 | 
						|
            try:
 | 
						|
                self._handle = _winapi.DuplicateHandle(
 | 
						|
                    _winapi.GetCurrentProcess(),
 | 
						|
                    handle, proc, access, False, 0)
 | 
						|
            finally:
 | 
						|
                _winapi.CloseHandle(proc)
 | 
						|
            self._access = access
 | 
						|
            self._pid = pid
 | 
						|
 | 
						|
        def detach(self):
 | 
						|
            '''Get the handle.  This should only be called once.'''
 | 
						|
            # retrieve handle from process which currently owns it
 | 
						|
            if self._pid == os.getpid():
 | 
						|
                # The handle has already been duplicated for this process.
 | 
						|
                return self._handle
 | 
						|
            # We must steal the handle from the process whose pid is self._pid.
 | 
						|
            proc = _winapi.OpenProcess(_winapi.PROCESS_DUP_HANDLE, False,
 | 
						|
                                       self._pid)
 | 
						|
            try:
 | 
						|
                return _winapi.DuplicateHandle(
 | 
						|
                    proc, self._handle, _winapi.GetCurrentProcess(),
 | 
						|
                    self._access, False, _winapi.DUPLICATE_CLOSE_SOURCE)
 | 
						|
            finally:
 | 
						|
                _winapi.CloseHandle(proc)
 | 
						|
 | 
						|
else:
 | 
						|
    # Unix
 | 
						|
    __all__ += ['DupFd', 'sendfds', 'recvfds']
 | 
						|
    import array
 | 
						|
 | 
						|
    # On MacOSX we should acknowledge receipt of fds -- see Issue14669
 | 
						|
    ACKNOWLEDGE = sys.platform == 'darwin'
 | 
						|
 | 
						|
    def sendfds(sock, fds):
 | 
						|
        '''Send an array of fds over an AF_UNIX socket.'''
 | 
						|
        fds = array.array('i', fds)
 | 
						|
        msg = bytes([len(fds) % 256])
 | 
						|
        sock.sendmsg([msg], [(socket.SOL_SOCKET, socket.SCM_RIGHTS, fds)])
 | 
						|
        if ACKNOWLEDGE and sock.recv(1) != b'A':
 | 
						|
            raise RuntimeError('did not receive acknowledgement of fd')
 | 
						|
 | 
						|
    def recvfds(sock, size):
 | 
						|
        '''Receive an array of fds over an AF_UNIX socket.'''
 | 
						|
        a = array.array('i')
 | 
						|
        bytes_size = a.itemsize * size
 | 
						|
        msg, ancdata, flags, addr = sock.recvmsg(1, socket.CMSG_SPACE(bytes_size))
 | 
						|
        if not msg and not ancdata:
 | 
						|
            raise EOFError
 | 
						|
        try:
 | 
						|
            if ACKNOWLEDGE:
 | 
						|
                sock.send(b'A')
 | 
						|
            if len(ancdata) != 1:
 | 
						|
                raise RuntimeError('received %d items of ancdata' %
 | 
						|
                                   len(ancdata))
 | 
						|
            cmsg_level, cmsg_type, cmsg_data = ancdata[0]
 | 
						|
            if (cmsg_level == socket.SOL_SOCKET and
 | 
						|
                cmsg_type == socket.SCM_RIGHTS):
 | 
						|
                if len(cmsg_data) % a.itemsize != 0:
 | 
						|
                    raise ValueError
 | 
						|
                a.frombytes(cmsg_data)
 | 
						|
                if len(a) % 256 != msg[0]:
 | 
						|
                    raise AssertionError(
 | 
						|
                        "Len is {0:n} but msg[0] is {1!r}".format(
 | 
						|
                            len(a), msg[0]))
 | 
						|
                return list(a)
 | 
						|
        except (ValueError, IndexError):
 | 
						|
            pass
 | 
						|
        raise RuntimeError('Invalid data received')
 | 
						|
 | 
						|
    def send_handle(conn, handle, destination_pid):
 | 
						|
        '''Send a handle over a local connection.'''
 | 
						|
        with socket.fromfd(conn.fileno(), socket.AF_UNIX, socket.SOCK_STREAM) as s:
 | 
						|
            sendfds(s, [handle])
 | 
						|
 | 
						|
    def recv_handle(conn):
 | 
						|
        '''Receive a handle over a local connection.'''
 | 
						|
        with socket.fromfd(conn.fileno(), socket.AF_UNIX, socket.SOCK_STREAM) as s:
 | 
						|
            return recvfds(s, 1)[0]
 | 
						|
 | 
						|
    def DupFd(fd):
 | 
						|
        '''Return a wrapper for an fd.'''
 | 
						|
        popen_obj = context.get_spawning_popen()
 | 
						|
        if popen_obj is not None:
 | 
						|
            return popen_obj.DupFd(popen_obj.duplicate_for_child(fd))
 | 
						|
        elif HAVE_SEND_HANDLE:
 | 
						|
            from . import resource_sharer
 | 
						|
            return resource_sharer.DupFd(fd)
 | 
						|
        else:
 | 
						|
            raise ValueError('SCM_RIGHTS appears not to be available')
 | 
						|
 | 
						|
#
 | 
						|
# Try making some callable types picklable
 | 
						|
#
 | 
						|
 | 
						|
def _reduce_method(m):
 | 
						|
    if m.__self__ is None:
 | 
						|
        return getattr, (m.__class__, m.__func__.__name__)
 | 
						|
    else:
 | 
						|
        return getattr, (m.__self__, m.__func__.__name__)
 | 
						|
class _C:
 | 
						|
    def f(self):
 | 
						|
        pass
 | 
						|
register(type(_C().f), _reduce_method)
 | 
						|
 | 
						|
 | 
						|
def _reduce_method_descriptor(m):
 | 
						|
    return getattr, (m.__objclass__, m.__name__)
 | 
						|
register(type(list.append), _reduce_method_descriptor)
 | 
						|
register(type(int.__add__), _reduce_method_descriptor)
 | 
						|
 | 
						|
 | 
						|
def _reduce_partial(p):
 | 
						|
    return _rebuild_partial, (p.func, p.args, p.keywords or {})
 | 
						|
def _rebuild_partial(func, args, keywords):
 | 
						|
    return functools.partial(func, *args, **keywords)
 | 
						|
register(functools.partial, _reduce_partial)
 | 
						|
 | 
						|
#
 | 
						|
# Make sockets picklable
 | 
						|
#
 | 
						|
 | 
						|
if sys.platform == 'win32':
 | 
						|
    def _reduce_socket(s):
 | 
						|
        from .resource_sharer import DupSocket
 | 
						|
        return _rebuild_socket, (DupSocket(s),)
 | 
						|
    def _rebuild_socket(ds):
 | 
						|
        return ds.detach()
 | 
						|
    register(socket.socket, _reduce_socket)
 | 
						|
 | 
						|
else:
 | 
						|
    def _reduce_socket(s):
 | 
						|
        df = DupFd(s.fileno())
 | 
						|
        return _rebuild_socket, (df, s.family, s.type, s.proto)
 | 
						|
    def _rebuild_socket(df, family, type, proto):
 | 
						|
        fd = df.detach()
 | 
						|
        return socket.socket(family, type, proto, fileno=fd)
 | 
						|
    register(socket.socket, _reduce_socket)
 | 
						|
 | 
						|
 | 
						|
class AbstractReducer(metaclass=ABCMeta):
 | 
						|
    '''Abstract base class for use in implementing a Reduction class
 | 
						|
    suitable for use in replacing the standard reduction mechanism
 | 
						|
    used in multiprocessing.'''
 | 
						|
    ForkingPickler = ForkingPickler
 | 
						|
    register = register
 | 
						|
    dump = dump
 | 
						|
    send_handle = send_handle
 | 
						|
    recv_handle = recv_handle
 | 
						|
 | 
						|
    if sys.platform == 'win32':
 | 
						|
        steal_handle = steal_handle
 | 
						|
        duplicate = duplicate
 | 
						|
        DupHandle = DupHandle
 | 
						|
    else:
 | 
						|
        sendfds = sendfds
 | 
						|
        recvfds = recvfds
 | 
						|
        DupFd = DupFd
 | 
						|
 | 
						|
    _reduce_method = _reduce_method
 | 
						|
    _reduce_method_descriptor = _reduce_method_descriptor
 | 
						|
    _rebuild_partial = _rebuild_partial
 | 
						|
    _reduce_socket = _reduce_socket
 | 
						|
    _rebuild_socket = _rebuild_socket
 | 
						|
 | 
						|
    def __init__(self, *args):
 | 
						|
        register(type(_C().f), _reduce_method)
 | 
						|
        register(type(list.append), _reduce_method_descriptor)
 | 
						|
        register(type(int.__add__), _reduce_method_descriptor)
 | 
						|
        register(functools.partial, _reduce_partial)
 | 
						|
        register(socket.socket, _reduce_socket)
 |