Source code for pyyeti.psd

# -*- coding: utf-8 -*-
"""
Power spectral density tools.
"""

from warnings import warn
import numpy as np
import scipy.signal as signal
from scipy.interpolate import interp1d
from pyyeti import dsp


# temporary patch for numpy < 2.0
try:
    np.trapezoid
except AttributeError:
    np.trapezoid = np.trapz


# FIXME: We need the str/repr formatting used in Numpy < 1.14.
try:
    np.set_printoptions(legacy="1.13")
except TypeError:
    pass


def _set_frange(frange, low, high):
    s = frange[0] if frange[0] > 0.0 else low
    e = frange[-1] if frange[-1] <= high else high
    return s, e


[docs] def get_freq_oct(n, frange=(1.0, 10000.0), exact=False, trim="outside", anchor=None): r""" Get frequency vector on an octave scale. Parameters ---------- n : scalar Specify octave band: 1 for full octave, 3 for 1/3 octave, 6 for 1/6, etc. frange : 1d array_like; optional Specifies bounds for the frequencies according to input `trim`. Only the first and last elements are used. If the first element <= 0.0, 1.0 is used instead. See also the `trim` input. trim : string; optional Specify how to trim frequency vector to `frange`: ========= ================================================ `trim` Description ========= ================================================ 'inside' All frequencies inside `frange`: ``F_lower[0] >= frange[0]``; ``F_upper[-1] <= frange[-1]`` 'center' Center frequencies inside `frange`: ``F[0] >= frange[0]``; ``F[-1] <= frange[-1]`` 'outside' First band includes ``frange[0]`` and last band includes ``frange[-1]`` 'band' Same as 'outside' ========= ================================================ exact : bool; optional If False, return an approximate octave scale so that it hits the power of 10s, achored at 1 Hz by default (see `anchor`). If True, return an exact octave scale, anchored at 1000 Hz by default. anchor : scalar or None; optional If scalar, it specifies the anchor. If None, the anchor used is specified under `exact` above (1 or 1000). Returns ------- F : 1d ndarray Contains the band center frequencies on an octave scale. F_lower : 1d ndarray Same size as `F`, band lower frequencies. F_upper : 1d ndarray Same size as `F`, band upper frequencies. Notes ----- If `exact` is False, the center, lower, and upper frequencies are computed from (where :math:`i` and :math:`j` are integers according to `frange` and `trim`): .. math:: \begin{aligned} F &= anchor \cdot 10^{3 [i, i+1, i+2, ..., j] / (10 n)} \\ F_{lower} &= F/10^{3/(20 n)} \\ F_{upper} &= F \cdot 10^{3/(20 n)} \end{aligned} If `exact` is True: .. math:: \begin{aligned} F &= anchor \cdot 2^{[i, i+1, i+2, ..., j] / n} \\ F_{lower} &= F/2^{1/(2 n)} \\ F_{upper} &= F \cdot 2^{1/(2 n)} \end{aligned} Examples -------- >>> import numpy as np >>> from pyyeti import psd >>> np.set_printoptions(precision=4, linewidth=75) >>> np.array(psd.get_freq_oct(3, [505, 900])) array([[ 501.1872, 630.9573, 794.3282, 1000. ], [ 446.6836, 562.3413, 707.9458, 891.2509], [ 562.3413, 707.9458, 891.2509, 1122.0185]]) >>> np.array(psd.get_freq_oct(3, [505, 900], trim='center')) array([[ 630.9573, 794.3282], [ 562.3413, 707.9458], [ 707.9458, 891.2509]]) >>> np.array(psd.get_freq_oct(3, [505, 900], exact=True)) array([[ 500. , 629.9605, 793.7005, 1000. ], [ 445.4494, 561.231 , 707.1068, 890.8987], [ 561.231 , 707.1068, 890.8987, 1122.462 ]]) >>> psd.get_freq_oct(6, [.8, 2.6])[0] array([ 0.7943, 0.8913, 1. , 1.122 , 1.2589, 1.4125, 1.5849, 1.7783, 1.9953, 2.2387, 2.5119]) >>> psd.get_freq_oct(6, [.8, 2.6], anchor=2)[0] array([ 0.7962, 0.8934, 1.0024, 1.1247, 1.2619, 1.4159, 1.5887, 1.7825, 2. , 2.244 , 2.5179]) >>> psd.get_freq_oct(6, [.8, 2.6], exact=True)[0] array([ 0.7751, 0.87 , 0.9766, 1.0962, 1.2304, 1.3811, 1.5502, 1.74 , 1.9531, 2.1923, 2.4608]) >>> psd.get_freq_oct(6, [.8, 2.6], exact=True, anchor=2)[0] array([ 0.7937, 0.8909, 1. , 1.1225, 1.2599, 1.4142, 1.5874, 1.7818, 2. , 2.2449, 2.5198]) """ s = frange[0] if frange[0] > 0.0 else 1.0 e = frange[-1] if exact: if not anchor: anchor = 1000.0 var1 = np.floor(np.log2(s / anchor) * n) var2 = np.log2(e / anchor) * n + 1 bands = np.arange(var1, var2) F = anchor * 2 ** (bands / n) factor = 2 ** (1 / (2 * n)) else: if not anchor: anchor = 1.0 var1 = np.floor(np.log10(s / anchor) * 10 * n / 3) var2 = np.log10(e / anchor) * 10 * n / 3 + 1 bands = np.arange(var1, var2) F = anchor * 10 ** (3 * bands / (10 * n)) factor = 10 ** (3 / (20 * n)) FL, FU = F / factor, F * factor if trim in ("outside", "band"): Nmax = np.max(np.nonzero(FL <= e)[0]) + 1 Nmin = np.min(np.nonzero(FU >= s)[0]) elif trim == "center": Nmax = np.max(np.nonzero(F <= e)[0]) + 1 Nmin = np.min(np.nonzero(F >= s)[0]) elif trim == "inside": Nmax = np.max(np.nonzero(FU <= e)[0]) + 1 Nmin = np.min(np.nonzero(FL >= s)[0]) else: raise ValueError("input `trim` has invalid value") F = F[Nmin:Nmax] FL = FL[Nmin:Nmax] FU = FU[Nmin:Nmax] return F, FL, FU
[docs] def proc_psd_spec(spec): """ Process PSD specification for other routines Parameters ---------- spec : 2d ndarray or 2-element tuple/list If ndarray, its columns are ``[Freq, PSD1, PSD2, ... PSDn]``. Otherwise, it must be a 2-element tuple or list, eg: ``(Freq, PSD)`` where PSD is: ``[PSD1, PSD2, ... PSDn]``. In the second usage, PSD can be 1d; in the first usage, PSD is always considered 2d. Returns ------- freq : 1d ndarray The frequency vector in `spec`. PSD : 1d or 2d ndarray The PSD matrix or vector as noted above. Will be 1d only if the second usage of `spec` was used and `PSD` is 1d. Shape is either ``(len(freq), n)`` or ``(len(freq),)``. npsds : integer Number of PSDs in `PSD`. Notes ----- Any NaNs in the specification frequency are deleted (along with the corresponding PSD values). """ if isinstance(spec, np.ndarray): if spec.ndim != 2 or spec.shape[1] <= 1: raise ValueError( "incorrectly sized ndarray for " "`spec` input (must be 2d with more" " than 1 column)" ) Freq = spec[:, 0] PSD = spec[:, 1:] npsds = PSD.shape[1] else: if len(spec) != 2: msg = ( "incorrectly sized `spec` input: for tuple/" f"list input, must have length 2, not {len(spec)}" ) raise ValueError(msg) Freq, PSD = np.atleast_1d(*spec) if len(Freq) != PSD.shape[0]: raise ValueError("Freq and PSD inputs in `spec` are incompatibly sized") if PSD.ndim > 2: raise ValueError("the PSD input in `spec` has more than 2 dimensions.") npsds = 1 if PSD.ndim == 1 else PSD.shape[1] # check for nans in Freq: pv = np.isnan(Freq) if pv.any(): pv = ~pv Freq = Freq[pv] PSD = PSD[pv] return Freq, PSD, npsds
[docs] def area(spec): r""" Compute the area under a PSD random specification. Parameters ---------- spec : 2d ndarray or 2-element tuple/list If ndarray, its columns are ``[Freq, PSD1, PSD2, ... PSDn]``. Otherwise, it must be a 2-element tuple or list, eg: ``(Freq, PSD)`` where PSD is: ``[PSD1, PSD2, ... PSDn]``. In the second usage, PSD can be 1d; in the first usage, PSD is always considered 2d. The frequency vector must be monotonically increasing. Returns ------- area : 1d array Area(s) under the PSD specification. Notes ----- This routine assumes a constant db/octave slope for all segments. With this assumption, it computes the area for each segment with the following formula (derivation below). The segment goes from (f1, p1) to (f2, p2):: s = log(p2/p1)/log(f2/f1) if s == -1: area_segment = p1*f1*log(f2/f1) else: area_segment = (f2*p2 - f1*p1)/(s + 1) .. warning:: This routine is only for specifications with all constant db/octave segments. Do not use for general freq vs. psd curves, such as output from analysis; use something like :func:`numpy.trapezoid` (or :func:`trapz` for NumPy versions < 2.0). The following derives the equations for computing the area under the curve. Each segment is assumed to have a constant db/octave slope. We'll start by computing the slope (:math:`m`) of each segment. To get the slope, we'll need the number of dbs :math:`d` and the number of octaves :math:`n`. First, the number of dbs: .. math:: d = 10 \log_{10} (p_2 / p_1) The number of octaves (:math:`n`) is defined by :math:`f_2 = 2^n f_1`. Solving for :math:`n` gives: .. math:: n = \frac{\log_{10} (f_2 / f_1)}{\log_{10} (2)} Therefore, the slope :math:`m = d / n` for the segment is: .. math:: m = 10 \log_{10} (2) \frac{\log_{10} (p_2 / p_1)} {\log_{10} (f_2 / f_1)} To simplify the equations, we'll define the variable :math:`s` as: .. math:: s = \frac{m}{10 \log_{10} (2)} = \frac{\log_{10} (p_2 / p_1)} {\log_{10} (f_2 / f_1)} Solving for :math:`p_2` gives: .. math:: p_2 = p_1 ( f_2 / f_1 )^s Since the segment has a constant db/octave slope, that equation can be generalized for any frequency value :math:`x` from :math:`f_1` to :math:`f_2`: .. math:: p(x) = p_1 ( x / f_1 )^s The area under the segment is simply the integral of that equation: .. math:: area_{segment} = \int_{f_1}^{f_2} p_1 ( x / f_1 )^s dx If :math:`s = -1`: .. math:: area_{segment} = p_1 f_1 \ln ( f_2 / f_1 ) Otherwise, if :math:`s \neq -1`: .. math:: \begin{aligned} area_{segment} &= \frac{p_1 ( f_2^{s+1} - f_1^{s+1} )} {f_1^s (s+1)} \\ &= ( p_2 f_2 - p_1 f_1 ) / (s + 1) \end{aligned} Examples -------- Compute a 3-sigma peak value given a test spec: >>> import numpy as np >>> from pyyeti import psd >>> spec = np.array([[20, .0053], ... [150, .04], ... [600, .04], ... [2000, .0036]]) >>> 3*np.sqrt(psd.area(spec)) # doctest: +ELLIPSIS array([ 18.43...]) For comparison, use a brute-force technique: >>> f = np.arange(20, 2000.1, 0.1) >>> p = psd.interp(spec, f, linear=False) >>> 3*np.sqrt(np.trapezoid(p, f, axis=0)) # doctest: +ELLIPSIS array([ 18.43...]) """ Freq, PSD, _ = proc_psd_spec(spec) if PSD.ndim == 1: PSD = PSD[:, None] _area = np.zeros(PSD.shape[1]) # accumulate the areas of all segments for each curve: for i in range(Freq.size - 1): f1 = Freq[i] f2 = Freq[i + 1] for j in range(PSD.shape[1]): p1 = PSD[i, j] p2 = PSD[i + 1, j] s = np.log(p2 / p1) / np.log(f2 / f1) if abs(s + 1.0) < 1e-5: # happens when p2/p1 = f1/f2 # slope = -10*log10(2) db/octave intarea = p1 * f1 * np.log(f2 / f1) else: intarea = (f2 * p2 - f1 * p1) / (s + 1.0) _area[j] += intarea return _area
[docs] def interp(spec, freq, linear=False): """ Interpolate values on a PSD specification (or analysis curve). Parameters ---------- spec : 2d ndarray or 2-element tuple/list If ndarray, its columns are ``[Freq, PSD1, PSD2, ... PSDn]``. Otherwise, it must be a 2-element tuple or list, eg: ``(Freq, PSD)`` where PSD is: ``[PSD1, PSD2, ... PSDn]``. In the second usage, PSD can be 1d; in the first usage, PSD is always considered 2d. The frequency vector must be monotonically increasing. freq : 1d array Vector of frequencies to interpolate the specification to. linear : bool If True, use linear interpolation to expand `spec` to the frequencies in `freq`. If False, `spec` is expanded via interpolation in log space. In other words: ================ ========================================== Use: When: ================ ========================================== ``linear=False`` `spec` is an actual PSD test specification -- that is, it uses constant db/octave slopes ``linear=True`` `spec` doesn't use constant db/octave slopes (eg, an analysis curve) ================ ========================================== Returns ------- psdfull : 1d or 2d array Matrix of the interpolated PSD values. If 2d, has "n" columns (which will be one less than `spec` in the first usage above because the frequency column is not included). Will be 1d if PSD was 1d on input. Notes ----- For PSD data that is defined on a center-band frequency scale, use :func:`rescale` instead. Zeros are used to fill in PSD values for frequencies outside the specification(s). Any NaNs in the specification frequency are deleted (along with the corresponding PSD values) before interpolation. This is done by the routine :func:`proc_psd_spec`. See also -------- :func:`rescale` Examples -------- >>> import numpy as np >>> from pyyeti import psd >>> spec = np.array([[20, .0053], ... [150, .04], ... [600, .04], ... [2000, .0036]]) >>> freq = [100, 200, 600, 1200] >>> np.set_printoptions(precision=3) >>> psd.interp(spec, freq).ravel() array([ 0.027, 0.04 , 0.04 , 0.01 ]) >>> psd.interp(spec, freq, linear=True).ravel() array([ 0.027, 0.04 , 0.04 , 0.024]) Using the second form of the spec input: >>> spec = ([ 20, 150, 600, 2000], ... [.0053, .04, .04, .0036]) >>> psd.interp(spec, freq, linear=True) array([ 0.027, 0.04 , 0.04 , 0.024]) """ Freq, PSD, npsds = proc_psd_spec(spec) # spec = np.atleast_2d(spec) freq = np.atleast_1d(freq) if linear: ifunc = interp1d( Freq, PSD, axis=0, bounds_error=False, fill_value=0, assume_sorted=True ) psdfull = ifunc(freq) else: ifunc = interp1d( np.log(Freq), np.log(PSD), axis=0, bounds_error=False, fill_value=0, assume_sorted=True, ) psdfull = ifunc(np.log(freq)) pv = (freq >= Freq[0]) & (freq <= Freq[-1]) psdfull[pv] = np.exp(psdfull[pv]) return psdfull
[docs] def rescale(P, F, n_oct=3, freq=None, extendends=True, frange=None): """ Convert PSD from one frequency scale to another. Parameters ---------- P : array_like Vector or matrix; PSD(s) to convert. Works columnwise if matrix. F : array_like Vector; center frequencies for `P`. If steps are not linear, logarithmic spacing is assumed. n_oct : scalar; optional Specifies the desired octave scale. 1 means full octave, 3 means 1/3rd octave, 6 means 1/6th octave, etc. The routine :func:`get_freq_oct` is used to calculate the frequency vector with the default options. To change options, call :func:`get_freq_oct` directly and provide that input via `freq`. freq : array_like or None; optional Alternative to `n_oct` and takes precedence. Specifies desired output frequencies directly. If steps are not linear, logarithmic spacing is assumed for computing the band boundaries. extendends : bool; optional If True and the first and/or last frequency band extends beyond the original data, the area of the new band is adjusted up by the ratio of the new bandwidth over the portion that is covered by the original data. This will cause the corresponding PSD value to be higher than it would otherwise be, meaning the overall mean-square value will also be a little higher. frange : 1d array_like or None; optional This option can be used to limit the frequency range of the output frequencies. If None, this option is ignored for cases where `freq` is used, but is set internally to ``(1.0, np.inf))`` for cases where `n_oct` is used. Only the first and last elements of `frange` are used. Note that the output frequencies will be trimmed further if needed by the first and last values of `F`. .. note:: When the `n_oct` option is used, if the first value of `frange` is <= 0.0, it is internally reset to 1.0. Returns ------- Pout : ndarray Vector or matrix; converted PSD(s). Rows correspond to the new frequency scale; columns correspond with `P`. Fctr : ndarray Vector; center frequencies of output. Equal to `freq` if that was input. msv : scalar or 1d ndarray Mean square value estimate for each PSD. ms : ndarray Vector or matrix; converted mean square values directly (instead of density). For constant frequency step df, ``ms = df*Pout``. Same size as `Pout`. Notes ----- The input PSD is assumed to be on a center-band frequency scale. For PSD specifications that use constant dB/octave slopes, use :func:`interp` instead. This algorithm works by interpolating on cummulative area such that original contributions to total mean-square per band is preserved. .. note:: Note that if the area of the first and/or last band is extended (see `extendends` above), the overall mean-square value will be higher than the original. See :func:`get_freq_oct` for more information on how the octave scales are calculated. See also -------- :func:`interp` Examples -------- Compute a PSD with :func:`scipy.signal.welch` and then rescale it to 3rd, 6th, and 12th octave scales starting at 1.0 Hz. Compare mean square values from all 4 PSDs. .. plot:: :context: close-figs >>> import matplotlib.pyplot as plt >>> import numpy as np >>> import scipy.signal as signal >>> from pyyeti import psd >>> >>> frange = (1.0, np.inf) >>> rng = np.random.default_rng() >>> g = rng.normal(size=10000) >>> sr = 400 >>> f, p = signal.welch(g, sr, nperseg=sr) >>> p3, f3, msv3, ms3 = psd.rescale(p, f, frange=frange) >>> p6, f6, msv6, ms6 = psd.rescale( ... p, f, n_oct=6, frange=frange) >>> p12, f12, msv12, ms12 = psd.rescale( ... p, f, n_oct=12, frange=frange) >>> >>> fig = plt.figure('Example', clear=True, ... layout='constrained') >>> line = plt.semilogx(f, p, label='Linear') >>> line = plt.semilogx(f3, p3, label='1/3 Octave') >>> line = plt.semilogx(f6, p6, label='1/6 Octave') >>> line = plt.semilogx(f12, p12, label='1/12 Octave') >>> _ = plt.legend(ncol=2, loc='best') >>> _ = plt.xlim([1, 200]) >>> msv1 = np.sum(p[1:]*(f[1]-f[0])) >>> abs(msv1/msv3 - 1) < .12 True >>> abs(msv1/msv6 - 1) < .06 True >>> abs(msv1/msv12 - 1) < .03 True Demonstrate ``extendends=False``. Ends will be less than input because there is a region of zero area averaged in: >>> in_freq = np.arange(0, 10.1, .25) >>> out_freq = np.arange(0, 10.1, 5) >>> in_p = np.ones_like(in_freq) >>> p, f, ms, mvs = psd.rescale(in_p, in_freq, freq=out_freq) >>> p array([ 1., 1., 1.]) >>> p, f, ms, mvs = psd.rescale(in_p, in_freq, freq=out_freq, ... extendends=False) >>> p array([ 0.525, 1. , 0.525]) The 0.525 value is from: ``area/width = 1*(2.5-(-0.125))/5 = 0.525`` """ F = np.atleast_1d(F) P = np.atleast_1d(P) def _get_fl_fu(fcenter): Df = np.diff(fcenter) # if np.all(Df == Df[0]): if (abs(Df / Df[0] - 1.0) < 1e-12).all(): # linear scale Df = Df[0] FL = fcenter - Df / 2 FU = fcenter + Df / 2 else: # output is not linear, assume log mid = np.sqrt(fcenter[:-1] * fcenter[1:]) fact1 = mid[0] / fcenter[1] fact2 = fcenter[-1] / mid[-1] FL = np.hstack((fact1 * fcenter[0], mid)) FU = np.hstack((mid, fact2 * fcenter[-1])) return FL, FU if freq is None: if frange is None: frange = (1.0, np.inf) frange = _set_frange(frange, 1.0, F[-1]) Wctr, FL, FU = get_freq_oct(n_oct, exact=True, frange=frange) else: freq = np.atleast_1d(freq) if frange is not None: freq = freq[(freq >= frange[0]) & (freq <= frange[-1])] FL, FU = _get_fl_fu(freq) Nmax = np.max(np.nonzero(FL <= F[-1])[0]) + 1 Nmin = np.min(np.nonzero(FU >= F[0])[0]) Wctr = freq[Nmin:Nmax] FL = FL[Nmin:Nmax] FU = FU[Nmin:Nmax] oned = False if P.ndim == 1 or (P.ndim == 2 and P.shape[0] == 1): oned = True P = P.reshape(-1, 1) cols = 1 else: cols = P.shape[1] # calculate cumulative area: Df = np.diff(F) if np.all(Df == Df[0]): # input uses linear frequency scale FLin = F - Df[0] / 2 FUin = F + Df[0] / 2 else: # not linear, assume log FLin, FUin = _get_fl_fu(F) Df = (FUin - FLin).reshape(-1, 1) ca = np.vstack((np.zeros((1, cols)), np.cumsum(Df * P, axis=0))) Fa = np.hstack((FLin[0], FUin)) if extendends: fl = FL[0] fu = FU[-1] if FL[0] < FLin[0]: FL[0] = FLin[0] if FU[-1] > FUin[-1]: FU[-1] = FUin[-1] cal = np.zeros((len(FL), cols)) cau = np.zeros((len(FL), cols)) for i in range(cols): # with np.interp, interpolating cumulative area beyond end # points will take the end value -- that's perfect here: 0's # on the front, total area on the back cal[:, i] = np.interp(FL, Fa, ca[:, i]) cau[:, i] = np.interp(FU, Fa, ca[:, i]) # Compute new values ms = cau - cal psdoct = ms * (1 / (FU - FL).reshape(-1, 1)) if extendends: FL[0] = fl FU[-1] = fu ms = psdoct * (FU - FL).reshape(-1, 1) msv = np.sum(ms, axis=0) if oned: psdoct = psdoct.ravel() ms = ms.ravel() msv = msv[0] return psdoct, Wctr, msv, ms
[docs] def spl( x, sr, nperseg=None, overlap=0.5, window="hann", timeslice=1.0, tsoverlap=0.5, fs=3, pref=2.9e-9, frange=(25.0, np.inf), extendends=True, ): r""" Sound pressure level estimation using PSD. Parameters ---------- x : 1d array like Vector of pressure values. sr : scalar Sample rate. nperseg : int, optional Length of each segment for the FFT. Defaults to ``int(sr / 5)`` for 5 Hz frequency step in PSD. Note: frequency step in Hz = ``sr/nperseg``. overlap : scalar; optional Amount of overlap in windows, eg 0.5 would be 50% overlap. window : str or tuple or array like; optional Passed to :func:`scipy.signal.welch`; see that routine for more information. timeslice : scalar or string-integer If scalar, it is the length in seconds for each slice. If string, it contains the integer number of points for each slice. For example, if `sr` is 1000 samples/second, ``timeslice=0.75`` is equivalent to ``timeslice="750"``. tsoverlap : scalar in [0, 1) or string-integer If scalar, is the fraction of each time-slice to overlap. If string, it contains the integer number of points to overlap. For example, if `sr` is 1000 samples/second, ``tsoverlap=0.5`` and ``tsoverlap="500"`` each specify 50% overlap. fs : integer; optional Specifies output frequency scale. Zero means linear scale, anything else is passed to :func:`rescale`. Example: === ====================================================== 0 linear scale as computed by :func:`scipy.signal.welch` 1 full octave scale 3 3rd octave scale 6 6th octave scale === ====================================================== pref : scalar; optional Reference pressure. 2.9e-9 psi is considered to be the threshhold for human hearing. In Pascals, that value is 2e-5. frange : 1d array_like; optional Specifies bounds for the frequencies. Only the first and last elements are used. If the first value is < 0.0, it is reset to 0.0. If the last value is > ``sr/2``, it is reset to ``sr/2``. Note that for octave scales, :func:`rescale` is used which enforces a minimum of 1.0 Hz. extendends : bool; optional Passed to :func:`rescale` if an octave scale output is requested. See that routine for more information. Returns ------- f : ndarray The output frequency vector (Hz). spl : ndarray The sound pressure level vector in dB. oaspl : scalar The overall sound pressure level. Notes ----- This routine ultimately calls :func:`scipy.signal.welch` (via :func:`psdmod`) to calculate the PSD. It then converts that to mean-square values per band (for linear frequency steps, this is just ``PSD * delta_freq``). Loosely, the math is: .. math:: \begin{aligned} V &= \frac{PSD \cdot \Delta f}{P_{ref}^2} \\ SPL &= 10 \; \log_{10} ( V ) \\ OASPL &= 10 \; \log_{10} \left ( \sum V \right ) \end{aligned} Examples -------- .. plot:: :context: close-figs >>> import numpy as np >>> from pyyeti import psd >>> rng = np.random.default_rng() >>> x = rng.normal(size=100000) >>> sr = 4000 >>> f, spl, oaspl = psd.spl(x, sr, sr, timeslice=len(x)/sr) >>> # oaspl should be around 170.75 (since variance = 1): >>> shouldbe = 10*np.log10(1/(2.9e-9)**2) >>> abs(oaspl/shouldbe - 1) < .01 True Plot the 1/3 octave SPL from above with a full octave SPL. The OASPL comes out a little higher for the full octave SPL because of the ``extendends=True`` option: >>> import matplotlib.pyplot as plt >>> full = psd.spl(x, sr, sr, timeslice=len(x)/sr, fs=1) >>> lbl = f"1/3 Octave SPL; OASPL={oaspl:.2f}" >>> _ = plt.plot(f, spl, "-o", label=lbl) >>> lbl = f"Full Octave SPL; OASPL={full[2]:.2f}" >>> _ = plt.plot(full[0], full[1], "-o", label=lbl) >>> _ = plt.legend() """ if nperseg is None: nperseg = int(sr / 5) # compute psd F, P = psdmod( x, sr, nperseg=nperseg, window=window, timeslice=timeslice, tsoverlap=tsoverlap, noverlap=int(overlap * nperseg), ) s, e = _set_frange(frange, 0.0, sr / 2) if fs != 0: _, F, _, P = rescale(P, F, n_oct=fs, frange=(s, e), extendends=extendends) else: P = P * F[1] pv = (F >= s) & (F <= e) F = F[pv] P = P[pv] v = P / pref**2 return F, 10 * np.log10(v), 10 * np.log10(np.sum(v))
[docs] def psd2time( spec, fstart, fstop, *, ppc=10, df=None, winends=None, gettime=False, expand_method="interp", rng=None, ): r""" Generate a 'random' time domain signal given a PSD specification. Parameters ---------- spec : 2d ndarray or 2-element tuple/list If ndarray, it has two columns: ``[Freq, PSD]``. Otherwise, it must be a 2-element tuple or list, eg: ``(Freq, PSD)``. fstart : scalar Starting frequency in Hz fstop : scalar Stopping frequency in Hz ppc : scalar; optional Points per cycle at highest (`fstop`) frequency; if < 2, it is internally reset to 2. With `fstop`, determines the sample rate: ``sr = ppc * fstop``. df : scalar or None; optional Serves two purposes: it is the frequency step between sinusoids included in the time signal, and it also determines the duration of the signal. If None, it is set to ``fstart / 100`` in order to have 100 cycles at the lowest frequency. If input as a scalar value, it is taken as a hint and will be internally adjusted lower as needed; see Notes section. If routine gives poor results, try refining `df`. If `df` is greater than `fstart`, it is reset internally to `fstart` (in that case though, you'll only get 1 cycle at the lowest frequency). Total duration of the signal: ``T = 1 / df`` (see equations below for more details). .. note:: `df` can be used to indirectly specify the number of cycles desired at the lowest frequency (`fstart`). For example, if ``fstart=5.0`` and you want to have 500 cycles of the 5.0 Hz content, then use ``df=0.01``: ``fstart / df == 500``. winends : None or dictionary; optional If None, :func:`pyyeti.dsp.windowends` is not called. Otherwise, `winends` must be a dictionary of arguments that will be passed to :func:`pyyeti.dsp.windowends` (not including `signal`). gettime : bool; optional If True, a time vector is output. expand_method : str; optional Either 'interp' or 'rescale', referring to which function in this module will be used to expand input `spec` to all needed frequencies. Use 'interp' if `spec` is a specification with constant dB/octave slopes. Use 'rescale' if `spec` provides the PSD levels on a center-band scale. See :func:`interp` and :func:`rescale` for more information. rng : :class:`numpy.random.Generator` object or None; optional Random number generator. If None, a new generator is created via :func:`numpy.random.default_rng`. Uniform deviates are generated via :func:`rng.random`. Supplying your own `rng` can be handy for parallel applications, for example, when you need repeatability. For illustration, the following creates a PCG-64 DXSM generator and initializes it with a seed of 1:: from numpy.random import Generator, PCG64DXSM rng = Generator(PCG64DXSM(seed=1)) Returns ------- sig : 1d ndarray The time domain signal with properties set to match input PSD spectrum. Duration of signal: ``1 / df``. sr : scalar The sample rate of the signal (``ppc * fstop``) time : 1d ndarray; optional Time vector for the signal starting at zero with step of ``1.0 / sr``: ``time = np.arange(len(sig)) / sr``. Only returned if `gettime` is True. Notes ----- The following outlines the equations used in this routine. If :math:`df` is None: :math:`df = fstart / 100`. If :math:`df` is greater than :math:`fstart`, it is reset to :math:`fstart`: .. math:: df = \min(df, fstart) The total time of the signal :math:`T` is determined by the lowest frequency cycle (at frequency :math:`df`): .. math:: T = 1 / df The number of points needed is determined by the number of cycles at the highest frequency multiplied by the points/cycle :math:`ppc`: .. math:: N = \lceil {fstop \cdot ppc \cdot T} \rceil The frequency step :math:`df` is reset to match :math:`N` (because of the possible round-up in the last equation): .. math:: df = fstop \cdot ppc / N The final downward adjustment to :math:`df` is to make sure we hit the :math:`fstart` frequency exactly: .. math:: df = fstart / \lfloor fstart / df \rfloor The frequency vector is defined by: .. math:: f = [fstart, fstart+df, ..., \lceil fstop / df \rceil \cdot df] A sinusoid is to be defined at each of those frequencies, so we need to compute an amplitude and phase for each. The amplitude is determined by the magnitude of the PSD. Since a "power spectral density" is really mean-square spectral density, and since the mean-square value of a sinusoid is the amplitude squared over 2, the amplitude of each sinusoid is computed by: .. math:: amp(f) = \sqrt { 2 \cdot PSD(f) \cdot df } The phase is determined by using pseudo-random deviates from a uniform distribution: .. math:: phase(f) = U(0, 2\pi) The amplitude and phase are assembled together in a complex array so that an inverse-FFT will give a real time signal matching the desired mean-square profile defined by the input PSD. Raises ------ ValueError If more than one PSD specification is input. ValueError On invalid setting for `expand_method`. See also -------- :func:`interp`, :func:`rescale`, :func:`pyyeti.dsp.windowends` Examples -------- .. plot:: :context: close-figs >>> import numpy as np >>> from pyyeti import psd >>> spec = np.array([[20, .0768], ... [50, .48], ... [100, .48]]) >>> we = dict(portion=0.01) >>> seed = 1 >>> rng = np.random.default_rng(seed) >>> sig, sr = psd.psd2time( ... spec, 35, 70, ppc=10, df=.01, winends=we, rng=rng, ... ) >>> sr # the sample rate should be 70*10 = 700 700.0 Compare the resulting psd to the spec from 37 to 68: >>> import matplotlib.pyplot as plt >>> import scipy.signal as signal >>> f, p = signal.welch(sig, sr, nperseg=sr) >>> pv = np.logical_and(f >= 37, f <= 68) >>> fi = f[pv] >>> psdi = p[pv] >>> speci = psd.interp(spec, fi).ravel() >>> abs(speci - psdi).max() < .05 True >>> abs(np.trapezoid(psdi, fi) - np.trapezoid(speci, fi)) < .25 True >>> fig = plt.figure('Example', clear=True, ... layout='constrained') >>> a = plt.subplot(211) >>> line = plt.plot(np.arange(len(sig))/sr, sig) >>> plt.grid(True) >>> a = plt.subplot(212) >>> line = plt.loglog(spec[:, 0], spec[:, 1], label='spec') >>> line = plt.loglog(f, p, label='PSD of time signal') >>> leg = plt.legend(loc='best') >>> x = plt.xlim(20, 100) >>> y = plt.ylim(.05, 1) >>> plt.grid(True) >>> xticks = np.arange(20, 105, 10) >>> x = plt.xticks(xticks, xticks) >>> yticks = (.05, .1, .2, .5, 1) >>> y = plt.yticks(yticks, yticks) >>> v = plt.axvline(35, color='black', linestyle='--') >>> v = plt.axvline(70, color='black', linestyle='--') """ _freq, _psd, npsds = proc_psd_spec(spec) if npsds > 1: raise ValueError( "only a single PSD specification is currently allowed in " f"`psd2time`, but {npsds} were provided" ) if df is None: df = fstart / 100 if df > fstart: df = fstart if ppc < 2: ppc = 2 # compute parameters # 1 cycle of lowest frequency defines length of signal: T = 1 / df # number of seconds for lowest frequency cycle N = int(np.ceil(fstop * ppc * T)) # total number of pts df = fstop * ppc / N # adjust df to make sure fstart is an even multiple of df df = fstart / np.floor(fstart / df) sr = N * df # sr = N/T = N/(1/df) odd = N & 1 # define constants # freq = np.arange(fstart, fstop + df / 2, df) freq = np.arange(fstart, fstop + df, df) # 4/9/22 # generate amp(f) vector if expand_method == "interp": speclevel = interp(spec, freq).ravel() elif expand_method == "rescale": speclevel, *_ = rescale(_psd.ravel(), _freq, freq=freq) else: raise ValueError( '`expand_method` must be either "interp" or "rescale", ' f"not {expand_method!r}" ) amp = np.sqrt(2 * speclevel * df) m = (N + 1) // 2 if odd else N // 2 + 1 # build up amp to include zero frequency to fstart and from fstop # to fhighest: ntop = int(np.floor((fstart - df / 2) / df) + 1) nbot = m - ntop - len(amp) if nbot > 0: amp = np.hstack((np.zeros(ntop), amp, np.zeros(nbot))) else: amp = np.hstack((np.zeros(ntop), amp)) # generate F(t) # - use uniformly distributed random phase angle: if rng is None: rng = np.random.default_rng() phi = rng.random(m) * np.pi * 2 # force these terms to be pure cosine phi[0] = 0.0 if not odd: phi[m - 1] = 0 # coefficients: a = amp * np.cos(phi) b = -amp * np.sin(phi) # form matrix ready for ifft: if odd: a2 = a[1:m] / 2 b2 = b[1:m] / 2 r = N * np.hstack((a[0], a2, a2[::-1])) # real part i = N * np.hstack((0, b2, -b2[::-1])) # imag part else: a2 = a[1 : m - 1] / 2 b2 = b[1 : m - 1] / 2 r = N * np.hstack((a[0], a2, a[m - 1], a2[::-1])) # real part i = N * np.hstack((0, b2, 0, -b2[::-1])) # imag part F_time = np.fft.ifft(r + 1j * i) mxi = abs(F_time.imag).max() mxr = abs(F_time.real).max() if mxi > 1e-7 * mxr: # pragma: no cover # bug in the code if this ever happens warn( f"method failed accuracy test; (max imag)/(max real) = {mxi / mxr}", RuntimeWarning, ) F_time = F_time.real if winends is not None: F_time = dsp.windowends(F_time, **winends) if gettime: return F_time, sr, np.arange(N) / sr return F_time, sr
[docs] def psdmod(sig, sr, nperseg=None, timeslice=1.0, tsoverlap=0.5, getmap=False, **kwargs): """ Modified method for PSD estimation via FFT. Parameters ---------- sig : 1d array_like Time series of measurement values. sr : scalar Sample rate. nperseg : int, optional Length of each segment for the FFT. Defaults to ``int(sr / 5)`` for 5 Hz frequency step in PSD. Note: frequency step in Hz = ``sr/nperseg``. timeslice : scalar or string-integer If scalar, it is the length in seconds for each slice. If string, it contains the integer number of points for each slice. For example, if `sr` is 1000 samples/second, ``timeslice=0.75`` is equivalent to ``timeslice="750"``. tsoverlap : scalar in [0, 1) or string-integer If scalar, is the fraction of each time-slice to overlap. If string, it contains the integer number of points to overlap. For example, if `sr` is 1000 samples/second, ``tsoverlap=0.5`` and ``tsoverlap="500"`` each specify 50% overlap. getmap : bool, optional If True, get the PSD map output (the `Pmap` and `t` variables described below). *kwargs : optional Named arguments to pass to :func:`scipy.signal.welch`. Returns ------- f : 1d ndarray Array of sample frequencies. Pxx : 1d ndarray Power spectral density or power spectrum of `sig`. Pmap : 2d ndarray; optional The PSD map; each column is an output of :func:`scipy.signal.welch`. Rows correspond to frequency `f` and columns correspond to time `t`. Only output if `getmap` is True. t : 1d ndarray; optional The time vector for the columns of `Pmap`. Only output if `getmap` is True. Notes ----- This routine calls :func:`pyyeti.dsp.waterfall` for handling the timeslices and preparing the output and :func:`scipy.signal.welch` to process each time slice. So, the "modified" method is to use the PSD averaging (via welch) for each time slice but then take the peaks over all these averages. For a pure 'maximax' PSD, just set `timeslice` to ``nperseg/sr`` and `tsoverlap` to 0.5 (assuming 50% overlap is desired). Conversely, for a pure Welch periodogram, just set the `timeslice` equal to the entire signal (or just use :func:`scipy.signal.welch` of course). Usually the desired behavior for :func:`psdmod` is somewhere between these two extremes. Examples -------- .. plot:: :context: close-figs >>> import numpy as np >>> import matplotlib.pyplot as plt >>> from pyyeti import psd >>> from scipy import signal >>> TF = 30 # make a 30 second signal >>> spec = np.array([[20, 1], [50, 1]]) >>> sig, sr, t = psd.psd2time(spec, ppc=10, fstart=20, ... fstop=50, df=1/TF, ... winends=dict(portion=10), ... gettime=True) >>> f, p = signal.welch(sig, sr, nperseg=sr) >>> f2, p2 = psd.psdmod(sig, sr, nperseg=sr, timeslice=4, ... tsoverlap=0.5) >>> f3, p3 = psd.psdmod(sig, sr, nperseg=sr) >>> spec = spec.T >>> fig = plt.figure('Example', clear=True, ... layout='constrained') >>> _ = plt.subplot(211) >>> _ = plt.plot(t, sig) >>> _ = plt.title(r'Input Signal - Specification Level = ' ... '1.0 $g^{2}$/Hz') >>> _ = plt.xlabel('Time (sec)') >>> _ = plt.ylabel('Acceleration (g)') >>> _ = plt.subplot(212) >>> _ = plt.plot(*spec, 'k-', lw=1.5, label='Spec') >>> _ = plt.plot(f, p, label='Welch PSD') >>> _ = plt.plot(f2, p2, label='PSDmod') >>> _ = plt.plot(f3, p3, label='Maximax') >>> _ = plt.legend(loc='best') >>> _ = plt.xlim(20, 50) >>> _ = plt.title('PSD') >>> _ = plt.xlabel('Frequency (Hz)') >>> _ = plt.ylabel(r'PSD ($g^2$/Hz)') """ if nperseg is None: nperseg = int(sr / 5) ntimeslice = dsp._proc_timeslice(timeslice, sr, sig.size)[0] if nperseg > ntimeslice: raise ValueError( "`nperseg` too big for current `timeslice` setting;" " either decrease `nperseg` or increase `timeslice`" ) welch_inputs = dict(fs=sr, nperseg=nperseg, **kwargs) pmap, t, f = dsp.waterfall( sig, sr, timeslice, tsoverlap, signal.welch, which=1, freq=0, kwargs=welch_inputs, ) p = pmap.max(axis=1) if getmap: return f, p, pmap, t return f, p