import warnings
from typing import Union, Iterable, List, Tuple
from tqdm import tqdm
import aerosandbox.numpy as np
from aerosandbox.tools.pretty_plots.utilities.natural_univariate_spline import NaturalUnivariateSpline as Spline
from scipy import signal
from aerosandbox.tools.code_benchmarking import Timer
[docs]def estimate_noise_standard_deviation(
data: np.ndarray,
estimator_order: int = None,
) -> float:
"""
Estimates the standard deviation of the random noise in a time-series dataset.
Relies on several assumptions:
- The noise is normally-distributed and independent between samples (i.e. white noise).
- The noise is stationary and homoscedastic (i.e., the noise standard deviation is constant).
- The noise is uncorrelated with the signal.
- The sample rate of the data is significantly higher than the highest-frequency component of the signal. (In
practice, this ratio need not be more than ~5:1, if higher-order estimators are used. At a minimum, however,
this ratio must be greater than 2:1, corresponding to the Nyquist frequency.)
The algorithm used in this function is a highly-optimized version of the math described in this repository,
part of an upcoming paper: https://github.com/peterdsharpe/aircraft-polar-reconstruction-from-flight-test
Args:
data: A 1D NumPy array of time-series data.
estimator_order: The order of the estimator to use. Higher orders are generally more accurate, up to the
point where sample error starts to dominate. If None, a reasonable estimator order will be chosen automatically.
Returns: An estimate of the standard deviation of the data's noise component.
"""
if len(data) < 2:
raise ValueError("Data must have at least 2 points.")
if estimator_order is None:
estimator_order = int(np.clip(len(data) ** 0.5, 1, 1000))
##### Noise Variance Reconstruction #####
from scipy.special import gammaln
ln_factorial = lambda x: gammaln(x + 1)
### For speed, pre-compute the log-factorial of integers from 1 to estimator_order
# ln_f = ln_factorial(np.arange(estimator_order + 1))
ln_f = np.cumsum(
np.log(
np.concatenate([
[1],
np.arange(1, estimator_order + 1)
])
)
)
### Create a convolutional kernel to vectorize the summation
log_coeffs = (
2 * ln_f[estimator_order] - ln_f - ln_f[::-1] - 0.5 * ln_factorial(2 * estimator_order)
)
indices = np.nonzero(log_coeffs >= np.log(1e-20) + log_coeffs[estimator_order // 2])[0]
coefficients = np.exp(log_coeffs[indices[0]:indices[-1] + 1])
coefficients[::2] *= -1 # Flip the sign on every other coefficient
coefficients -= np.mean(coefficients) # Remove any bias introduced by floating-point error
# sample_stdev = signal.convolve(data, coefficients[::-1], 'valid')
sample_stdev = signal.oaconvolve(data, coefficients[::-1], 'valid')
return np.mean(sample_stdev ** 2) ** 0.5
[docs]def bootstrap_fits(
x: np.ndarray,
y: np.ndarray,
x_noise_stdev: Union[None, float] = 0.,
y_noise_stdev: Union[None, float] = None,
n_bootstraps: int = 2000,
fit_points: Union[int, Iterable[float], None] = 300,
spline_degree: int = 3,
normalize: bool = None,
) -> Union[Tuple[np.ndarray, np.ndarray], List[Spline]]:
"""
Bootstraps a time-series dataset and fits splines to each bootstrap resample.
Args:
x: The independent variable (e.g., time) of the dataset. A 1D NumPy array.
y: The dependent variable (e.g., altitude) of the dataset. A 1D NumPy array.
n_bootstraps: The number of bootstrap resamples to create.
fit_points: An optional variable that determines what to do with the splines after they are fit:
- If an integer, the splines will be evaluated at a linearly-spaced vector of points between the minimum
and maximum x-values of the dataset, with the number of points equal to `fit_points`. This is the default.
- If an iterable of floats (e.g. a 1D NumPy array), the splines will be evaluated at those points.
- If None, the splines won't be evaluated, and instead the splines are returned directly.
spline_degree: The degree of the splines to fit.
normalize: Whether or not to normalize the data before fitting. If True, the data will be normalized to
the range [0, 1] before fitting, and the splines will be un-normalized before being returned. If False,
the data will not be normalized before fitting.
- If None (the default), the data will be normalized if and only if `fit_points` is not None.
Returns: One of the following, depending on the value of `fit_points`:
- If `fit_points` is an integer or array, then this function returns a tuple of NumPy arrays:
- `x_fit`: A 1D NumPy array with the x-values at which the splines were evaluated.
- `y_bootstrap_fits`: A 2D NumPy array of shape (n_bootstraps, len(x_fit)) with the y-values of the
splines evaluated at each bootstrap resample and at each x-value.
- If `fit_points` is None, then this function returns a list of `n_bootstraps` splines, each of which is a
`NaturalUnivariateSpline`, which is a subclass of `scipy.interpolate.UnivariateSpline` with more sensible
extrapolation.
"""
### Set defaults
if normalize is None:
normalize = fit_points is not None
### Discard any NaN points
isnan = np.logical_or(
np.isnan(x),
np.isnan(y),
)
x = x[~isnan]
y = y[~isnan]
### Compute the standard deviation of the noise
if x_noise_stdev is None:
x_noise_stdev = estimate_noise_standard_deviation(x)
print(f"Estimated x-component of noise standard deviation: {x_noise_stdev}")
if y_noise_stdev is None:
y_noise_stdev = estimate_noise_standard_deviation(y)
print(f"Estimated y-component of noise standard deviation: {y_noise_stdev}")
### Sort the data by x-value
sort_indices = np.argsort(x)
x = x[sort_indices]
y = y[sort_indices]
### Prepare for normalization
x_min = np.min(x)
x_max = np.max(x)
x_rng = x_max - x_min
y_min = np.min(y)
y_max = np.max(y)
y_rng = y_max - y_min
if normalize:
x_normalize = lambda x: (x - x_min) / x_rng
y_normalize = lambda y: (y - y_min) / y_rng
# x_unnormalize = lambda x_n: x_n * x_rng + x_min
y_unnormalize = lambda y_n: y_n * y_rng + y_min
x_stdev_normalized = x_noise_stdev / x_rng
y_stdev_normalized = y_noise_stdev / y_rng
else:
x_normalize = lambda x: x
y_normalize = lambda y: y
# x_unnormalize = lambda x_n: x_n
y_unnormalize = lambda y_n: y_n
x_stdev_normalized = x_noise_stdev
y_stdev_normalized = y_noise_stdev
with tqdm(total=n_bootstraps, desc="Bootstrapping", unit=" samples") as progress_bar:
splines = []
n_valid_splines = 0
n_attempted_splines = 0
while n_valid_splines < n_bootstraps:
n_attempted_splines += 1
### Obtain a bootstrap resample
indices = np.random.choice(len(x), size=len(x), replace=True)
x_sample = x[indices] + np.random.normal(scale=x_noise_stdev, size=len(x))
y_sample = y[indices]
order = np.argsort(x_sample)
x_sample = x_sample[order]
y_sample = y_sample[order]
with warnings.catch_warnings():
warnings.simplefilter("ignore", category=UserWarning)
spline = Spline(
x=x_normalize(x_sample),
y=y_normalize(y_sample),
w=np.ones_like(x) / y_stdev_normalized,
s=len(x),
k=spline_degree,
)
if not np.isnan(spline(x_normalize((x_min + x_max) / 2))):
n_valid_splines += 1
progress_bar.update(1)
splines.append(spline)
else:
continue
if fit_points is None:
return splines
else:
### Determine which x-points to resample at
if fit_points is None:
x_fit = None
if normalize:
raise ValueError("If `fit_points` is None, `normalize` must be False.")
elif isinstance(fit_points, int):
x_fit = np.linspace(
np.min(x),
np.max(x),
fit_points
)
else:
x_fit = np.array(fit_points)
### Evaluate the splines at the x-points
y_bootstrap_fits = np.array([
y_unnormalize(spline(x_normalize(x_fit)))
for spline in splines
])
### Throw an error if all of the splines are NaN
if np.all(np.isnan(y_bootstrap_fits)):
raise ValueError("All of the splines are NaN. This is likely due to a poor choice of `spline_degree`.")
return x_fit, y_bootstrap_fits
if __name__ == '__main__':
np.random.seed(1)
f_sample_over_f_signal = 1000
t = np.arange(N)
y = np.sin(2 * np.pi / f_sample_over_f_signal * t) + 0.1 * np.random.randn(len(t))
print(estimate_noise_standard_deviation(y, 1))
# d = dict(np.load("raw_data.npz"))
#
# x = d["airspeed"]
# y = d["voltage"] * d["current"]
#
# # estimate_noise_standard_deviation(x)
# #
# # x_fit, y_bootstrap_fits = bootstrap_fits(
# # x, y,
# # x_stdev=None,
# # y_stdev=None,
# # n_bootstraps=20,
# # spline_degree=5,
# # )
# import matplotlib.pyplot as plt
# import aerosandbox.tools.pretty_plots as p
#
# fig, ax = plt.subplots(figsize=(7, 4))
#
# p.plot_with_bootstrapped_uncertainty(
# x, y,
# x_stdev=None,
# y_stdev=estimate_noise_standard_deviation(y[np.argsort(x)]),
# ci=[0.75, 0.95],
# color="coral",
# n_bootstraps=100,
# n_fit_points=200,
# # ci_to_alpha_mapping=lambda ci: 0.4,
# normalize=False,
# spline_degree=3,
# )
# plt.xlim(x.min(), x.max())
# plt.ylim(-10, 800)
# p.set_ticks(1, 0.25, 100, 25)
# plt.legend(
# loc="lower right"
# )
# p.show_plot(
# xlabel="Cruise Airspeed [m/s]",
# ylabel="Electrical Power Required [W]",
# title="Raw Data",
# legend=False,
# dpi=300
# )