mirror of
				https://github.com/python/cpython.git
				synced 2025-10-31 18:28:49 +00:00 
			
		
		
		
	
		
			
				
	
	
		
			344 lines
		
	
	
	
		
			10 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			344 lines
		
	
	
	
		
			10 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| __all__ = ['coroutine',
 | |
|            'iscoroutinefunction', 'iscoroutine']
 | |
| 
 | |
| import functools
 | |
| import inspect
 | |
| import opcode
 | |
| import os
 | |
| import sys
 | |
| import traceback
 | |
| import types
 | |
| 
 | |
| from . import compat
 | |
| from . import events
 | |
| from . import base_futures
 | |
| from .log import logger
 | |
| 
 | |
| 
 | |
| # Opcode of "yield from" instruction
 | |
| _YIELD_FROM = opcode.opmap['YIELD_FROM']
 | |
| 
 | |
| # If you set _DEBUG to true, @coroutine will wrap the resulting
 | |
| # generator objects in a CoroWrapper instance (defined below).  That
 | |
| # instance will log a message when the generator is never iterated
 | |
| # over, which may happen when you forget to use "yield from" with a
 | |
| # coroutine call.  Note that the value of the _DEBUG flag is taken
 | |
| # when the decorator is used, so to be of any use it must be set
 | |
| # before you define your coroutines.  A downside of using this feature
 | |
| # is that tracebacks show entries for the CoroWrapper.__next__ method
 | |
| # when _DEBUG is true.
 | |
| _DEBUG = (not sys.flags.ignore_environment and
 | |
|           bool(os.environ.get('PYTHONASYNCIODEBUG')))
 | |
| 
 | |
| 
 | |
| try:
 | |
|     _types_coroutine = types.coroutine
 | |
|     _types_CoroutineType = types.CoroutineType
 | |
| except AttributeError:
 | |
|     # Python 3.4
 | |
|     _types_coroutine = None
 | |
|     _types_CoroutineType = None
 | |
| 
 | |
| try:
 | |
|     _inspect_iscoroutinefunction = inspect.iscoroutinefunction
 | |
| except AttributeError:
 | |
|     # Python 3.4
 | |
|     _inspect_iscoroutinefunction = lambda func: False
 | |
| 
 | |
| try:
 | |
|     from collections.abc import Coroutine as _CoroutineABC, \
 | |
|                                 Awaitable as _AwaitableABC
 | |
| except ImportError:
 | |
|     _CoroutineABC = _AwaitableABC = None
 | |
| 
 | |
| 
 | |
| # Check for CPython issue #21209
 | |
| def has_yield_from_bug():
 | |
|     class MyGen:
 | |
|         def __init__(self):
 | |
|             self.send_args = None
 | |
|         def __iter__(self):
 | |
|             return self
 | |
|         def __next__(self):
 | |
|             return 42
 | |
|         def send(self, *what):
 | |
|             self.send_args = what
 | |
|             return None
 | |
|     def yield_from_gen(gen):
 | |
|         yield from gen
 | |
|     value = (1, 2, 3)
 | |
|     gen = MyGen()
 | |
|     coro = yield_from_gen(gen)
 | |
|     next(coro)
 | |
|     coro.send(value)
 | |
|     return gen.send_args != (value,)
 | |
| _YIELD_FROM_BUG = has_yield_from_bug()
 | |
| del has_yield_from_bug
 | |
| 
 | |
| 
 | |
| def debug_wrapper(gen):
 | |
|     # This function is called from 'sys.set_coroutine_wrapper'.
 | |
|     # We only wrap here coroutines defined via 'async def' syntax.
 | |
|     # Generator-based coroutines are wrapped in @coroutine
 | |
|     # decorator.
 | |
|     return CoroWrapper(gen, None)
 | |
| 
 | |
| 
 | |
| class CoroWrapper:
 | |
|     # Wrapper for coroutine object in _DEBUG mode.
 | |
| 
 | |
|     def __init__(self, gen, func=None):
 | |
|         assert inspect.isgenerator(gen) or inspect.iscoroutine(gen), gen
 | |
|         self.gen = gen
 | |
|         self.func = func  # Used to unwrap @coroutine decorator
 | |
|         self._source_traceback = traceback.extract_stack(sys._getframe(1))
 | |
|         self.__name__ = getattr(gen, '__name__', None)
 | |
|         self.__qualname__ = getattr(gen, '__qualname__', None)
 | |
| 
 | |
|     def __repr__(self):
 | |
|         coro_repr = _format_coroutine(self)
 | |
|         if self._source_traceback:
 | |
|             frame = self._source_traceback[-1]
 | |
|             coro_repr += ', created at %s:%s' % (frame[0], frame[1])
 | |
|         return '<%s %s>' % (self.__class__.__name__, coro_repr)
 | |
| 
 | |
|     def __iter__(self):
 | |
|         return self
 | |
| 
 | |
|     def __next__(self):
 | |
|         return self.gen.send(None)
 | |
| 
 | |
