gh-128384: Use a context variable for warnings.catch_warnings (gh-130010)

Make `warnings.catch_warnings()` use a context variable for holding
the warning filtering state if the `sys.flags.context_aware_warnings`
flag is set to true.  This makes using the context manager thread-safe in
multi-threaded programs.

Add the `sys.flags.thread_inherit_context` flag.  If true, starting a new
thread with `threading.Thread` will use a copy of the context
from the caller of `Thread.start()`.

Both these flags are set to true by default for the free-threaded build
and false for the default build.

Move the Python implementation of warnings.py into _py_warnings.py.

Make _contextvars a builtin module.

Co-authored-by: Kumar Aditya <kumaraditya@python.org>
This commit is contained in:
Neil Schemenauer 2025-04-09 16:18:54 -07:00 committed by GitHub
parent e5237541a0
commit d687900f98
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
41 changed files with 1851 additions and 960 deletions

View file

@ -152,3 +152,33 @@ to re-enable it in a thread-safe way in the 3.14 release. This overhead is
expected to be reduced in upcoming Python release. We are aiming for an
overhead of 10% or less on the pyperformance suite compared to the default
GIL-enabled build.
Behavioral changes
==================
This section describes CPython behavioural changes with the free-threaded
build.
Context variables
-----------------
In the free-threaded build, the flag :data:`~sys.flags.thread_inherit_context`
is set to true by default which causes threads created with
:class:`threading.Thread` to start with a copy of the
:class:`~contextvars.Context()` of the caller of
:meth:`~threading.Thread.start`. In the default GIL-enabled build, the flag
defaults to false so threads start with an
empty :class:`~contextvars.Context()`.
Warning filters
---------------
In the free-threaded build, the flag :data:`~sys.flags.context_aware_warnings`
is set to true by default. In the default GIL-enabled build, the flag defaults
to false. If the flag is true then the :class:`warnings.catch_warnings`
context manager uses a context variable for warning filters. If the flag is
false then :class:`~warnings.catch_warnings` modifies the global filters list,
which is not thread-safe. See the :mod:`warnings` module for more details.

View file

@ -1884,13 +1884,20 @@ the current thread.
If :func:`setcontext` has not been called before :func:`getcontext`, then
:func:`getcontext` will automatically create a new context for use in the
current thread.
current thread. New context objects have default values set from the
:data:`decimal.DefaultContext` object.
The new context is copied from a prototype context called *DefaultContext*. To
control the defaults so that each thread will use the same values throughout the
application, directly modify the *DefaultContext* object. This should be done
*before* any threads are started so that there won't be a race condition between
threads calling :func:`getcontext`. For example::
The :data:`sys.flags.thread_inherit_context` flag affects the context for
new threads. If the flag is false, new threads will start with an empty
context. In this case, :func:`getcontext` will create a new context object
when called and use the default values from *DefaultContext*. If the flag
is true, new threads will start with a copy of context from the caller of
:meth:`threading.Thread.start`.
To control the defaults so that each thread will use the same values throughout
the application, directly modify the *DefaultContext* object. This should be
done *before* any threads are started so that there won't be a race condition
between threads calling :func:`getcontext`. For example::
# Set applicationwide defaults for all threads about to be launched
DefaultContext.prec = 12

View file

@ -535,7 +535,8 @@ always available. Unless explicitly noted otherwise, all variables are read-only
.. data:: flags
The :term:`named tuple` *flags* exposes the status of command line
flags. The attributes are read only.
flags. Flags should only be accessed only by name and not by index. The
attributes are read only.
.. list-table::
@ -594,6 +595,18 @@ always available. Unless explicitly noted otherwise, all variables are read-only
* - .. attribute:: flags.warn_default_encoding
- :option:`-X warn_default_encoding <-X>`
* - .. attribute:: flags.gil
- :option:`-X gil <-X>` and :envvar:`PYTHON_GIL`
* - .. attribute:: flags.thread_inherit_context
- :option:`-X thread_inherit_context <-X>` and
:envvar:`PYTHON_THREAD_INHERIT_CONTEXT`
* - .. attribute:: flags.context_aware_warnings
- :option:`-X thread_inherit_context <-X>` and
:envvar:`PYTHON_CONTEXT_AWARE_WARNINGS`
.. versionchanged:: 3.2
Added ``quiet`` attribute for the new :option:`-q` flag.
@ -620,6 +633,15 @@ always available. Unless explicitly noted otherwise, all variables are read-only
.. versionchanged:: 3.11
Added the ``int_max_str_digits`` attribute.
.. versionchanged:: 3.13
Added the ``gil`` attribute.
.. versionchanged:: 3.14
Added the ``thread_inherit_context`` attribute.
.. versionchanged:: 3.14
Added the ``context_aware_warnings`` attribute.
.. data:: float_info

View file

@ -334,7 +334,7 @@ since it is impossible to detect the termination of alien threads.
.. class:: Thread(group=None, target=None, name=None, args=(), kwargs={}, *, \
daemon=None)
daemon=None, context=None)
This constructor should always be called with keyword arguments. Arguments
are:
@ -359,6 +359,16 @@ since it is impossible to detect the termination of alien threads.
If ``None`` (the default), the daemonic property is inherited from the
current thread.
*context* is the :class:`~contextvars.Context` value to use when starting
the thread. The default value is ``None`` which indicates that the
:data:`sys.flags.thread_inherit_context` flag controls the behaviour. If
the flag is true, threads will start with a copy of the context of the
caller of :meth:`~Thread.start`. If false, they will start with an empty
context. To explicitly start with an empty context, pass a new instance of
:class:`~contextvars.Context()`. To explicitly start with a copy of the
current context, pass the value from :func:`~contextvars.copy_context`. The
flag defaults true on free-threaded builds and false otherwise.
If the subclass overrides the constructor, it must make sure to invoke the
base class constructor (``Thread.__init__()``) before doing anything else to
the thread.
@ -369,6 +379,9 @@ since it is impossible to detect the termination of alien threads.
.. versionchanged:: 3.10
Use the *target* name if *name* argument is omitted.
.. versionchanged:: 3.14
Added the *context* parameter.
.. method:: start()
Start the thread's activity.

View file

@ -324,11 +324,13 @@ the warning using the :class:`catch_warnings` context manager::
While within the context manager all warnings will simply be ignored. This
allows you to use known-deprecated code without having to see the warning while
not suppressing the warning for other code that might not be aware of its use
of deprecated code. Note: this can only be guaranteed in a single-threaded
application. If two or more threads use the :class:`catch_warnings` context
manager at the same time, the behavior is undefined.
of deprecated code.
.. note::
See :ref:`warning-concurrent-safe` for details on the
concurrency-safety of the :class:`catch_warnings` context manager when
used in programs using multiple threads or async functions.
.. _warning-testing:
@ -364,10 +366,13 @@ the warning has been cleared.
Once the context manager exits, the warnings filter is restored to its state
when the context was entered. This prevents tests from changing the warnings
filter in unexpected ways between tests and leading to indeterminate test
results. The :func:`showwarning` function in the module is also restored to
its original value. Note: this can only be guaranteed in a single-threaded
application. If two or more threads use the :class:`catch_warnings` context
manager at the same time, the behavior is undefined.
results.
.. note::
See :ref:`warning-concurrent-safe` for details on the
concurrency-safety of the :class:`catch_warnings` context manager when
used in programs using multiple threads or async functions.
When testing multiple operations that raise the same kind of warning, it
is important to test them in a manner that confirms each operation is raising
@ -615,12 +620,71 @@ Available Context Managers
.. note::
The :class:`catch_warnings` manager works by replacing and
then later restoring the module's
:func:`showwarning` function and internal list of filter
specifications. This means the context manager is modifying
global state and therefore is not thread-safe.
See :ref:`warning-concurrent-safe` for details on the
concurrency-safety of the :class:`catch_warnings` context manager when
used in programs using multiple threads or async functions.
.. versionchanged:: 3.11
Added the *action*, *category*, *lineno*, and *append* parameters.
.. _warning-concurrent-safe:
Concurrent safety of Context Managers
-------------------------------------
The behavior of :class:`catch_warnings` context manager depends on the
:data:`sys.flags.context_aware_warnings` flag. If the flag is true, the
context manager behaves in a concurrent-safe fashion and otherwise not.
Concurrent-safe means that it is both thread-safe and safe to use within
:ref:`asyncio coroutines <coroutine>` and tasks. Being thread-safe means
that behavior is predictable in a multi-threaded program. The flag defaults
to true for free-threaded builds and false otherwise.
If the :data:`~sys.flags.context_aware_warnings` flag is false, then
:class:`catch_warnings` will modify the global attributes of the
:mod:`warnings` module. This is not safe if used within a concurrent program
(using multiple threads or using asyncio coroutines). For example, if two
or more threads use the :class:`catch_warnings` class at the same time, the
behavior is undefined.
If the flag is true, :class:`catch_warnings` will not modify global
attributes and will instead use a :class:`~contextvars.ContextVar` to
store the newly established warning filtering state. A context variable
provides thread-local storage and it makes the use of :class:`catch_warnings`
thread-safe.
The *record* parameter of the context handler also behaves differently
depending on the value of the flag. When *record* is true and the flag is
false, the context manager works by replacing and then later restoring the
module's :func:`showwarning` function. That is not concurrent-safe.
When *record* is true and the flag is true, the :func:`showwarning` function
is not replaced. Instead, the recording status is indicated by an internal
property in the context variable. In this case, the :func:`showwarning`
function will not be restored when exiting the context handler.
The :data:`~sys.flags.context_aware_warnings` flag can be set the :option:`-X
context_aware_warnings<-X>` command-line option or by the
:envvar:`PYTHON_CONTEXT_AWARE_WARNINGS` environment variable.
.. note::
It is likely that most programs that desire thread-safe
behaviour of the warnings module will also want to set the
:data:`~sys.flags.thread_inherit_context` flag to true. That flag
causes threads created by :class:`threading.Thread` to start
with a copy of the context variables from the thread starting
it. When true, the context established by :class:`catch_warnings`
in one thread will also apply to new threads started by it. If false,
new threads will start with an empty warnings context variable,
meaning that any filtering that was established by a
:class:`catch_warnings` context manager will no longer be active.
.. versionchanged:: 3.14
Added the :data:`sys.flags.context_aware_warnings` flag and the use of a
context variable for :class:`catch_warnings` if the flag is true. Previous
versions of Python acted as if the flag was always set to false.

View file

@ -639,6 +639,23 @@ Miscellaneous options
.. versionadded:: 3.13
* :samp:`-X thread_inherit_context={0,1}` causes :class:`~threading.Thread`
to, by default, use a copy of context of of the caller of
``Thread.start()`` when starting. Otherwise, threads will start
with an empty context. If unset, the value of this option defaults
to ``1`` on free-threaded builds and to ``0`` otherwise. See also
:envvar:`PYTHON_THREAD_INHERIT_CONTEXT`.
.. versionadded:: 3.14
* :samp:`-X context_aware_warnings={0,1}` causes the
:class:`warnings.catch_warnings` context manager to use a
:class:`~contextvars.ContextVar` to store warnings filter state. If
unset, the value of this option defaults to ``1`` on free-threaded builds
and to ``0`` otherwise. See also :envvar:`PYTHON_CONTEXT_AWARE_WARNINGS`.
.. versionadded:: 3.14
It also allows passing arbitrary values and retrieving them through the
:data:`sys._xoptions` dictionary.
@ -1241,6 +1258,26 @@ conflict.
.. versionadded:: 3.13
.. envvar:: PYTHON_THREAD_INHERIT_CONTEXT
If this variable is set to ``1`` then :class:`~threading.Thread` will,
by default, use a copy of context of of the caller of ``Thread.start()``
when starting. Otherwise, new threads will start with an empty context.
If unset, this variable defaults to ``1`` on free-threaded builds and to
``0`` otherwise. See also :option:`-X thread_inherit_context<-X>`.
.. versionadded:: 3.14
.. envvar:: PYTHON_CONTEXT_AWARE_WARNINGS
If set to ``1`` then the :class:`warnings.catch_warnings` context
manager will use a :class:`~contextvars.ContextVar` to store warnings
filter state. If unset, this variable defaults to ``1`` on
free-threaded builds and to ``0`` otherwise. See :option:`-X
context_aware_warnings<-X>`.
.. versionadded:: 3.14
Debug-mode variables
~~~~~~~~~~~~~~~~~~~~

View file

