mirror of
				https://github.com/python/cpython.git
				synced 2025-11-04 03:44:55 +00:00 
			
		
		
		
	
		
			
				
	
	
		
			356 lines
		
	
	
	
		
			14 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			356 lines
		
	
	
	
		
			14 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
"""functools.py - Tools for working with functions and callable objects
 | 
						|
"""
 | 
						|
# Python module wrapper for _functools C module
 | 
						|
# to allow utilities written in Python to be added
 | 
						|
# to the functools module.
 | 
						|
# Written by Nick Coghlan <ncoghlan at gmail.com>
 | 
						|
# and Raymond Hettinger <python at rcn.com>
 | 
						|
#   Copyright (C) 2006-2010 Python Software Foundation.
 | 
						|
# See C source code for _functools credits/copyright
 | 
						|
 | 
						|
__all__ = ['update_wrapper', 'wraps', 'WRAPPER_ASSIGNMENTS', 'WRAPPER_UPDATES',
 | 
						|
           'total_ordering', 'cmp_to_key', 'lru_cache', 'reduce', 'partial']
 | 
						|
 | 
						|
try:
 | 
						|
    from _functools import reduce
 | 
						|
except ImportError:
 | 
						|
    pass
 | 
						|
from collections import namedtuple
 | 
						|
try:
 | 
						|
    from _thread import RLock
 | 
						|
except:
 | 
						|
    class RLock:
 | 
						|
        'Dummy reentrant lock for builds without threads'
 | 
						|
        def __enter__(self): pass
 | 
						|
        def __exit__(self, exctype, excinst, exctb): pass
 | 
						|
 | 
						|
 | 
						|
################################################################################
 | 
						|
### update_wrapper() and wraps() decorator
 | 
						|
################################################################################
 | 
						|
 | 
						|
# update_wrapper() and wraps() are tools to help write
 | 
						|
# wrapper functions that can handle naive introspection
 | 
						|
 | 
						|
WRAPPER_ASSIGNMENTS = ('__module__', '__name__', '__qualname__', '__doc__',
 | 
						|
                       '__annotations__')
 | 
						|
WRAPPER_UPDATES = ('__dict__',)
 | 
						|
def update_wrapper(wrapper,
 | 
						|
                   wrapped,
 | 
						|
                   assigned = WRAPPER_ASSIGNMENTS,
 | 
						|
                   updated = WRAPPER_UPDATES):
 | 
						|
    """Update a wrapper function to look like the wrapped function
 | 
						|
 | 
						|
       wrapper is the function to be updated
 | 
						|
       wrapped is the original function
 | 
						|
       assigned is a tuple naming the attributes assigned directly
 | 
						|
       from the wrapped function to the wrapper function (defaults to
 | 
						|
       functools.WRAPPER_ASSIGNMENTS)
 | 
						|
       updated is a tuple naming the attributes of the wrapper that
 | 
						|
       are updated with the corresponding attribute from the wrapped
 | 
						|
       function (defaults to functools.WRAPPER_UPDATES)
 | 
						|
    """
 | 
						|
    wrapper.__wrapped__ = wrapped
 | 
						|
    for attr in assigned:
 | 
						|
        try:
 | 
						|
            value = getattr(wrapped, attr)
 | 
						|
        except AttributeError:
 | 
						|
            pass
 | 
						|
        else:
 | 
						|
            setattr(wrapper, attr, value)
 | 
						|
    for attr in updated:
 | 
						|
        getattr(wrapper, attr).update(getattr(wrapped, attr, {}))
 | 
						|
    # Return the wrapper so this can be used as a decorator via partial()
 | 
						|
    return wrapper
 | 
						|
 | 
						|
def wraps(wrapped,
 | 
						|
          assigned = WRAPPER_ASSIGNMENTS,
 | 
						|
          updated = WRAPPER_UPDATES):
 | 
						|
    """Decorator factory to apply update_wrapper() to a wrapper function
 | 
						|
 | 
						|
       Returns a decorator that invokes update_wrapper() with the decorated
 | 
						|
       function as the wrapper argument and the arguments to wraps() as the
 | 
						|
       remaining arguments. Default arguments are as for update_wrapper().
 | 
						|
       This is a convenience function to simplify applying partial() to
 | 
						|
       update_wrapper().
 | 
						|
    """
 | 
						|
    return partial(update_wrapper, wrapped=wrapped,
 | 
						|
                   assigned=assigned, updated=updated)
 | 
						|
 | 
						|
 | 
						|
################################################################################
 | 
						|
