gh-115532 Add kde_random() to the statistic module (#118210)

This commit is contained in:
Raymond Hettinger 2024-05-03 23:13:36 -05:00 committed by GitHub
parent 1b7e5e6e60
commit 42dc5b4ace
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
4 changed files with 207 additions and 63 deletions

View file

@ -77,6 +77,7 @@ or sample.
:func:`geometric_mean` Geometric mean of data.
:func:`harmonic_mean` Harmonic mean of data.
:func:`kde` Estimate the probability density distribution of the data.
:func:`kde_random` Random sampling from the PDF generated by kde().
:func:`median` Median (middle value) of data.
:func:`median_low` Low median of data.
:func:`median_high` High median of data.
@ -311,6 +312,30 @@ However, for reading convenience, most of the examples show sorted sequences.
.. versionadded:: 3.13
.. function:: kde_random(data, h, kernel='normal', *, seed=None)
Return a function that makes a random selection from the estimated
probability density function produced by ``kde(data, h, kernel)``.
Providing a *seed* allows reproducible selections. In the future, the
values may change slightly as more accurate kernel inverse CDF estimates
are implemented. The seed may be an integer, float, str, or bytes.
A :exc:`StatisticsError` will be raised if the *data* sequence is empty.
Continuing the example for :func:`kde`, we can use
:func:`kde_random` to generate new random selections from an
estimated probability density function:
>>> data = [-2.1, -1.3, -0.4, 1.9, 5.1, 6.2]
>>> rand = kde_random(data, h=1.5, seed=8675309)
>>> new_selections = [rand() for i in range(10)]
>>> [round(x, 1) for x in new_selections]
[0.7, 6.2, 1.2, 6.9, 7.0, 1.8, 2.5, -0.5, -1.8, 5.6]
.. versionadded:: 3.13
.. function:: median(data)
Return the median (middle value) of numeric data, using the common "mean of
@ -1148,65 +1173,6 @@ The final prediction goes to the largest posterior. This is known as the
'female'
Sampling from kernel density estimation
***************************************
The :func:`kde()` function creates a continuous probability density
function from discrete samples. Some applications need a way to make
random selections from that distribution.
The technique is to pick a sample from a bandwidth scaled kernel
function and recenter the result around a randomly chosen point from
the input data. This can be done with any kernel that has a known or
accurately approximated inverse cumulative distribution function.
.. testcode::
from random import choice, random, seed
from math import sqrt, log, pi, tan, asin, cos, acos
from statistics import NormalDist
kernel_invcdfs = {
'normal': NormalDist().inv_cdf,
'logistic': lambda p: log(p / (1 - p)),
'sigmoid': lambda p: log(tan(p * pi/2)),
'rectangular': lambda p: 2*p - 1,
'triangular': lambda p: sqrt(2*p) - 1 if p < 0.5 else 1 - sqrt(2 - 2*p),
'parabolic': lambda p: 2 * cos((acos(2*p-1) + pi) / 3),
'cosine': lambda p: 2*asin(2*p - 1)/pi,
}
def kde_random(data, h, kernel='normal'):
'Return a function that samples from kde() smoothed data.'
kernel_invcdf = kernel_invcdfs[kernel]
def rand():
return h * kernel_invcdf(random()) + choice(data)
return rand
For example:
.. doctest::
>>> discrete_samples = [-2.1, -1.3, -0.4, 1.9, 5.1, 6.2]
>>> rand = kde_random(discrete_samples, h=1.5)
>>> seed(8675309)
>>> selections = [rand() for i in range(10)]
>>> [round(x, 1) for x in selections]
[4.7, 7.4, 1.2, 7.8, 6.9, -1.3, 5.8, 0.2, -1.4, 5.7]
.. testcode::
:hide:
from statistics import kde
from math import isclose
# Verify that cdf / invcdf will round trip
xarr = [i/100 for i in range(-100, 101)]
for kernel, invcdf in kernel_invcdfs.items():
cdf = kde([0.0], h=1.0, kernel=kernel, cumulative=True)
for x in xarr:
assert isclose(invcdf(cdf(x)), x, abs_tol=1E-9)
..
# This modelines must appear within the last ten lines of the file.
kate: indent-width 3; remove-trailing-space on; replace-tabs on; encoding utf-8;

View file

@ -745,7 +745,8 @@ statistics
* Add :func:`statistics.kde` for kernel density estimation.
This makes it possible to estimate a continuous probability density function
from a fixed number of discrete samples.
from a fixed number of discrete samples. Also added :func:`statistics.kde_random`
for sampling from the estimated probability density function.
(Contributed by Raymond Hettinger in :gh:`115863`.)
.. _whatsnew313-subprocess:

View file

@ -113,6 +113,7 @@ __all__ = [
'geometric_mean',
'harmonic_mean',
'kde',
'kde_random',
'linear_regression',
'mean',
'median',
@ -138,12 +139,13 @@ from decimal import Decimal
from itertools import count, groupby, repeat
from bisect import bisect_left, bisect_right
from math import hypot, sqrt, fabs, exp, erf, tau, log, fsum, sumprod
from math import isfinite, isinf, pi, cos, sin, cosh, atan
from math import isfinite, isinf, pi, cos, sin, tan, cosh, asin, atan, acos
from functools import reduce
from operator import itemgetter
from collections import Counter, namedtuple, defaultdict
_SQRT2 = sqrt(2.0)
_random = random
# === Exceptions ===
@ -978,11 +980,9 @@ def kde(data, h, kernel='normal', *, cumulative=False):
return sum(K((x - x_i) / h) for x_i in data) / (n * h)
def cdf(x):
n = len(data)
return sum(W((x - x_i) / h) for x_i in data) / n
else:
sample = sorted(data)
@ -1078,6 +1078,7 @@ def quantiles(data, *, n=4, method='exclusive'):
if ld == 1:
return data * (n - 1)
raise StatisticsError('must have at least one data point')
if method == 'inclusive':
m = ld - 1
result = []
@ -1086,6 +1087,7 @@ def quantiles(data, *, n=4, method='exclusive'):
interpolated = (data[j] * (n - delta) + data[j + 1] * delta) / n
result.append(interpolated)
return result
if method == 'exclusive':
m = ld + 1
result = []
@ -1096,6 +1098,7 @@ def quantiles(data, *, n=4, method='exclusive'):
interpolated = (data[j - 1] * (n - delta) + data[j] * delta) / n
result.append(interpolated)
return result
raise ValueError(f'Unknown method: {method!r}')
@ -1709,3 +1712,97 @@ class NormalDist:
def __setstate__(self, state):
self._mu, self._sigma = state
## kde_random() ##############################################################
def _newton_raphson(f_inv_estimate, f, f_prime, tolerance=1e-12):
def f_inv(y):
"Return x such that f(x) ≈ y within the specified tolerance."
x = f_inv_estimate(y)
while abs(diff := f(x) - y) > tolerance:
x -= diff / f_prime(x)
return x
return f_inv
def _quartic_invcdf_estimate(p):
sign, p = (1.0, p) if p <= 1/2 else (-1.0, 1.0 - p)
x = (2.0 * p) ** 0.4258865685331 - 1.0
if p >= 0.004 < 0.499:
x += 0.026818732 * sin(7.101753784 * p + 2.73230839482953)
return x * sign
_quartic_invcdf = _newton_raphson(
f_inv_estimate = _quartic_invcdf_estimate,
f = lambda t: 3/16 * t**5 - 5/8 * t**3 + 15/16 * t + 1/2,
f_prime = lambda t: 15/16 * (1.0 - t * t) ** 2)
def _triweight_invcdf_estimate(p):
sign, p = (1.0, p) if p <= 1/2 else (-1.0, 1.0 - p)
x = (2.0 * p) ** 0.3400218741872791 - 1.0
return x * sign
_triweight_invcdf = _newton_raphson(
f_inv_estimate = _triweight_invcdf_estimate,
f = lambda t: 35/32 * (-1/7*t**7 + 3/5*t**5 - t**3 + t) + 1/2,
f_prime = lambda t: 35/32 * (1.0 - t * t) ** 3)
_kernel_invcdfs = {
'normal': NormalDist().inv_cdf,
'logistic': lambda p: log(p / (1 - p)),
'sigmoid': lambda p: log(tan(p * pi/2)),
'rectangular': lambda p: 2*p - 1,
'parabolic': lambda p: 2 * cos((acos(2*p-1) + pi) / 3),
'quartic': _quartic_invcdf,
'triweight': _triweight_invcdf,
'triangular': lambda p: sqrt(2*p) - 1 if p < 1/2 else 1 - sqrt(2 - 2*p),
'cosine': lambda p: 2 * asin(2*p - 1) / pi,
}
_kernel_invcdfs['gauss'] = _kernel_invcdfs['normal']
_kernel_invcdfs['uniform'] = _kernel_invcdfs['rectangular']
_kernel_invcdfs['epanechnikov'] = _kernel_invcdfs['parabolic']
_kernel_invcdfs['biweight'] = _kernel_invcdfs['quartic']
def kde_random(data, h, kernel='normal', *, seed=None):
"""Return a function that makes a random selection from the estimated
probability density function created by kde(data, h, kernel).
Providing a *seed* allows reproducible selections within a single
thread. The seed may be an integer, float, str, or bytes.
A StatisticsError will be raised if the *data* sequence is empty.
Example:
>>> data = [-2.1, -1.3, -0.4, 1.9, 5.1, 6.2]
>>> rand = kde_random(data, h=1.5, seed=8675309)
>>> new_selections = [rand() for i in range(10)]
>>> [round(x, 1) for x in new_selections]
[0.7, 6.2, 1.2, 6.9, 7.0, 1.8, 2.5, -0.5, -1.8, 5.6]
"""
n = len(data)
if not n:
raise StatisticsError('Empty data sequence')
if not isinstance(data[0], (int, float)):
raise TypeError('Data sequence must contain ints or floats')
if h <= 0.0:
raise StatisticsError(f'Bandwidth h must be positive, not {h=!r}')
try:
kernel_invcdf = _kernel_invcdfs[kernel]
except KeyError:
raise StatisticsError(f'Unknown kernel name: {kernel!r}')
prng = _random.Random(seed)
random = prng.random
choice = prng.choice
def rand():
return choice(data) + h * kernel_invcdf(random())
rand.__doc__ = f'Random KDE selection with {h=!r} and {kernel=!r}'
return rand

View file

@ -2426,6 +2426,86 @@ class TestKDE(unittest.TestCase):
self.assertEqual(f_hat(-1.0), 1/2)
self.assertEqual(f_hat(1.0), 1/2)
def test_kde_kernel_invcdfs(self):
kernel_invcdfs = statistics._kernel_invcdfs
kde = statistics.kde
# Verify that cdf / invcdf will round trip
xarr = [i/100 for i in range(-100, 101)]
for kernel, invcdf in kernel_invcdfs.items():
with self.subTest(kernel=kernel):
cdf = kde([0.0], h=1.0, kernel=kernel, cumulative=True)
for x in xarr:
self.assertAlmostEqual(invcdf(cdf(x)), x, places=5)
def test_kde_random(self):
kde_random = statistics.kde_random
StatisticsError = statistics.StatisticsError
kernels = ['normal', 'gauss', 'logistic', 'sigmoid', 'rectangular',
'uniform', 'triangular', 'parabolic', 'epanechnikov',
'quartic', 'biweight', 'triweight', 'cosine']
sample = [-2.1, -1.3, -0.4, 1.9, 5.1, 6.2]
# Smoke test
for kernel in kernels:
with self.subTest(kernel=kernel):
rand = kde_random(sample, h=1.5, kernel=kernel)
selections = [rand() for i in range(10)]
# Check error cases
with self.assertRaises(StatisticsError):
kde_random([], h=1.0) # Empty dataset
with self.assertRaises(TypeError):
kde_random(['abc', 'def'], 1.5) # Non-numeric data
with self.assertRaises(TypeError):
kde_random(iter(sample), 1.5) # Data is not a sequence
with self.assertRaises(StatisticsError):
kde_random(sample, h=0.0) # Zero bandwidth
with self.assertRaises(StatisticsError):
kde_random(sample, h=0.0) # Negative bandwidth
with self.assertRaises(TypeError):
kde_random(sample, h='str') # Wrong bandwidth type
with self.assertRaises(StatisticsError):
kde_random(sample, h=1.0, kernel='bogus') # Invalid kernel
# Test name and docstring of the generated function
h = 1.5
kernel = 'cosine'
prng = kde_random(sample, h, kernel)
self.assertEqual(prng.__name__, 'rand')
self.assertIn(kernel, prng.__doc__)
self.assertIn(repr(h), prng.__doc__)
# Approximate distribution test: Compare a random sample to the expected distribution
data = [-2.1, -1.3, -0.4, 1.9, 5.1, 6.2, 7.8, 14.3, 15.1, 15.3, 15.8, 17.0]
n = 1_000_000
h = 1.75
dx = 0.1
def p_expected(x):
return F_hat(x + dx) - F_hat(x - dx)
def p_observed(x):
# P(x-dx <= X < x+dx) / (2*dx)
i = bisect.bisect_left(big_sample, x - dx)
j = bisect.bisect_right(big_sample, x + dx)
return (j - i) / len(big_sample)
for kernel in kernels:
with self.subTest(kernel=kernel):
F_hat = statistics.kde(data, h, kernel, cumulative=True)
rand = kde_random(data, h, kernel, seed=8675309**2)
big_sample = sorted([rand() for i in range(n)])
for x in range(-40, 190):
x /= 10
self.assertTrue(math.isclose(p_observed(x), p_expected(x), abs_tol=0.001))
class TestQuantiles(unittest.TestCase):