|     if _YIELD_FROM_BUG:
 | |
|         # For for CPython issue #21209: using "yield from" and a custom
 | |
|         # generator, generator.send(tuple) unpacks the tuple instead of passing
 | |
|         # the tuple unchanged. Check if the caller is a generator using "yield
 | |
|         # from" to decide if the parameter should be unpacked or not.
 | |
|         def send(self, *value):
 | |
|             frame = sys._getframe()
 | |
|             caller = frame.f_back
 | |
|             assert caller.f_lasti >= 0
 | |
|             if caller.f_code.co_code[caller.f_lasti] != _YIELD_FROM:
 | |
|                 value = value[0]
 | |
|             return self.gen.send(value)
 | |
|     else:
 | |
|         def send(self, value):
 | |
|             return self.gen.send(value)
 | |
| 
 | |
|     def throw(self, type, value=None, traceback=None):
 | |
|         return self.gen.throw(type, value, traceback)
 | |
| 
 | |
|     def close(self):
 | |
|         return self.gen.close()
 | |
| 
 | |
|     @property
 | |
|     def gi_frame(self):
 | |
|         return self.gen.gi_frame
 | |
| 
 | |
|     @property
 | |
|     def gi_running(self):
 | |
|         return self.gen.gi_running
 | |
| 
 | |
|     @property
 | |
|     def gi_code(self):
 | |
|         return self.gen.gi_code
 | |
| 
 | |
|     if compat.PY35:
 | |
| 
 | |
|         def __await__(self):
 | |
|             cr_await = getattr(self.gen, 'cr_await', None)
 | |
|             if cr_await is not None:
 | |
|                 raise RuntimeError(
 | |
|                     "Cannot await on coroutine {!r} while it's "
 | |
|                     "awaiting for {!r}".format(self.gen, cr_await))
 | |
|             return self
 | |
| 
 | |
|         @property
 | |
|         def gi_yieldfrom(self):
 | |
|             return self.gen.gi_yieldfrom
 | |
| 
 | |
|         @property
 | |
|         def cr_await(self):
 | |
|             return self.gen.cr_await
 | |
| 
 | |
|         @property
 | |
|         def cr_running(self):
 | |
|             return self.gen.cr_running
 | |
| 
 | |
|         @property
 | |
|         def cr_code(self):
 | |
|             return self.gen.cr_code
 | |
| 
 | |
|         @property
 | |
|         def cr_frame(self):
 | |
|             return self.gen.cr_frame
 | |
| 
 | |
|     def __del__(self):
 | |
|         # Be careful accessing self.gen.frame -- self.gen might not exist.
 | |
|         gen = getattr(self, 'gen', None)
 | |
|         frame = getattr(gen, 'gi_frame', None)
 | |
|         if frame is None:
 | |
|             frame = getattr(gen, 'cr_frame', None)
 | |
|         if frame is not None and frame.f_lasti == -1:
 | |
|             msg = '%r was never yielded from' % self
 | |
|             tb = getattr(self, '_source_traceback', ())
 | |
|             if tb:
 | |
|                 tb = ''.join(traceback.format_list(tb))
 | |
|                 msg += ('\nCoroutine object created at '
 | |
|                         '(most recent call last):\n')
 | |
|                 msg += tb.rstrip()
 | |
|             logger.error(msg)
 | |
| 
 | |
| 
 | |
| def coroutine(func):
 | |
|     """Decorator to mark coroutines.
 | |
| 
 | |
|     If the coroutine is not yielded from before it is destroyed,
 | |
|     an error message is logged.
 | |
|     """
 | |
|     if _inspect_iscoroutinefunction(func):
 | |
|         # In Python 3.5 that's all we need to do for coroutines
 | |
|         # defiend with "async def".
 | |
|         # Wrapping in CoroWrapper will happen via
 | |
|         # 'sys.set_coroutine_wrapper' function.
 | |
|         return func
 | |
| 
 | |
|     if inspect.isgeneratorfunction(func):
 | |
|         coro = func
 | |
|     else:
 | |
|         @functools.wraps(func)
 | |
|         def coro(*args, **kw):
 | |
|             res = func(*args, **kw)
 | |
|             if (base_futures.isfuture(res) or inspect.isgenerator(res) or
 | |
|                 isinstance(res, CoroWrapper)):
 | |
|                 res = yield from res
 | |
|             elif _AwaitableABC is not None:
 | |
|                 # If 'func' returns an Awaitable (new in 3.5) we
 | |
|                 # want to run it.
 | |
|                 try:
 | |
|                     await_meth = res.__await__
 | |
|                 except AttributeError:
 | |
|                     pass
 | |
|                 else:
 | |
|                     if isinstance(res, _AwaitableABC):
 | |
|                         res = yield from await_meth()
 | |
|             return res
 | |
| 
 | |
|     if not _DEBUG:
 | |
|         if _types_coroutine is None:
 | |
|             wrapper = coro
 | |
|         else:
 | |
|             wrapper = _types_coroutine(coro)
 | |
|     else:
 | |