@ -180,6 +180,8 @@ typedef struct PyConfig {
int use_frozen_modules;
int safe_path;
int int_max_str_digits;
int thread_inherit_context;
int context_aware_warnings;
#ifdef __APPLE__
int use_system_logger;
#endif

View file

@ -753,6 +753,7 @@ _PyStaticObjects_CheckRefcnt(PyInterpreterState *interp) {
_PyStaticObject_CheckRefcnt((PyObject *)&_Py_ID(_feature_version));
_PyStaticObject_CheckRefcnt((PyObject *)&_Py_ID(_field_types));
_PyStaticObject_CheckRefcnt((PyObject *)&_Py_ID(_fields_));
_PyStaticObject_CheckRefcnt((PyObject *)&_Py_ID(_filters));
_PyStaticObject_CheckRefcnt((PyObject *)&_Py_ID(_finalizing));
_PyStaticObject_CheckRefcnt((PyObject *)&_Py_ID(_find_and_load));
_PyStaticObject_CheckRefcnt((PyObject *)&_Py_ID(_fix_up_module));

View file

@ -244,6 +244,7 @@ struct _Py_global_strings {
STRUCT_FOR_ID(_feature_version)
STRUCT_FOR_ID(_field_types)
STRUCT_FOR_ID(_fields_)
STRUCT_FOR_ID(_filters)
STRUCT_FOR_ID(_finalizing)
STRUCT_FOR_ID(_find_and_load)
STRUCT_FOR_ID(_fix_up_module)

View file

@ -582,6 +582,7 @@ struct _warnings_runtime_state {
PyObject *default_action; /* String */
_PyRecursiveMutex lock;
long filters_version;
PyObject *context;
};
struct _Py_mem_interp_free_queue {

View file

@ -751,6 +751,7 @@ extern "C" {
INIT_ID(_feature_version), \
INIT_ID(_field_types), \
INIT_ID(_fields_), \
INIT_ID(_filters), \
INIT_ID(_finalizing), \
INIT_ID(_find_and_load), \
INIT_ID(_fix_up_module), \

View file

@ -764,6 +764,10 @@ _PyUnicode_InitStaticStrings(PyInterpreterState *interp) {
_PyUnicode_InternStatic(interp, &string);
assert(_PyUnicode_CheckConsistency(string, 1));
assert(PyUnicode_GET_LENGTH(string) != 1);
string = &_Py_ID(_filters);
_PyUnicode_InternStatic(interp, &string);
assert(_PyUnicode_CheckConsistency(string, 1));
assert(PyUnicode_GET_LENGTH(string) != 1);
string = &_Py_ID(_finalizing);
_PyUnicode_InternStatic(interp, &string);
assert(_PyUnicode_CheckConsistency(string, 1));

866
Lib/_py_warnings.py Normal file
View file

@ -0,0 +1,866 @@
"""Python part of the warnings subsystem."""
import sys
import _contextvars
import _thread
__all__ = ["warn", "warn_explicit", "showwarning",
"formatwarning", "filterwarnings", "simplefilter",
"resetwarnings", "catch_warnings", "deprecated"]
# Normally '_wm' is sys.modules['warnings'] but for unit tests it can be
# a different module. User code is allowed to reassign global attributes
# of the 'warnings' module, commonly 'filters' or 'showwarning'. So we
# need to lookup these global attributes dynamically on the '_wm' object,
# rather than binding them earlier. The code in this module consistently uses
# '_wm.<something>' rather than using the globals of this module. If the
# '_warnings' C extension is in use, some globals are replaced by functions
# and variables defined in that extension.
_wm = None
def _set_module(module):
global _wm
_wm = module
# filters contains a sequence of filter 5-tuples
# The components of the 5-tuple are:
# - an action: error, ignore, always, all, default, module, or once
# - a compiled regex that must match the warning message
# - a class representing the warning category
# - a compiled regex that must match the module that is being warned
# - a line number for the line being warning, or 0 to mean any line
# If either if the compiled regexs are None, match anything.
filters = []
defaultaction = "default"
onceregistry = {}
_lock = _thread.RLock()
_filters_version = 1
# If true, catch_warnings() will use a context var to hold the modified
# filters list. Otherwise, catch_warnings() will operate on the 'filters'
# global of the warnings module.
_use_context = sys.flags.context_aware_warnings
class _Context:
def __init__(self, filters):
self._filters = filters
self.log = None # if set to a list, logging is enabled
def copy(self):
context = _Context(self._filters[:])
if self.log is not None:
context.log = self.log
return context
def _record_warning(self, msg):
self.log.append(msg)
class _GlobalContext(_Context):
def __init__(self):
self.log = None
@property
def _filters(self):
# Since there is quite a lot of code that assigns to
# warnings.filters, this needs to return the current value of
# the module global.
try:
return _wm.filters
except AttributeError:
# 'filters' global was deleted. Do we need to actually handle this case?
return []
_global_context = _GlobalContext()
_warnings_context = _contextvars.ContextVar('warnings_context')
def _get_context():
if not _use_context:
return _global_context
try:
return _wm._warnings_context.get()
except LookupError:
return _global_context
def _set_context(context):
assert _use_context
_wm._warnings_context.set(context)
def _new_context():
assert _use_context
old_context = _wm._get_context()
new_context = old_context.copy()
_wm._set_context(new_context)
return old_context, new_context
def _get_filters():
"""Return the current list of filters. This is a non-public API used by
module functions and by the unit tests."""
return _wm._get_context()._filters
def _filters_mutated_lock_held():
_wm._filters_version += 1
def showwarning(message, category, filename, lineno, file=None, line=None):
"""Hook to write a warning to a file; replace if you like."""
msg = _wm.WarningMessage(message, category, filename, lineno, file, line)
_wm._showwarnmsg_impl(msg)
def formatwarning(message, category, filename, lineno, line=None):
"""Function to format a warning the standard way."""
msg = _wm.WarningMessage(message, category, filename, lineno, None, line)
return _wm._formatwarnmsg_impl(msg)
def _showwarnmsg_impl(msg):
context = _wm._get_context()
if context.log is not None:
context._record_warning(msg)
return
file = msg.file
if file is None:
file = sys.stderr
if file is None:
# sys.stderr is None when run with pythonw.exe:
# warnings get lost
return
text = _wm._formatwarnmsg(msg)
try:
file.write(text)
except OSError:
# the file (probably stderr) is invalid - this warning gets lost.
pass
def _formatwarnmsg_impl(msg):
category = msg.category.__name__
s = f"{msg.filename}:{msg.lineno}: {category}: {msg.message}\n"
if msg.line is None:
try:
import linecache
line = linecache.getline(msg.filename, msg.lineno)
except Exception:
# When a warning is logged during Python shutdown, linecache
# and the import machinery don't work anymore
line = None
linecache = None
else:
line = msg.line
if line:
line = line.strip()
s += " %s\n" % line
if msg.source is not None:
try:
import tracemalloc
# Logging a warning should not raise a new exception:
# catch Exception, not only ImportError and RecursionError.
except Exception:
# don't suggest to enable tracemalloc if it's not available
suggest_tracemalloc = False
tb = None
else:
try:
suggest_tracemalloc = not tracemalloc.is_tracing()
tb = tracemalloc.get_object_traceback(msg.source)
except Exception:
# When a warning is logged during Python shutdown, tracemalloc
# and the import machinery don't work anymore
suggest_tracemalloc = False
tb = None
if tb is not None:
s += 'Object allocated at (most recent call last):\n'
for frame in tb:
s += (' File "%s", lineno %s\n'
% (frame.filename, frame.lineno))
try:
if linecache is not None:
line = linecache.getline(frame.filename, frame.lineno)
else:
line = None
except Exception:
line = None
if line:
line = line.strip()
s += ' %s\n' % line
elif suggest_tracemalloc:
s += (f'{category}: Enable tracemalloc to get the object '
f'allocation traceback\n')
return s
# Keep a reference to check if the function was replaced
_showwarning_orig = showwarning
def _showwarnmsg(msg):
"""Hook to write a warning to a file; replace if you like."""
try:
sw = _wm.showwarning
except AttributeError:
pass
else:
if sw is not _showwarning_orig:
# warnings.showwarning() was replaced
if not callable(sw):
raise TypeError("warnings.showwarning() must be set to a "
"function or method")
sw(msg.message, msg.category, msg.filename, msg.lineno,
msg.file, msg.line)
return
_wm._showwarnmsg_impl(msg)
# Keep a reference to check if the function was replaced
_formatwarning_orig = formatwarning
def _formatwarnmsg(msg):
"""Function to format a warning the standard way."""
try:
fw = _wm.formatwarning
except AttributeError:
pass
else:
if fw is not _formatwarning_orig:
# warnings.formatwarning() was replaced
return fw(msg.message, msg.category,
msg.filename, msg.lineno, msg.line)
return _wm._formatwarnmsg_impl(msg)
def filterwarnings(action, message="", category=Warning, module="", lineno=0,
append=False):
"""Insert an entry into the list of warnings filters (at the front).
'action' -- one of "error", "ignore", "always", "all", "default", "module",
or "once"
'message' -- a regex that the warning message must match
'category' -- a class that the warning must be a subclass of
'module' -- a regex that the module name must match
'lineno' -- an integer line number, 0 matches all warnings
'append' -- if true, append to the list of filters
"""
if action not in {"error", "ignore", "always", "all", "default", "module", "once"}:
raise ValueError(f"invalid action: {action!r}")
if not isinstance(message, str):
raise TypeError("message must be a string")
if not isinstance(category, type) or not issubclass(category, Warning):
raise TypeError("category must be a Warning subclass")
if not isinstance(module, str):
raise TypeError("module must be a string")
if not isinstance(lineno, int):
raise TypeError("lineno must be an int")
if lineno < 0:
raise ValueError("lineno must be an int >= 0")
if message or module:
import re
if message:
message = re.compile(message, re.I)
else:
message = None
if module:
module = re.compile(module)
else:
module = None
_wm._add_filter(action, message, category, module, lineno, append=append)
def simplefilter(action, category=Warning, lineno=0, append=False):
"""Insert a simple entry into the list of warnings filters (at the front).
A simple filter matches all modules and messages.
'action' -- one of "error", "ignore", "always", "all", "default", "module",
or "once"
'category' -- a class that the warning must be a subclass of
'lineno' -- an integer line number, 0 matches all warnings
'append' -- if true, append to the list of filters
"""
if action not in {"error", "ignore", "always", "all", "default", "module", "once"}:
raise ValueError(f"invalid action: {action!r}")
if not isinstance(lineno, int):
raise TypeError("lineno must be an int")
if lineno < 0:
raise ValueError("lineno must be an int >= 0")
_wm._add_filter(action, None, category, None, lineno, append=append)
def _filters_mutated():
# Even though this function is not part of the public API, it's used by
# a fair amount of user code.
with _wm._lock:
_wm._filters_mutated_lock_held()
def _add_filter(*item, append):
with _wm._lock:
filters = _wm._get_filters()
if not append:
# Remove possible duplicate filters, so new one will be placed
# in correct place. If append=True and duplicate exists, do nothing.
try:
filters.remove(item)
except ValueError:
pass
filters.insert(0, item)
else:
if item not in filters:
filters.append(item)
_wm._filters_mutated_lock_held()
def resetwarnings():
"""Clear the list of warning filters, so that no filters are active."""
with _wm._lock:
del _wm._get_filters()[:]
_wm._filters_mutated_lock_held()
class _OptionError(Exception):
"""Exception used by option processing helpers."""
pass
# Helper to process -W options passed via sys.warnoptions
def _processoptions(args):
for arg in args:
try:
_wm._setoption(arg)
except _wm._OptionError as msg:
print("Invalid -W option ignored:", msg, file=sys.stderr)
# Helper for _processoptions()
def _setoption(arg):
parts = arg.split(':')
if len(parts) > 5:
raise _wm._OptionError("too many fields (max 5): %r" % (arg,))
while len(parts) < 5:
parts.append('')
action, message, category, module, lineno = [s.strip()
for s in parts]
action = _wm._getaction(action)
category = _wm._getcategory(category)
if message or module:
import re
if message:
message = re.escape(message)
if module:
module = re.escape(module) + r'\Z'
if lineno:
try:
lineno = int(lineno)
if lineno < 0:
raise ValueError
except (ValueError, OverflowError):
raise _wm._OptionError("invalid lineno %r" % (lineno,)) from None
else:
lineno = 0
_wm.filterwarnings(action, message, category, module, lineno)
# Helper for _setoption()
def _getaction(action):
if not action:
return "default"
for a in ('default', 'always', 'all', 'ignore', 'module', 'once', 'error'):
if a.startswith(action):
return a
raise _wm._OptionError("invalid action: %r" % (action,))
# Helper for _setoption()
def _getcategory(category):
if not category:
return Warning
if '.' not in category:
import builtins as m
klass = category
else:
module, _, klass = category.rpartition('.')
try:
m = __import__(module, None, None, [klass])
except ImportError:
raise _wm._OptionError("invalid module name: %r" % (module,)) from None
try:
cat = getattr(m, klass)
except AttributeError:
raise _wm._OptionError("unknown warning category: %r" % (category,)) from None
if not issubclass(cat, Warning):
raise _wm._OptionError("invalid warning category: %r" % (category,))
return cat
def _is_internal_filename(filename):
return 'importlib' in filename and '_bootstrap' in filename
def _is_filename_to_skip(filename, skip_file_prefixes):
return any(filename.startswith(prefix) for prefix in skip_file_prefixes)
def _is_internal_frame(frame):
"""Signal whether the frame is an internal CPython implementation detail."""
return _is_internal_filename(frame.f_code.co_filename)
def _next_external_frame(frame, skip_file_prefixes):
"""Find the next frame that doesn't involve Python or user internals."""
frame = frame.f_back
while frame is not None and (
_is_internal_filename(filename := frame.f_code.co_filename) or
_is_filename_to_skip(filename, skip_file_prefixes)):
frame = frame.f_back
return frame
# Code typically replaced by _warnings
def warn(message, category=None, stacklevel=1, source=None,
*, skip_file_prefixes=()):
"""Issue a warning, or maybe ignore it or raise an exception."""
# Check if message is already a Warning object
if isinstance(message, Warning):
category = message.__class__
# Check category argument
if category is None:
category = UserWarning
if not (isinstance(category, type) and issubclass(category, Warning)):
raise TypeError("category must be a Warning subclass, "
"not '{:s}'".format(type(category).__name__))
if not isinstance(skip_file_prefixes, tuple):
# The C version demands a tuple for implementation performance.
raise TypeError('skip_file_prefixes must be a tuple of strs.')
if skip_file_prefixes:
stacklevel = max(2, stacklevel)
# Get context information
try:
if stacklevel <= 1 or _is_internal_frame(sys._getframe(1)):
# If frame is too small to care or if the warning originated in
# internal code, then do not try to hide any frames.
frame = sys._getframe(stacklevel)
else:
frame = sys._getframe(1)
# Look for one frame less since the above line starts us off.
for x in range(stacklevel-1):
frame = _next_external_frame(frame, skip_file_prefixes)
if frame is None:
raise ValueError
except ValueError:
globals = sys.__dict__
filename = "<sys>"
lineno = 0
else:
globals = frame.f_globals
filename = frame.f_code.co_filename
lineno = frame.f_lineno
if '__name__' in globals:
module = globals['__name__']
else:
module = "<string>"
registry = globals.setdefault("__warningregistry__", {})
_wm.warn_explicit(
message,
category,
filename,
lineno,
module,
registry,
globals,
source=source,
)
def warn_explicit(message, category, filename, lineno,
module=None, registry=None, module_globals=None,
source=None):
lineno = int(lineno)
if module is None:
module = filename or "<unknown>"
if module[-3:].lower() == ".py":
module = module[:-3] # XXX What about leading pathname?
if isinstance(message, Warning):
text = str(message)
category = message.__class__
else:
text = message
message = category(message)
key = (text, category, lineno)
with _wm._lock:
if registry is None:
registry = {}
if registry.get('version', 0) != _wm._filters_version:
registry.clear()
registry['version'] = _wm._filters_version
# Quick test for common case
if registry.get(key):
return
# Search the filters
for item in _wm._get_filters():
action, msg, cat, mod, ln = item
if ((msg is None or msg.match(text)) and
issubclass(category, cat) and
(mod is None or mod.match(module)) and
(ln == 0 or lineno == ln)):
break
else:
action = _wm.defaultaction
# Early exit actions
if action == "ignore":
return
if action == "error":
raise message
# Other actions
if action == "once":
registry[key] = 1
oncekey = (text, category)
if _wm.onceregistry.get(oncekey):
return
_wm.onceregistry[oncekey] = 1
elif action in {"always", "all"}:
pass
elif action == "module":
registry[key] = 1
altkey = (text, category, 0)
if registry.get(altkey):
return
registry[altkey] = 1
elif action == "default":
registry[key] = 1
else:
# Unrecognized actions are errors
raise RuntimeError(
"Unrecognized action (%r) in warnings.filters:\n %s" %
(action, item))
# Prime the linecache for formatting, in case the
# "file" is actually in a zipfile or something.
import linecache
linecache.getlines(filename, module_globals)
# Print message and context
msg = _wm.WarningMessage(message, category, filename, lineno, source=source)
_wm._showwarnmsg(msg)
class WarningMessage(object):
_WARNING_DETAILS = ("message", "category", "filename", "lineno", "file",
"line", "source")
def __init__(self, message, category, filename, lineno, file=None,
line=None, source=None):
self.message = message
self.category = category
self.filename = filename
self.lineno = lineno
self.file = file
self.line = line
self.source = source
self._category_name = category.__name__ if category else None
def __str__(self):
return ("{message : %r, category : %r, filename : %r, lineno : %s, "
"line : %r}" % (self.message, self._category_name,
self.filename, self.lineno, self.line))
class catch_warnings(object):
"""A context manager that copies and restores the warnings filter upon
exiting the context.
The 'record' argument specifies whether warnings should be captured by a
custom implementation of warnings.showwarning() and be appended to a list
returned by the context manager. Otherwise None is returned by the context
manager. The objects appended to the list are arguments whose attributes
mirror the arguments to showwarning().
The 'module' argument is to specify an alternative module to the module
named 'warnings' and imported under that name. This argument is only useful
when testing the warnings module itself.
If the 'action' argument is not None, the remaining arguments are passed
to warnings.simplefilter() as if it were called immediately on entering the
context.
"""
def __init__(self, *, record=False, module=None,
action=None, category=Warning, lineno=0, append=False):
"""Specify whether to record warnings and if an alternative module
should be used other than sys.modules['warnings'].
"""
self._record = record
self._module = sys.modules['warnings'] if module is None else module
self._entered = False
if action is None:
self._filter = None
else:
self._filter = (action, category, lineno, append)
def __repr__(self):
args = []
if self._record:
args.append("record=True")
if self._module is not sys.modules['warnings']:
args.append("module=%r" % self._module)
name = type(self).__name__
return "%s(%s)" % (name, ", ".join(args))
def __enter__(self):
if self._entered:
raise RuntimeError("Cannot enter %r twice" % self)
self._entered = True
with _wm._lock:
if _use_context:
self._saved_context, context = self._module._new_context()
else:
context = None
self._filters = self._module.filters
self._module.filters = self._filters[:]
self._showwarning = self._module.showwarning
self._showwarnmsg_impl = self._module._showwarnmsg_impl
self._module._filters_mutated_lock_held()
if self._record:
if _use_context:
context.log = log = []
else:
log = []
self._module._showwarnmsg_impl = log.append
# Reset showwarning() to the default implementation to make sure
# that _showwarnmsg() calls _showwarnmsg_impl()
self._module.showwarning = self._module._showwarning_orig
else:
log = None
if self._filter is not None:
self._module.simplefilter(*self._filter)
return log
def __exit__(self, *exc_info):
if not self._entered:
raise RuntimeError("Cannot exit %r without entering first" % self)
with _wm._lock:
if _use_context:
self._module._warnings_context.set(self._saved_context)
else:
self._module.filters = self._filters
self._module.showwarning = self._showwarning
self._module._showwarnmsg_impl = self._showwarnmsg_impl
self._module._filters_mutated_lock_held()
class deprecated:
"""Indicate that a class, function or overload is deprecated.
When this decorator is applied to an object, the type checker
will generate a diagnostic on usage of the deprecated object.
Usage:
@deprecated("Use B instead")
class A:
pass
@deprecated("Use g instead")
def f():
pass
@overload
@deprecated("int support is deprecated")
def g(x: int) -> int: ...
@overload
def g(x: str) -> int: ...
The warning specified by *category* will be emitted at runtime
on use of deprecated objects. For functions, that happens on calls;
for classes, on instantiation and on creation of subclasses.
If the *category* is ``None``, no warning is emitted at runtime.
The *stacklevel* determines where the
warning is emitted. If it is ``1`` (the default), the warning
is emitted at the direct caller of the deprecated object; if it
is higher, it is emitted further up the stack.
Static type checker behavior is not affected by the *category*
and *stacklevel* arguments.
The deprecation message passed to the decorator is saved in the
``__deprecated__`` attribute on the decorated object.
If applied to an overload, the decorator
must be after the ``@overload`` decorator for the attribute to
exist on the overload as returned by ``get_overloads()``.
See PEP 702 for details.
"""
def __init__(
self,
message: str,
/,
*,
category: type[Warning] | None = DeprecationWarning,
stacklevel: int = 1,
) -> None:
if not isinstance(message, str):
raise TypeError(
f"Expected an object of type str for 'message', not {type(message).__name__!r}"
)
self.message = message
self.category = category
self.stacklevel = stacklevel
def __call__(self, arg, /):
# Make sure the inner functions created below don't
# retain a reference to self.
msg = self.message
category = self.category
stacklevel = self.stacklevel
if category is None:
arg.__deprecated__ = msg
return arg
elif isinstance(arg, type):
import functools
from types import MethodType
original_new = arg.__new__
@functools.wraps(original_new)
def __new__(cls, /, *args, **kwargs):
if cls is arg:
_wm.warn(msg, category=category, stacklevel=stacklevel + 1)
if original_new is not object.__new__:
return original_new(cls, *args, **kwargs)
# Mirrors a similar check in object.__new__.
elif cls.__init__ is object.__init__ and (args or kwargs):
raise TypeError(f"{cls.__name__}() takes no arguments")
else:
return original_new(cls)
arg.__new__ = staticmethod(__new__)
original_init_subclass = arg.__init_subclass__
# We need slightly different behavior if __init_subclass__
# is a bound method (likely if it was implemented in Python)
if isinstance(original_init_subclass, MethodType):
original_init_subclass = original_init_subclass.__func__
@functools.wraps(original_init_subclass)
def __init_subclass__(*args, **kwargs):
_wm.warn(msg, category=category, stacklevel=stacklevel + 1)
return original_init_subclass(*args, **kwargs)
arg.__init_subclass__ = classmethod(__init_subclass__)
# Or otherwise, which likely means it's a builtin such as
# object's implementation of __init_subclass__.
else:
@functools.wraps(original_init_subclass)
def __init_subclass__(*args, **kwargs):
_wm.warn(msg, category=category, stacklevel=stacklevel + 1)
return original_init_subclass(*args, **kwargs)
arg.__init_subclass__ = __init_subclass__
arg.__deprecated__ = __new__.__deprecated__ = msg
__init_subclass__.__deprecated__ = msg
return arg
elif callable(arg):
import functools
import inspect
@functools.wraps(arg)
def wrapper(*args, **kwargs):
_wm.warn(msg, category=category, stacklevel=stacklevel + 1)
return arg(*args, **kwargs)
if inspect.iscoroutinefunction(arg):
wrapper = inspect.markcoroutinefunction(wrapper)
arg.__deprecated__ = wrapper.__deprecated__ = msg
return wrapper
else:
raise TypeError(
"@deprecated decorator with non-None category must be applied to "
f"a class or callable, not {arg!r}"
)
_DEPRECATED_MSG = "{name!r} is deprecated and slated for removal in Python {remove}"
def _deprecated(name, message=_DEPRECATED_MSG, *, remove, _version=sys.version_info):
"""Warn that *name* is deprecated or should be removed.
RuntimeError is raised if *remove* specifies a major/minor tuple older than
the current Python version or the same version but past the alpha.
The *message* argument is formatted with *name* and *remove* as a Python
version tuple (e.g. (3, 11)).
"""
remove_formatted = f"{remove[0]}.{remove[1]}"
if (_version[:2] > remove) or (_version[:2] == remove and _version[3] != "alpha"):
msg = f"{name!r} was slated for removal after Python {remove_formatted} alpha"
raise RuntimeError(msg)
else:
msg = message.format(name=name, remove=remove_formatted)
_wm.warn(msg, DeprecationWarning, stacklevel=3)
# Private utility function called by _PyErr_WarnUnawaitedCoroutine
def _warn_unawaited_coroutine(coro):
msg_lines = [
f"coroutine '{coro.__qualname__}' was never awaited\n"
]
if coro.cr_origin is not None:
import linecache, traceback
def extract():
for filename, lineno, funcname in reversed(coro.cr_origin):
line = linecache.getline(filename, lineno)
yield (filename, lineno, funcname, line)
msg_lines.append("Coroutine created at (most recent call last)\n")
msg_lines += traceback.format_list(list(extract()))
msg = "".join(msg_lines).rstrip("\n")
# Passing source= here means that if the user happens to have tracemalloc
# enabled and tracking where the coroutine was created, the warning will
# contain that traceback. This does mean that if they have *both*
# coroutine origin tracking *and* tracemalloc enabled, they'll get two
# partially-redundant tracebacks. If we wanted to be clever we could
# probably detect this case and avoid it, but for now we don't bother.
_wm.warn(
msg, category=RuntimeWarning, stacklevel=2, source=coro
)
def _setup_defaults():
# Several warning categories are ignored by default in regular builds
if hasattr(sys, 'gettotalrefcount'):
return
_wm.filterwarnings("default", category=DeprecationWarning, module="__main__", append=1)
_wm.simplefilter("ignore", category=DeprecationWarning, append=1)
_wm.simplefilter("ignore", category=PendingDeprecationWarning, append=1)
_wm.simplefilter("ignore", category=ImportWarning, append=1)
_wm.simplefilter("ignore", category=ResourceWarning, append=1)

View file

@ -2370,8 +2370,9 @@ def clear_ignored_deprecations(*tokens: object) -> None:
raise ValueError("Provide token or tokens returned by ignore_deprecations_from")
new_filters = []
old_filters = warnings._get_filters()
endswith = tuple(rf"(?#support{id(token)})" for token in tokens)
for action, message, category, module, lineno in warnings.filters:
for action, message, category, module, lineno in old_filters:
if action == "ignore" and category is DeprecationWarning:
if isinstance(message, re.Pattern):
msg = message.pattern
@ -2380,8 +2381,8 @@ def clear_ignored_deprecations(*tokens: object) -> None:
if msg.endswith(endswith):
continue
new_filters.append((action, message, category, module, lineno))
if warnings.filters != new_filters:
warnings.filters[:] = new_filters
if old_filters != new_filters:
old_filters[:] = new_filters
warnings._filters_mutated()

View file

@ -160,11 +160,12 @@ def _filterwarnings(filters, quiet=False):
registry = frame.f_globals.get('__warningregistry__')
if registry:
registry.clear()
with warnings.catch_warnings(record=True) as w:
# Set filter "always" to record all warnings. Because
# test_warnings swap the module, we need to look up in
# the sys.modules dictionary.
sys.modules['warnings'].simplefilter("always")
# Because test_warnings swap the module, we need to look up in the
# sys.modules dictionary.
wmod = sys.modules['warnings']
with wmod.catch_warnings(record=True) as w:
# Set filter "always" to record all warnings.
wmod.simplefilter("always")
yield WarningsRecorder(w)
# Filter the recorded warnings
reraise = list(w)

View file

@ -54,6 +54,8 @@ class CAPITests(unittest.TestCase):
("filesystem_errors", str, None),
("hash_seed", int, None),
("home", str | None, None),
("thread_inherit_context", int, None),
("context_aware_warnings", int, None),
("import_time", bool, None),
("inspect", bool, None),
("install_signal_handlers", bool, None),
@ -98,7 +100,7 @@ class CAPITests(unittest.TestCase):
]
if support.Py_DEBUG:
options.append(("run_presite", str | None, None))
if sysconfig.get_config_var('Py_GIL_DISABLED'):
if support.Py_GIL_DISABLED:
options.append(("enable_gil", int, None))
options.append(("tlbc_enabled", int, None))
if support.MS_WINDOWS:
@ -170,7 +172,7 @@ class CAPITests(unittest.TestCase):
("warn_default_encoding", "warn_default_encoding", False),
("safe_path", "safe_path", False),
("int_max_str_digits", "int_max_str_digits", False),
# "gil" is tested below
# "gil", "thread_inherit_context" and "context_aware_warnings" are tested below
):
with self.subTest(flag=flag, name=name, negate=negate):
value = config_get(name)
@ -182,11 +184,17 @@ class CAPITests(unittest.TestCase):
config_get('use_hash_seed') == 0
or config_get('hash_seed') != 0)
if sysconfig.get_config_var('Py_GIL_DISABLED'):
if support.Py_GIL_DISABLED:
value = config_get('enable_gil')
expected = (value if value != -1 else None)
self.assertEqual(sys.flags.gil, expected)
expected_inherit_context = 1 if support.Py_GIL_DISABLED else 0
self.assertEqual(sys.flags.thread_inherit_context, expected_inherit_context)
expected_safe_warnings = 1 if support.Py_GIL_DISABLED else 0
self.assertEqual(sys.flags.context_aware_warnings, expected_safe_warnings)
def test_config_get_non_existent(self):
# Test PyConfig_Get() on non-existent option name
config_get = _testcapi.config_get

View file

@ -1,3 +1,4 @@
import sys
import collections.abc
import concurrent.futures
import contextvars
@ -392,6 +393,60 @@ class ContextTest(unittest.TestCase):
tp.shutdown()
self.assertEqual(results, list(range(10)))
@isolated_context
@threading_helper.requires_working_threading()
def test_context_thread_inherit(self):
import threading
cvar = contextvars.ContextVar('cvar')
def run_context_none():
if sys.flags.thread_inherit_context:
expected = 1
else:
expected = None
self.assertEqual(cvar.get(None), expected)
# By default, context is inherited based on the
# sys.flags.thread_inherit_context option.
cvar.set(1)
thread = threading.Thread(target=run_context_none)
thread.start()
thread.join()
# Passing 'None' explicitly should have same behaviour as not
# passing parameter.
thread = threading.Thread(target=run_context_none, context=None)
thread.start()
thread.join()
# An explicit Context value can also be passed
custom_ctx = contextvars.Context()
custom_var = None
def setup_context():
nonlocal custom_var
custom_var = contextvars.ContextVar('custom')
custom_var.set(2)
custom_ctx.run(setup_context)
def run_custom():
self.assertEqual(custom_var.get(), 2)
thread = threading.Thread(target=run_custom, context=custom_ctx)
thread.start()
thread.join()
# You can also pass a new Context() object to start with an empty context
def run_empty():
with self.assertRaises(LookupError):
cvar.get()
thread = threading.Thread(target=run_empty, context=contextvars.Context())
thread.start()
thread.join()
def test_token_contextmanager_with_default(self):
ctx = contextvars.Context()
c = contextvars.ContextVar('c', default=42)

View file

@ -45,6 +45,7 @@ from test.support import warnings_helper
import random
import inspect
import threading
import contextvars
if sys.platform == 'darwin':
@ -1726,8 +1727,13 @@ class ThreadingTest:
self.finish1 = threading.Event()
self.finish2 = threading.Event()
th1 = threading.Thread(target=thfunc1, args=(self,))
th2 = threading.Thread(target=thfunc2, args=(self,))
# This test wants to start threads with an empty context, no matter
# the setting of sys.flags.thread_inherit_context. We pass the
# 'context' argument explicitly with an empty context instance.
th1 = threading.Thread(target=thfunc1, args=(self,),
context=contextvars.Context())
th2 = threading.Thread(target=thfunc2, args=(self,),
context=contextvars.Context())
th1.start()
th2.start()

View file

@ -48,7 +48,7 @@ API_ISOLATED = 3
INIT_LOOPS = 4
MAX_HASH_SEED = 4294967295
ABI_THREAD = 't' if sysconfig.get_config_var('Py_GIL_DISABLED') else ''
ABI_THREAD = 't' if support.Py_GIL_DISABLED else ''
# PLATSTDLIB_LANDMARK copied from Modules/getpath.py
if os.name == 'nt':
PLATSTDLIB_LANDMARK = f'{sys.platlibdir}'
@ -58,6 +58,8 @@ else:
PLATSTDLIB_LANDMARK = (f'{sys.platlibdir}/python{VERSION_MAJOR}.'
f'{VERSION_MINOR}{ABI_THREAD}/lib-dynload')
DEFAULT_THREAD_INHERIT_CONTEXT = 1 if support.Py_GIL_DISABLED else 0
DEFAULT_CONTEXT_AWARE_WARNINGS = 1 if support.Py_GIL_DISABLED else 0
# If we are running from a build dir, but the stdlib has been installed,
# some tests need to expect different results.
@ -584,6 +586,8 @@ class InitConfigTests(EmbeddingTestsMixin, unittest.TestCase):
'tracemalloc': 0,
'perf_profiling': 0,
'import_time': False,
'thread_inherit_context': DEFAULT_THREAD_INHERIT_CONTEXT,
'context_aware_warnings': DEFAULT_CONTEXT_AWARE_WARNINGS,
'code_debug_ranges': True,
'show_ref_count': False,
'dump_refs': False,

View file

@ -5,6 +5,7 @@ import threading
import time
import unittest
import _testinternalcapi
import warnings
from test.support import threading_helper
@ -286,5 +287,37 @@ class TestRaces(TestBase):
do_race(something_recursive, set_recursion_limit)
@threading_helper.requires_working_threading()
class TestWarningsRaces(TestBase):
def setUp(self):
self.saved_filters = warnings.filters[:]
warnings.resetwarnings()
# Add multiple filters to the list to increase odds of race.
for lineno in range(20):
warnings.filterwarnings('ignore', message='not matched', category=Warning, lineno=lineno)
# Override showwarning() so that we don't actually show warnings.
def showwarning(*args):
pass
warnings.showwarning = showwarning
def tearDown(self):
warnings.filters[:] = self.saved_filters
warnings.showwarning = warnings._showwarning_orig
def test_racing_warnings_filter(self):
# Modifying the warnings.filters list while another thread is using
# warn() should not crash or race.
def modify_filters():
time.sleep(0)
warnings.filters[:] = [('ignore', None, UserWarning, None, 0)]
time.sleep(0)
warnings.filters[:] = self.saved_filters
def emit_warning():
warnings.warn('dummy message', category=UserWarning)
do_race(modify_filters, emit_warning)
if __name__ == "__main__":
unittest.main()

View file

@ -54,23 +54,23 @@ def _caplog():
class TestSupport(unittest.TestCase):
@classmethod
def setUpClass(cls):
orig_filter_len = len(warnings.filters)
orig_filter_len = len(warnings._get_filters())
cls._warnings_helper_token = support.ignore_deprecations_from(
"test.support.warnings_helper", like=".*used in test_support.*"
)
cls._test_support_token = support.ignore_deprecations_from(
__name__, like=".*You should NOT be seeing this.*"
)
assert len(warnings.filters) == orig_filter_len + 2
assert len(warnings._get_filters()) == orig_filter_len + 2
@classmethod
def tearDownClass(cls):
orig_filter_len = len(warnings.filters)
orig_filter_len = len(warnings._get_filters())
support.clear_ignored_deprecations(
cls._warnings_helper_token,
cls._test_support_token,
)
assert len(warnings.filters) == orig_filter_len - 2
assert len(warnings._get_filters()) == orig_filter_len - 2
def test_ignored_deprecations_are_silent(self):
"""Test support.ignore_deprecations_from() silences warnings"""

View file

@ -1890,8 +1890,10 @@ class SizeofTest(unittest.TestCase):
# symtable entry
# XXX
# sys.flags
# FIXME: The +1 will not be necessary once gh-122575 is fixed
check(sys.flags, vsize('') + self.P + self.P * (1 + len(sys.flags)))
# FIXME: The +3 is for the 'gil', 'thread_inherit_context' and
# 'context_aware_warnings' flags and will not be necessary once
# gh-122575 is fixed
check(sys.flags, vsize('') + self.P + self.P * (3 + len(sys.flags)))
def test_asyncgen_hooks(self):
old = sys.get_asyncgen_hooks()

View file

@ -24,10 +24,13 @@ import warnings as original_warnings
from warnings import deprecated
py_warnings = import_helper.import_fresh_module('warnings',
blocked=['_warnings'])
c_warnings = import_helper.import_fresh_module('warnings',
fresh=['_warnings'])
py_warnings = import_helper.import_fresh_module('_py_warnings')
py_warnings._set_module(py_warnings)
c_warnings = import_helper.import_fresh_module(
"warnings", fresh=["_warnings", "_py_warnings"]
)
c_warnings._set_module(c_warnings)
@contextmanager
def warnings_state(module):
@ -43,15 +46,21 @@ def warnings_state(module):
except NameError:
pass
original_warnings = warning_tests.warnings
original_filters = module.filters
try:
if module._use_context:
saved_context, context = module._new_context()
else:
original_filters = module.filters
module.filters = original_filters[:]
try:
module.simplefilter("once")
warning_tests.warnings = module
yield
finally:
warning_tests.warnings = original_warnings
module.filters = original_filters
if module._use_context:
module._set_context(saved_context)
else:
module.filters = original_filters
class TestWarning(Warning):
@ -111,14 +120,14 @@ class FilterTests(BaseTest):
"""Testing the filtering functionality."""
def test_error(self):
with original_warnings.catch_warnings(module=self.module) as w:
with self.module.catch_warnings() as w:
self.module.resetwarnings()
self.module.filterwarnings("error", category=UserWarning)
self.assertRaises(UserWarning, self.module.warn,
"FilterTests.test_error")
def test_error_after_default(self):
with original_warnings.catch_warnings(module=self.module) as w:
with self.module.catch_warnings() as w:
self.module.resetwarnings()
message = "FilterTests.test_ignore_after_default"
def f():
@ -136,8 +145,7 @@ class FilterTests(BaseTest):
self.assertRaises(UserWarning, f)
def test_ignore(self):
with original_warnings.catch_warnings(record=True,
module=self.module) as w:
with self.module.catch_warnings(record=True) as w:
self.module.resetwarnings()
self.module.filterwarnings("ignore", category=UserWarning)
self.module.warn("FilterTests.test_ignore", UserWarning)
@ -145,8 +153,7 @@ class FilterTests(BaseTest):
self.assertEqual(list(__warningregistry__), ['version'])
def test_ignore_after_default(self):
with original_warnings.catch_warnings(record=True,
module=self.module) as w:
with self.module.catch_warnings(record=True) as w:
self.module.resetwarnings()
message = "FilterTests.test_ignore_after_default"
def f():
@ -159,8 +166,7 @@ class FilterTests(BaseTest):
def test_always_and_all(self):
for mode in {"always", "all"}:
with original_warnings.catch_warnings(record=True,
module=self.module) as w:
with self.module.catch_warnings(record=True) as w:
self.module.resetwarnings()
self.module.filterwarnings(mode, category=UserWarning)
message = "FilterTests.test_always_and_all"
@ -175,8 +181,7 @@ class FilterTests(BaseTest):
def test_always_and_all_after_default(self):
for mode in {"always", "all"}:
with original_warnings.catch_warnings(record=True,
module=self.module) as w:
with self.module.catch_warnings(record=True) as w:
self.module.resetwarnings()
message = "FilterTests.test_always_and_all_after_ignore"
def f():
@ -195,8 +200,7 @@ class FilterTests(BaseTest):
self.assertEqual(w[-1].message.args[0], message)
def test_default(self):
with original_warnings.catch_warnings(record=True,
module=self.module) as w:
with self.module.catch_warnings(record=True) as w:
self.module.resetwarnings()
self.module.filterwarnings("default", category=UserWarning)
message = UserWarning("FilterTests.test_default")
@ -211,8 +215,7 @@ class FilterTests(BaseTest):
raise ValueError("loop variant unhandled")
def test_module(self):
with original_warnings.catch_warnings(record=True,
module=self.module) as w:
with self.module.catch_warnings(record=True) as w:
self.module.resetwarnings()
self.module.filterwarnings("module", category=UserWarning)
message = UserWarning("FilterTests.test_module")
@ -223,8 +226,7 @@ class FilterTests(BaseTest):
self.assertEqual(len(w), 0)
def test_once(self):
with original_warnings.catch_warnings(record=True,
module=self.module) as w:
with self.module.catch_warnings(record=True) as w:
self.module.resetwarnings()
self.module.filterwarnings("once", category=UserWarning)
message = UserWarning("FilterTests.test_once")
@ -240,8 +242,7 @@ class FilterTests(BaseTest):
self.assertEqual(len(w), 0)
def test_module_globals(self):
with original_warnings.catch_warnings(record=True,
module=self.module) as w:
with self.module.catch_warnings(record=True) as w:
self.module.simplefilter("always", UserWarning)
# bpo-33509: module_globals=None must not crash
@ -261,15 +262,14 @@ class FilterTests(BaseTest):
self.assertEqual(len(w), 2)
def test_inheritance(self):
with original_warnings.catch_warnings(module=self.module) as w:
with self.module.catch_warnings() as w:
self.module.resetwarnings()
self.module.filterwarnings("error", category=Warning)
self.assertRaises(UserWarning, self.module.warn,
"FilterTests.test_inheritance", UserWarning)
def test_ordering(self):
with original_warnings.catch_warnings(record=True,
module=self.module) as w:
with self.module.catch_warnings(record=True) as w:
self.module.resetwarnings()
self.module.filterwarnings("ignore", category=UserWarning)
self.module.filterwarnings("error", category=UserWarning,
@ -284,8 +284,7 @@ class FilterTests(BaseTest):
def test_filterwarnings(self):
# Test filterwarnings().
# Implicitly also tests resetwarnings().
with original_warnings.catch_warnings(record=True,
module=self.module) as w:
with self.module.catch_warnings(record=True) as w:
self.module.filterwarnings("error", "", Warning, "", 0)
self.assertRaises(UserWarning, self.module.warn, 'convert to error')
@ -309,8 +308,7 @@ class FilterTests(BaseTest):
self.assertIs(w[-1].category, UserWarning)
def test_message_matching(self):
with original_warnings.catch_warnings(record=True,
module=self.module) as w:
with self.module.catch_warnings(record=True) as w:
self.module.simplefilter("ignore", UserWarning)
self.module.filterwarnings("error", "match", UserWarning)
self.assertRaises(UserWarning, self.module.warn, "match")
@ -326,54 +324,52 @@ class FilterTests(BaseTest):
L[:] = []
L = [("default",X(),UserWarning,X(),0) for i in range(2)]
with original_warnings.catch_warnings(record=True,
module=self.module) as w:
with self.module.catch_warnings(record=True) as w:
self.module.filters = L
self.module.warn_explicit(UserWarning("b"), None, "f.py", 42)
self.assertEqual(str(w[-1].message), "b")
def test_filterwarnings_duplicate_filters(self):
with original_warnings.catch_warnings(module=self.module):
with self.module.catch_warnings():
self.module.resetwarnings()
self.module.filterwarnings("error", category=UserWarning)
self.assertEqual(len(self.module.filters), 1)
self.assertEqual(len(self.module._get_filters()), 1)
self.module.filterwarnings("ignore", category=UserWarning)
self.module.filterwarnings("error", category=UserWarning)
self.assertEqual(
len(self.module.filters), 2,
len(self.module._get_filters()), 2,
"filterwarnings inserted duplicate filter"
)
self.assertEqual(
self.module.filters[0][0], "error",
self.module._get_filters()[0][0], "error",
"filterwarnings did not promote filter to "
"the beginning of list"
)
def test_simplefilter_duplicate_filters(self):
with original_warnings.catch_warnings(module=self.module):
with self.module.catch_warnings():
self.module.resetwarnings()
self.module.simplefilter("error", category=UserWarning)
self.assertEqual(len(self.module.filters), 1)
self.assertEqual(len(self.module._get_filters()), 1)
self.module.simplefilter("ignore", category=UserWarning)
self.module.simplefilter("error", category=UserWarning)
self.assertEqual(
len(self.module.filters), 2,
len(self.module._get_filters()), 2,
"simplefilter inserted duplicate filter"
)
self.assertEqual(
self.module.filters[0][0], "error",
self.module._get_filters()[0][0], "error",
"simplefilter did not promote filter to the beginning of list"
)
def test_append_duplicate(self):
with original_warnings.catch_warnings(module=self.module,
record=True) as w:
with self.module.catch_warnings(record=True) as w:
self.module.resetwarnings()
self.module.simplefilter("ignore")
self.module.simplefilter("error", append=True)
self.module.simplefilter("ignore", append=True)
self.module.warn("test_append_duplicate", category=UserWarning)
self.assertEqual(len(self.module.filters), 2,
self.assertEqual(len(self.module._get_filters()), 2,
"simplefilter inserted duplicate filter"
)
self.assertEqual(len(w), 0,
@ -403,19 +399,17 @@ class FilterTests(BaseTest):
self.module.simplefilter('ignore', lineno=-1)
def test_catchwarnings_with_simplefilter_ignore(self):
with original_warnings.catch_warnings(module=self.module):
with self.module.catch_warnings(module=self.module):
self.module.resetwarnings()
self.module.simplefilter("error")
with self.module.catch_warnings(
module=self.module, action="ignore"
):
with self.module.catch_warnings(action="ignore"):
self.module.warn("This will be ignored")
def test_catchwarnings_with_simplefilter_error(self):
with original_warnings.catch_warnings(module=self.module):
with self.module.catch_warnings():
self.module.resetwarnings()
with self.module.catch_warnings(
module=self.module, action="error", category=FutureWarning
action="error", category=FutureWarning
):
with support.captured_stderr() as stderr:
error_msg = "Other types of warnings are not errors"
@ -437,8 +431,7 @@ class WarnTests(BaseTest):
"""Test warnings.warn() and warnings.warn_explicit()."""
def test_message(self):
with original_warnings.catch_warnings(record=True,
module=self.module) as w:
with self.module.catch_warnings(record=True) as w:
self.module.simplefilter("once")
for i in range(4):
text = 'multi %d' %i # Different text on each call.
@ -450,8 +443,7 @@ class WarnTests(BaseTest):
def test_warn_nonstandard_types(self):
# warn() should handle non-standard types without issue.
for ob in (Warning, None, 42):
with original_warnings.catch_warnings(record=True,
module=self.module) as w:
with self.module.catch_warnings(record=True) as w:
self.module.simplefilter("once")
self.module.warn(ob)
# Don't directly compare objects since
@ -460,8 +452,7 @@ class WarnTests(BaseTest):
def test_filename(self):
with warnings_state(self.module):
with original_warnings.catch_warnings(record=True,
module=self.module) as w:
with self.module.catch_warnings(record=True) as w:
warning_tests.inner("spam1")
self.assertEqual(os.path.basename(w[-1].filename),
"stacklevel.py")
@ -473,8 +464,7 @@ class WarnTests(BaseTest):
# Test stacklevel argument
# make sure all messages are different, so the warning won't be skipped
with warnings_state(self.module):
with original_warnings.catch_warnings(record=True,
module=self.module) as w:
with self.module.catch_warnings(record=True) as w:
warning_tests.inner("spam3", stacklevel=1)
self.assertEqual(os.path.basename(w[-1].filename),
"stacklevel.py")
@ -500,8 +490,7 @@ class WarnTests(BaseTest):
# Issue #24305: With stacklevel=2, module-level warnings should work.
import_helper.unload('test.test_warnings.data.import_warning')
with warnings_state(self.module):
with original_warnings.catch_warnings(record=True,
module=self.module) as w:
with self.module.catch_warnings(record=True) as w:
self.module.simplefilter('always')
import test.test_warnings.data.import_warning # noqa: F401
self.assertEqual(len(w), 1)
@ -509,8 +498,7 @@ class WarnTests(BaseTest):
def test_skip_file_prefixes(self):
with warnings_state(self.module):
with original_warnings.catch_warnings(record=True,
module=self.module) as w:
with self.module.catch_warnings(record=True) as w:
self.module.simplefilter('always')
# Warning never attributed to the data/ package.
@ -537,9 +525,7 @@ class WarnTests(BaseTest):
# see: gh-126209
with warnings_state(self.module):
skipped = warning_tests.__file__
with original_warnings.catch_warnings(
record=True, module=self.module,
) as w:
with self.module.catch_warnings(record=True) as w:
warning_tests.outer("msg", skip_file_prefixes=(skipped,))
self.assertEqual(len(w), 1)
@ -560,14 +546,13 @@ class WarnTests(BaseTest):
codeobj = compile(("import warnings\n"
"warnings.warn('hello', UserWarning)"),
filename, "exec")
with original_warnings.catch_warnings(record=True) as w:
with self.module.catch_warnings(record=True) as w:
self.module.simplefilter("always", category=UserWarning)
exec(codeobj)
self.assertEqual(w[0].filename, filename)
def test_warn_explicit_non_ascii_filename(self):
with original_warnings.catch_warnings(record=True,
module=self.module) as w:
with self.module.catch_warnings(record=True) as w:
self.module.resetwarnings()
self.module.filterwarnings("always", category=UserWarning)
filenames = ["nonascii\xe9\u20ac"]
@ -637,7 +622,7 @@ class WarnTests(BaseTest):
self.assertIn('category must be a Warning subclass, not ',
str(cm.exception))
with original_warnings.catch_warnings(module=self.module):
with self.module.catch_warnings():
self.module.resetwarnings()
self.module.filterwarnings('default')
with self.assertWarns(MyWarningClass) as cm:
@ -653,7 +638,7 @@ class WarnTests(BaseTest):
self.assertIsInstance(cm.warning, Warning)
def check_module_globals(self, module_globals):
with original_warnings.catch_warnings(module=self.module, record=True) as w:
with self.module.catch_warnings(record=True) as w:
self.module.filterwarnings('default')
self.module.warn_explicit(
'eggs', UserWarning, 'bar', 1,
@ -666,7 +651,7 @@ class WarnTests(BaseTest):
if self.module is py_warnings:
self.check_module_globals(module_globals)
return
with original_warnings.catch_warnings(module=self.module, record=True) as w:
with self.module.catch_warnings(record=True) as w:
self.module.filterwarnings('always')
with self.assertRaisesRegex(errtype, re.escape(errmsg)):
self.module.warn_explicit(
@ -678,7 +663,7 @@ class WarnTests(BaseTest):
if self.module is py_warnings:
self.check_module_globals(module_globals)
return
with original_warnings.catch_warnings(module=self.module, record=True) as w:
with self.module.catch_warnings(record=True) as w:
self.module.filterwarnings('always')
self.module.warn_explicit(
'eggs', UserWarning, 'bar', 1,
@ -767,7 +752,7 @@ class WCmdLineTests(BaseTest):
def test_improper_input(self):
# Uses the private _setoption() function to test the parsing
# of command-line warning arguments
with original_warnings.catch_warnings(module=self.module):
with self.module.catch_warnings():
self.assertRaises(self.module._OptionError,
self.module._setoption, '1:2:3:4:5:6')
self.assertRaises(self.module._OptionError,
@ -786,7 +771,7 @@ class WCmdLineTests(BaseTest):
self.assertRaises(UserWarning, self.module.warn, 'convert to error')
def test_import_from_module(self):
with original_warnings.catch_warnings(module=self.module):
with self.module.catch_warnings():
self.module._setoption('ignore::Warning')
with self.assertRaises(self.module._OptionError):
self.module._setoption('ignore::TestWarning')
@ -829,7 +814,7 @@ class _WarningsTests(BaseTest, unittest.TestCase):
def test_filter(self):
# Everything should function even if 'filters' is not in warnings.
with original_warnings.catch_warnings(module=self.module) as w:
with self.module.catch_warnings() as w:
self.module.filterwarnings("error", "", Warning, "", 0)
self.assertRaises(UserWarning, self.module.warn,
'convert to error')
@ -844,8 +829,7 @@ class _WarningsTests(BaseTest, unittest.TestCase):
try:
original_registry = self.module.onceregistry
__warningregistry__ = {}
with original_warnings.catch_warnings(record=True,
module=self.module) as w:
with self.module.catch_warnings(record=True) as w:
self.module.resetwarnings()
self.module.filterwarnings("once", category=UserWarning)
self.module.warn_explicit(message, UserWarning, "file", 42)
@ -872,8 +856,7 @@ class _WarningsTests(BaseTest, unittest.TestCase):
message = UserWarning("defaultaction test")
original = self.module.defaultaction
try:
with original_warnings.catch_warnings(record=True,
module=self.module) as w:
with self.module.catch_warnings(record=True) as w:
self.module.resetwarnings()
registry = {}
self.module.warn_explicit(message, UserWarning, "<test>", 42,
@ -906,8 +889,12 @@ class _WarningsTests(BaseTest, unittest.TestCase):
def test_showwarning_missing(self):
# Test that showwarning() missing is okay.
if self.module._use_context:
# If _use_context is true, the warnings module does not
# override/restore showwarning()
return
text = 'del showwarning test'
with original_warnings.catch_warnings(module=self.module):
with self.module.catch_warnings():
self.module.filterwarnings("always", category=UserWarning)
del self.module.showwarning
with support.captured_output('stderr') as stream:
@ -918,7 +905,7 @@ class _WarningsTests(BaseTest, unittest.TestCase):
def test_showwarnmsg_missing(self):
# Test that _showwarnmsg() missing is okay.
text = 'del _showwarnmsg test'
with original_warnings.catch_warnings(module=self.module):
with self.module.catch_warnings():
self.module.filterwarnings("always", category=UserWarning)
show = self.module._showwarnmsg
@ -932,7 +919,7 @@ class _WarningsTests(BaseTest, unittest.TestCase):
self.assertIn(text, result)
def test_showwarning_not_callable(self):
with original_warnings.catch_warnings(module=self.module):
with self.module.catch_warnings():
self.module.filterwarnings("always", category=UserWarning)
self.module.showwarning = print
with support.captured_output('stdout'):
@ -943,7 +930,7 @@ class _WarningsTests(BaseTest, unittest.TestCase):
def test_show_warning_output(self):
# With showwarning() missing, make sure that output is okay.
text = 'test show_warning'
with original_warnings.catch_warnings(module=self.module):
with self.module.catch_warnings():
self.module.filterwarnings("always", category=UserWarning)
del self.module.showwarning
with support.captured_output('stderr') as stream:
@ -968,12 +955,11 @@ class _WarningsTests(BaseTest, unittest.TestCase):
globals_dict = globals()
oldfile = globals_dict['__file__']
try:
catch = original_warnings.catch_warnings(record=True,
module=self.module)
catch = self.module.catch_warnings(record=True)
with catch as w:
self.module.filterwarnings("always", category=UserWarning)
globals_dict['__file__'] = None
original_warnings.warn('test', UserWarning)
self.module.warn('test', UserWarning)
self.assertTrue(len(w))
finally:
globals_dict['__file__'] = oldfile
@ -1010,7 +996,7 @@ class _WarningsTests(BaseTest, unittest.TestCase):
wmod = self.module
with original_warnings.catch_warnings(module=wmod):
with wmod.catch_warnings():
wmod.filterwarnings('default', category=UserWarning)
linecache.clearcache()
@ -1037,7 +1023,7 @@ class _WarningsTests(BaseTest, unittest.TestCase):
# warn_explicit() shouldn't raise a SystemError in case
# warnings.onceregistry isn't a dictionary.
wmod = self.module
with original_warnings.catch_warnings(module=wmod):
with wmod.catch_warnings():
wmod.filterwarnings('once')
with support.swap_attr(wmod, 'onceregistry', None):
with self.assertRaises(TypeError):
@ -1048,12 +1034,12 @@ class _WarningsTests(BaseTest, unittest.TestCase):
# warn_explicit() shouldn't cause an assertion failure in case of a
# bad warnings.filters or warnings.defaultaction.
wmod = self.module
with original_warnings.catch_warnings(module=wmod):
wmod.filters = [(None, None, Warning, None, 0)]
with wmod.catch_warnings():
wmod._get_filters()[:] = [(None, None, Warning, None, 0)]
with self.assertRaises(TypeError):
wmod.warn_explicit('foo', Warning, 'bar', 1)
wmod.filters = []
wmod._get_filters()[:] = []
with support.swap_attr(wmod, 'defaultaction', None), \
self.assertRaises(TypeError):
wmod.warn_explicit('foo', Warning, 'bar', 1)
@ -1062,7 +1048,7 @@ class _WarningsTests(BaseTest, unittest.TestCase):
def test_issue31566(self):
# warn() shouldn't cause an assertion failure in case of a bad
# __name__ global.
with original_warnings.catch_warnings(module=self.module):
with self.module.catch_warnings():
self.module.filterwarnings('error', category=UserWarning)
with support.swap_item(globals(), '__name__', b'foo'), \
support.swap_item(globals(), '__file__', None):
@ -1190,16 +1176,18 @@ class CatchWarningTests(BaseTest):
"""Test catch_warnings()."""
def test_catch_warnings_restore(self):
if self.module._use_context:
return # test disabled if using context vars
wmod = self.module
orig_filters = wmod.filters
orig_showwarning = wmod.showwarning
# Ensure both showwarning and filters are restored when recording
with wmod.catch_warnings(module=wmod, record=True):
with wmod.catch_warnings(record=True):
wmod.filters = wmod.showwarning = object()
self.assertIs(wmod.filters, orig_filters)
self.assertIs(wmod.showwarning, orig_showwarning)
# Same test, but with recording disabled
with wmod.catch_warnings(module=wmod, record=False):
with wmod.catch_warnings(record=False):
wmod.filters = wmod.showwarning = object()
self.assertIs(wmod.filters, orig_filters)
self.assertIs(wmod.showwarning, orig_showwarning)
@ -1207,7 +1195,7 @@ class CatchWarningTests(BaseTest):
def test_catch_warnings_recording(self):
wmod = self.module
# Ensure warnings are recorded when requested
with wmod.catch_warnings(module=wmod, record=True) as w:
with wmod.catch_warnings(record=True) as w:
self.assertEqual(w, [])
self.assertIs(type(w), list)
wmod.simplefilter("always")
@ -1221,44 +1209,48 @@ class CatchWarningTests(BaseTest):
self.assertEqual(w, [])
# Ensure warnings are not recorded when not requested
orig_showwarning = wmod.showwarning
with wmod.catch_warnings(module=wmod, record=False) as w:
with wmod.catch_warnings(record=False) as w:
self.assertIsNone(w)
self.assertIs(wmod.showwarning, orig_showwarning)
def test_catch_warnings_reentry_guard(self):
wmod = self.module
# Ensure catch_warnings is protected against incorrect usage
x = wmod.catch_warnings(module=wmod, record=True)
x = wmod.catch_warnings(record=True)
self.assertRaises(RuntimeError, x.__exit__)
with x:
self.assertRaises(RuntimeError, x.__enter__)
# Same test, but with recording disabled
x = wmod.catch_warnings(module=wmod, record=False)
x = wmod.catch_warnings(record=False)
self.assertRaises(RuntimeError, x.__exit__)
with x:
self.assertRaises(RuntimeError, x.__enter__)
def test_catch_warnings_defaults(self):
wmod = self.module
orig_filters = wmod.filters
orig_filters = wmod._get_filters()
orig_showwarning = wmod.showwarning
# Ensure default behaviour is not to record warnings
with wmod.catch_warnings(module=wmod) as w:
with wmod.catch_warnings() as w:
self.assertIsNone(w)
self.assertIs(wmod.showwarning, orig_showwarning)
self.assertIsNot(wmod.filters, orig_filters)
self.assertIs(wmod.filters, orig_filters)
self.assertIsNot(wmod._get_filters(), orig_filters)
self.assertIs(wmod._get_filters(), orig_filters)
if wmod is sys.modules['warnings']:
# Ensure the default module is this one
with wmod.catch_warnings() as w:
self.assertIsNone(w)
self.assertIs(wmod.showwarning, orig_showwarning)
self.assertIsNot(wmod.filters, orig_filters)
self.assertIs(wmod.filters, orig_filters)
self.assertIsNot(wmod._get_filters(), orig_filters)
self.assertIs(wmod._get_filters(), orig_filters)
def test_record_override_showwarning_before(self):
# Issue #28835: If warnings.showwarning() was overridden, make sure
# that catch_warnings(record=True) overrides it again.
if self.module._use_context:
# If _use_context is true, the warnings module does not restore
# showwarning()
return
text = "This is a warning"
wmod = self.module
my_log = []
@ -1269,7 +1261,7 @@ class CatchWarningTests(BaseTest):
# Override warnings.showwarning() before calling catch_warnings()
with support.swap_attr(wmod, 'showwarning', my_logger):
with wmod.catch_warnings(module=wmod, record=True) as log:
with wmod.catch_warnings(record=True) as log:
self.assertIsNot(wmod.showwarning, my_logger)
wmod.simplefilter("always")
@ -1284,6 +1276,10 @@ class CatchWarningTests(BaseTest):
def test_record_override_showwarning_inside(self):
# Issue #28835: It is possible to override warnings.showwarning()
# in the catch_warnings(record=True) context manager.
if self.module._use_context:
# If _use_context is true, the warnings module does not restore
# showwarning()
return
text = "This is a warning"
wmod = self.module
my_log = []
@ -1292,7 +1288,7 @@ class CatchWarningTests(BaseTest):
nonlocal my_log
my_log.append(message)
with wmod.catch_warnings(module=wmod, record=True) as log:
with wmod.catch_warnings(record=True) as log:
wmod.simplefilter("always")
wmod.showwarning = my_logger
wmod.warn(text)
@ -1406,7 +1402,7 @@ class EnvironmentVariableTests(BaseTest):
code = "import sys; sys.modules.pop('warnings', None); sys.modules['_warnings'] = None; "
else:
code = ""
code += "import warnings; [print(f) for f in warnings.filters]"
code += "import warnings; [print(f) for f in warnings._get_filters()]"
rc, stdout, stderr = assert_python_ok("-c", code, __isolated=True)
stdout_lines = [line.strip() for line in stdout.splitlines()]
@ -1532,6 +1528,169 @@ a=A()
self.assertTrue(err.startswith(expected), ascii(err))
class AsyncTests(BaseTest):
"""Verifies that the catch_warnings() context manager behaves
as expected when used inside async co-routines. This requires
that the context_aware_warnings flag is enabled, so that
the context manager uses a context variable.
"""
def setUp(self):
super().setUp()
self.module.resetwarnings()
@unittest.skipIf(not sys.flags.context_aware_warnings,
"requires context aware warnings")
def test_async_context(self):
import asyncio
# Events to force the execution interleaving we want.
step_a1 = asyncio.Event()
step_a2 = asyncio.Event()
step_b1 = asyncio.Event()
step_b2 = asyncio.Event()
async def run_a():
with self.module.catch_warnings(record=True) as w:
await step_a1.wait()
# The warning emitted here should be caught be the enclosing
# context manager.
self.module.warn('run_a warning', UserWarning)
step_b1.set()
await step_a2.wait()
self.assertEqual(len(w), 1)
self.assertEqual(w[0].message.args[0], 'run_a warning')
step_b2.set()
async def run_b():
with self.module.catch_warnings(record=True) as w:
step_a1.set()
await step_b1.wait()
# The warning emitted here should be caught be the enclosing
# context manager.
self.module.warn('run_b warning', UserWarning)
step_a2.set()
await step_b2.wait()
self.assertEqual(len(w), 1)
self.assertEqual(w[0].message.args[0], 'run_b warning')
async def run_tasks():
await asyncio.gather(run_a(), run_b())
asyncio.run(run_tasks())
@unittest.skipIf(not sys.flags.context_aware_warnings,
"requires context aware warnings")
def test_async_task_inherit(self):
"""Check that a new asyncio task inherits warnings context from the
coroutine that spawns it.
"""
import asyncio
step1 = asyncio.Event()
step2 = asyncio.Event()
async def run_child1():
await step1.wait()
# This should be recorded by the run_parent() catch_warnings
# context.
self.module.warn('child warning', UserWarning)
step2.set()
async def run_child2():
# This establishes a new catch_warnings() context. The
# run_child1() task should still be using the context from
# run_parent() if context-aware warnings are enabled.
with self.module.catch_warnings(record=True) as w:
step1.set()
await step2.wait()
async def run_parent():
with self.module.catch_warnings(record=True) as w:
await asyncio.gather(run_child1(), run_child2())
self.assertEqual(len(w), 1)
self.assertEqual(w[0].message.args[0], 'child warning')
asyncio.run(run_parent())
class CAsyncTests(AsyncTests, unittest.TestCase):
module = c_warnings
class PyAsyncTests(AsyncTests, unittest.TestCase):
module = py_warnings
class ThreadTests(BaseTest):
"""Verifies that the catch_warnings() context manager behaves as
expected when used within threads. This requires that both the
context_aware_warnings flag and thread_inherit_context flags are enabled.
"""
ENABLE_THREAD_TESTS = (sys.flags.context_aware_warnings and
sys.flags.thread_inherit_context)
def setUp(self):
super().setUp()
self.module.resetwarnings()
@unittest.skipIf(not ENABLE_THREAD_TESTS,
"requires thread-safe warnings flags")
def test_threaded_context(self):
import threading
barrier = threading.Barrier(2)
def run_a():
with self.module.catch_warnings(record=True) as w:
barrier.wait()
# The warning emitted here should be caught be the enclosing
# context manager.
self.module.warn('run_a warning', UserWarning)
barrier.wait()
self.assertEqual(len(w), 1)
self.assertEqual(w[0].message.args[0], 'run_a warning')
# Should be caught be the catch_warnings() context manager of run_threads()
self.module.warn('main warning', UserWarning)
def run_b():
with self.module.catch_warnings(record=True) as w:
barrier.wait()
# The warning emitted here should be caught be the enclosing
# context manager.
barrier.wait()
self.module.warn('run_b warning', UserWarning)
self.assertEqual(len(w), 1)
self.assertEqual(w[0].message.args[0], 'run_b warning')
# Should be caught be the catch_warnings() context manager of run_threads()
self.module.warn('main warning', UserWarning)
def run_threads():
threads = [
threading.Thread(target=run_a),
threading.Thread(target=run_b),
]
with self.module.catch_warnings(record=True) as w:
for thread in threads:
thread.start()
for thread in threads:
thread.join()
self.assertEqual(len(w), 2)
self.assertEqual(w[0].message.args[0], 'main warning')
self.assertEqual(w[1].message.args[0], 'main warning')
run_threads()
class CThreadTests(ThreadTests, unittest.TestCase):
module = c_warnings
class PyThreadTests(ThreadTests, unittest.TestCase):
module = py_warnings
class DeprecatedTests(PyPublicAPITests):
def test_dunder_deprecated(self):
@deprecated("A will go away soon")

View file

@ -3,6 +3,7 @@
import os as _os
import sys as _sys
import _thread
import _contextvars
from time import monotonic as _time
from _weakrefset import WeakSet
@ -876,7 +877,7 @@ class Thread:
_initialized = False
def __init__(self, group=None, target=None, name=None,
args=(), kwargs=None, *, daemon=None):
args=(), kwargs=None, *, daemon=None, context=None):
"""This constructor should always be called with keyword arguments. Arguments are:
*group* should be None; reserved for future extension when a ThreadGroup
@ -893,6 +894,14 @@ class Thread:
*kwargs* is a dictionary of keyword arguments for the target
invocation. Defaults to {}.
*context* is the contextvars.Context value to use for the thread.
The default value is None, which means to check
sys.flags.thread_inherit_context. If that flag is true, use a copy
of the context of the caller. If false, use an empty context. To
explicitly start with an empty context, pass a new instance of
contextvars.Context(). To explicitly start with a copy of the current
context, pass the value from contextvars.copy_context().
If a subclass overrides the constructor, it must make sure to invoke
the base class constructor (Thread.__init__()) before doing anything
else to the thread.
@ -922,6 +931,7 @@ class Thread:
self._daemonic = daemon
else:
self._daemonic = current_thread().daemon
self._context = context
self._ident = None
if _HAVE_THREAD_NATIVE_ID:
self._native_id = None
@ -977,6 +987,16 @@ class Thread:
with _active_limbo_lock:
_limbo[self] = self
if self._context is None:
# No context provided
if _sys.flags.thread_inherit_context:
# start with a copy of the context of the caller
self._context = _contextvars.copy_context()
else:
# start with an empty context
self._context = _contextvars.Context()
try:
# Start joinable thread
_start_joinable_thread(self._bootstrap, handle=self._handle,
@ -1056,7 +1076,7 @@ class Thread:
_sys.setprofile(_profile_hook)
try:
self.run()
self._context.run(self.run)
except:
self._invoke_excepthook(self)
finally:

View file

@ -1,721 +1,79 @@
"""Python part of the warnings subsystem."""
import sys
__all__ = [
"warn",
"warn_explicit",
"showwarning",
"formatwarning",
"filterwarnings",
"simplefilter",
"resetwarnings",
"catch_warnings",
"deprecated",
]
from _py_warnings import (
WarningMessage,
_DEPRECATED_MSG,
_OptionError,
_add_filter,
_deprecated,
_filters_mutated,
_filters_mutated_lock_held,
_filters_version,
_formatwarning_orig,
_formatwarnmsg,
_formatwarnmsg_impl,
_get_context,
_get_filters,
_getaction,
_getcategory,
_is_filename_to_skip,
_is_internal_filename,
_is_internal_frame,
_lock,
_new_context,
_next_external_frame,
_processoptions,
_set_context,
_set_module,
_setoption,
_setup_defaults,
_showwarning_orig,
_showwarnmsg,
_showwarnmsg_impl,
_use_context,
_warn_unawaited_coroutine,
_warnings_context,
catch_warnings,
defaultaction,
deprecated,
filters,
filterwarnings,
formatwarning,
onceregistry,
resetwarnings,
showwarning,
simplefilter,
warn,
warn_explicit,
)
__all__ = ["warn", "warn_explicit", "showwarning",
"formatwarning", "filterwarnings", "simplefilter",
"resetwarnings", "catch_warnings", "deprecated"]
def showwarning(message, category, filename, lineno, file=None, line=None):
"""Hook to write a warning to a file; replace if you like."""
msg = WarningMessage(message, category, filename, lineno, file, line)
_showwarnmsg_impl(msg)
def formatwarning(message, category, filename, lineno, line=None):
"""Function to format a warning the standard way."""
msg = WarningMessage(message, category, filename, lineno, None, line)
return _formatwarnmsg_impl(msg)
def _showwarnmsg_impl(msg):
file = msg.file
if file is None:
file = sys.stderr
if file is None:
# sys.stderr is None when run with pythonw.exe:
# warnings get lost
return
text = _formatwarnmsg(msg)
try:
file.write(text)
except OSError:
# the file (probably stderr) is invalid - this warning gets lost.
pass
def _formatwarnmsg_impl(msg):
category = msg.category.__name__
s = f"{msg.filename}:{msg.lineno}: {category}: {msg.message}\n"
if msg.line is None:
try:
import linecache
line = linecache.getline(msg.filename, msg.lineno)
except Exception:
# When a warning is logged during Python shutdown, linecache
# and the import machinery don't work anymore
line = None
linecache = None
else:
line = msg.line
if line:
line = line.strip()
s += " %s\n" % line
if msg.source is not None:
try:
import tracemalloc
# Logging a warning should not raise a new exception:
# catch Exception, not only ImportError and RecursionError.
except Exception:
# don't suggest to enable tracemalloc if it's not available
suggest_tracemalloc = False
tb = None
else:
try:
suggest_tracemalloc = not tracemalloc.is_tracing()
tb = tracemalloc.get_object_traceback(msg.source)
except Exception:
# When a warning is logged during Python shutdown, tracemalloc
# and the import machinery don't work anymore
suggest_tracemalloc = False
tb = None
if tb is not None:
s += 'Object allocated at (most recent call last):\n'
for frame in tb:
s += (' File "%s", lineno %s\n'
% (frame.filename, frame.lineno))
try:
if linecache is not None:
line = linecache.getline(frame.filename, frame.lineno)
else:
line = None
except Exception:
line = None
if line:
line = line.strip()
s += ' %s\n' % line
elif suggest_tracemalloc:
s += (f'{category}: Enable tracemalloc to get the object '
f'allocation traceback\n')
return s
# Keep a reference to check if the function was replaced
_showwarning_orig = showwarning
def _showwarnmsg(msg):
"""Hook to write a warning to a file; replace if you like."""
try:
sw = showwarning
except NameError:
pass
else:
if sw is not _showwarning_orig:
# warnings.showwarning() was replaced
if not callable(sw):
raise TypeError("warnings.showwarning() must be set to a "
"function or method")
sw(msg.message, msg.category, msg.filename, msg.lineno,
msg.file, msg.line)
return
_showwarnmsg_impl(msg)
# Keep a reference to check if the function was replaced
_formatwarning_orig = formatwarning
def _formatwarnmsg(msg):
"""Function to format a warning the standard way."""
try:
fw = formatwarning
except NameError:
pass
else:
if fw is not _formatwarning_orig:
# warnings.formatwarning() was replaced
return fw(msg.message, msg.category,
msg.filename, msg.lineno, msg.line)
return _formatwarnmsg_impl(msg)
def filterwarnings(action, message="", category=Warning, module="", lineno=0,
append=False):
"""Insert an entry into the list of warnings filters (at the front).
'action' -- one of "error", "ignore", "always", "all", "default", "module",
or "once"
'message' -- a regex that the warning message must match
'category' -- a class that the warning must be a subclass of
'module' -- a regex that the module name must match
'lineno' -- an integer line number, 0 matches all warnings
'append' -- if true, append to the list of filters
"""
if action not in {"error", "ignore", "always", "all", "default", "module", "once"}:
raise ValueError(f"invalid action: {action!r}")
if not isinstance(message, str):
raise TypeError("message must be a string")
if not isinstance(category, type) or not issubclass(category, Warning):
raise TypeError("category must be a Warning subclass")
if not isinstance(module, str):
raise TypeError("module must be a string")
if not isinstance(lineno, int):
raise TypeError("lineno must be an int")
if lineno < 0:
raise ValueError("lineno must be an int >= 0")
if message or module:
import re
if message:
message = re.compile(message, re.I)
else:
message = None
if module:
module = re.compile(module)
else:
module = None
_add_filter(action, message, category, module, lineno, append=append)
def simplefilter(action, category=Warning, lineno=0, append=False):
"""Insert a simple entry into the list of warnings filters (at the front).
A simple filter matches all modules and messages.
'action' -- one of "error", "ignore", "always", "all", "default", "module",
or "once"
'category' -- a class that the warning must be a subclass of
'lineno' -- an integer line number, 0 matches all warnings
'append' -- if true, append to the list of filters
"""
if action not in {"error", "ignore", "always", "all", "default", "module", "once"}:
raise ValueError(f"invalid action: {action!r}")
if not isinstance(lineno, int):
raise TypeError("lineno must be an int")
if lineno < 0:
raise ValueError("lineno must be an int >= 0")
_add_filter(action, None, category, None, lineno, append=append)
def _filters_mutated():
# Even though this function is not part of the public API, it's used by
# a fair amount of user code.
with _lock:
_filters_mutated_lock_held()
def _add_filter(*item, append):
with _lock:
if not append:
# Remove possible duplicate filters, so new one will be placed
# in correct place. If append=True and duplicate exists, do nothing.
try:
filters.remove(item)
except ValueError:
pass
filters.insert(0, item)
else:
if item not in filters:
filters.append(item)
_filters_mutated_lock_held()
def resetwarnings():
"""Clear the list of warning filters, so that no filters are active."""
with _lock:
filters[:] = []
_filters_mutated_lock_held()
class _OptionError(Exception):
"""Exception used by option processing helpers."""
pass
# Helper to process -W options passed via sys.warnoptions
def _processoptions(args):
for arg in args:
try:
_setoption(arg)
except _OptionError as msg:
print("Invalid -W option ignored:", msg, file=sys.stderr)
# Helper for _processoptions()
def _setoption(arg):
parts = arg.split(':')
if len(parts) > 5:
raise _OptionError("too many fields (max 5): %r" % (arg,))
while len(parts) < 5:
parts.append('')
action, message, category, module, lineno = [s.strip()
for s in parts]
action = _getaction(action)
category = _getcategory(category)
if message or module:
import re
if message:
message = re.escape(message)
if module:
module = re.escape(module) + r'\Z'
if lineno:
try:
lineno = int(lineno)
if lineno < 0:
raise ValueError
except (ValueError, OverflowError):
raise _OptionError("invalid lineno %r" % (lineno,)) from None
else:
lineno = 0
filterwarnings(action, message, category, module, lineno)
# Helper for _setoption()
def _getaction(action):
if not action:
return "default"
for a in ('default', 'always', 'all', 'ignore', 'module', 'once', 'error'):
if a.startswith(action):
return a
raise _OptionError("invalid action: %r" % (action,))
# Helper for _setoption()
def _getcategory(category):
if not category:
return Warning
if '.' not in category:
import builtins as m
klass = category
else:
module, _, klass = category.rpartition('.')
try:
m = __import__(module, None, None, [klass])
except ImportError:
raise _OptionError("invalid module name: %r" % (module,)) from None
try:
cat = getattr(m, klass)
except AttributeError:
raise _OptionError("unknown warning category: %r" % (category,)) from None
if not issubclass(cat, Warning):
raise _OptionError("invalid warning category: %r" % (category,))
return cat
def _is_internal_filename(filename):
return 'importlib' in filename and '_bootstrap' in filename
def _is_filename_to_skip(filename, skip_file_prefixes):
return any(filename.startswith(prefix) for prefix in skip_file_prefixes)
def _is_internal_frame(frame):
"""Signal whether the frame is an internal CPython implementation detail."""
return _is_internal_filename(frame.f_code.co_filename)
def _next_external_frame(frame, skip_file_prefixes):
"""Find the next frame that doesn't involve Python or user internals."""
frame = frame.f_back
while frame is not None and (
_is_internal_filename(filename := frame.f_code.co_filename) or
_is_filename_to_skip(filename, skip_file_prefixes)):
frame = frame.f_back
return frame
# Code typically replaced by _warnings
def warn(message, category=None, stacklevel=1, source=None,
*, skip_file_prefixes=()):
"""Issue a warning, or maybe ignore it or raise an exception."""
# Check if message is already a Warning object
if isinstance(message, Warning):
category = message.__class__
# Check category argument
if category is None:
category = UserWarning
if not (isinstance(category, type) and issubclass(category, Warning)):
raise TypeError("category must be a Warning subclass, "
"not '{:s}'".format(type(category).__name__))
if not isinstance(skip_file_prefixes, tuple):
# The C version demands a tuple for implementation performance.
raise TypeError('skip_file_prefixes must be a tuple of strs.')
if skip_file_prefixes:
stacklevel = max(2, stacklevel)
# Get context information
try:
if stacklevel <= 1 or _is_internal_frame(sys._getframe(1)):
# If frame is too small to care or if the warning originated in
# internal code, then do not try to hide any frames.
frame = sys._getframe(stacklevel)
else:
frame = sys._getframe(1)
# Look for one frame less since the above line starts us off.
for x in range(stacklevel-1):
frame = _next_external_frame(frame, skip_file_prefixes)
if frame is None:
raise ValueError
except ValueError:
globals = sys.__dict__
filename = "<sys>"
lineno = 0
else:
globals = frame.f_globals
filename = frame.f_code.co_filename
lineno = frame.f_lineno
if '__name__' in globals:
module = globals['__name__']
else:
module = "<string>"
registry = globals.setdefault("__warningregistry__", {})
warn_explicit(message, category, filename, lineno, module, registry,
globals, source)
def warn_explicit(message, category, filename, lineno,
module=None, registry=None, module_globals=None,
source=None):
lineno = int(lineno)
if module is None:
module = filename or "<unknown>"
if module[-3:].lower() == ".py":
module = module[:-3] # XXX What about leading pathname?
if isinstance(message, Warning):
text = str(message)
category = message.__class__
else:
text = message
message = category(message)
key = (text, category, lineno)
with _lock:
if registry is None:
registry = {}
if registry.get('version', 0) != _filters_version:
registry.clear()
registry['version'] = _filters_version
# Quick test for common case
if registry.get(key):
return
# Search the filters
for item in filters:
action, msg, cat, mod, ln = item
if ((msg is None or msg.match(text)) and
issubclass(category, cat) and
(mod is None or mod.match(module)) and
(ln == 0 or lineno == ln)):
break
else:
action = defaultaction
# Early exit actions
if action == "ignore":
return
if action == "error":
raise message
# Other actions
if action == "once":
registry[key] = 1
oncekey = (text, category)
if onceregistry.get(oncekey):
return
onceregistry[oncekey] = 1
elif action in {"always", "all"}:
pass
elif action == "module":
registry[key] = 1
altkey = (text, category, 0)
if registry.get(altkey):
return
registry[altkey] = 1
elif action == "default":
registry[key] = 1
else:
# Unrecognized actions are errors
raise RuntimeError(
"Unrecognized action (%r) in warnings.filters:\n %s" %
(action, item))
# Prime the linecache for formatting, in case the
# "file" is actually in a zipfile or something.
import linecache
linecache.getlines(filename, module_globals)
# Print message and context
msg = WarningMessage(message, category, filename, lineno, source=source)
_showwarnmsg(msg)
class WarningMessage(object):
_WARNING_DETAILS = ("message", "category", "filename", "lineno", "file",
"line", "source")
def __init__(self, message, category, filename, lineno, file=None,
line=None, source=None):
self.message = message
self.category = category
self.filename = filename
self.lineno = lineno
self.file = file
self.line = line
self.source = source
self._category_name = category.__name__ if category else None
def __str__(self):
return ("{message : %r, category : %r, filename : %r, lineno : %s, "
"line : %r}" % (self.message, self._category_name,
self.filename, self.lineno, self.line))
class catch_warnings(object):
"""A context manager that copies and restores the warnings filter upon
exiting the context.
The 'record' argument specifies whether warnings should be captured by a
custom implementation of warnings.showwarning() and be appended to a list
returned by the context manager. Otherwise None is returned by the context
manager. The objects appended to the list are arguments whose attributes
mirror the arguments to showwarning().
The 'module' argument is to specify an alternative module to the module
named 'warnings' and imported under that name. This argument is only useful
when testing the warnings module itself.
If the 'action' argument is not None, the remaining arguments are passed
to warnings.simplefilter() as if it were called immediately on entering the
context.
"""
def __init__(self, *, record=False, module=None,
action=None, category=Warning, lineno=0, append=False):
"""Specify whether to record warnings and if an alternative module
should be used other than sys.modules['warnings'].
"""
self._record = record
self._module = sys.modules['warnings'] if module is None else module
self._entered = False
if action is None:
self._filter = None
else:
self._filter = (action, category, lineno, append)
def __repr__(self):
args = []
if self._record:
args.append("record=True")
if self._module is not sys.modules['warnings']:
args.append("module=%r" % self._module)
name = type(self).__name__
return "%s(%s)" % (name, ", ".join(args))
def __enter__(self):
if self._entered:
raise RuntimeError("Cannot enter %r twice" % self)
self._entered = True
with _lock:
self._filters = self._module.filters
self._module.filters = self._filters[:]
self._module._filters_mutated_lock_held()
self._showwarning = self._module.showwarning
self._showwarnmsg_impl = self._module._showwarnmsg_impl
if self._record:
log = []
self._module._showwarnmsg_impl = log.append
# Reset showwarning() to the default implementation to make sure
# that _showwarnmsg() calls _showwarnmsg_impl()
self._module.showwarning = self._module._showwarning_orig
else:
log = None
if self._filter is not None:
simplefilter(*self._filter)
return log
def __exit__(self, *exc_info):
if not self._entered:
raise RuntimeError("Cannot exit %r without entering first" % self)
with _lock:
self._module.filters = self._filters
self._module._filters_mutated_lock_held()
self._module.showwarning = self._showwarning
self._module._showwarnmsg_impl = self._showwarnmsg_impl
class deprecated:
"""Indicate that a class, function or overload is deprecated.
When this decorator is applied to an object, the type checker
will generate a diagnostic on usage of the deprecated object.
Usage:
@deprecated("Use B instead")
class A:
pass
@deprecated("Use g instead")
def f():
pass
@overload
@deprecated("int support is deprecated")
def g(x: int) -> int: ...
@overload
def g(x: str) -> int: ...
The warning specified by *category* will be emitted at runtime
on use of deprecated objects. For functions, that happens on calls;
for classes, on instantiation and on creation of subclasses.
If the *category* is ``None``, no warning is emitted at runtime.
The *stacklevel* determines where the
warning is emitted. If it is ``1`` (the default), the warning
is emitted at the direct caller of the deprecated object; if it
is higher, it is emitted further up the stack.
Static type checker behavior is not affected by the *category*
and *stacklevel* arguments.
The deprecation message passed to the decorator is saved in the
``__deprecated__`` attribute on the decorated object.
If applied to an overload, the decorator
must be after the ``@overload`` decorator for the attribute to
exist on the overload as returned by ``get_overloads()``.
See PEP 702 for details.
"""
def __init__(
self,
message: str,
/,
*,
category: type[Warning] | None = DeprecationWarning,
stacklevel: int = 1,
) -> None:
if not isinstance(message, str):
raise TypeError(
f"Expected an object of type str for 'message', not {type(message).__name__!r}"
)
self.message = message
self.category = category
self.stacklevel = stacklevel
def __call__(self, arg, /):
# Make sure the inner functions created below don't
# retain a reference to self.
msg = self.message
category = self.category
stacklevel = self.stacklevel
if category is None:
arg.__deprecated__ = msg
return arg
elif isinstance(arg, type):
import functools
from types import MethodType
original_new = arg.__new__
@functools.wraps(original_new)
def __new__(cls, /, *args, **kwargs):
if cls is arg:
warn(msg, category=category, stacklevel=stacklevel + 1)
if original_new is not object.__new__:
return original_new(cls, *args, **kwargs)
# Mirrors a similar check in object.__new__.
elif cls.__init__ is object.__init__ and (args or kwargs):
raise TypeError(f"{cls.__name__}() takes no arguments")
else:
return original_new(cls)
arg.__new__ = staticmethod(__new__)
original_init_subclass = arg.__init_subclass__
# We need slightly different behavior if __init_subclass__
# is a bound method (likely if it was implemented in Python)
if isinstance(original_init_subclass, MethodType):
original_init_subclass = original_init_subclass.__func__
@functools.wraps(original_init_subclass)
def __init_subclass__(*args, **kwargs):
warn(msg, category=category, stacklevel=stacklevel + 1)
return original_init_subclass(*args, **kwargs)
arg.__init_subclass__ = classmethod(__init_subclass__)
# Or otherwise, which likely means it's a builtin such as
# object's implementation of __init_subclass__.
else:
@functools.wraps(original_init_subclass)
def __init_subclass__(*args, **kwargs):
warn(msg, category=category, stacklevel=stacklevel + 1)
return original_init_subclass(*args, **kwargs)
arg.__init_subclass__ = __init_subclass__
arg.__deprecated__ = __new__.__deprecated__ = msg
__init_subclass__.__deprecated__ = msg
return arg
elif callable(arg):
import functools
import inspect
@functools.wraps(arg)
def wrapper(*args, **kwargs):
warn(msg, category=category, stacklevel=stacklevel + 1)
return arg(*args, **kwargs)
if inspect.iscoroutinefunction(arg):
wrapper = inspect.markcoroutinefunction(wrapper)
arg.__deprecated__ = wrapper.__deprecated__ = msg
return wrapper
else:
raise TypeError(
"@deprecated decorator with non-None category must be applied to "
f"a class or callable, not {arg!r}"
)
_DEPRECATED_MSG = "{name!r} is deprecated and slated for removal in Python {remove}"
def _deprecated(name, message=_DEPRECATED_MSG, *, remove, _version=sys.version_info):
"""Warn that *name* is deprecated or should be removed.
RuntimeError is raised if *remove* specifies a major/minor tuple older than
the current Python version or the same version but past the alpha.
The *message* argument is formatted with *name* and *remove* as a Python
version tuple (e.g. (3, 11)).
"""
remove_formatted = f"{remove[0]}.{remove[1]}"
if (_version[:2] > remove) or (_version[:2] == remove and _version[3] != "alpha"):
msg = f"{name!r} was slated for removal after Python {remove_formatted} alpha"
raise RuntimeError(msg)
else:
msg = message.format(name=name, remove=remove_formatted)
warn(msg, DeprecationWarning, stacklevel=3)
# Private utility function called by _PyErr_WarnUnawaitedCoroutine
def _warn_unawaited_coroutine(coro):
msg_lines = [
f"coroutine '{coro.__qualname__}' was never awaited\n"
]
if coro.cr_origin is not None:
import linecache, traceback
def extract():
for filename, lineno, funcname in reversed(coro.cr_origin):
line = linecache.getline(filename, lineno)
yield (filename, lineno, funcname, line)
msg_lines.append("Coroutine created at (most recent call last)\n")
msg_lines += traceback.format_list(list(extract()))
msg = "".join(msg_lines).rstrip("\n")
# Passing source= here means that if the user happens to have tracemalloc
# enabled and tracking where the coroutine was created, the warning will
# contain that traceback. This does mean that if they have *both*
# coroutine origin tracking *and* tracemalloc enabled, they'll get two
# partially-redundant tracebacks. If we wanted to be clever we could
# probably detect this case and avoid it, but for now we don't bother.
warn(msg, category=RuntimeWarning, stacklevel=2, source=coro)
# filters contains a sequence of filter 5-tuples
# The components of the 5-tuple are:
# - an action: error, ignore, always, all, default, module, or once
# - a compiled regex that must match the warning message
# - a class representing the warning category
# - a compiled regex that must match the module that is being warned
# - a line number for the line being warning, or 0 to mean any line
# If either if the compiled regexs are None, match anything.
try:
from _warnings import (filters, _defaultaction, _onceregistry,
warn, warn_explicit,
_filters_mutated_lock_held,
_acquire_lock, _release_lock,
# Try to use the C extension, this will replace some parts of the
# _py_warnings implementation imported above.
from _warnings import (
_acquire_lock,
_defaultaction as defaultaction,
_filters_mutated_lock_held,
_onceregistry as onceregistry,
_release_lock,
_warnings_context,
filters,
warn,
warn_explicit,
)
defaultaction = _defaultaction
onceregistry = _onceregistry
_warnings_defaults = True
class _Lock:
@ -727,35 +85,15 @@ try:
_release_lock()
_lock = _Lock()
except ImportError:
filters = []
defaultaction = "default"
onceregistry = {}
import _thread
_lock = _thread.RLock()
_filters_version = 1
def _filters_mutated_lock_held():
global _filters_version
_filters_version += 1
_warnings_defaults = False
# Module initialization
_set_module(sys.modules[__name__])
_processoptions(sys.warnoptions)
if not _warnings_defaults:
# Several warning categories are ignored by default in regular builds
if not hasattr(sys, 'gettotalrefcount'):
filterwarnings("default", category=DeprecationWarning,
module="__main__", append=1)
simplefilter("ignore", category=DeprecationWarning, append=1)
simplefilter("ignore", category=PendingDeprecationWarning, append=1)
simplefilter("ignore", category=ImportWarning, append=1)
simplefilter("ignore", category=ResourceWarning, append=1)
_setup_defaults()
del _warnings_defaults
del _setup_defaults

View file

@ -424,6 +424,7 @@ PARSER_HEADERS= \
# Python
PYTHON_OBJS= \
Python/_contextvars.o \
Python/_warnings.o \
Python/Python-ast.o \
Python/Python-tokenize.o \

View file

@ -0,0 +1,16 @@
Add the :data:`sys.flags.thread_inherit_context` flag.
* This flag is set to true by default on the free-threaded build
and false otherwise. If the flag is true, starting a new thread using
:class:`threading.Thread` will, by default, use a copy of the
:class:`contextvars.Context` from the caller of
:meth:`threading.Thread.start` rather than using an empty context.
* Add the :option:`-X thread_inherit_context <-X>` command-line option and
:envvar:`PYTHON_THREAD_INHERIT_CONTEXT` environment variable, which set the
:data:`~sys.flags.thread_inherit_context` flag.
* Add the ``context`` keyword parameter to :class:`~threading.Thread`. It can
be used to explicitly pass a context value to be used by a new thread.
* Make the ``_contextvars`` module built-in.

View file

@ -0,0 +1,7 @@
Make :class:`warnings.catch_warnings` use a context variable for holding
the warning filtering state if the :data:`sys.flags.context_aware_warnings`
flag is set to true. This makes using the context manager thread-safe in
multi-threaded programs. The flag is true by default in free-threaded builds
and is otherwise false. The value of the flag can be overridden by the
the :option:`-X context_aware_warnings <-X>` command-line option or by the
:envvar:`PYTHON_CONTEXT_AWARE_WARNINGS` environment variable.

View file

@ -132,7 +132,6 @@ PYTHONPATH=$(COREPYTHONPATH)
#_asyncio _asynciomodule.c
#_bisect _bisectmodule.c
#_contextvars _contextvarsmodule.c
#_csv _csv.c
#_datetime _datetimemodule.c
#_decimal _decimal/_decimal.c

View file

@ -31,7 +31,6 @@
@MODULE_ARRAY_TRUE@array arraymodule.c
@MODULE__ASYNCIO_TRUE@_asyncio _asynciomodule.c
@MODULE__BISECT_TRUE@_bisect _bisectmodule.c
@MODULE__CONTEXTVARS_TRUE@_contextvars _contextvarsmodule.c
@MODULE__CSV_TRUE@_csv _csv.c
@MODULE__HEAPQ_TRUE@_heapq _heapqmodule.c
@MODULE__JSON_TRUE@_json _json.c

View file

@ -19,6 +19,7 @@ extern PyObject* PyInit__imp(void);
extern PyObject* PyInit_gc(void);
extern PyObject* PyInit__ast(void);
extern PyObject* PyInit__tokenize(void);
extern PyObject* PyInit__contextvars(void);
extern PyObject* _PyWarnings_Init(void);
extern PyObject* PyInit__string(void);
@ -45,6 +46,9 @@ struct _inittab _PyImport_Inittab[] = {
/* This lives in gcmodule.c */
{"gc", PyInit_gc},
/* This lives in Python/_contextvars.c */
{"_contextvars", PyInit__contextvars},
/* This lives in _warnings.c */
{"_warnings", _PyWarnings_Init},

View file

@ -421,7 +421,6 @@
</ClCompile>
<ClCompile Include="..\Modules\_codecsmodule.c" />
<ClCompile Include="..\Modules\_collectionsmodule.c" />
<ClCompile Include="..\Modules\_contextvarsmodule.c" />
<ClCompile Include="..\Modules\_csv.c" />
<ClCompile Include="..\Modules\_functoolsmodule.c" />
<ClCompile Include="..\Modules\_hacl\Hacl_Hash_MD5.c" />
@ -572,6 +571,7 @@
<ClCompile Include="..\PC\config.c" />
<ClCompile Include="..\PC\msvcrtmodule.c" />
<ClCompile Include="..\Python\pyhash.c" />
<ClCompile Include="..\Python\_contextvars.c" />
<ClCompile Include="..\Python\_warnings.c" />
<ClCompile Include="..\Python\asdl.c" />
<ClCompile Include="..\Python\assemble.c" />

View file

@ -1304,6 +1304,9 @@
<ClCompile Include="..\PC\msvcrtmodule.c">
<Filter>PC</Filter>
</ClCompile>
<ClCompile Include="..\Python\_contextvars.c">
<Filter>Python</Filter>
</ClCompile>
<ClCompile Include="..\Python\_warnings.c">
<Filter>Python</Filter>
</ClCompile>
@ -1571,9 +1574,6 @@
<ClCompile Include="..\Objects\odictobject.c">
<Filter>Objects</Filter>
</ClCompile>
<ClCompile Include="..\Modules\_contextvarsmodule.c">
<Filter>Modules</Filter>
</ClCompile>
<ClCompile Include="$(zlibDir)\adler32.c">
<Filter>Modules\zlib</Filter>
</ClCompile>

View file

@ -1,6 +1,6 @@
#include "Python.h"
#include "clinic/_contextvarsmodule.c.h"
#include "clinic/_contextvars.c.h"
/*[clinic input]
module _contextvars

View file

@ -69,6 +69,7 @@ warnings_clear_state(WarningsState *st)
Py_CLEAR(st->filters);
Py_CLEAR(st->once_registry);
Py_CLEAR(st->default_action);
Py_CLEAR(st->context);
}
#ifndef Py_DEBUG
@ -156,6 +157,13 @@ _PyWarnings_InitState(PyInterpreterState *interp)
}
}
if (st->context == NULL) {
st->context = PyContextVar_New("_warnings_context", NULL);
if (st->context == NULL) {
return -1;
}
}
st->filters_version = 0;
return 0;
}
@ -256,6 +264,68 @@ warnings_lock_held(WarningsState *st)
return PyMutex_IsLocked(&st->lock.mutex);
}
static PyObject *
get_warnings_context(PyInterpreterState *interp)
{
WarningsState *st = warnings_get_state(interp);
assert(PyContextVar_CheckExact(st->context));
PyObject *ctx;
if (PyContextVar_Get(st->context, NULL, &ctx) < 0) {
return NULL;
}
if (ctx == NULL) {
Py_RETURN_NONE;
}
return ctx;
}
static PyObject *
get_warnings_context_filters(PyInterpreterState *interp)
{
PyObject *ctx = get_warnings_context(interp);
if (ctx == NULL) {
return NULL;
}
if (ctx == Py_None) {
Py_RETURN_NONE;
}
PyObject *context_filters = PyObject_GetAttr(ctx, &_Py_ID(_filters));
Py_DECREF(ctx);
if (context_filters == NULL) {
return NULL;
}
if (!PyList_Check(context_filters)) {
PyErr_SetString(PyExc_ValueError,
"_filters of warnings._warnings_context must be a list");
Py_DECREF(context_filters);
return NULL;
}
return context_filters;
}
// Returns a borrowed reference to the list.
static PyObject *
get_warnings_filters(PyInterpreterState *interp)
{
WarningsState *st = warnings_get_state(interp);
PyObject *warnings_filters = GET_WARNINGS_ATTR(interp, filters, 0);
if (warnings_filters == NULL) {
if (PyErr_Occurred())
return NULL;
}
else {
Py_SETREF(st->filters, warnings_filters);
}
PyObject *filters = st->filters;
if (filters == NULL || !PyList_Check(filters)) {
PyErr_SetString(PyExc_ValueError,
MODULE_NAME ".filters must be a list");
return NULL;
}
return filters;
}
/*[clinic input]
_acquire_lock as warnings_acquire_lock
@ -349,35 +419,17 @@ get_default_action(PyInterpreterState *interp)
return default_action;
}
/* The item is a new reference. */
static PyObject*
get_filter(PyInterpreterState *interp, PyObject *category,
PyObject *text, Py_ssize_t lineno,
PyObject *module, PyObject **item)
{
WarningsState *st = warnings_get_state(interp);
assert(st != NULL);
assert(warnings_lock_held(st));
PyObject *warnings_filters = GET_WARNINGS_ATTR(interp, filters, 0);
if (warnings_filters == NULL) {
if (PyErr_Occurred())
return NULL;
}
else {
Py_SETREF(st->filters, warnings_filters);
}
PyObject *filters = st->filters;
if (filters == NULL || !PyList_Check(filters)) {
PyErr_SetString(PyExc_ValueError,
MODULE_NAME ".filters must be a list");
return NULL;
}
/* WarningsState.filters could change while we are iterating over it. */
/* Search filters list of match, returns false on error. If no match
* then 'matched_action' is NULL. */
static bool
filter_search(PyInterpreterState *interp, PyObject *category,
PyObject *text, Py_ssize_t lineno,
PyObject *module, char *list_name, PyObject *filters,
PyObject **item, PyObject **matched_action) {
bool result = true;
*matched_action = NULL;
/* Avoid the filters list changing while we iterate over it. */
Py_BEGIN_CRITICAL_SECTION(filters);
for (Py_ssize_t i = 0; i < PyList_GET_SIZE(filters); i++) {
PyObject *tmp_item, *action, *msg, *cat, *mod, *ln_obj;
Py_ssize_t ln;
@ -386,8 +438,9 @@ get_filter(PyInterpreterState *interp, PyObject *category,
tmp_item = PyList_GET_ITEM(filters, i);
if (!PyTuple_Check(tmp_item) || PyTuple_GET_SIZE(tmp_item) != 5) {
PyErr_Format(PyExc_ValueError,
MODULE_NAME ".filters item %zd isn't a 5-tuple", i);
return NULL;
"warnings.%s item %zd isn't a 5-tuple", list_name, i);
result = false;
break;
}
/* Python code: action, msg, cat, mod, ln = item */
@ -403,42 +456,102 @@ get_filter(PyInterpreterState *interp, PyObject *category,
"action must be a string, not '%.200s'",
Py_TYPE(action)->tp_name);
Py_DECREF(tmp_item);
return NULL;
result = false;
break;
}
good_msg = check_matched(interp, msg, text);
if (good_msg == -1) {
Py_DECREF(tmp_item);
return NULL;
result = false;
break;
}
good_mod = check_matched(interp, mod, module);
if (good_mod == -1) {
Py_DECREF(tmp_item);
return NULL;
result = false;
break;
}
is_subclass = PyObject_IsSubclass(category, cat);
if (is_subclass == -1) {
Py_DECREF(tmp_item);
return NULL;
result = false;
break;
}
ln = PyLong_AsSsize_t(ln_obj);
if (ln == -1 && PyErr_Occurred()) {
Py_DECREF(tmp_item);
return NULL;
result = false;
break;
}
if (good_msg && is_subclass && good_mod && (ln == 0 || lineno == ln)) {
*item = tmp_item;
return action;
*matched_action = action;
result = true;
break;
}
Py_DECREF(tmp_item);
}
Py_END_CRITICAL_SECTION();
return result;
}
PyObject *action = get_default_action(interp);
/* The item is a new reference. */
static PyObject*
get_filter(PyInterpreterState *interp, PyObject *category,
PyObject *text, Py_ssize_t lineno,
PyObject *module, PyObject **item)
{
#ifdef Py_DEBUG
WarningsState *st = warnings_get_state(interp);
assert(st != NULL);
assert(warnings_lock_held(st));
#endif
/* check _warning_context _filters list */
PyObject *context_filters = get_warnings_context_filters(interp);
if (context_filters == NULL) {
return NULL;
}
bool use_global_filters = false;
if (context_filters == Py_None) {
use_global_filters = true;
} else {
PyObject *context_action = NULL;
if (!filter_search(interp, category, text, lineno, module, "_warnings_context _filters",
context_filters, item, &context_action)) {
Py_DECREF(context_filters);
return NULL;
}
Py_DECREF(context_filters);
if (context_action != NULL) {
return context_action;
}
}
PyObject *action;
if (use_global_filters) {
/* check warnings.filters list */
PyObject *filters = get_warnings_filters(interp);
if (filters == NULL) {
return NULL;
}
if (!filter_search(interp, category, text, lineno, module, "filters",
filters, item, &action)) {
return NULL;
}
if (action != NULL) {
return action;
}
}
action = get_default_action(interp);
if (action != NULL) {
*item = Py_NewRef(Py_None);
return action;
@ -1540,6 +1653,9 @@ warnings_module_exec(PyObject *module)
if (PyModule_AddObjectRef(module, "_defaultaction", st->default_action) < 0) {
return -1;
}
if (PyModule_AddObjectRef(module, "_warnings_context", st->context) < 0) {
return -1;
}
return 0;
}

View file

@ -151,6 +151,8 @@ static const PyConfigSpec PYCONFIG_SPEC[] = {
SPEC(filesystem_errors, WSTR, READ_ONLY, NO_SYS),
SPEC(hash_seed, ULONG, READ_ONLY, NO_SYS),
SPEC(home, WSTR_OPT, READ_ONLY, NO_SYS),
SPEC(thread_inherit_context, INT, READ_ONLY, NO_SYS),
SPEC(context_aware_warnings, INT, READ_ONLY, NO_SYS),
SPEC(import_time, BOOL, READ_ONLY, NO_SYS),
SPEC(install_signal_handlers, BOOL, READ_ONLY, NO_SYS),
SPEC(isolated, BOOL, READ_ONLY, NO_SYS), // sys.flags.isolated
@ -339,6 +341,14 @@ The following implementation-specific options are available:\n\
PYTHON_TLBC\n"
#endif
"\
-X thread_inherit_context=[0|1]: enable (1) or disable (0) threads inheriting\n\
context vars by default; enabled by default in the free-threaded\n\
build and disabled otherwise; also PYTHON_THREAD_INHERIT_CONTEXT\n\
-X context_aware_warnings=[0|1]: if true (1) then the warnings module will\n\
use a context variables; if false (0) then the warnings module will\n\
use module globals, which is not concurrent-safe; set to true for\n\
free-threaded builds and false otherwise; also\n\
PYTHON_CONTEXT_AWARE_WARNINGS\n\
-X tracemalloc[=N]: trace Python memory allocations; N sets a traceback limit\n \
of N frames (default: 1); also PYTHONTRACEMALLOC=N\n\
-X utf8[=0|1]: enable (1) or disable (0) UTF-8 mode; also PYTHONUTF8\n\
@ -426,6 +436,10 @@ static const char usage_envvars[] =
#ifdef Py_GIL_DISABLED
"PYTHON_TLBC : when set to 0, disables thread-local bytecode (-X tlbc)\n"
#endif
"PYTHON_THREAD_INHERIT_CONTEXT: if true (1), threads inherit context vars\n"
" (-X thread_inherit_context)\n"
"PYTHON_CONTEXT_AWARE_WARNINGS: if true (1), enable thread-safe warnings module\n"
" behaviour (-X context_aware_warnings)\n"
"PYTHONTRACEMALLOC: trace Python memory allocations (-X tracemalloc)\n"
"PYTHONUNBUFFERED: disable stdout/stderr buffering (-u)\n"
"PYTHONUTF8 : control the UTF-8 mode (-X utf8)\n"
@ -923,6 +937,8 @@ config_check_consistency(const PyConfig *config)
assert(config->cpu_count != 0);
// config->use_frozen_modules is initialized later
// by _PyConfig_InitImportConfig().
assert(config->thread_inherit_context >= 0);
assert(config->context_aware_warnings >= 0);
#ifdef __APPLE__
assert(config->use_system_logger >= 0);
#endif
@ -1029,6 +1045,13 @@ _PyConfig_InitCompatConfig(PyConfig *config)
config->_is_python_build = 0;
config->code_debug_ranges = 1;
config->cpu_count = -1;
#ifdef Py_GIL_DISABLED
config->thread_inherit_context = 1;
config->context_aware_warnings = 1;
#else
config->thread_inherit_context = 0;
config->context_aware_warnings = 0;
#endif
#ifdef __APPLE__
config->use_system_logger = USE_SYSTEM_LOGGER_DEFAULT;
#endif
@ -1061,6 +1084,13 @@ config_init_defaults(PyConfig *config)
#ifdef MS_WINDOWS
config->legacy_windows_stdio = 0;
#endif
#ifdef Py_GIL_DISABLED
config->thread_inherit_context = 1;
config->context_aware_warnings = 1;
#else
config->thread_inherit_context = 0;
config->context_aware_warnings = 0;
#endif
#ifdef __APPLE__
config->use_system_logger = USE_SYSTEM_LOGGER_DEFAULT;
#endif
@ -1095,6 +1125,11 @@ PyConfig_InitIsolatedConfig(PyConfig *config)
config->int_max_str_digits = _PY_LONG_DEFAULT_MAX_STR_DIGITS;
config->safe_path = 1;
config->pathconfig_warnings = 0;
#ifdef Py_GIL_DISABLED
config->thread_inherit_context = 1;
#else
config->thread_inherit_context = 0;
#endif
#ifdef MS_WINDOWS
config->legacy_windows_stdio = 0;
#endif
@ -1924,6 +1959,58 @@ error:
"n must be greater than 0");
}
static PyStatus
config_init_thread_inherit_context(PyConfig *config)
{
const char *env = config_get_env(config, "PYTHON_THREAD_INHERIT_CONTEXT");
if (env) {
int enabled;
if (_Py_str_to_int(env, &enabled) < 0 || (enabled < 0) || (enabled > 1)) {
return _PyStatus_ERR(
"PYTHON_THREAD_INHERIT_CONTEXT=N: N is missing or invalid");
}
config->thread_inherit_context = enabled;
}
const wchar_t *xoption = config_get_xoption(config, L"thread_inherit_context");
if (xoption) {
int enabled;
const wchar_t *sep = wcschr(xoption, L'=');
if (!sep || (config_wstr_to_int(sep + 1, &enabled) < 0) || (enabled < 0) || (enabled > 1)) {
return _PyStatus_ERR(
"-X thread_inherit_context=n: n is missing or invalid");
}
config->thread_inherit_context = enabled;
}
return _PyStatus_OK();
}
static PyStatus
config_init_context_aware_warnings(PyConfig *config)
{
const char *env = config_get_env(config, "PYTHON_CONTEXT_AWARE_WARNINGS");
if (env) {
int enabled;
if (_Py_str_to_int(env, &enabled) < 0 || (enabled < 0) || (enabled > 1)) {
return _PyStatus_ERR(
"PYTHON_CONTEXT_AWARE_WARNINGS=N: N is missing or invalid");
}
config->context_aware_warnings = enabled;
}
const wchar_t *xoption = config_get_xoption(config, L"context_aware_warnings");
if (xoption) {
int enabled;
const wchar_t *sep = wcschr(xoption, L'=');
if (!sep || (config_wstr_to_int(sep + 1, &enabled) < 0) || (enabled < 0) || (enabled > 1)) {
return _PyStatus_ERR(
"-X context_aware_warnings=n: n is missing or invalid");
}
config->context_aware_warnings = enabled;
}
return _PyStatus_OK();
}
static PyStatus
config_init_tlbc(PyConfig *config)
{
@ -2232,6 +2319,16 @@ config_read_complex_options(PyConfig *config)
}
#endif
status = config_init_thread_inherit_context(config);
if (_PyStatus_EXCEPTION(status)) {
return status;
}
status = config_init_context_aware_warnings(config);
if (_PyStatus_EXCEPTION(status)) {
return status;
}
status = config_init_tlbc(config);
if (_PyStatus_EXCEPTION(status)) {
return status;

View file

@ -64,6 +64,7 @@ static const char* _Py_stdlib_module_names[] = {
"_posixshmem",
"_posixsubprocess",
"_py_abc",
"_py_warnings",
"_pydatetime",
"_pydecimal",
"_pyio",

View file

@ -3335,6 +3335,8 @@ static PyStructSequence_Field flags_fields[] = {
{"safe_path", "-P"},
{"int_max_str_digits", "-X int_max_str_digits"},
{"gil", "-X gil"},
{"thread_inherit_context", "-X thread_inherit_context"},
{"context_aware_warnings", "-X context_aware_warnings"},
{0}
};
@ -3435,6 +3437,8 @@ set_flags_from_config(PyInterpreterState *interp, PyObject *flags)
#else
SetFlagObj(PyLong_FromLong(1));
#endif
SetFlag(config->thread_inherit_context);
SetFlag(config->context_aware_warnings);
#undef SetFlagObj
#undef SetFlag
return 0;

28
configure generated vendored
View file

@ -805,8 +805,6 @@ MODULE__HEAPQ_FALSE
MODULE__HEAPQ_TRUE
MODULE__CSV_FALSE
MODULE__CSV_TRUE
MODULE__CONTEXTVARS_FALSE
MODULE__CONTEXTVARS_TRUE
MODULE__BISECT_FALSE
MODULE__BISECT_TRUE
MODULE__ASYNCIO_FALSE
@ -30768,28 +30766,6 @@ then :
fi
if test "$py_cv_module__contextvars" != "n/a"
then :
py_cv_module__contextvars=yes
fi
if test "$py_cv_module__contextvars" = yes; then
MODULE__CONTEXTVARS_TRUE=
MODULE__CONTEXTVARS_FALSE='#'
else
MODULE__CONTEXTVARS_TRUE='#'
MODULE__CONTEXTVARS_FALSE=
fi
as_fn_append MODULE_BLOCK "MODULE__CONTEXTVARS_STATE=$py_cv_module__contextvars$as_nl"
if test "x$py_cv_module__contextvars" = xyes
then :
fi
@ -33703,10 +33679,6 @@ if test -z "${MODULE__BISECT_TRUE}" && test -z "${MODULE__BISECT_FALSE}"; then
as_fn_error $? "conditional \"MODULE__BISECT\" was never defined.
Usually this means the macro was only invoked conditionally." "$LINENO" 5
fi
if test -z "${MODULE__CONTEXTVARS_TRUE}" && test -z "${MODULE__CONTEXTVARS_FALSE}"; then
as_fn_error $? "conditional \"MODULE__CONTEXTVARS\" was never defined.
Usually this means the macro was only invoked conditionally." "$LINENO" 5
fi
if test -z "${MODULE__CSV_TRUE}" && test -z "${MODULE__CSV_FALSE}"; then
as_fn_error $? "conditional \"MODULE__CSV\" was never defined.
Usually this means the macro was only invoked conditionally." "$LINENO" 5

View file

@ -7776,7 +7776,6 @@ dnl always enabled extension modules
PY_STDLIB_MOD_SIMPLE([array])
PY_STDLIB_MOD_SIMPLE([_asyncio])
PY_STDLIB_MOD_SIMPLE([_bisect])
PY_STDLIB_MOD_SIMPLE([_contextvars])
PY_STDLIB_MOD_SIMPLE([_csv])
PY_STDLIB_MOD_SIMPLE([_heapq])
PY_STDLIB_MOD_SIMPLE([_json])