### total_ordering class decorator
 | 
						|
################################################################################
 | 
						|
 | 
						|
def total_ordering(cls):
 | 
						|
    """Class decorator that fills in missing ordering methods"""
 | 
						|
    convert = {
 | 
						|
        '__lt__': [('__gt__', lambda self, other: not (self < other or self == other)),
 | 
						|
                   ('__le__', lambda self, other: self < other or self == other),
 | 
						|
                   ('__ge__', lambda self, other: not self < other)],
 | 
						|
        '__le__': [('__ge__', lambda self, other: not self <= other or self == other),
 | 
						|
                   ('__lt__', lambda self, other: self <= other and not self == other),
 | 
						|
                   ('__gt__', lambda self, other: not self <= other)],
 | 
						|
        '__gt__': [('__lt__', lambda self, other: not (self > other or self == other)),
 | 
						|
                   ('__ge__', lambda self, other: self > other or self == other),
 | 
						|
                   ('__le__', lambda self, other: not self > other)],
 | 
						|
        '__ge__': [('__le__', lambda self, other: (not self >= other) or self == other),
 | 
						|
                   ('__gt__', lambda self, other: self >= other and not self == other),
 | 
						|
                   ('__lt__', lambda self, other: not self >= other)]
 | 
						|
    }
 | 
						|
    # Find user-defined comparisons (not those inherited from object).
 | 
						|
    roots = [op for op in convert if getattr(cls, op, None) is not getattr(object, op, None)]
 | 
						|
    if not roots:
 | 
						|
        raise ValueError('must define at least one ordering operation: < > <= >=')
 | 
						|
    root = max(roots)       # prefer __lt__ to __le__ to __gt__ to __ge__
 | 
						|
    for opname, opfunc in convert[root]:
 | 
						|
        if opname not in roots:
 | 
						|
            opfunc.__name__ = opname
 | 
						|
            opfunc.__doc__ = getattr(int, opname).__doc__
 | 
						|
            setattr(cls, opname, opfunc)
 | 
						|
    return cls
 | 
						|
 | 
						|
 | 
						|
################################################################################
 | 
						|
### cmp_to_key() function converter
 | 
						|
################################################################################
 | 
						|
 | 
						|
def cmp_to_key(mycmp):
 | 
						|
    """Convert a cmp= function into a key= function"""
 | 
						|
    class K(object):
 | 
						|
        __slots__ = ['obj']
 | 
						|
        def __init__(self, obj):
 | 
						|
            self.obj = obj
 | 
						|
        def __lt__(self, other):
 | 
						|
            return mycmp(self.obj, other.obj) < 0
 | 
						|
        def __gt__(self, other):
 | 
						|
            return mycmp(self.obj, other.obj) > 0
 | 
						|
        def __eq__(self, other):
 | 
						|
            return mycmp(self.obj, other.obj) == 0
 | 
						|
        def __le__(self, other):
 | 
						|
            return mycmp(self.obj, other.obj) <= 0
 | 
						|
        def __ge__(self, other):
 | 
						|
            return mycmp(self.obj, other.obj) >= 0
 | 
						|
        def __ne__(self, other):
 | 
						|
            return mycmp(self.obj, other.obj) != 0
 | 
						|
        __hash__ = None
 | 
						|
    return K
 | 
						|
 | 
						|
try:
 | 
						|
    from _functools import cmp_to_key
 | 
						|
except ImportError:
 | 
						|
    pass
 | 
						|
 | 
						|
 | 
						|
################################################################################
 | 
						|
### partial() argument application
 | 
						|
################################################################################
 | 
						|
 | 
						|
def partial(func, *args, **keywords):
 | 
						|
    """new function with partial application of the given arguments
 | 
						|
    and keywords.
 | 
						|
    """
 | 
						|
    def newfunc(*fargs, **fkeywords):
 | 
						|
        newkeywords = keywords.copy()
 | 
						|
        newkeywords.update(fkeywords)
 | 
						|
        return func(*(args + fargs), **newkeywords)
 | 
						|
    newfunc.func = func
 | 
						|
    newfunc.args = args
 | 
						|
    newfunc.keywords = keywords
 | 
						|
    return newfunc
 | 
						|
 | 
						|
try:
 | 
						|
    from _functools import partial
 | 
						|
except ImportError:
 | 
						|
    pass
 | 
						|
 | 
						|
 | 
						|
################################################################################
 | 
						|
### LRU Cache function decorator
 | 
						|
################################################################################
 | 
						|
 | 
						|