|         @functools.wraps(func)
 | |
|         def wrapper(*args, **kwds):
 | |
|             w = CoroWrapper(coro(*args, **kwds), func=func)
 | |
|             if w._source_traceback:
 | |
|                 del w._source_traceback[-1]
 | |
|             # Python < 3.5 does not implement __qualname__
 | |
|             # on generator objects, so we set it manually.
 | |
|             # We use getattr as some callables (such as
 | |
|             # functools.partial may lack __qualname__).
 | |
|             w.__name__ = getattr(func, '__name__', None)
 | |
|             w.__qualname__ = getattr(func, '__qualname__', None)
 | |
|             return w
 | |
| 
 | |
|     wrapper._is_coroutine = _is_coroutine  # For iscoroutinefunction().
 | |
|     return wrapper
 | |
| 
 | |
| 
 | |
| # A marker for iscoroutinefunction.
 | |
| _is_coroutine = object()
 | |
| 
 | |
| 
 | |
| def iscoroutinefunction(func):
 | |
|     """Return True if func is a decorated coroutine function."""
 | |
|     return (getattr(func, '_is_coroutine', None) is _is_coroutine or
 | |
|             _inspect_iscoroutinefunction(func))
 | |
| 
 | |
| 
 | |
| _COROUTINE_TYPES = (types.GeneratorType, CoroWrapper)
 | |
| if _CoroutineABC is not None:
 | |
|     _COROUTINE_TYPES += (_CoroutineABC,)
 | |
| if _types_CoroutineType is not None:
 | |
|     # Prioritize native coroutine check to speed-up
 | |
|     # asyncio.iscoroutine.
 | |
|     _COROUTINE_TYPES = (_types_CoroutineType,) + _COROUTINE_TYPES
 | |
| 
 | |
| 
 | |
| def iscoroutine(obj):
 | |
|     """Return True if obj is a coroutine object."""
 | |
|     return isinstance(obj, _COROUTINE_TYPES)
 | |
| 
 | |
| 
 | |
| def _format_coroutine(coro):
 | |
|     assert iscoroutine(coro)
 | |
| 
 | |
|     if not hasattr(coro, 'cr_code') and not hasattr(coro, 'gi_code'):
 | |
|         # Most likely a built-in type or a Cython coroutine.
 | |
| 
 | |
|         # Built-in types might not have __qualname__ or __name__.
 | |
|         coro_name = getattr(
 | |
|             coro, '__qualname__',
 | |
|             getattr(coro, '__name__', type(coro).__name__))
 | |
|         coro_name = '{}()'.format(coro_name)
 | |
| 
 | |
|         running = False
 | |
|         try:
 | |
|             running = coro.cr_running
 | |
|         except AttributeError:
 | |
|             try:
 | |
|                 running = coro.gi_running
 | |
|             except AttributeError:
 | |
|                 pass
 | |
| 
 | |
|         if running:
 | |
|             return '{} running'.format(coro_name)
 | |
|         else:
 | |
|             return coro_name
 | |
| 
 | |
|     coro_name = None
 | |
|     if isinstance(coro, CoroWrapper):
 | |
|         func = coro.func
 | |
|         coro_name = coro.__qualname__
 | |
|         if coro_name is not None:
 | |
|             coro_name = '{}()'.format(coro_name)
 | |
|     else:
 | |
|         func = coro
 | |
| 
 | |
|     if coro_name is None:
 | |
|         coro_name = events._format_callback(func, (), {})
 | |
| 
 | |
|     try:
 | |
|         coro_code = coro.gi_code
 | |
|     except AttributeError:
 | |
|         coro_code = coro.cr_code
 | |
| 
 | |
|     try:
 | |
|         coro_frame = coro.gi_frame
 | |
|     except AttributeError:
 | |
|         coro_frame = coro.cr_frame
 | |
| 
 | |
|     filename = coro_code.co_filename
 | |
|     lineno = 0
 | |
|     if (isinstance(coro, CoroWrapper) and
 | |
|             not inspect.isgeneratorfunction(coro.func) and
 | |
|             coro.func is not None):
 | |
|         source = events._get_function_source(coro.func)
 | |
|         if source is not None:
 | |
|             filename, lineno = source
 | |
|         if coro_frame is None:
 | |
|             coro_repr = ('%s done, defined at %s:%s'
 | |
|                          % (coro_name, filename, lineno))
 | |
|         else:
 | |
|             coro_repr = ('%s running, defined at %s:%s'
 | |
|                          % (coro_name, filename, lineno))
 | |
|     elif coro_frame is not None:
 | |
|         lineno = coro_frame.f_lineno
 | |
|         coro_repr = ('%s running at %s:%s'
 | |
|                      % (coro_name, filename, lineno))
 | |
|     else:
 | |
|         lineno = coro_code.co_firstlineno
 | |
|         coro_repr = ('%s done, defined at %s:%s'
 | |
|                      % (coro_name, filename, lineno))
 | |
| 
 | |
|     return coro_repr
 | 
