Source code for pyyeti.dsp

# -*- coding: utf-8 -*-
"""
Digital signal processing tools.
"""

import math
import itertools
from collections import abc
from warnings import warn
from types import SimpleNamespace
import numbers
import numpy as np
import pandas as pd
import scipy.signal as signal
import scipy.interpolate as interp
import matplotlib.patches as mpatches
from pyyeti.ytools import _check_makeplot


try:
    import numba
except ImportError:
    HAVE_NUMBA = False
else:
    HAVE_NUMBA = True


# FIXME: We need the str/repr formatting used in Numpy < 1.14.
try:
    np.set_printoptions(legacy="1.13")
except TypeError:
    pass


[docs] def resample(data, p, q, *, axis=-1, beta=14, pts=10, t=None, getfir=False): """ Change sample rate of data by a rational factor using Lanczos resampling. Parameters ---------- data : nd array_like Data to be resampled. The resampling is done along axis `axis`. p : integer The upsample factor. q : integer The downsample factor. axis : int, optional Axis along which to operate. beta : scalar The beta value for the Kaiser window. See :func:`scipy.signal.windows.kaiser`. pts : integer Number of points in data to average from each side of current data point. For example, if ``pts == 10``, a total 21 points of original data are used for averaging. t : array_like If `t` is given, it is assumed to be the sample positions associated with the signal data in `data` and the new (resampled) positions are returned. getfir : bool If True, the FIR filter coefficients are returned. Returns ------- rdata : nd ndarray The resampled data. If the signal(s) in `data` have `n` samples, the signal(s) in `rdata` have ``ceil(n*p/q)`` samples. tnew : 1d ndarray; optional The resampled positions, same length as `rdata`. Only returned if `t` is input. fir : 1d ndarray; optional The FIR filter coefficients. ``len(fir) = 2*pts*max(p, q)/gcd(p, q) + 1``. ``gcd(p, q)`` is the greatest common denominator of `p` and `q`. `fir` is only returned if `getfir` is True. Notes ----- This routine takes care not to introduce new frequency content when upsampling and not to alias higher frequency content into the lower frequencies when downsampling. It performs these basic steps: 0. Removes the mean from `data`: ``mdata = data-mean(data)``. 1. Inserts ``p-1`` zeros after every point in `mdata`. 2. Forms an averaging, anti-aliasing FIR filter based on the 'sinc' function and the Kaiser window to filter `mdata`. a. Each original point gets retained as-is (it gets multiplied by 1.0 and the other original data points get multiplied by 0.0). b. The new zero points are a weighted average (by distance) of the original points. The averaging coefficients (in the FIR filter) sum to 1.0 c. The frequency cutoff for the filter is the minimum of the original sample rate divided by two, or the final sample rate divided by two. This ensures there is no aliasing. 3. Downsamples by selecting every `q` points in filtered data. 4. Adds the mean value(s) back on to final result. Using more points in the averaging results in more accuracy at the cost of run-time. From tests, upsampling with this routine approaches the output of :func:`scipy.signal.resample` as `pts` is increased except near the end points, where the periodic nature of the FFT (used in :func:`scipy.signal.resample`) becomes evident. See the first example below. When upsampling by a factor, the original points in `data` are retained. See also -------- :func:`scipy.signal.resample` Examples -------- The first example upsamples a hand-picked data vector to show how this routine compares and contrasts with the FFT method in :func:`scipy.signal.resample`. .. plot:: :context: close-figs >>> from pyyeti import dsp >>> import scipy.signal as signal >>> import numpy as np >>> import matplotlib.pyplot as plt >>> import numpy.fft as fft >>> p = 3 >>> q = 1 >>> data = [0., -0.08, 0.8, 1.6, -1.7, -1.8, 2, 0., 0.7, -1.5] >>> n = len(data) >>> x = np.arange(n) >>> upx = np.arange(n*p)/p >>> _ = plt.figure('Example', figsize=(10, 8), clear=True, ... layout='constrained') >>> _ = plt.subplot(211) >>> _ = plt.plot(x, data, 'o-', label='Original') >>> res = {} >>> for pts, m in zip([3, 5, 7, 10], ... ['^', 'v', '<', '>']): ... res[pts], up2 = dsp.resample(data, p, q, pts=pts, t=x) ... lab = f'Resample, pts={pts}' ... _ = plt.plot(upx, res[pts], '-', label=lab, marker=m) >>> resfft = signal.resample(data, p*n) >>> _ = plt.plot(upx, resfft, '-D', label='scipy.signal.resample') >>> _ = plt.legend(loc='best') >>> _ = plt.title('Signal') >>> _ = plt.xlabel('Time') >>> _ = plt.subplot(212) >>> n2 = len(upx) >>> frq = fft.rfftfreq(n, 1) >>> frqup = fft.rfftfreq(n2, 1/p) >>> _ = plt.plot(frq, 2*np.abs(fft.rfft(data))/n, ... label='Original') >>> _ = plt.plot(frqup, 2*np.abs(fft.rfft(res[5]))/n2, ... label='Resample, pts=5') >>> _ = plt.plot(frqup, 2*np.abs(fft.rfft(res[10]))/n2, ... label='Resample, pts=10') >>> _ = plt.plot(frqup, 2*np.abs(fft.rfft(resfft))/n2, ... label='scipy.signal.resample') >>> _ = plt.legend(loc='best') >>> _ = plt.title('FFT Mag.') >>> _ = plt.xlabel('Frequency - fraction of original sample rate') >>> np.allclose(up2, upx) True >>> np.allclose(res[5][::p], data) # original data still here True .. plot:: :context: close-figs For another example, downsample some random data: >>> p = 1 >>> q = 5 >>> n = 530 >>> rng = np.random.default_rng(seed=10) >>> data = rng.normal(size=n) >>> x = np.arange(n) >>> dndata, dnx = dsp.resample(data, p, q, t=x) >>> fig = plt.figure('Example 2', clear=True, ... layout='constrained') >>> _ = plt.subplot(211) >>> _ = plt.plot(x, data, 'o-', label='Original', alpha=0.3) >>> _ = plt.plot(dnx, dndata, label='Resample', lw=2) >>> resfft = signal.resample(data, int(np.ceil(n/q))) >>> _ = plt.plot( ... dnx, resfft, label='scipy.signal.resample', lw=2 ... ) >>> _ = plt.legend(loc='best') >>> _ = plt.title('Signal') >>> _ = plt.xlabel('Time') >>> _ = plt.subplot(212) >>> n2 = len(dnx) >>> frq = fft.rfftfreq(n, 1) >>> frqdn = fft.rfftfreq(n2, q) >>> _ = plt.plot( ... frq, 2*np.abs(fft.rfft(data))/n, 'o-', alpha=.3 ... ) >>> _ = plt.plot(frqdn, 2*np.abs(fft.rfft(dndata))/n2, lw=2) >>> _ = plt.plot(frqdn, 2*np.abs(fft.rfft(resfft))/n2, lw=2) >>> _ = plt.title('FFT Mag.') >>> xlbl = 'Frequency - fraction of original sample rate' >>> _ = plt.xlabel(xlbl) >>> _ = plt.xlim(-0.01, 0.13) """ data = np.atleast_1d(data) ln = data.shape[axis] if not (axis == -1 or axis == data.ndim - 1): # Move the axis containing the data to the end data = np.swapaxes(data, axis, data.ndim - 1) # setup FIR filter for upsampling given the following parameters: gf = math.gcd(p, q) p = p // gf q = q // gf M = 2 * pts * max(p, q) w = signal.windows.kaiser(M + 1, beta) # w = signal.hann(M+1) n = np.arange(M + 1) # compute cutoff relative to highest sample rate (P*sr where sr=1) # eg, if Q = 1, cutoff = 0.5 of old sample rate = 0.5/P of new # if P = 1, cutoff = 0.5 of new sample rate = 0.5/Q of old cutoff = min(1 / q, 1 / p) / 2 # sinc(x) = sin(pi*x)/(pi*x) s = 2 * cutoff * np.sinc(2 * cutoff * (n - M / 2)) fir = p * w * s m = np.mean(data, axis=-1, keepdims=True) # insert zeros shape = [*data.shape] if p > 1: shape[-1] = ln * p updata1 = np.zeros(shape) updata1[..., ::p] = data - m else: updata1 = data - m # take care of lag by shifting with zeros: nz = M // 2 shape[-1] = nz z = np.zeros(shape) updata1 = np.concatenate((z, updata1, z), axis=-1) updata = signal.lfilter(fir, 1, updata1, axis=-1) updata = updata[..., M:] # downsample: n = int(np.ceil(ln * p / q)) if q > 1: shape[-1] = n RData = np.zeros(shape) RData = updata[..., ::q] + m else: RData = updata + m if not (axis == -1 or axis == data.ndim - 1): # Move the axis back to where it was RData = np.swapaxes(RData, axis, data.ndim - 1) if t is None: if getfir: return RData, fir return RData tnew = np.arange(n) * np.mean(np.diff(t)) * ln / n + t[0] if getfir: return RData, tnew, fir return RData, tnew
def _get_timedata(data): """ Check for value time/data input for :func:`fixtime` and :func:`aligntime` """ if isinstance(data, np.ndarray): if data.ndim != 2 or data.shape[1] != 2: raise ValueError( "incorrectly sized ndarray for " "time/data input (must be 2d with 2 " "columns)" ) t = data[:, 0] d = data[:, 1] isndarray = True else: if len(data) != 2: raise ValueError("incorrectly defined time/data input") t, d = np.atleast_1d(*data) if len(t) != len(d): raise ValueError("time and data vectors are incompatibly sized") isndarray = False return t, d, isndarray def _get_prev_index(vec, val): """Finds previous index for scalar `val`""" p = np.searchsorted(vec, val) - 1 if p < 0: return 0 return p
[docs] def exclusive_sgfilter(x, n, exclude_point="first", axis=-1): """ 1-d moving average that excludes selected point More specifically, this is a 0th order 1-d Savitzky-Golay FIR filter that has been modified such that it excludes the selected point. This is helpful to find outliers. Parameters ---------- x : nd array_like Array to filter n : odd integer Number of points for filter; if even, it is reset to ``n+1`` exclude_point : string or int or None; optional Defines which point to exclude in each moving average window. If integer, it must be in [0, n), specifying the point to exclude. If string, it must be 'first', 'middle', or 'last' (which is the same as ``0``, ``n // 2``, and ``n-1``, respectively). If None, no point will be excluded (this is primarily for testing and should match a standard 0th order Savitzky-Golay filter). axis : integer; optional Axis along which to apply filter; each subarray along this axis is filtered. For example, to filter each column in a 2d array, set `axis` to 0. Returns ------- x_f : nd ndarray Filtered version of `x` Notes ----- The end windows cannot all exclude the selected point. For example, if the excluded point is the first point in each window, the last ``n-1`` windows cannot follow this rule. To illustrate, let signal `x` be ``np.arange(9)``, `n` be 5, and `exclude_point` be 'first' (or 0). In this scenario, the last 4 windows cannot exclude the first point (the "-" denotes the excluded point for each window):: x = [0, 1, 2, 3, 4, 5, 6, 7, 8] 1st ave: - + + + + 2nd ave: - + + + + 3rd ave: - + + + + 4th ave: - + + + + 5th ave: - + + + + 6th ave: + - + + + 7th ave: + + - + + 8th ave: + + + - + 9th ave: + + + + - If `exclude_point` is 'middle' or ``n // 2``:: x = [0, 1, 2, 3, 4, 5, 6, 7, 8] 1st ave: - + + + + 2nd ave: + - + + + 3rd ave: + + - + + 4th ave: + + - + + 5th ave: + + - + + 6th ave: + + - + + 7th ave: + + - + + 8th ave: + + + - + 9th ave: + + + + - If `exclude_point` is 'last' or ``n-1``:: x = [0, 1, 2, 3, 4, 5, 6, 7, 8] 1st ave: - + + + + 2nd ave: + - + + + 3rd ave: + + - + + 4th ave: + + + - + 5th ave: + + + + - 6th ave: + + + + - 7th ave: + + + + - 8th ave: + + + + - 9th ave: + + + + - Examples -------- >>> import numpy as np >>> from pyyeti.dsp import exclusive_sgfilter >>> x = np.arange(6.) >>> x[3] *= 2 >>> x array([ 0., 1., 2., 6., 4., 5.]) >>> for point in ('first', 'middle', 'last'): ... print(exclusive_sgfilter(x, 3, exclude_point=point)) [ 1.5 4. 5. 4.5 5.5 5. ] [ 1.5 1. 3.5 3. 5.5 5. ] [ 1.5 1. 0.5 1.5 4. 5. ] Equivalent run using indexes: >>> for point in (0, 1, 2): ... print(exclusive_sgfilter(x, 3, exclude_point=point)) [ 1.5 4. 5. 4.5 5.5 5. ] [ 1.5 1. 3.5 3. 5.5 5. ] [ 1.5 1. 0.5 1.5 4. 5. ] If `exclude_point` is None, this is the same as a normal 0th order Savitzky-Golay filter: >>> from scipy.signal import savgol_filter >>> savgol_filter(x, 3, polyorder=0) array([ 1., 1., 3., 4., 5., 5.]) >>> exclusive_sgfilter(x, 3, exclude_point=None) array([ 1., 1., 3., 4., 5., 5.]) """ x = np.atleast_1d(x) n = min(x.size - 1, n) | 1 b = np.empty(n) if isinstance(exclude_point, str): if exclude_point == "first": n_pt = 0 elif exclude_point == "middle": n_pt = n // 2 elif exclude_point == "last": n_pt = n - 1 else: raise ValueError("invalid `exclude_point` string") else: n_pt = exclude_point if n_pt is None: b[:] = 1 / n n_pt = n // 2 else: if not 0 <= n_pt <= n - 1: raise ValueError("invalid `exclude_point` integer") b[:] = 1 / (n - 1) # b is applied in reverse: y[1] = b[0]*x[1] + b[1]*x[0] b[n - n_pt - 1] = 0.0 if not (axis == -1 or axis == x.ndim - 1): # Move the axis containing the data to the end x = np.swapaxes(x, axis, x.ndim - 1) # Append pieces of x onto the front and back so the averages on # the ends work out properly. For example, if n is 5 and x is: # x = [0, 1, 2, 3, 4, 5, 6, 7, 8] # and n_pt is 2, then x2 is: # x2 = [3, 4, 0, 1, 2, 3, 4, 5, 6, 7, 8, 4, 5] # 1st ave: + + - + + # 2nd ave: + + - + + # 3rd ave: + + - + + # ... last ave: + + - + + # and n_pt is 1, then x2 is: # x2 = [4, 0, 1, 2, 3, 4, 5, 6, 7, 8, 4, 5, 6] # 1st ave: + - + + + # 2nd ave: + - + + + # 3rd ave: + - + + + # ... last ave: + - + + + x2 = np.concatenate((x[..., n - n_pt : n], x, x[..., -n : -(n_pt + 1)]), axis=-1) d = signal.lfilter(b, 1, x2)[..., n - 1 :] if not (axis == -1 or axis == x.ndim - 1): # Move the axis back to where it was d = np.swapaxes(d, axis, x.ndim - 1) return d
def _get_min_limit(x, n, threshold_sigma, threshold_value): if threshold_value is not None: return threshold_value ave = exclusive_sgfilter(x, n, exclude_point=None) return threshold_sigma * np.std(x - ave) def _sweep_out_priors(y, i, limit, ave): # see if we can consider points before the detected outlier # also as outliers: pv = [i] lim = limit[i] av = ave[i] for k in range(i - 1, -1, -1): if abs(y[k] - av) <= lim: break pv.append(k) pv.reverse() return pv def _sweep_out_nexts(y, i, limit, ave): # see if we can consider points after the detected outlier # also as outliers: pv = [i] lim = limit[i] av = ave[i] for k in range(i + 1, y.size): if abs(y[k] - av) <= lim: break pv.append(k) return pv def _get_stats_full(y, n, sigma, min_limit, xp): ave = exclusive_sgfilter(y, n, exclude_point=xp) y_delta = abs(y - ave) var = exclusive_sgfilter(y**2, n, exclude_point=xp) - ave**2 # use abs to care of negative numerical zeros: std = np.sqrt(abs(var)) limit = np.fmax(sigma * std, min_limit) return ave, y_delta, var, std, limit def _outs_first(y, n, sigma, min_limit, xp, ave, y_delta, var, std, limit): PV = y_delta > limit while True: pv = PV.nonzero()[0] if pv.size == 0: yield None, ave + limit, ave - limit # we're done # keep only last one ... previous ones can change pv = _sweep_out_priors(y, pv[-1], limit, ave) yield pv, ave + limit, ave - limit i, j = pv[0], pv[-1] if i == 0: yield None, ave + limit, ave - limit # we're done PV[i : j + 1] = False # To determine if point before i is a spike, need n-1 # valid points after j: k = min(y.size, j + n) count = k - (j + 1) # n-1 if away from end # shift good points backward in time to get rid of spikes: # <--- # ......ssss+++++ ==> ......+++++ # i j y[i : i + count] = y[j + 1 : k] # update only sections that need it: from i-n to i j = i i = max(i - n, 0) ave[i:j] = exclusive_sgfilter(y[i:k], n, exclude_point=xp)[: j - i] y_delta[i:j] = abs(y[i:j] - ave[i:j]) avsq = exclusive_sgfilter(y[i:k] ** 2, n, exclude_point=xp)[: j - i] var[i:j] = avsq - ave[i:j] ** 2 # use abs to care of negative numerical zeros: std[i:j] = np.sqrt(abs(var[i:j])) limit[i:j] = np.fmax(sigma * std[i:j], min_limit) PV[i:j] = y_delta[i:j] > limit[i:j] def _outs_last(y, n, sigma, min_limit, xp, ave, y_delta, var, std, limit): PV = y_delta > limit while True: pv = PV.nonzero()[0] if pv.size == 0: yield None, ave + limit, ave - limit # we're done # keep only first one ... later ones can change pv = _sweep_out_nexts(y, pv[0], limit, ave) yield pv, ave + limit, ave - limit i, j = pv[0], pv[-1] if j == y.size - 1: yield None, ave + limit, ave - limit # we're done PV[i : j + 1] = False # To determine if point after j is a spike, need n-1 # valid points before i: k = max(0, i - n + 1) count = i - k # n-1 if away from start # shift good points forward in time to get rid of spikes: # ---> # ......ssss+++++ ==> ......+++++ # i j y[j - count + 1 : j + 1] = y[k:i] # update only sections that need it: from j to j+n i = j j = min(j + n, y.size) m = i - j # -(j-i) ... keep last j-i points ave[i:j] = exclusive_sgfilter(y[k:j], n, exclude_point=xp)[m:] y_delta[i:j] = abs(y[i:j] - ave[i:j]) avsq = exclusive_sgfilter(y[k:j] ** 2, n, exclude_point=xp)[m:] var[i:j] = avsq - ave[i:j] ** 2 # use abs to care of negative numerical zeros: std[i:j] = np.sqrt(abs(var[i:j])) limit[i:j] = np.fmax(sigma * std[i:j], min_limit) PV[i:j] = y_delta[i:j] > limit[i:j] def _outs_gen(y, n, sigma, min_limit, xp, ave, y_delta, limit): PV = np.zeros(y.size, bool) hi = ave + limit lo = ave - limit while True: pv = y_delta > limit if not pv.any(): yield None, hi, lo # we're done PV[~PV] = pv yield PV.nonzero()[0], hi, lo y = y[~pv] ave, y_delta, var, std, limit = _get_stats_full(y, n, sigma, min_limit, xp) hi[~PV] = ave + limit lo[~PV] = ave - limit def _find_outlier_peaks(y, n, sigma, min_limit, xp): ave, y_delta, var, std, limit = _get_stats_full(y, n, sigma, min_limit, xp) if xp in ("first", 0): y = y.copy() yield from _outs_first( y, n, sigma, min_limit, xp, ave, y_delta, var, std, limit ) elif xp in ("last", n - 1): y = y.copy() yield from _outs_last(y, n, sigma, min_limit, xp, ave, y_delta, var, std, limit) else: yield from _outs_gen(y, n, sigma, min_limit, xp, ave, y_delta, limit)
[docs] def despike( x, n, sigma=8.0, maxiter=-1, threshold_sigma=2.0, threshold_value=None, exclude_point="first", **kwargs, ): """ Delete outlier data points from signal Parameters ---------- x : 1d array_like Signal to de-spike. n : odd integer Number of points for moving average; if even, it is reset to ``n+1``. If greater than the dimension of `x`, it is reset to the dimension or 1 less. sigma : real scalar; optional Number of standard deviations beyond which a point is considered an outlier. The default value is quite high; this is possible because the point itself is excluded from the calculations. maxiter : integer; optional Maximum number of iterations of outlier removal allowed. If `exclude_point` is 'first', only the last spike is removed on each iteration; if it is 'last', only the first spike is removed on each iteration. It is done this way because removing a spike can expose other points as spikes (but didn't appear to be because the removed spike was present). If <= 0, there is no set limit and the looping will stop when no more outliers are detected. Routine will always run at least 1 loop (setting `maxiter` to 0 is the same as setting it to 1). threshold_sigma : scalar; optional Number of standard deviations below which all data is kept. This standard deviation is of the entire input signal minus the moving average (using a window of `n` size). This value exists to avoid deleting small deviations such as bit toggles. Set to 0.0 to not use a threshold. `threshold_value` overrides `threshold_sigma` if it is not None. threshold_value : scalar or None; optional Optional method for specifying a minimum threshold. If not None, this scalar is used as an absolute minimum deviation from the moving average for a value to be considered a spike. Overrides `threshold_sigma`. Set to 0.0 to not use a threshold. exclude_point : string or int or None; optional Defines where, within each window, the point that is being considered as a potential outlier is. For example, 'first' compares the first point in each window the rest in that window to test if it is an outlier. This option is passed directly to :func:`exclusive_sgfilter`. If integer, it must be in [0, n), specifying the point to exclude. If string, it must be 'first', 'middle', or 'last' (which is the same as ``0``, ``n // 2``, and ``n-1``, respectively). If None, the point will be in the middle of the window and will not be excluded from the statistics (this is not recommended). **kwargs : other args are ignored This is here to accommodate :func:`fixtime`. Returns ------- A SimpleNamespace with the members: x : 1d ndarray Despiked version of input `x`. Will be shorter than input `x` if any spikes were deleted; otherwise, it will equal input `x`. pv : bool 1d ndarray; same size as input `x` Has True where an outlier was detected hilim : 1d ndarray; same size as input `x` This is the upper limit: ``mean + sigma*std`` lolim : 1d ndarray; same size as input `x` This is the lower limit: ``mean - sigma*std`` niter : integer Number of iterations executed Notes ----- Uses :func:`exclusive_sgfilter` to exclude the point being tested from the moving average and the moving standard deviation calculations. Each point is tested. The points near the ends of the signal may not be at the requested position in the window (see :func:`exclusive_sgfilter` for more information on this). To not use a threshold, set `threshold_sigma` to 0.0 (or set `threshold_value` to 0.0). .. note:: If you plan to use both :func:`fixtime` and :func:`despike`, it is recommended that you let :func:`fixtime` call :func:`despike` (via the `delspikes` option) instead of calling it directly. This is preferable because the ideal time to run :func:`despike` is in the middle of :func:`fixtime`: after drop-outs have been deleted but before gaps are filled. Examples -------- Compare `exclude_point` 'first' and 'middle' options. An explanation follows: >>> import numpy as np >>> from pyyeti import dsp >>> x = [1, 1, 1, 1, 5, 5, 1, 1, 1, 1] >>> s = dsp.despike(x, n=5, exclude_point='first') >>> s.x array([1, 1, 1, 1, 1, 1, 1, 1]) >>> s = dsp.despike(x, n=5, exclude_point='middle') >>> s.x array([1, 1, 1, 1, 5, 5, 1, 1, 1, 1]) The two 5 points get deleted when using 'first' but not when using 'middle'. This is logical because, when using 'first', the second 5 is compared to following four 1 values (the window is ``[5, 1, 1, 1, 1]``. The second loop then catches the other 5. But when 'middle' is used, the window for the first 5 is ``[1, 1, 5, 5, 1]`` and the window for the second 5 is ``[1, 5, 5, 1, 1]``. For both points, the other 5 in the window prevents the center 5 from being considered an outlier. For another example, make up some data and, with carefully chosen inputs, demonstrate how the routine runs by plotting one iteration at a time: .. plot:: :context: close-figs >>> import matplotlib.pyplot as plt >>> np.set_printoptions(linewidth=65) >>> x = [100, 2, 3, -4, 25, -6, 6, 3, -2, 4, -2, -100] >>> _ = plt.figure('Example', figsize=(8, 11), clear=True, ... layout='constrained') >>> for i in range(5): ... s = dsp.despike(x, n=9, sigma=2, maxiter=1, ... threshold_sigma=0.1, ... exclude_point='middle') ... _ = plt.subplot(5, 1, i+1) ... _ = plt.plot(x) ... _ = plt.plot(s.hilim, 'k--') ... _ = plt.plot(s.lolim, 'k--') ... _ = plt.title(f'Iteration {i+1}') ... x = s.x >>> s.x array([ 2, 3, 6, 3, -2, 4, -2]) Run all iterations at once to see what ``s.pv`` looks like: >>> x = [100, 2, 3, -4, 25, -6, 6, 3, -2, 4, -2, -100] >>> s = dsp.despike(x, n=9, sigma=2, ... threshold_sigma=0.1, ... exclude_point='middle') >>> s.x array([ 2, 3, 6, 3, -2, 4, -2]) >>> s.pv array([ True, False, False, True, True, True, False, False, False, False, False, True], dtype=bool) """ x = np.atleast_1d(x) if x.ndim > 1: raise ValueError("`x` must be 1d") if n > x.size: n = x.size - 1 min_limit = _get_min_limit(x, n, threshold_sigma, threshold_value) PV = np.zeros(x.shape, bool) # start generator: gen = _find_outlier_peaks(x, n, sigma, min_limit, xp=exclude_point) for i, (pv, hi, lo) in zip(itertools.count(1), gen): if pv is None: break PV[pv] = True if maxiter > 0 and i >= maxiter: break return SimpleNamespace(x=x[~PV], pv=PV, hilim=hi, lolim=lo, niter=i)
def _sweep_out_priors_diff(y, i, limit, ave): # see if we can consider points before the detected outlier # also as outliers: pv = [i] lim = limit[i] av = ave[i] next_y = y[i + 1] for k in range(i - 1, -1, -1): new_dy = next_y - y[k] if abs(new_dy - av) <= lim: break pv.append(k) pv.reverse() return pv def _sweep_out_nexts_diff(y, i, limit, ave): # see if we can consider points after the detected outlier # also as outliers: pv = [i] lim = limit[i - 1] av = ave[i - 1] prev_y = y[i - 1] for k in range(i + 1, y.size): new_dy = y[k] - prev_y if abs(new_dy - av) <= lim: break pv.append(k) return pv def _outs_first_diff( y, dy, n, sigma, min_limit, xp, ave, dy_delta, var, std, limit, dpv ): while True: if dpv.any(): # keep only last one ... previous ones can change i = dpv.nonzero()[0][-1] # since we're grabbing last spike in dy, that index # is also what we need for y: # dy -> y # 0 -> 1-0 # 1 -> 2-1 # 2 -> 3-2 # say 1 of dy is last spike ... 2 isn't. So 3-2 # of original is okay. spike in original has to be 1. pv = _sweep_out_priors_diff(y, i, limit, ave) else: pv = None yield pv i, j = pv[0], pv[-1] if i == 0: yield None # we're done dpv[i : j + 1] = False # To determine if point before i is a spike, need n-1 # valid points after j: k = min(y.size, j + n) count = k - (j + 1) # n-1 if away from end # shift good points backward in time to get rid of # spikes: # <--- # ......ssss+++++ ==> ......+++++ # i j y[i : i + count] = y[j + 1 : k] # update only sections that need it: from i-n to i j = i i = max(i - n, 0) dy[i:k] = np.diff(y[i : k + 1]) ave[i:j] = exclusive_sgfilter(dy[i:k], n, exclude_point=xp)[: j - i] dy_delta[i:j] = abs(dy[i:j] - ave[i:j]) avsq = exclusive_sgfilter(dy[i:k] ** 2, n, exclude_point=xp)[: j - i] var[i:j] = avsq - ave[i:j] ** 2 # use abs to care of negative numerical zeros: std[i:j] = np.sqrt(abs(var[i:j])) limit[i:j] = np.fmax(sigma * std[i:j], min_limit) dpv[i:j] = dy_delta[i:j] > limit[i:j] def _outs_last_diff( y, dy, n, sigma, min_limit, xp, ave, dy_delta, var, std, limit, dpv ): while True: if dpv.any(): # keep only first one ... later ones can change i = dpv.nonzero()[0][0] # since we're grabbing first spike in dy, that index # plus 1 is what we need for y: # dy -> y # 0 -> 1-0 # 1 -> 2-1 # 2 -> 3-2 # say 1 of dy is first spike ... 0 isn't. So 1-0 # of original is okay. spike in original has to be 2. pv = _sweep_out_nexts_diff(y, i + 1, limit, ave) else: pv = None yield pv i, j = pv[0], pv[-1] if j == dy.size: yield None # we're done dpv[i - 1 : j] = False # To determine if point after j is a spike, need n-1 # valid points before i: k = max(0, i - n + 1) count = i - k # n-1 if away from start # shift good points forward in time to get rid of spikes: # ---> # ......ssss+++++ ==> ......+++++ # i j y[j - count + 1 : j + 1] = y[k:i] # update only sections that need it: from j to j+n i = j j = min(j + n, dy.size) m = i - j # -(j-i) ... keep last j-i points dy[k:j] = np.diff(y[k : j + 1]) ave[i:j] = exclusive_sgfilter(dy[k:j], n, exclude_point=xp)[m:] dy_delta[i:j] = abs(dy[i:j] - ave[i:j]) avsq = exclusive_sgfilter(dy[k:j] ** 2, n, exclude_point=xp)[m:] var[i:j] = avsq - ave[i:j] ** 2 # use abs to care of negative numerical zeros: std[i:j] = np.sqrt(abs(var[i:j])) limit[i:j] = np.fmax(sigma * std[i:j], min_limit) dpv[i:j] = dy_delta[i:j] > limit[i:j] def _find_outlier_peaks_diff(y, dy, n, sigma, min_limit, xp): ave = exclusive_sgfilter(dy, n, exclude_point=xp) dy_delta = abs(dy - ave) var = exclusive_sgfilter(dy**2, n, exclude_point=xp) - ave**2 # use abs to care of negative numerical zeros: std = np.sqrt(abs(var)) limit = np.fmax(sigma * std, min_limit) dpv = dy_delta > limit if xp in ("first", 0): yield from _outs_first_diff( y, dy, n, sigma, min_limit, xp, ave, dy_delta, var, std, limit, dpv ) elif xp in ("last", n - 1): yield from _outs_last_diff( y, dy, n, sigma, min_limit, xp, ave, dy_delta, var, std, limit, dpv ) else: raise ValueError("invalid `exclude_point` for :func:`despike_diff` routine")
[docs] def despike_diff( x, n, sigma=8.0, maxiter=-1, threshold_sigma=2.0, threshold_value=None, exclude_point="first", **kwargs, ): """ Delete outlier data points from signal based on level changes Parameters ---------- x : 1d array_like Signal to de-spike. n : odd integer Number of points for moving average; if even, it is reset to ``n+1``. If greater than the dimension of `x`, it is reset to the dimension or 1 less. sigma : real scalar; optional Number of standard deviations beyond which a point is considered an outlier. The default value is quite high; this is possible because the point itself is excluded from the calculations. maxiter : integer; optional Maximum number of iterations of outlier removal allowed. If `exclude_point` is 'first', only the last spike is removed on each iteration; if it is 'last', only the first spike is removed on each iteration. It is done this way because removing a spike can expose other points as spikes (but didn't appear to be because the removed spike was present). If <= 0, there is no set limit and the looping will stop when no more outliers are detected. Routine will always run at least 1 loop (setting `maxiter` to 0 is the same as setting it to 1). threshold_sigma : scalar; optional Number of standard deviations below which all data is kept. This standard deviation is computed from `x`. Let ``dx = np.diff(x)``, the standard deviation is ``std(dx - moving_average(dx))``. The moving average uses a window of `n` size. This value exists to avoid deleting small deviations such as bit toggles. Set to 0.0 to not use a threshold. `threshold_value` overrides `threshold_sigma` if it is not None. threshold_value : scalar or None; optional Optional method for specifying a minimum threshold. If not None, this scalar is used as an absolute minimum deviation from the moving average for a value to be considered a spike. Overrides `threshold_sigma`. Set to 0.0 to not use a threshold. exclude_point : string or int; optional Defines where, within each window, the point that is being considered as a potential outlier is. For this routine, `exclude_point` must be either 'first' (or 0) or 'last' (or ``n-1``). For example, 'first' compares the first point in each window the rest in that window to test if it is an outlier. **kwargs : other args are ignored This is here to accommodate :func:`fixtime`. Returns ------- A SimpleNamespace with the members: x : 1d ndarray Despiked version of input `x`. Will be shorter than input `x` if any spikes were deleted; otherwise, it will equal input `x`. pv : bool 1d ndarray; same size as input `x` Has True where an outlier was detected niter : integer Number of iterations executed Notes ----- Uses :func:`exclusive_sgfilter` to exclude the point being tested from the moving average and the moving standard deviation calculations. Each point is tested. The points near the ends of the signal may not be at the requested position in the window (see :func:`exclusive_sgfilter` for more information on this). To not use a threshold, set `threshold_sigma` to 0.0 (or set `threshold_value` to 0.0). .. note:: If you plan to use both :func:`fixtime` and :func:`despike_diff`, it is recommended that you let :func:`fixtime` call :func:`despike_diff` (via the `delspikes` option) instead of calling it directly. This is preferable because the ideal time to run :func:`despike_diff` is in the middle of :func:`fixtime`: after drop-outs have been deleted but before gaps are filled. Examples -------- Set `threshold_value` to catch second spike but not first. The threshold is based on differences: >>> import numpy as np >>> from pyyeti import dsp >>> x = [2, 2, 2, 2, 5, 2, 2, 2, 2, 7, 2, 2, 2, 2, 2] >>> s = dsp.despike_diff(x, n=5, threshold_value=4) >>> s.x array([2, 2, 2, 2, 5, 2, 2, 2, 2, 2, 2, 2, 2, 2]) >>> s.pv array([False, False, False, False, False, False, False, False, False, True, False, False, False, False, False], dtype=bool) >>> s.niter 2 """ x = np.atleast_1d(x) if x.ndim > 1: raise ValueError("`x` must be 1d") dx = np.diff(x) if n > dx.size: n = dx.size - 1 min_limit = _get_min_limit(dx, n, threshold_sigma, threshold_value) PV = np.zeros(x.shape, bool) # start generator: gen = _find_outlier_peaks_diff(x.copy(), dx, n, sigma, min_limit, xp=exclude_point) for i, pv in zip(itertools.count(1), gen): if pv is None: break PV[pv] = True if maxiter > 0 and i >= maxiter: break return SimpleNamespace(x=x[~PV], pv=PV, niter=i)
def _chk_negsteps(t, data, negmethod): difft = np.diff(t) negs = difft < 0 if negs.any(): nneg = np.count_nonzero(negs) npos = difft.size - nneg if npos == 0: raise ValueError( "there are no positive steps in the entire time vector. " "Cannot fix this." ) if negmethod == "stop": raise ValueError(f"There are {nneg:d} negative time steps. Stopping.") if negmethod == "sort": warn( f"there are {nneg} negative time steps. Sorting the data. " "This may be a poor way to handle the current data, so " "please check the results carefully.", RuntimeWarning, ) j = t.argsort() # unsort = j.argsort() t = t[j] data = data[j] difft = np.diff(t) else: j = None return t, data, difft, j def _find_drops(d, dropval): dropouts = np.logical_or(np.isnan(d), np.isinf(d)) if np.isfinite(dropval): d = d[~dropouts] dropouts[~dropouts] = abs(d - dropval) < abs(dropval) / 100 # dropouts = np.logical_or( # dropouts, abs(d-dropval) < abs(dropval)/100) return dropouts def _del_loners(dropouts, n, nz=3): """Delete "loner-ish" points amongst dropouts. dropouts : 1d bool ndarray of dropouts; True for drops n : integer; window size nz : integer; number of points in `n` point range that will cause all points between to True values to be turned to True """ pv = dropouts.nonzero()[0] if pv.size > 2: # delete 1-of loners first (this is necessary if # method="despike_diff" because a middle spike # will be left behind if it is smaller than the two # surrounding spikes) loners = (np.diff(pv) == 2).nonzero()[0] if loners.size > 0: dropouts[pv[loners] + 1] = True s = dropouts.size ind = [] for i in pv[: -(nz - 1)]: j = i + n j = j if j < s else s d_ij = dropouts[i:j] if d_ij.sum() >= nz: while not dropouts[j - 1]: j -= 1 ind.append(slice(i, j)) for ij in ind: dropouts[ij] = True def _del_drops(olddata, dropval, delspikes): dropouts = _find_drops(olddata, dropval) if dropouts.any(): if delspikes: _del_loners(dropouts, delspikes["n"]) keep = ~dropouts keep = keep.nonzero()[0] dropouts = dropouts.nonzero()[0] if keep.size == 0: warn("there are only drop-outs!", RuntimeWarning) return keep, dropouts def _del_outtimes(told, keep, delouttimes): t = told[keep] mn = t.mean() sig = 3 * t.std(ddof=1) pv = np.logical_or(t < mn - sig, t > mn + sig) outtimes = keep[pv] if pv.any(): if delouttimes: warn( f"there are {pv.sum()} outlier times being deleted. These are" " times more than 3-sigma away from the mean. " "This may be a poor way to handle the current data, so " "please check the results carefully.", RuntimeWarning, ) keep = keep[~pv] else: warn( f"there are {pv.sum()} outlier times that are NOT being deleted" " because `delouttimes` is False. These are times more than " "3-sigma away from the mean.", RuntimeWarning, ) return keep, outtimes def _sr_calcs(difft, sr, verbose): min_ts = difft.min() max_ts = difft.max() ave_ts = difft.mean() max_sr = 1 / min_ts min_sr = 1 / max_ts ave_sr = 1 / ave_ts # get mode of all sample rates: Ldiff = len(difft) difft2 = difft[difft != 0] sr_all = 1 / difft2 sr1 = sr_all.min() if sr1 > 5: dsr = 5 else: dsr = round(10 * max(sr1, 0.1)) / 10 # pandas is very fast for computing mode: counts = pd.Series(np.round(sr_all / dsr)).value_counts() mode_sr = counts.index[0] * dsr mode_pct = counts.iat[0] / Ldiff * 100 mode_ts = 1 / mode_sr sr_stats = np.array([max_sr, min_sr, ave_sr, mode_sr, mode_pct]) if not sr: # pragma: no cover verbose = True if verbose: print("==> Info: [min, max, ave, count (% occurrence)] time step:") print( f"==> [{min_ts:g}, {max_ts:g}, {ave_ts:g}, " f"{mode_ts:g} ({mode_pct:.1f}%)]" ) print("==> Corresponding sample rates:") print("==> [{:g}, {:g}, {:g}, {:g} ({:.1f}%)]".format(*sr_stats)) print('==> Note: "count" shows most frequent sample rate to') print(f" nearest {dsr} samples/sec.") if mode_pct > 90 or abs(mode_sr - ave_sr) < dsr: defsr = round(mode_sr / dsr) * dsr else: defsr = round(ave_sr / dsr) * dsr if sr == "auto": sr = defsr elif not sr: # pragma: no cover ssr = input(f"==> Enter desired sample rate [{defsr:g}]: ") if not ssr: sr = defsr else: sr = float(ssr) if verbose: print(f"==> Using sample rate = {sr:g}") return sr, sr_stats def _prep_delspikes(delspikes): def _dict_default(dct, **kwargs): for k, v in kwargs.items(): if k not in dct: dct[k] = v if not isinstance(delspikes, abc.MutableMapping): delspikes = dict() else: delspikes = dict(delspikes) # make a copy _dict_default(delspikes, sigma=8, n=15, method="despike_diff", maxiter=-1) return delspikes def _post_despike(pv, keep, delspikes, niter): if pv.any(): _del_loners(pv, delspikes["n"]) spikes = keep[pv] keep = keep[~pv] despike_info = SimpleNamespace(delspikes=delspikes, niter=niter) return keep, spikes, despike_info def _simple_filter(olddata, keep, delspikes): d = olddata[keep] PV = np.ones(d.size, bool) n = delspikes["n"] sigma = delspikes["sigma"] maxiter = delspikes["maxiter"] for i in itertools.count(1): ave = exclusive_sgfilter( d[PV], n, # exclude_point='middle') exclude_point=None, ) delta = d[PV] - ave pv = abs(delta) > sigma * np.std(delta) if pv.any(): PV[PV] = ~pv else: break if maxiter > 0 and i >= maxiter: break return _post_despike(~PV, keep, delspikes, i + 1) def _del_spikes(olddata, keep, delspikes): method = delspikes["method"] if method == "despike_diff": s = despike_diff(olddata[keep], **delspikes) return _post_despike(s.pv, keep, delspikes, s.niter) elif method == "despike": s = despike(olddata[keep], **delspikes) return _post_despike(s.pv, keep, delspikes, s.niter) elif method == "simple": return _simple_filter(olddata, keep, delspikes) else: raise ValueError(f"unknown `method` ({method})") def _get_alldrops( told, olddata, sortvec, dropouts, outtimes, spikes, despike_info, delouttimes ): alldrops = np.zeros(told.size, bool) if dropouts is not None: alldrops[dropouts] = True if delouttimes: alldrops[outtimes] = True if spikes is not None: alldrops[spikes] = True _del_loners(alldrops, despike_info.delspikes["n"], 3) else: spikes = None keep = ~alldrops t = told[keep] data = olddata[keep] alldrops = alldrops.nonzero()[0] def _apply_sortvec(sortvec, *args): args = list(args) for i, arg in enumerate(args): if arg is not None: args[i] = np.sort(sortvec[arg]) return args if sortvec is not None: alldrops, dropouts, outtimes, spikes = _apply_sortvec( sortvec, alldrops, dropouts, outtimes, spikes ) return ( t, data, SimpleNamespace( dropouts=dropouts, outtimes=outtimes, spikes=spikes, alldrops=alldrops ), ) def _check_dt_size(difft, dt): n = len(difft) nsmall = (difft < 0.93 * dt).sum() / n nlarge = (difft > 1.07 * dt).sum() / n for n, s1, s2 in zip((nsmall, nlarge), ("smaller", "larger"), ("low", "high")): if n > 0.01: warn( f"there are a large ({n * 100:.2f}%) number of time " f"steps {s1:s} than {dt:g} by more than 7%. Double " f"check the sample rate; it might be too {s2:s}.", RuntimeWarning, ) def _get_time_shifts(told, dt): tp = np.empty(len(told), bool) tp[0] = True tp[1:] = abs(np.diff(told) - dt) > dt / 4 tp[:-1] |= tp[1:] tp[-1] = True tp = np.nonzero(tp)[0] if len(tp) - 2 > len(told) // 2: # -2 to ignore ends align = False p = (len(tp) - 2) / len(told) * 100 msg = ( "there are too many time-step changes ('turning points')" f" ({p:.2f}%) to align the largest section." ) warn(msg, RuntimeWarning) else: align = True return tp, align def _mk_initial_tnew(told, sr, dt, difft): L = int(round((told[-1] - told[0]) * sr)) + 1 tnew = np.arange(L) / sr + told[0] # get turning points and see if we should try to align: tp, align = _get_time_shifts(told, dt) if align: # align with the largest "good" range in `told`: j = np.argmax(np.diff(tp)) told_good = told[tp[j] : tp[j + 1] + 1] lold = len(told_good) p = _get_prev_index(tnew, told_good[0] + dt / 2) n = _get_prev_index(tnew, told_good[-1] + dt / 2) tnew_good = tnew[p : n + 1] if (lnew := len(tnew_good)) != lold: lohi = "low" if lnew < lold else "high" warn( "when trying to align best sections of data, lengths of old" f" time vector and new time vector do not match ({lold} vs " f"{lnew}); sample rate used too {lohi}? Only aligning on " "first data point of 'good' section.", RuntimeWarning, ) delt = told_good[0] - tnew_good[0] else: delt = np.mean(told_good - tnew_good) tnew += delt return tnew, tp if not HAVE_NUMBA: def _find_closest_times(told, tnew): # - note this simple method doesn't work if there are large # gaps in `told`: # index = np.searchsorted(told, tnew - dt / 2) index = np.searchsorted(told, tnew) # told[index] <= tnew (except possibly at ends) lold = len(told) index[index == lold] = lold - 1 delta = told[index] - tnew # check 1 time-step earlier to see if it's closer: delta_1 = told[index - 1] - tnew pv = abs(delta_1) <= abs(delta) index[pv] -= 1 return index def _find_closest_previous_times(told, tnew): index = np.searchsorted(told, tnew) - 1 index[index < 0] = 0 return index else: @numba.njit(cache=True) def _find_closest_times(told, tnew): # both vectors are monotonically ascending lold = len(told) lnew = len(tnew) # find first i such that told[i] >= tnew[0]: v = tnew[0] for i in range(lold): if told[i] >= v: break else: return np.zeros(lnew, np.int64) index = np.empty(lnew, np.int64) if i > 0 and v - told[i - 1] <= told[i] - v: index[0] = i - 1 else: index[0] = i for j in range(1, lnew): v = tnew[j] for i in range(i, lold): if told[i] >= v: break if i > 0 and v - told[i - 1] <= told[i] - v: index[j] = i - 1 else: index[j] = i return index @numba.njit(cache=True) def _find_closest_previous_times(told, tnew): # both vectors are monotonically ascending lold = len(told) lnew = len(tnew) # find first i such that told[i] > tnew[0]: v = tnew[0] for i in range(lold): if told[i] > v: break else: return np.zeros(lnew, np.int64) index = np.empty(lnew, np.int64) if i > 0: index[0] = i - 1 else: index[0] = i for j in range(1, lnew): v = tnew[j] for i in range(i, lold): if told[i] > v: break else: i = lold if i > 0: index[j] = i - 1 else: index[j] = i return index def _return(t, data, alldrops, sr_stats, tp, getall, return_ndarray, despike_info): if return_ndarray: newdata = np.vstack((t, data)).T else: newdata = (t, data) if getall: fixinfo = SimpleNamespace( sr_stats=sr_stats, tp=tp, alldrops=alldrops, despike_info=despike_info ) return newdata, fixinfo return newdata
[docs] def fixtime( olddata, sr=None, *, negmethod="sort", deldrops=True, dropval=-1.40130e-45, delouttimes=True, delspikes=False, base=None, hold_previous_value=False, previous_value_tol=1e-3, getall=False, verbose=True, ): """ Process recorded data to make an even time vector. Parameters ---------- olddata : 2d ndarray or 2-element tuple/list If ndarray, it must have 2 columns: ``[time, signal]``. Otherwise, it must be a 2-element tuple or list, eg: ``(time, signal)`` sr : scalar, string or None; optional If scalar, specifies the sample rate. If 'auto', the algorithm chooses a "best" fit. If None, user is prompted after displaying some statistics on sample rate. negmethod : string; optional Specifies how to handle negative time steps: =========== ========== `negmethod` Action =========== ========== "stop" Error out "sort" Sort data =========== ========== deldrops : bool; optional If True, dropouts are deleted from the data; otherwise, they are left in. dropval : scalar; optional The numerical value of drop-outs. Note that any `np.nan` or `np.inf` values in the data are treated as drop-outs in any case. delouttimes : bool; optional If True, outlier times are deleted from the data; otherwise, they are left in. delspikes : bool or dict; optional If False, do not delete spikes. If True, delete spikes by calling :func:`despike_diff` with inputs as defined below. If a dict, you can take complete control. You can specify one of 3 methods for despiking: =============== ====================================== `method` Action =============== ====================================== "despike_diff" Call :func:`despike_diff` (default) "despike" Call :func:`despike` "simple" Detect outliers by standard deviations from a moving average through signal. =============== ====================================== For example, to set the method to "despike", the number of standard deviations to 12, the window size to 25, the maximum iterations to 100, and the threshold_value to 0.25:: delspikes=dict(method='despike', sigma=12, n=25, maxiter=100, threshold_value=0.25) Defaults are defined for some parameters (others are accepted from the definition of :func:`despike_diff` or :func:`despike` ... 'simple' only uses these three). The defaults are:: method = 'despike_diff' n = 15 sigma = 8 maxiter = -1 # negative value means no limit base : scalar or None; optional Scalar value that new time vector would hit exactly if within range. If None, new time vector is aligned to longest section of "good" data. hold_previous_value : bool; optional If True, hold previous value instead of finding closest value (but see `previous_value_tol`). The default is False; find closest value and, in case of a tie, use previous value. For example:: olddata = ([0.0, 4.0], [10.0, 20]) t, y1 = fixtime(olddata, sr=1.0) t, y2 = fixtime(olddata, sr=1.0, hold_previous_value=True) Gives:: t --> array([ 0., 1., 2., 3., 4.]) y1 --> array([ 10., 10., 10., 20., 20.]) y2 --> array([ 10., 10., 10., 10., 20.]) previous_value_tol : float; optional If `hold_previous_value` is True, a new time value is considered equal to an old time value if it is within ``previous_value_tol * dt`` of it, where ``dt`` is the new time step. Must be within [0.0, 1.0], inclusive. getall : bool; optional If True, return `fixinfo`; otherwise only `newdata` is returned. verbose : bool; optional If True, sample rate statistics are printed. Note that if `sr` is None, `verbose` is internally set to True. Returns ------- newdata : 2d ndarray or tuple Cleaned up version of `olddata`. Will be 2d ndarray if `olddata` was ndarray; otherwise it is a tuple: ``(time, data)``. fixinfo : SimpleNamespace; optional Only returned if `getall` is True. Members: - `sr_stats` : 1d ndarray Five-element vector with the sample rate statistics; useful to help user select best sample rate or to compare against `sr`. The five elements are:: [max_sr, min_sr, ave_sr, max_count_sr, max_count_percent] The `max_count_sr` is the sample rate that occurred most often. This is usually the 'correct' sample rate. `max_count_percent` gives the percent occurrence of `max_count_sr`. - `tp` : 1d ndarray Contains indices into old time vector of where time-step shifts ("turning points") were done to align the new time vector against the old. - `alldrops` : SimpleNamespace or None Has 1d indexing arrays into `olddata` showing the drops: ========== ========================================== `dropouts` shows infs, nans, and `dropvals` (None if ``not deldrops``) `outtimes` shows where outlier times were found in `olddata` (whether they were deleted or not) `spikes` shows where spikes were found in `olddata` (None if ``not delspikes``) `alldrops` merger of `dropouts` and `spikes` plus possible points in between those ========== ========================================== - `despike_info` : SimpleNamespace or None If `delspikes` is True or a dict, `despike_info` contains: =========== ======================================= `delspikes` Dict of values used for spike removal (input to :func:`despike` for the "despike" methods) `niter` Number of iterations of spike removal =========== ======================================= Notes ----- This algorithm works as follows: 1. Find and delete drop-outs if `deldrops` is True. 2. Delete outlier times if `delouttimes` is True. These are points with times that are more than 3 standard deviations away from the mean. A warning message is printed if any such times are found. Note that on a perfect time vector, the end points are at 1.73 sigma (eg: ``mean + 1.73*sigma = 0.5 + 1.73*0.2887 = 1.0``). 3. Check for positive time steps, and if there are none, error out. 4. Check the time vector for negative steps. Sort or error out as specified by `negmethod`. Warnings are printed in any case. 5. Compute and print sample rates for user review. Perhaps the most useful of these printed numbers is the one based on the count. :func:`numpy.histogram` is used to count which sample rate occurs most often (to the nearest multiple of 5 in most cases). If there is a high percentage printed with that sample rate, it is likely the correct value to use (at least within 5 samples/second). If `sr` is not input, prompt user for `sr`. 6. Call selected despiker if requested to delete data spikes. 7. Count number of small time-steps defined as those that are less than ``0.93/sr``. If more than 1% of the steps are small, print a warning. 8. Count number of large time-steps defines as those that are greater than ``1.07/sr``. If more than 1% of the steps are large, print a warning. 9. Make a new, evenly spaced time vector according to the new sample rate that spans the range of time in `olddata`. 10. Find the "turning points" in the old time vector. These are where the step differs by more than 1/4 step from the ideal. Will issue warning if the number of turning points is greater than 50% of total points. 11. If step 10 did not issue a warning about too many turning points, the new time vector is shifted to align with the longest section of "good" old time steps. 12. Loop over the segments defined by the turning points. Each segment will shifted left or right to fit with the new time vector. The longest section is not shifted due to step 12 (unless that step was skipped because of too many turning points). 13. If `base` is not None, the new time vector is shifted by up to a half time step such that it would hit `base` exactly (if it was in range). 14. Fill in new data vector using best fit times. This means that gaps are filled with flat lines using the closest value if `hold_previous_value` is False, or the previous value if `hold_previous_value` is True. This routine does not do any linear interpolation. If despiking is not producing good results: 1. Spikes very near the ends of the signal (in the first or last window) can cause trouble for the :func:`despike_diff` and :func:`despike` routines. If `exclude_point` is 'first', spikes in the last window should be avoided (the routine works backward); conversely, if `exclude_point` is 'last', spikes in the first window should be avoided (the routine works forward). 2. Try increasing/decreasing `sigma` to make it more/less picky. 3. If bit toggles or similar small spikes are being considered spikes (which can also make the routine take a very long time to run), setting `threshold_value` to a suitable value for the current data is often a good solution. Increasing `threshold_sigma` can also protect these small spikes. Note that the threshold settings are not available for the "simple" `delspikes` method. 4. Try a different window size. 5. Try a different method. They all have strengths and weaknesses, so experiment. Examples -------- >>> from pyyeti import dsp >>> t = [0., 1., 5., 6.] >>> y = [1., 2., 3., 4.] >>> tn, yn = dsp.fixtime((t, y), sr=1) ==> Info: [min, max, ave, count (% occurrence)] time step: ==> [1, 4, 2, 1 (66.7%)] ==> Corresponding sample rates: ==> [1, 0.25, 0.5, 1 (66.7%)] ==> Note: "count" shows most frequent sample rate to nearest 0.2 samples/sec. ==> Using sample rate = 1 >>> tn array([ 0., 1., 2., 3., 4., 5., 6.]) >>> yn array([ 1., 2., 2., 2., 3., 3., 4.]) Repeat, but with `hold_previous_value` set to True: >>> tn, yn = dsp.fixtime( ... (t, y), sr=1, hold_previous_value=True, verbose=False ... ) >>> tn array([ 0., 1., 2., 3., 4., 5., 6.]) >>> yn array([ 1., 2., 2., 2., 2., 3., 4.]) """ # begin main routine told, olddata, return_ndarray = _get_timedata(olddata) # check for negative steps: told, olddata, difft, sortvec = _chk_negsteps(told, olddata, negmethod) # prep `delspikes` if needed: if delspikes: delspikes = _prep_delspikes(delspikes) # check for drop outs: sr_stats = tp = None if deldrops: keep, dropouts = _del_drops(olddata, dropval, delspikes) if len(keep) == 0: alldrops = _get_alldrops( told, olddata, sortvec, dropouts, None, None, None, delouttimes ) return _return( told, olddata, alldrops, sr_stats, tp, getall, return_ndarray, None ) else: keep = np.arange(told.size) dropouts = None # check for outlier times ... outside 3-sigma keep, outtimes = _del_outtimes(told, keep, delouttimes) # sample rate calculations: difft = np.diff(told[keep]) sr, sr_stats = _sr_calcs(difft, sr, verbose) dt = 1 / sr # delete spikes if requested: if delspikes: keep, spikes, despike_info = _del_spikes(olddata, keep, delspikes) difft = np.diff(told[keep]) else: spikes = None despike_info = None # get partition vector for drops, spikes, and remaining "loners": told, olddata, alldrops = _get_alldrops( told, olddata, sortvec, dropouts, outtimes, spikes, despike_info, delouttimes ) difft = np.diff(told) # check for small and large time steps: _check_dt_size(difft, dt) # make initial new time vector aligned with longest range in # told of "good" time steps (tp: turning points): tnew, tp = _mk_initial_tnew(told, sr, dt, difft) # build a best-fit index by finding closest new time (no # interpolation) if hold_previous_value: if previous_value_tol < 0.0 or previous_value_tol > 1.0: raise ValueError( "`previous_value_tol` must be in [0.0, 1.0];" f" it is {previous_value_tol}" ) index = _find_closest_previous_times(told - dt * previous_value_tol, tnew) else: index = _find_closest_times(told, tnew) # fill in new data vector with closest old data: newdata = olddata[index] # if want new time to exactly hit base (if base were in range): if base is not None: t0 = tnew[0] t1 = base - t0 - round((base - t0) * sr) / sr tnew += t1 return _return( tnew, newdata, alldrops, sr_stats, tp, getall, return_ndarray, despike_info )
[docs] def aligntime(dct, channels=None, mode="truncate", value=0): """ Aligns the time vectors for specified channels in dct. Parameters ---------- dct : dictionary Dictionary of channels where each channel is either 2d ndarray or 2-element tuple. If ndarray, it must have 2 columns: ``[time, signal]``. Otherwise, it must be a 2-element tuple or list, eg: ``(time, signal)``. See notes below. channels : list or None; optional List of names defining which channels to synchronize in time. If None, all channels in `dct` will be synchronized. mode : string; optional Method of aligning: =========== ================================================= `mode` Description =========== ================================================= 'truncate' Keep only data where all channels overlap 'expand' Expand all channels to maximum time range of all channels. Channels are expanded by stuffing in `value`'s. =========== ================================================= value : scalar; optional Used for the 'expand' mode. Returns ------- dctout : dictionary Dictionary containing only those channels specified in `channels`. Each channel will be a 1d ndarray. The time vector is also a 1d array and is named 't'. Notes ----- This routine operates under these assumptions: 1. The time vector for each channel is perfect (ie, after :func:`fixtime`) 2. All the time vectors have the same step size 3. They would all hit the same time point if they overlapped The first two assumptions are checked. The third is not checked and could cause indexing failures if it is not true. """ # check for channels: if channels is not None: err = 0 for item in channels: if item not in dct: err = 1 print(f"Channel {item} not found in `dct`.") if err: raise ValueError( "`dct` does not contain all requested channels. See above." ) parms = channels else: parms = list(dct.keys()) # get time step: t, d, isarr = _get_timedata(dct[parms[0]]) dt = (t[-1] - t[0]) / (len(t) - 1) if mode == "truncate": # loop to determine maximum overlap: tmin = t[0] tmax = t[-1] for key in parms: t, d, isarr = _get_timedata(dct[key]) if t[0] > tmax or t[-1] < tmin: raise ValueError("not all inputs overlap in time.") if not np.allclose(np.diff(t), dt): raise ValueError(f"not all time steps in {key} match {dt}") tmin = max(tmin, t[0]) tmax = min(tmax, t[-1]) n = int(np.ceil((tmax - tmin) / dt)) if (dt * n + tmin) < (tmax + dt / 2): n += 1 pv = np.arange(n) dctout = {} dctout["t"] = pv * dt + tmin start = tmin + dt / 2 # so index finds closest point for key in parms: t, d, isarr = _get_timedata(dct[key]) i = _get_prev_index(t, start) dctout[key] = d[i + pv] else: # loop to determine maximum range: tmin = t[0] tmax = t[-1] for key in parms: t, d, isarr = _get_timedata(dct[key]) if not np.allclose(np.diff(t), dt): raise ValueError(f"not all time steps in {key} match {dt}") tmin = min(tmin, t[0]) tmax = max(tmax, t[-1]) n = int(np.ceil((tmax - tmin) / dt)) if (dt * n + tmin) < (tmax + dt / 2): n += 1 dctout = {} t = dctout["t"] = np.arange(n) * dt + tmin for key in parms: old_t, old_d, isarr = _get_timedata(dct[key]) i = _get_prev_index(t, old_t[0] + dt / 2) new_d = np.empty(n) new_d[:] = value old_n = len(old_t) if i + old_n > n: old_n = n - i new_d[i : i + old_n] = old_d[:old_n] dctout[key] = new_d return dctout
def _vector_to_axis(v, ndim, axis): """ Reshape 1d `v` to nd where the only non-unity dimension is `axis`. Useful for broadcasting when, for example, multiplying a vector along a certain axis of an nd array. _vector_to_axis(np.arange(5), 3, 1).shape --> (1, 5, 1) """ dims = np.ones(ndim, int) dims[axis] = -1 return v.reshape(*dims)
[docs] def windowends(sig, portion=0.01, ends="front", axis=-1): """ Apply parts of a cosine window to the ends of a signal. This is also called the "Tukey" window. The window values are computed from: ``(1 - cos)/2``. Parameters ---------- sig : 1d or 2d ndarray Vector or matrix; input time signal(s). portion : scalar, optional If > 1, specifies the number of points to window at each end. If in (0, 1], specifies the fraction of signal to window at each end: ``npts = int(portion * np.size(sig, axis))``. ends : string, optional Specify which ends of signal to operate on: ======= ==================================== `ends` Action ======= ==================================== 'none' no windowing (no change to `signal`) 'front' window front end only 'back' window back end only 'both' window both ends ======= ==================================== axis : int, optional Axis along which to operate. Returns ------- Returns the windowed signal(s); same size as the input. Notes ----- The minimum number of points that can be windowed is 3. Therefore, `portion` is internally adjusted to 3 if the input value is (or the computed value turns out to be) less than 3. Examples -------- >>> from pyyeti import dsp >>> import numpy as np >>> np.set_printoptions(precision=2, suppress=True) >>> dsp.windowends(np.ones(8), 4) array([ 0. , 0.25, 0.75, 1. , 1. , 1. , 1. , 1. ]) >>> dsp.windowends(np.ones(8), .7, ends='back') array([ 1. , 1. , 1. , 1. , 0.85, 0.5 , 0.15, 0. ]) >>> dsp.windowends(np.ones(8), .5, ends='both') array([ 0. , 0.25, 0.75, 1. , 1. , 0.75, 0.25, 0. ]) .. plot:: :context: close-figs >>> import matplotlib.pyplot as plt >>> _ = plt.figure('Example', figsize=[8, 3], clear=True, ... layout='constrained') >>> sig = np.ones(100) >>> wesig = dsp.windowends(sig, 5, ends='both') >>> _ = plt.plot(sig, label='Original') >>> _ = plt.plot(wesig, label='windowends (ends="both")') >>> _ = plt.ylim(0, 1.2) """ if ends == "none": return sig sig = np.asarray(sig) ln = sig.shape[axis] if portion <= 1: n = int(portion * ln) else: n = int(portion) if n < 3: n = 3 v = np.ones(ln, float) w = 0.5 - 0.5 * np.cos(2 * np.pi * np.arange(n) / (2 * n - 2)) if ends == "front" or ends == "both": v[:n] = w if ends == "back" or ends == "both": v[-n:] = w[::-1] v = _vector_to_axis(v, sig.ndim, axis) return sig * v
def _proc_timeslice(timeslice, sr, n): # work with integers for slicing: if isinstance(timeslice, str): ntimeslice = int(timeslice) timeslice = ntimeslice / sr else: ntimeslice = int(round(sr * timeslice)) if ntimeslice > n: ntimeslice = n timeslice = ntimeslice / sr return ntimeslice, timeslice
[docs] def waterfall( sig, sr, timeslice, tsoverlap, func, which, freq, t0=0.0, args=None, kwargs=None, slicefunc=None, sliceargs=None, slicekwargs=None, ): """ Compute a 'waterfall' map over time and frequency (typically) using a user-supplied function. Parameters ---------- sig : 1d array_like Time series of measurement values. sr : scalar Sample rate. timeslice : scalar or string-integer If scalar, it is the length in seconds for each slice. If string, it contains the integer number of points for each slice. For example, if `sr` is 1000 samples/second, ``timeslice=0.75`` is equivalent to ``timeslice="750"``. tsoverlap : scalar in [0, 1) or string-integer If scalar, is the fraction of each time-slice to overlap. If string, it contains the integer number of points to overlap. For example, if `sr` is 1000 samples/second, ``tsoverlap=0.5`` and ``tsoverlap="500"`` each specify 50% overlap. func : function This function is called for each time slice (denoted as "sig_slice" here) and is expected to return amplitude values across the frequency range. It can return just the amplitudes, or it can have multiple outputs (see also `which` and `freq`). The call is: ``func(sig_slice, *args, **kwargs)``. Note that the "sig_slice" input is first passed through `slicefunc` if one is provided (see below). which : None or integer Set to None if `func` only returns the amplitudes. Otherwise, if `func` returns multiple outputs, set `which` to the index of the output corresponding to the amplitudes. For example, if `func` returns ``(frequencies, amplitudes)``, `which` would be 1 (and `freq` would be 0). .. note:: Setting `which` to None is not the same as setting it to 0. Using None means that the function only returns amplitudes, while a 0 indicates that the output of `func` must be indexed by 0 to get the amplitudes. For example: if the function has ``return amps``, use ``which=None``; if the function has ``return (amps,)``, use ``which=0``. freq : integer or vector If integer, it is the index of the output of `func` corresponding to the frequency vector and cannot be equal to `which`. Otherwise, if `freq` is a vector, it is the frequency vector directly. t0 : scalar; optional Start time of signal; defaults to 0.0. args : tuple or list; optional If provided, these are passed to `func`. kwargs : dict; optional If provided, these are passed to `func`. slicefunc : function or None; optional If a function, it is called for each time-slice before `func` is called. This could be for windowing or detrending, for example. The call is: ``sig_slice = slicefunc(sig[pv], *sliceargs, **slicekwargs)``. sliceargs : tuple or list; optional If provided, these are passed to `slicefunc`. Must be None or `()` if `slicefunc` is None. slicekwargs : dict; optional If provided, these are passed to `slicefunc`. Must be None or `{}` if `slicefunc` is None. Returns ------- mp : 2d ndarray The waterfall map; columns span time, rows span frequency. So, each column is a vector of frequency amplitudes as returned by `func` for a specific time-slice. Time increases going across the columns and frequency increases going down the rows. t : 1d ndarray Time vector of center times; corresponds to columns in `mp`. Signal is assumed to start at time = t0. f : 1d ndarray Frequency vector corresponding to rows in `mp`. Either equal to the input `freq` or as returned by `func`. Notes ----- Even though the example below shows the use of `args` and `sliceargs`, it is recommended to only use `kwargs` and `slicekwargs` only to pass arguments to the supplied functions. This makes for more readable code. Examples -------- Compute a shock response spectrum waterfall for a sine sweep signal. The sweep rate 4 octaves/min. Process in 2-second windows with 50% overlap; 2% windowends, compute equivalent sine. .. plot:: :context: close-figs >>> import numpy as np >>> import matplotlib.pyplot as plt >>> from pyyeti import srs, ytools, dsp >>> from matplotlib import cm, colors >>> sig, t, f = ytools.gensweep(10, 1, 50, 4) >>> sr = 1/t[1] >>> frq = np.arange(1., 50.1) >>> Q = 20 >>> mp, t, f = dsp.waterfall(sig, sr, 2, 0.5, srs.srs, ... which=None, freq=frq, ... args=(sr, frq, Q), ... kwargs=dict(eqsine=1), ... slicefunc=dsp.windowends, ... sliceargs=[.02], ... slicekwargs=dict(ends='front')) >>> _ = plt.figure('Example', clear=True, layout='constrained') >>> cs = plt.contour(t, f, mp, 40, cmap=cm.plasma_r) >>> # This doesn't work in matplotlib 3.5.0: >>> # cbar = plt.colorbar() >>> # cbar.filled = True >>> # cbar.draw_all() >>> # But this does: >>> norm = colors.Normalize( ... vmin=cs.cvalues.min(), vmax=cs.cvalues.max() ... ) >>> sm = plt.cm.ScalarMappable(norm=norm, cmap=cs.cmap) >>> cb = plt.colorbar(sm, ax=plt.gca()) # , ticks=cs.levels) >>> # >>> _ = plt.xlabel('Time (s)') >>> _ = plt.ylabel('Frequency (Hz)') >>> ttl = 'EQSINE Map of Sine-Sweep @ 4 oct/min, Q = 20' >>> _ = plt.title(ttl) .. plot:: :context: close-figs Also show results on a 3D surface plot: >>> fig = plt.figure("Example 2", clear=True, ... layout='constrained') >>> ax = fig.add_subplot(projection="3d") >>> x, y = np.meshgrid(t, f) >>> surf = ax.plot_surface(x, y, mp, rstride=1, cstride=1, ... linewidth=0, cmap=cm.plasma_r) >>> _ = fig.colorbar(surf, shrink=0.5, aspect=5) >>> ax.view_init(azim=-123, elev=48) >>> _ = ax.set_xlabel('Time (s)') >>> _ = ax.set_ylabel('Frequency (Hz)') >>> _ = ax.set_zlabel('Amplitude') >>> _ = plt.title(ttl) """ sig = np.atleast_1d(sig) if sig.ndim > 1: if max(sig.shape) < sig.size: raise ValueError("`sig` must be a vector") sig = sig.ravel() if args is None: args = () if kwargs is None: kwargs = {} if sliceargs is None: sliceargs = () if slicekwargs is None: slicekwargs = {} ntimeslice, timeslice = _proc_timeslice(timeslice, sr, sig.size) if isinstance(tsoverlap, str): ntsoverlap = int(tsoverlap) if not 0 <= ntsoverlap < ntimeslice: raise ValueError(f"`tsoverlap` must be in [0, {ntimeslice})") else: if not 0 <= tsoverlap < 1: raise ValueError("`tsoverlap` must be in [0, 1)") ntsoverlap = int(round(ntimeslice * tsoverlap)) # inc = max(1, int(round(ntimeslice * (1.0 - tsoverlap)))) inc = max(1, ntimeslice - ntsoverlap) non_overlap = inc / ntimeslice tlen = (sig.size - ntimeslice) // inc + 1 b = 0 # make time vector: t0_ = timeslice / 2.0 tf = t0_ + (tlen - 1) * timeslice * non_overlap t = np.linspace(t0_, tf, tlen) # print('tlen =', tlen, 'inc =', inc, 't[0], t[-1] =', t[0], t[-1]) if not slicefunc: def slicefunc(a): return a # do first iteration outside loop to get freq and allocate map: s = slicefunc(sig[b : b + ntimeslice], *sliceargs, **slicekwargs) b += inc res = func(s, *args, **kwargs) if isinstance(freq, numbers.Integral): if which is None: raise ValueError("`which` cannot be None when `freq` is an integer") freq = res[freq] flen = len(freq) res_dtype = res[which].dtype else: flen = len(freq) res_dtype = res.dtype if which is None else res[which].dtype mp = np.zeros((flen, tlen), res_dtype) mp[:, 0] = res[which] for j in range(1, tlen): s = slicefunc(sig[b : b + ntimeslice], *sliceargs, **slicekwargs) b += inc res = func(s, *args, **kwargs) if which is not None: mp[:, j] = res[which] else: mp[:, j] = res return mp, t + t0, freq
[docs] def get_turning_pts(y, x=None, getindex=True, tol=1e-6, atol=None): """ Find turning points (where slope changes) in a vector. Parameters ---------- y : array_like y-axis data vector x : array_like or None; optional If vector, x-axis data vector; must be monotonically ascending and same length as `y`. If None, the index is used. getindex : bool, optional If True, return the index of turning points; otherwise, return the y (and x) data values at the turning points. tol : scalar; optional A slope is considered different from a neighbor if the difference is more than ``tol*max(abs(all differences))``. atol : scalar or None; optional Alternative to `tol`. If input (and non-zero), `atol` specifies the absolute tolerance and `tol` is ignored. In this case, slope is considered different from a neighbor if the difference is more than `atol`. Returns ------- pv : ndarray; if `getindex` is True True/False vector with True for the turning points in `y`. yn, xn : ndarray, ndarray or None; if `getindex` is False The possibly shortened versions of `y` and `x`; if `x` is None, `xn` is None. Examples -------- >>> from pyyeti import dsp >>> dsp.get_turning_pts([1, 2, 3, 3, 3]) array([ True, False, True, False, True], dtype=bool) >>> y, x = dsp.get_turning_pts([1, 2, 3, 3, 3], ... [1, 2, 3, 4, 5], ... getindex=False) >>> y array([1, 3, 3]) >>> x array([1, 3, 5]) """ if x is not None: x, y = np.atleast_1d(x, y) dx = np.diff(x) if np.any(dx <= 0): raise ValueError("x must be monotonically ascending") if np.size(y) != np.size(x): raise ValueError("x and y must be the same length") m = np.diff(y) / dx else: y = np.atleast_1d(y) m = np.diff(y) m2 = np.diff(m) stol = atol if atol else abs(tol * abs(m2).max()) pv = np.hstack((True, abs(m2) > stol, True)) if getindex: return pv if x is not None: return y[pv], x[pv] return y[pv]
[docs] def calcenv( x, y, p=5, n=2000, method="max", base=0.0, makeplot="clear", polycolor=(1, 0.7, 0.7), label="data", ): """ Returns a curve that envelopes the y data that is allowed to shift in the x-direction by some percentage. Optionally plots original curve with shaded enveloping polygon. Parameters ---------- x : array_like x-axis data vector; must be monotonically ascending y : array_like y-axis data vector; must be same length as x p : scalar; optional Percentage to shift the y data left and right n : integer; optional Number of points to use for enveloping curve method : string; optional Specifies how to envelop data: ======== ============================================= `method` Description ======== ============================================= 'max' compute envelope over the maximum values of y 'min' compute envelope over the minimum values of y 'both' combine both 'max' and 'min' ======== ============================================= base : scalar or None; optional The base y-value (defines one side of the envelope); if None, no base y-value is used and `method` is automatically set to 'both' makeplot : string or axes object; optional Specifies if and how to plot envelope in current figure: =========== =============================== `makeplot` Description =========== =============================== 'no' do not plot 'clear' plot after clearing figure 'add' plot without clearing figure 'new' plot in new figure axes object plot in given axes (like 'add') =========== =============================== polycolor : color specification; optional Any valid matplotlib color specification for the color of the enveloping curve label : string; optional Label for the x-y data on plot (only used if `makeplot` is not 'no') Returns ------- xe_max : 1d ndarray x-axis data vector for enveloping curve on max side ye_max : 1d ndarray y-axis data vector for enveloping curve on max side xe_min : 1d ndarray x-axis data vector for enveloping curve on min side ye_min : 1d ndarray y-axis data vector for enveloping curve on min side h : None or list If `makeplot` is not 'no', `h` is a 2-element list of graphic handles: [line, patch]. Examples -------- .. plot:: :context: close-figs >>> import numpy as np >>> import matplotlib.pyplot as plt >>> from pyyeti import dsp >>> x = np.arange(1.0, 31.0, 1.0) >>> y = np.cos(x) >>> fig = plt.figure('Example', figsize=[10, 8], clear=True, ... layout='constrained') >>> >>> ax = plt.subplot(411) >>> env = dsp.calcenv(x, y, base=None, makeplot='add') >>> _ = plt.title('base=None (method="both")') >>> _ = ax.legend(handles=env[-1], loc='upper left', ... bbox_to_anchor=(1.02, 1.), ... borderaxespad=0.) >>> _ = ax.set_xticklabels([]) >>> >>> ax = plt.subplot(412) >>> env = dsp.calcenv(x, y, method='both', makeplot='add') >>> _ = plt.title('method="both"') >>> ax.legend().set_visible(False) >>> _ = ax.set_xticklabels([]) >>> >>> ax = plt.subplot(413) >>> env = dsp.calcenv(x, y, method='max', makeplot='add') >>> _ = plt.title('method="max"') >>> ax.legend().set_visible(False) >>> _ = ax.set_xticklabels([]) >>> >>> ax = plt.subplot(414) >>> env = dsp.calcenv(x, y, method='min', makeplot='add') >>> _ = plt.title('method="min"') >>> ax.legend().set_visible(False) """ x, y = np.atleast_1d(x, y) if np.any(np.diff(x) <= 0): raise ValueError("x must be monotonically ascending") if np.size(y) != np.size(x): raise ValueError("x and y must be the same length") if method not in ["max", "min", "both"]: raise ValueError("`method` must be one of 'max', 'min', or 'both") if base is None: method = "both" up = 1 + p / 100 dn = 1 - p / 100 xe = np.linspace(x[0], x[-1], n) xe_max = xe_min = xe y2 = np.interp(xe, x, y) ye_max = np.zeros(n) ye_min = np.zeros(n) for i in range(n): pv = np.logical_and(xe >= xe[i] / up, xe <= xe[i] / dn) ye_max[i] = np.max(y2[pv]) ye_min[i] = np.min(y2[pv]) pv = np.logical_and(x >= xe[i] / up, x <= xe[i] / dn) if np.any(pv): ye_max[i] = max(ye_max[i], np.max(y[pv])) ye_min[i] = min(ye_min[i], np.min(y[pv])) if method == "max": ye_max, xe_max = get_turning_pts(ye_max, xe, getindex=0) elif method == "min": ye_max, xe_max = get_turning_pts(ye_min, xe, getindex=0) elif base is not None: ye_max[ye_max < base] = base ye_min[ye_min > base] = base ye_max, xe_max = get_turning_pts(ye_max, xe, getindex=0) ye_min, xe_min = get_turning_pts(ye_min, xe, getindex=0) ax = _check_makeplot(makeplot) if ax: envlabel = rf"$\pm${p}% envelope" ln = ax.plot(x, y, label=label)[0] p = mpatches.Patch(color=polycolor, label=envlabel) if base is None: ax.fill_between(xe_max, ye_max, ye_min, facecolor=polycolor, lw=0) else: ax.fill_between(xe_max, ye_max, base, facecolor=polycolor, lw=0) if method == "both": ax.fill_between(xe_min, ye_min, base, facecolor=polycolor, lw=0) ax.grid(True) h = [ln, p] ax.legend(handles=h, loc="best") else: h = None return xe_max, ye_max, xe_min, ye_min, h
[docs] def fdscale(y, sr, scale, axis=-1): """ Scale a time signal in the frequency domain. Parameters ---------- y : nd array_like Signal(s) to be scaled. Scaling is done along axis `axis`. sr : scalar Sample rate. scale : 2d array_like A two column matrix of [freq scale]. It is automatically sized to the correct dimensions via linear interpolation (uses :func:`numpy.interp`). axis : int, optional Axis along which to operate. Returns ------- y_new : nd ndarray The scaled version of `y`. Notes ----- This routine uses FFT to convert `y` to the frequency domain, applies the scale factors, and does an inverse FFT to get back to the time domain. For example, using ``scale = [[1. 1.], [100., 1.]]`` would accomplish nothing. The function :func:`scipy.signal.firwin2` can accomplish a similar scaling via a digital filter. Examples -------- Generate a unit amplitude sine sweep signal and scale down the middle section: .. plot:: :context: close-figs >>> from pyyeti import ytools, dsp >>> import matplotlib.pyplot as plt >>> sig, t, f = ytools.gensweep(10, 1, 12, 8) >>> scale = np.array([[0., 1.0], ... [4., 1.0], ... [5., 0.5], ... [8., 0.5], ... [9., 1.0], ... [100., 1.0]]) >>> sig_scaled = dsp.fdscale(sig, 1/t[1], scale) >>> _ = plt.figure('Example', clear=True, layout='constrained') >>> _ = plt.subplot(211) >>> _ = plt.plot(f, sig) >>> _ = plt.title('Sine Sweep vs Frequency') >>> _ = plt.xlabel('Frequency (Hz)') >>> _ = plt.xlim([f[0], f[-1]]) >>> _ = plt.subplot(212) >>> _ = plt.plot(f, sig_scaled) >>> _ = plt.plot(*scale.T, 'k-', lw=2, label='Scale') >>> _ = plt.legend(loc='best') >>> _ = plt.title('Scaled Sine Sweep vs Frequency') >>> _ = plt.xlabel('Frequency (Hz)') >>> _ = plt.xlim([f[0], f[-1]]) >>> _ = plt.grid(True) """ y = np.atleast_1d(y) n = y.shape[axis] even = n % 2 == 0 m = n // 2 + 1 if even else (n + 1) // 2 freq = np.arange(m) * (sr / n) # positive 1/2 frequency scale F = np.fft.rfft(y, axis=axis) h = np.interp(freq, scale[:, 0], scale[:, 1]) h = _vector_to_axis(h, y.ndim, axis) Ynew = np.fft.irfft(F * h, n=n, axis=axis) return Ynew
[docs] def nextpow2(x): """ Return next power of two that is >= integer `x` Examples -------- >>> nextpow2(4) 4 >>> nextpow2(5) 8 """ return 1 << (x - 1).bit_length()
def _get_ramp(df, bw, on): # form gaussian ramp from 1.0 to near zero: # exp(-freq ** 2 / den) # - to determine 'den', need number of points and how # close to zero to get # - number of points: npts = int(bw / df) + 1 # - how close to zero: # nearzero = exp(-(df*(npts-1)) ** 2 / den) # -log(nearzero) = (df*(npts-1)) ** 2 / den # den = (df*(npts-1)) ** 2 / (-log(nearzero)) nearzero = 1 / npts / 4 den = -((df * (npts - 1)) ** 2) / np.log(nearzero) ramp = np.exp(-((np.arange(npts + 1) * df) ** 2) / den) ramp[-1] = 0.0 if not on: ramp = ramp[::-1] return ramp def _make_h(freq, w, bw, pass_zero, mag, nyq): df = freq[1] - freq[0] if bw is None: bw = 0.01 * nyq bw = np.atleast_1d(bw) if bw.shape[0] != w.shape[0]: if bw.shape[0] != 1: raise ValueError( "`bw` must be either a scalar or compatibly sized with `w`" ) _bw = np.empty(w.shape[0]) _bw[:] = bw bw = _bw H = np.empty(freq.shape[0]) on = pass_zero # position ramps; try to have "mag" point closest to each value # in w: I = 0 for _w, _bw in zip(w, bw): j = np.argmin(abs(freq - _w)) ramp = _get_ramp(df, _bw, on) n = ramp.shape[0] i = np.argmin(abs(ramp - mag)) if i > j or j - i < I or j - i + n > freq.shape[0]: # if ramp doesn't fit in range or if it conflicts with # previous, error out: raise ValueError( "filter function could not be formed to satisfy " f"requirements as defined; stopped on w={_w}. Using " "a narrower bandwidth might help (currently using" f" bw={_bw})" ) H[I : j - i] = ramp[0] I = j - i + n H[j - i : I] = ramp on = not on H[I:] = ramp[-1] return H
[docs] def fftfilt( sig, w, *, axis=-1, bw=None, pass_zero=None, nyq=1.0, mag=0.5, makeplot="no" ): """ Filter time-domain signals using FFT with Gaussian ramps. Parameters ---------- sig : nd array_like Signal(s) to filter. Filtering is done along axis `axis`. w : scalar or 1d array_like Edge (cutoff) frequencies where ``0.0 < w[i] < nyq`` for all ``i`` (`w` must not include 0.0 or `nyq`). Can be any length and filter will alternate between pass and stop (starting according to `pass_zero`). For example, if leaving `pass_zero` as default, a low pass filter is created for scalar `w` and a band-pass is created for a 2-element `w`. Units are relative to the `nyq` input; so, for example, if `nyq` is the Nyquist frequency in Hz, `w` would be in Hz. axis : int, optional Axis along which to operate. bw : scalar, 1d array_like, or None; optional Width of each transition region (each up or down ramp). If None, ``bw = 0.01 * nyq``. pass_zero : bool or None; optional Specifies whether or not the zero frequency will be in a pass band. If None, `pass_zero` is set to True unless `w` has two elements. (So the default is low pass and band pass for `w` with one or two elements.) nyq : scalar; optional Specifies the Nyquist frequency: sample_rate/2.0 mag : scalar; optional Specifies the target filter magnitude at each `w` makeplot : string or axes object; optional Specifies if and how to plot filter function: =========== =============================== `makeplot` Description =========== =============================== 'no' do not plot 'clear' plot after clearing figure 'add' plot without clearing figure 'new' plot in new figure axes object plot in given axes (like 'add') =========== =============================== Returns ------- fsig : nd ndarray Filtered version of `sig` freq : 1d ndarray Frequency vector from 0.0 to `nyq` h : 1d ndarray The frequency domain filter function Raises ------ ValueError When a ramp will not fit in space as required. Examples -------- Make a signal composed of 4 sinusoids, then use :func:`fftfilt` to try and recover those sinusoids: .. plot:: :context: close-figs >>> from pyyeti import dsp >>> import numpy as np >>> import matplotlib.pyplot as plt >>> h = 0.001 >>> t = np.arange(0, 3.0, h) >>> y1 = 10 + 3.1*np.sin(2*np.pi*3*t) >>> y2 = 5*np.sin(2*np.pi*10*t) >>> y3 = 2*np.sin(2*np.pi*30*t) >>> y4 = 3*np.sin(2*np.pi*60*t) >>> y = y1 + y2 + y3 + y4 >>> _ = plt.plot(t, y) >>> _ = plt.title('Signal of 4 sinusoids') .. plot:: :context: close-figs >>> sr = 1/h >>> nyq = sr/2 >>> _ = plt.figure('Ex1', clear=True, layout='constrained') >>> _ = plt.figure('Ex2', clear=True, layout='constrained') >>> for j, (w, pz, yj) in enumerate(((7, None, y1), ... ([7, 18], None, y2), ... ([18, 45], None, y3), ... (45, False, y4))): ... _ = plt.figure('Ex1') ... _ = plt.subplot(4, 1, j+1) ... yf = dsp.fftfilt(y, w, pass_zero=pz, nyq=nyq, ... makeplot='add')[0] ... _ = plt.xlim(0, 75) ... _ = plt.figure('Ex2') ... _ = plt.subplot(4, 1, j+1) ... _ = plt.plot(t, yj, t, yf) """ # main routine: sig, w = np.atleast_1d(sig, w) if pass_zero is None: pass_zero = True if len(w) != 2 else False if np.any(w > nyq): raise ValueError("value(s) in `w` exceed `nyq`") if not (axis == -1 or axis == sig.ndim - 1): # Move the axis containing the data to the end sig = np.swapaxes(sig, axis, sig.ndim - 1) n = sig.shape[-1] n2 = nextpow2(n) freq = np.fft.rfftfreq(n2, 0.5 / nyq) h = _make_h(freq, w, bw, pass_zero, mag, nyq) t = np.arange(n) ylines = interp.interp1d(t[[0, -1]], sig[..., [0, -1]], axis=-1)(t) y2 = sig - ylines Y = np.fft.rfft(y2, n2, axis=-1) h_nd = _vector_to_axis(h, sig.ndim, -1) y_h = np.fft.irfft(Y * h_nd, n2, axis=-1)[..., :n] if pass_zero: y_h += ylines if not (axis == -1 or axis == sig.ndim - 1): # Move the axis back to where it was y_h = np.swapaxes(y_h, axis, sig.ndim - 1) ax = _check_makeplot(makeplot) if ax: ax.plot(freq, h) style = dict(color="k", lw=2, ls="--") for x in w: ax.axvline(x, **style) ax.axhline(mag, **style) return y_h, freq, h
def _fftsize(n, sr, maxdf): if maxdf and sr / n > maxdf: N = nextpow2(int(sr / maxdf)) else: N = n return N
[docs] def fftcoef( x, sr, *, axis=-1, coef="mag", window="boxcar", dodetrend=False, fold=True, maxdf=None, ): r""" FFT sine/cosine or magnitude/phase coefficients of a real signal This routine returns the positive frequency coefficients only. Parameters ---------- x : nd array_like The (real) signal(s) to FFT. The FFT is carried out along axis `axis`. sr : scalar The sample rate (samples/sec) axis : int, optional Axis along which to operate. coef : string; optional Specifies how to return the coefficients: ========== ======================== `coef` Return ========== ======================== "mag" (magnitude, phase, freq) "ab" (a, b, freq) "complex" (a + i b, None, freq) ========== ======================== See below for more details. window : string, tuple, or 1d array_like; optional Specifies window function. If a string or tuple, it is passed to :func:`scipy.signal.get_window` to get the window. If 1d array_like, it must be length ``len(x)`` and is used directly. dodetrend : bool; optional If True, remove a linear fit from `x`; otherwise, no detrending is done. fold : bool; optional If true, "fold" negative frequencies on top of positive frequencies such that the coefficients at frequencies that have a negative counterpart are doubled (magnitude is also doubled). maxdf : scalar or None; optional If scalar, this is the maximum allowed frequency step; zero padding will be done if necessary to enforce this. Note that this is for providing more points between peaks only. If None, the delta frequency is simply ``sr/len(x)``. Returns ------- 3-tuple depending on `coef`: ========== ======================== `coef` Return value ========== ======================== "mag" (magnitude, phase, freq) "ab" (a, b, freq) "complex" (a + i b, None, freq) ========== ======================== All values are for the positive side frequencies only. The dimensions of the nd arrays `magnitude`, `phase`, `a` and `b` are similar to input `x` except that along the axis `axis`; the dimension of that axis corresponds to `freq`. `freq` is a 1d array of the positive side frequencies only. Definitions of `magnitude`, `phase`, and the `a` and `b` coefficients are shown below. `freq` is in Hz. Notes ----- The FFT results are scaled according to the 'coherent gain' of the window function. For the "boxcar" window (which is just all 1.0's), the coherent gain is 1.0. The coherent gain is defined by:: scale = 1/coherent_gain coherent_gain = sum(window)/len(window) The coefficients are related to the original signal by either of these two equivalent summations (if `fold` is True): .. math:: x(t_n) = \sum\limits^{len(x)-1}_{k=0} A_k \cos(k \omega t_n) + B_k \sin(k \omega t_n) .. math:: x(t_n) = \sum\limits^{len(x)-1}_{k=0} M_k\sin(k \omega t_n - \phi_k) where :math:`\omega = 2 \pi \Delta freq`, :math:`M` is the magnitude, and :math:`\phi` is the phase. The magnitude and phase are computed by: .. math:: \begin{aligned} M_k &= \sqrt {A_k^2 + B_k^2} \phi_k &= \arctan2(-A_k, B_k) \end{aligned} Normally, the frequency step is defined by: .. math:: \Delta freq = sr / \text{len}(x) A finer frequency step can be achieved by specifying the `maxdf` parameter. If `maxdf` is specified *and* the frequency step computed from the above equation is greater than `maxdf`, the frequency step is computed by: .. math:: \begin{aligned} N &= \text{nextpow2}(\text{int}(sr / maxdf)) \Delta freq &= sr / N \end{aligned} The function :func:`nextpow2` finds the next power of 2 integer. This approach makes efficient use of the FFT while ensuring the final frequency step is less than or equal to `maxdf`. The example below uses these formulas directly to upsample a signal. This is for demonstration only; to truly upsample a signal based on FFT in a more efficient manner, see :func:`scipy.signal.resample`. (See also :func:`resample`.) See also -------- :func:`fftmap`, :func:`transmissibility` Examples -------- .. plot:: :context: close-figs >>> import numpy as np >>> import matplotlib.pyplot as plt >>> from pyyeti import dsp >>> n = 23 >>> rng = np.random.default_rng() >>> x = rng.normal(size=n) >>> t = np.arange(n) >>> mag, phase, frq = dsp.fftcoef(x, 1.0) >>> _ = plt.figure('Example', clear=True, layout='constrained') >>> _ = plt.subplot(211) >>> _ = plt.plot(frq, mag) >>> _ = plt.ylabel('Magnitude') >>> _ = plt.subplot(212) >>> _ = plt.plot(frq, np.rad2deg(np.unwrap(phase))) >>> _ = plt.ylabel('Phase (deg)') >>> _ = plt.xlabel('Frequency (Hz)') >>> >>> w = 2*np.pi*frq[1] >>> >>> # use a finer time vector for reconstructions: >>> t2 = np.arange(0., n, .05) >>> >>> # reconstruct with magnitude and phase: >>> x2 = 0.0 >>> for k, (m, p, f) in enumerate(zip(mag, phase, frq)): ... x2 = x2 + m*np.sin(k*w*t2 - p) >>> >>> # reconstruct with A and B: >>> A, B, frq = dsp.fftcoef(x, 1.0, coef='ab') >>> x3 = 0.0 >>> for k, (a, b, f) in enumerate(zip(A, B, frq)): ... x3 = x3 + a*np.cos(k*w*t2) + b*np.sin(k*w*t2) >>> >>> _ = plt.figure('Example 2', clear=True, ... layout='constrained') >>> _ = plt.plot(t, x, 'o', label='Original') >>> _ = plt.plot(t2, x2, label='FFT fit w/ Mag & Phase') >>> _ = plt.plot(t2, x3, '--', label='FFT fit w/ A & B') >>> _ = plt.legend(loc='best') >>> _ = plt.title('Using `fftcoef` for FFT curve fit') >>> _ = plt.xlabel('Time (s)') """ if coef not in ("mag", "ab", "complex"): raise ValueError( f"invalid `coef` ({coef!r}). Must be one of 'mag', 'ab', or 'complex'." ) x = np.atleast_1d(x) n = x.shape[axis] if isinstance(window, (str, tuple)): window = signal.get_window(window, n) else: window = np.atleast_1d(window) if len(window) != n: raise ValueError( f"window size is {len(window)}; expected {n} to match signal" ) window *= n / window.sum() if not (axis == -1 or axis == x.ndim - 1): # Move the axis containing the data to the end x = np.swapaxes(x, axis, x.ndim - 1) window = _vector_to_axis(window, x.ndim, -1) if dodetrend: x = signal.detrend(x) * window else: x = x * window N = _fftsize(n, sr, maxdf) if N > n: shape = [*x.shape] shape[-1] = N X = np.empty(shape) X[..., :n] = x X[..., n:] = 0.0 else: X = x F = np.fft.rfft(X) f = np.fft.rfftfreq(N, 1.0 / sr) # or, could do this to get same result: # F = np.fft.fft(X) # m = N // 2 + 1 # f = np.arange(0.0, m) * (sr / N) # F = F[:m] if fold: a = 2.0 * F.real / n a[..., 0] /= 2.0 if not N & 1: # if N is an even number a[..., -1] /= 2.0 b = -2.0 * F.imag / n else: a = F.real / n b = -F.imag / n if not (axis == -1 or axis == x.ndim - 1): # Move the axis containing the data to the end a = np.swapaxes(a, axis, x.ndim - 1) b = np.swapaxes(b, axis, x.ndim - 1) if coef == "mag": return np.sqrt(a**2 + b**2), np.arctan2(-a, b), f elif coef == "complex": return a + 1j * b, None, f return a, b, f
[docs] def fftmap( timeslice, tsoverlap, sig, sr, window="hann", dodetrend=False, fold=True, maxdf=None ): """ Make an FFT map ('waterfall') over time and frequency. Parameters ---------- timeslice : scalar or string-integer If scalar, it is the length in seconds for each slice. If string, it contains the integer number of points for each slice. For example, if `sr` is 1000 samples/second, ``timeslice=0.75`` is equivalent to ``timeslice="750"``. tsoverlap : scalar in [0, 1) or string-integer If scalar, is the fraction of each time-slice to overlap. If string, it contains the integer number of points to overlap. For example, if `sr` is 1000 samples/second, ``tsoverlap=0.5`` and ``tsoverlap="500"`` each specify 50% overlap. sig : 1d array_like Signal to compute FFT map of. sr : scalar The sample rate (samples/sec) window : string, tuple, or 1d array_like; optional Specifies window function. If a string or tuple, it is passed to :func:`scipy.signal.get_window` to get the window. If 1d array_like, it must be length ``len(x)`` and is used directly. dodetrend : bool; optional If True, remove a linear fit from `x`; otherwise, no detrending is done. fold : bool; optional If true, "fold" negative frequencies on top of positive frequencies such that the coefficients at frequencies that have a negative counterpart are doubled (magnitude is also doubled). maxdf : scalar or None; optional If scalar, this is the maximum allowed frequency step; zero padding will be done if necessary to enforce this. Note that this is for providing more points between peaks only. If None, the delta frequency is simply ``sr/len(x)``. Returns ------- mp : 2d ndarray The FFT map; columns span time, rows span frequency (so each column is an FFT curve). Time increases going across the columns and frequency increases going down the rows. t : 1d ndarray Time vector of center times; corresponds to columns in map. Signal is assumed to start at time = 0. f : 1d ndarray Frequency vector; corresponds to rows in map. Notes ----- This routine calls :func:`fftcoef` for each time slice. `mp` is a matrix where each column is the FFT magnitude at all discrete frequencies for a certain time-slice. That is, time increases going across the columns and frequency increases going down the rows. See also -------- :func:`fftcoef` Examples -------- .. plot:: :context: close-figs >>> import numpy as np >>> import matplotlib.pyplot as plt >>> from matplotlib import cm, colors >>> from pyyeti import dsp, ytools >>> sig, t, f = ytools.gensweep(10, 1, 50, 4) >>> sr = 1/t[1] >>> mp, t, f = dsp.fftmap(2, .1, sig, sr) >>> pv = f <= 50.0 >>> cs = plt.contour(t, f[pv], mp[pv], 40, cmap=cm.plasma) >>> # This doesn't work in matplotlib 3.5.0: >>> # cbar = plt.colorbar() >>> # cbar.filled = True >>> # cbar.draw_all() >>> # But this does: >>> norm = colors.Normalize( ... vmin=cs.cvalues.min(), vmax=cs.cvalues.max() ... ) >>> sm = plt.cm.ScalarMappable(norm=norm, cmap=cs.cmap) >>> cb = plt.colorbar(sm, ax=plt.gca()) # , ticks=cs.levels) >>> # >>> _ = plt.xlabel('Time (s)') >>> _ = plt.ylabel('Frequency (Hz)') >>> ttl = 'FFT Map of Sine-Sweep @ 4 oct/min' >>> _ = plt.title(ttl) """ return waterfall( sig, sr, timeslice, tsoverlap, fftcoef, which=0, freq=2, kwargs=dict(sr=sr, window=window, dodetrend=dodetrend, fold=fold, maxdf=maxdf), )
[docs] def transmissibility( in_data, out_data, sr, timeslice=1.0, tsoverlap=0.5, window="hann", getmap=False, **kwargs, ): r""" Compute transmissibility transfer function using the FFT Transmissibility is a common transfer function measurement of ``output / input``. It is a type of frequency response function where the gain (magnitude) vs frequency is typically of primary interest. Note that the phase can be computed from the output of this routine as well. Parameters ---------- in_data : 1d array_like Time series of measurement values for the input data out_data : 1d array_like Time series of measurement values for the output data sr : scalar Sample rate. timeslice : scalar or string-integer If scalar, it is the length in seconds for each slice. If string, it contains the integer number of points for each slice. For example, if `sr` is 1000 samples/second, ``timeslice=0.75`` is equivalent to ``timeslice="750"``. tsoverlap : scalar in [0, 1) or string-integer If scalar, is the fraction of each time-slice to overlap. If string, it contains the integer number of points to overlap. For example, if `sr` is 1000 samples/second, ``tsoverlap=0.5`` and ``tsoverlap="500"`` each specify 50% overlap. window : string, tuple, or 1d array_like; optional Specifies window function. If a string or tuple, it is passed to :func:`scipy.signal.get_window` to get the window. If 1d array_like, it must be length ``len(x)`` and is used directly. getmap : bool, optional If True, get the transfer function map outputs (see below). *kwargs : optional Named arguments to pass to :func:`fftcoef`. Note that `x`, `sr`, `coef` and `window` arguments are passed automatically, and that `fold` is irrelevant (due to computing a ratio). Therefore, at the time of this writing, only `dodetrend`, and `maxdf` are really valid entries in `kwargs`. Returns ------- A SimpleNamespace with the members: f : 1d ndarray Array of sample frequencies. mag : 1d ndarray Average magnitude of transmissibility transfer function across all time slices of ``out_data / in_data``; length is ``len(f)``:: mag = abs(tr_map).mean(axis=1) phase : 1d ndarray Average phase in degrees of transmissibility transfer function across all time slices of ``out_data / in_data``; length is ``len(f)``. Computing the average of angles is tricky; for example, the average of 15 degrees and 355 degrees is 5 degrees. To get this result, the approach used here is to compute the average of cartesian coordinates of points on a unit circle at each angle, and then compute the angle to that average location:: phase = np.angle( (tr_map / abs(tr_map)).mean(axis=1), deg=True ) This definition of phase follows the negative sign convention of phase (as in :func:`fftcoef`): ``sin(theta - phase)``. tr_map : complex 2d ndarray; optional The complex transmissibility transfer function map. Each column is the transmissibility of ``out_data / in_data`` computed from the FFT ratio (from :func:`fftcoef`) for the corresponding time slice. Rows correspond to frequency `f` and columns correspond to time `t`. Only output if `getmap` is True. mag_map : 2d ndarray; optional The magnitude of the transmissibility map. Only output if `getmap` is True. It is computed by:: mag_map = abs(tr_map) phase_map : 2d ndarray; optional The phase in degrees of the transmissibility map. Only output if `getmap` is True. It is computed by:: phase_map = np.angle(tr_map, deg=True) t : 1d ndarray; optional The time vector for the columns of `tr_map`, `mag_map` and `phase_map`. Only output if `getmap` is True. Notes ----- This routine calls :func:`waterfall` for handling the timeslices and preparing the output and :func:`fftcoef` (with `coef` set to "complex") to process each time slice of both `in_data` and `out_data`. The frequency step size is determined by `timeslice` in seconds:: freq_step_size = 1 / timeslice Examples -------- We'll make up a pseudo-random signal with content from 5 to 60 Hz, and use that as a base-excitation (acceleration) input to a spring mass system. This is the same system as used in the :func:`pyyeti.srs.srs` routine:: _____ ^ | | | | M | --- SDOF response (x) |_____| / | K \ |_| C ^ / | | [=======] --- input base acceleration (sig) We'll then compute the transmissibility of the acceleration of the mass relative to the base-excitation. We should see a nice peak (roughly equal to the dynamic amplification factor Q) at the natural frequency of the single degree of freedom system. For the example, the frequency of the SDOF system is set to 35 Hz and Q is set to 25. To check the results, the exact magnitude and phase from a closed-form frequency-domain solution (via :func:`pyyeti.ode.SolveUnc.fsolve`) will be plotted as well. .. plot:: :context: close-figs >>> import numpy as np >>> import matplotlib.pyplot as plt >>> from matplotlib import cm, colors >>> import matplotlib.gridspec as gridspec >>> from pyyeti import dsp, psd, ode >>> >>> # make up a flat (5-60 Hz) spectrum signal as input: >>> psd_ = 1.0 >>> fstart = 5.0 >>> fstop = 60.0 >>> spec = np.array([[0.1, psd_], [100.0, psd_]]) >>> in_acce, sr, t = psd.psd2time( ... spec, fstart, fstop, ppc=10, df=0.05, gettime=True, ... winends={"ends": "both"} ... ) >>> >>> # define a 35 hz system with 2% damping (Q = 25): >>> frq = 35.0 >>> omega = frq * np.pi * 2 >>> Q = 25 >>> zeta = 1 / (2 * Q) >>> ts = ode.SolveUnc(1, 2 * zeta * omega, omega ** 2, 1 / sr) >>> >>> # base-drive system like SRS ... in_acce is base input >>> # (see pyyeti.srs.srs): >>> # zddot = input >>> # x = absolute displacement of mass >>> # u = x - z >>> sol = ts.tsolve(-in_acce) >>> out_acce = sol.a.ravel() + in_acce # xddot >>> >>> tr = dsp.transmissibility( ... in_acce, out_acce, sr, getmap=True) >>> >>> fig = plt.figure('Example', figsize=(8, 11), clear=True, ... layout='tight') >>> >>> # use GridSpec to make a nice layout with colorbars: >>> gs = gridspec.GridSpec(5, 2, width_ratios=[30, 1]) >>> >>> ax = ax_time = plt.subplot(gs[0, 0]) >>> _ = ax.plot(t, in_acce, label="input") >>> _ = ax.plot(t, out_acce, alpha=0.75, label="output") >>> _ = ax.set_xlabel("Time (s)") >>> _ = ax.set_ylabel("Acceleration") >>> _ = ax.legend(loc="upper right") >>> >>> # plot only frequency range where input has content: >>> pv = (tr.f >= fstart) & (tr.f <= fstop) >>> fm = tr.f[pv] >>> >>> # compute exact solution for comparison: >>> in_acce_exact = np.ones(len(fm)) >>> sol = ts.fsolve(-in_acce_exact, fm) >>> acce_exact = in_acce_exact + sol.a[0] >>> mag_exact = abs(acce_exact) >>> phase_exact = -np.angle(acce_exact, deg=True) >>> >>> # plot magnitude: >>> ax = ax_freq = plt.subplot(gs[1, 0]) >>> _ = ax.plot(fm, tr.mag[pv], label="Estimate") >>> _ = ax.plot(fm, mag_exact, label="Exact") >>> _ = ax.set_xlabel("Frequency (Hz)") >>> _ = ax.set_ylabel("TR Magnitude") >>> _ = ax.set_title("Average Transmissibility Magnitude") >>> _ = ax.legend(loc="upper right") >>> >>> ax = plt.subplot(gs[2, 0], sharex=ax_time) >>> c = ax.contour(tr.t, fm, tr.mag_map[pv], 40, ... cmap=cm.plasma) >>> _ = ax.set_xlabel("Time (s)") >>> _ = ax.set_ylabel("Frequency (Hz)") >>> _ = ax.set_title("Transmissibility Magnitude Map") >>> >>> ax = plt.subplot(gs[2, 1]) >>> >>> # This doesn't work in matplotlib 3.5.0: >>> # cb = fig.colorbar(c, cax=ax) >>> # cb.filled = True >>> # cb.draw_all() >>> # But this does: >>> norm = colors.Normalize( ... vmin=c.cvalues.min(), vmax=c.cvalues.max() ... ) >>> sm = plt.cm.ScalarMappable(norm=norm, cmap=c.cmap) >>> cb = plt.colorbar(sm, cax=ax) # , ticks=c.levels) >>> >>> _ = ax.set_title("TR Magnitude") >>> >>> # plot phase: >>> ax = plt.subplot(gs[3, 0], sharex=ax_freq) >>> _ = ax.plot(fm, tr.phase[pv], label="Estimate") >>> _ = ax.plot(fm, phase_exact, label="Exact") >>> _ = ax.set_xlabel("Frequency (Hz)") >>> _ = ax.set_ylabel("TR Phase (deg)") >>> _ = ax.set_title("Average Transmissibility Phase") >>> _ = ax.legend(loc="lower right") >>> >>> ax = plt.subplot(gs[4, 0], sharex=ax_time) >>> c = ax.contour(tr.t, fm, tr.phase_map[pv], 40, ... cmap=cm.plasma) >>> _ = ax.set_xlabel("Time (s)") >>> _ = ax.set_ylabel("Frequency (Hz)") >>> _ = ax.set_title("Transmissibility Phase Map") >>> >>> ax = plt.subplot(gs[4, 1]) >>> >>> # This doesn't work in matplotlib 3.5.0: >>> # cb = fig.colorbar(c, cax=ax) >>> # cb.filled = True >>> # cb.draw_all() >>> # But this does: >>> norm = colors.Normalize( ... vmin=c.cvalues.min(), vmax=c.cvalues.max() ... ) >>> sm = plt.cm.ScalarMappable(norm=norm, cmap=c.cmap) >>> cb = plt.colorbar(sm, cax=ax) # , ticks=c.levels) >>> >>> _ = ax.set_title("TR Phase (deg)") As an aside and for fun, compare the actual root-mean-square response to the Miles' equation estimate. Miles' should be a little higher on average ... it assumes infinitely wide, flat input spectrum. We'll also compare the actual peak vs both: - a ``3 * sigma`` Miles' peak - a ``peak_factor * sigma`` Miles' peak, where ``peak_factor`` is determined from the Rayleigh distribution The Rayleigh peak factor is ``sqrt(2*log(duration*f))``. See :func:`pyyeti.fdepsd.fdepsd` for the derivation of this factor. In this example, since the number of cycles is quite high, 3 sigma will generally be below the peak. The Rayleigh peak factor allows for a fairly good estimate of the actual peak. >>> actual = np.sqrt((out_acce ** 2).mean()) >>> miles = np.sqrt(np.pi / 2 * Q * frq * psd_) >>> if True: # doctest: +SKIP ... print("rms comparison:") ... print(f"\tactual = {actual:.2f}") ... print(f"\tMiles = {miles:.2f}") ... print(f"\tratio = {actual/miles:.2f}") rms comparison: actual = 36.50 Miles = 37.07 ratio = 0.98 >>> >>> # Compare actual peak to 3 sigma peak from Miles': >>> actual_pk = abs(out_acce).max() >>> miles_3s = 3 * miles >>> if True: # doctest: +SKIP ... print("peak comparison #1:") ... print(f"\tactual = {actual_pk:.2f}") ... print(f"\t3 * Miles = {miles_3s:.2f}") ... print(f"\tratio = {actual_pk/miles_3s:.2f}") peak comparison #1: actual = 134.00 3 * Miles = 111.22 ratio = 1.20 >>> >>> rpf = np.sqrt(2 * np.log(frq * t[-1])) # rayleigh peak factor >>> miles_rayleigh = rpf * miles >>> if True: # doctest: +SKIP ... print("peak comparison #2:") ... print(f"\tactual = {actual_pk:.2f}") ... print(f"\t{rpf:.2f} * Miles = {miles_rayleigh:.2f}") ... print(f"\tratio = {actual_pk/miles_rayleigh:.2f}") peak comparison #2: actual = 134.00 3.62 * Miles = 134.19 ratio = 1.00 """ in_data, out_data = np.atleast_1d(in_data, out_data) if in_data.shape != out_data.shape or in_data.ndim != 1: raise ValueError("`in_data` and `out_data` must be 1d arrays of the same size") kwargs["sr"] = sr kwargs["window"] = window kwargs["coef"] = "complex" fftmap_in, t, f = waterfall( in_data, sr, timeslice, tsoverlap, fftcoef, which=0, freq=2, kwargs=kwargs ) fftmap_out, t, f = waterfall( out_data, sr, timeslice, tsoverlap, fftcoef, which=0, freq=2, kwargs=kwargs ) tr_map = fftmap_out / fftmap_in mag = abs(tr_map).mean(axis=1) phase = np.angle((tr_map / abs(tr_map)).mean(axis=1), deg=True) if getmap: mag_map = abs(tr_map) phase_map = np.angle(tr_map, deg=True) return SimpleNamespace( f=f, mag=mag, phase=phase, tr_map=tr_map, mag_map=mag_map, phase_map=phase_map, t=t, ) return SimpleNamespace(f=f, mag=mag, phase=phase)