_CacheInfo = namedtuple("CacheInfo", ["hits", "misses", "maxsize", "currsize"])
 | 
						|
 | 
						|
class _HashedSeq(list):
 | 
						|
    """ This class guarantees that hash() will be called no more than once
 | 
						|
        per element.  This is important because the lru_cache() will hash
 | 
						|
        the key multiple times on a cache miss.
 | 
						|
 | 
						|
    """
 | 
						|
 | 
						|
    __slots__ = 'hashvalue'
 | 
						|
 | 
						|
    def __init__(self, tup, hash=hash):
 | 
						|
        self[:] = tup
 | 
						|
        self.hashvalue = hash(tup)
 | 
						|
 | 
						|
    def __hash__(self):
 | 
						|
        return self.hashvalue
 | 
						|
 | 
						|
def _make_key(args, kwds, typed,
 | 
						|
             kwd_mark = (object(),),
 | 
						|
             fasttypes = {int, str, frozenset, type(None)},
 | 
						|
             sorted=sorted, tuple=tuple, type=type, len=len):
 | 
						|
    """Make a cache key from optionally typed positional and keyword arguments
 | 
						|
 | 
						|
    The key is constructed in a way that is flat as possible rather than
 | 
						|
    as a nested structure that would take more memory.
 | 
						|
 | 
						|
    If there is only a single argument and its data type is known to cache
 | 
						|
    its hash value, then that argument is returned without a wrapper.  This
 | 
						|
    saves space and improves lookup speed.
 | 
						|
 | 
						|
    """
 | 
						|
    key = args
 | 
						|
    if kwds:
 | 
						|
        sorted_items = sorted(kwds.items())
 | 
						|
        key += kwd_mark
 | 
						|
        for item in sorted_items:
 | 
						|
            key += item
 | 
						|
    if typed:
 | 
						|
        key += tuple(type(v) for v in args)
 | 
						|
        if kwds:
 | 
						|
            key += tuple(type(v) for k, v in sorted_items)
 | 
						|
    elif len(key) == 1 and type(key[0]) in fasttypes:
 | 
						|
        return key[0]
 | 
						|
    return _HashedSeq(key)
 | 
						|
 | 
						|
def lru_cache(maxsize=128, typed=False):
 | 
						|
    """Least-recently-used cache decorator.
 | 
						|
 | 
						|
    If *maxsize* is set to None, the LRU features are disabled and the cache
 | 
						|
    can grow without bound.
 | 
						|
 | 
						|
    If *typed* is True, arguments of different types will be cached separately.
 | 
						|
    For example, f(3.0) and f(3) will be treated as distinct calls with
 | 
						|
    distinct results.
 | 
						|
 | 
						|
    Arguments to the cached function must be hashable.
 | 
						|
 | 
						|
    View the cache statistics named tuple (hits, misses, maxsize, currsize)
 | 
						|
    with f.cache_info().  Clear the cache and statistics with f.cache_clear().
 | 
						|
    Access the underlying function with f.__wrapped__.
 | 
						|
 | 
						|
    See:  http://en.wikipedia.org/wiki/Cache_algorithms#Least_Recently_Used
 | 
						|
 | 
						|
    """
 | 
						|
 | 
						|
    # Users should only access the lru_cache through its public API:
 | 
						|
    #       cache_info, cache_clear, and f.__wrapped__
 | 
						|
    # The internals of the lru_cache are encapsulated for thread safety and
 | 
						|
    # to allow the implementation to change (including a possible C version).
 | 
						|
 | 
						|
    # Constants shared by all lru cache instances:
 | 
						|
    sentinel = object()          # unique object used to signal cache misses
 | 
						|
    make_key = _make_key         # build a key from the function arguments
 | 
						|
    PREV, NEXT, KEY, RESULT = 0, 1, 2, 3   # names for the link fields
 | 
						|
 | 
						|
    def decorating_function(user_function):
 | 
						|
        cache = {}
 | 
						|
        hits = misses = 0
 | 
						|
        full = False
 | 
						|
        cache_get = cache.get    # bound method to lookup a key or return None
 | 
						|
        lock = RLock()           # because linkedlist updates aren't threadsafe
 | 
						|
        root = []                # root of the circular doubly linked list
 | 
						|
        root[:] = [root, root, None, None]     # initialize by pointing to self
 | 
						|
 | 
						|
        if maxsize == 0:
 | 
						|
 | 
						|
            def wrapper(*args, **kwds):
 | 
						|
                # No caching -- just a statistics update after a successful call
 | 
						|
                nonlocal misses
 | 
						|
                result = user_function(*args, **kwds)
 | 
						|
                misses += 1
 | 
						|
                return result
 | 
						|
 | 
						|
        elif maxsize is None:
 | 
						|
 | 
						|
            def wrapper(*args, **kwds):
 | 
						|
                # Simple caching without ordering or size limit
 | 
						|
                nonlocal hits, misses
 | 
						|
                key = make_key(args, kwds, typed)
 | 
						|
                result = cache_get(key, sentinel)
 | 
						|
                if result is not sentinel:
 | 
						|
                    hits += 1
 | 
						|
                    return result
 | 
						|
                result = user_function(*args, **kwds)
 | 
						|
                cache[key] = result
 | 
						|
                misses += 1
 | 
						|
                return result
 | 
						|
 | 
						|
        else:
 | 
						|
 | 
						|
            def wrapper(*args, **kwds):
 | 
						|
                # Size limited caching that tracks accesses by recency
 | 
						|
                nonlocal root, hits, misses, full
 | 
						|
                key = make_key(args, kwds, typed)
 | 
						|
                with lock:
 | 
						|
                    link = cache_get(key)
 | 
						|
                    if link is not None:
 | 
						|
                        # Move the link to the front of the circular queue
 | 
						|
                        link_prev, link_next, _key, result = link
 | 
						|
                        link_prev[NEXT] = link_next
 | 
						|
                        link_next[PREV] = link_prev
 | 
						|
                        last = root[PREV]
 | 
						|
                        last[NEXT] = root[PREV] = link
 | 
						|
                        link[PREV] = last
 | 
						|
                        link[NEXT] = root
 | 
						|
                        hits += 1
 | 
						|
                        return result
 | 
						|
                result = user_function(*args, **kwds)
 | 
						|
                with lock:
 | 
						|
                    if key in cache:
 | 
						|
                        # Getting here means that this same key was added to the
 | 
						|
                        # cache while the lock was released.  Since the link
 | 
						|
                        # update is already done, we need only return the
 | 
						|
                        # computed result and update the count of misses.
 | 
						|
                        pass
 | 
						|
                    elif full:
 | 
						|
                        # Use the old root to store the new key and result.
 | 
						|
                        oldroot = root
 | 
						|
                        oldroot[KEY] = key
 | 
						|
                        oldroot[RESULT] = result
 | 
						|
                        # Empty the oldest link and make it the new root.
 | 
						|
                        # Keep a reference to the old key and old result to
 | 
						|
                        # prevent their ref counts from going to zero during the
 | 
						|
                        # update. That will prevent potentially arbitrary object
 | 
						|
                        # clean-up code (i.e. __del__) from running while we're
 | 
						|
                        # still adjusting the links.
 | 
						|
                        root = oldroot[NEXT]
 | 
						|
                        oldkey = root[KEY]
 | 
						|
                        oldresult = root[RESULT]
 | 
						|
                        root[KEY] = root[RESULT] = None
 | 
						|
                        # Now update the cache dictionary.
 | 
						|
                        del cache[oldkey]
 | 
						|
                        # Save the potentially reentrant cache[key] assignment
 | 
						|
                        # for last, after the root and links have been put in
 | 
						|
                        # a consistent state.
 | 
						|
                        cache[key] = oldroot
 | 
						|
                    else:
 | 
						|
                        # Put result in a new link at the front of the queue.
 | 
						|
                        last = root[PREV]
 | 
						|
                        link = [last, root, key, result]
 | 
						|
                        last[NEXT] = root[PREV] = cache[key] = link
 | 
						|
                        full = (len(cache) >= maxsize)
 | 
						|
                    misses += 1
 | 
						|
                return result
 | 
						|
 | 
						|
        def cache_info():
 | 
						|
            """Report cache statistics"""
 | 
						|
            with lock:
 | 
						|
                return _CacheInfo(hits, misses, maxsize, len(cache))
 | 
						|
 | 
						|
        def cache_clear():
 | 
						|
            """Clear the cache and cache statistics"""
 | 
						|
            nonlocal hits, misses, full
 | 
						|
            with lock:
 | 
						|
                cache.clear()
 | 
						|
                root[:] = [root, root, None, None]
 | 
						|
                hits = misses = 0
 | 
						|
                full = False
 | 
						|
 | 
						|
        wrapper.cache_info = cache_info
 | 
						|
        wrapper.cache_clear = cache_clear
 | 
						|
        return update_wrapper(wrapper, user_function)
 | 
						|
 | 
						|
    return decorating_function
 |