Source code for pyyeti.fdepsd

# -*- coding: utf-8 -*-
"""
Tools for calculating the fatigue damage equivalent PSD. Adapted and
enhanced from the CAM versions.
"""

from types import SimpleNamespace
import itertools as it
import multiprocessing as mp
from joblib import Parallel, delayed
import numpy as np
import scipy.signal as signal
import pandas as pd
from pyyeti import cyclecount, srs, dsp


WN_ = None
SIG_ = None
ASV_ = None
BinAmps_ = None
Count_ = None


def _to_np_array(sh_arr):
    return np.frombuffer(sh_arr[0]).reshape(sh_arr[1])


def _mk_par_globals(wn, sig, asv, binamps, count):
    global WN_, SIG_, ASV_, BinAmps_, Count_
    WN_ = _to_np_array(wn)
    SIG_ = _to_np_array(sig)
    ASV_ = _to_np_array(asv)
    BinAmps_ = _to_np_array(binamps)
    Count_ = _to_np_array(count)


def _dofde(args):
    """Utility routine for parallel processing"""
    (j, (coeffunc, Q, dT, verbose)) = args
    if verbose:
        print(f"Processing frequency {WN_[j] / 2 / np.pi:8.2f} Hz", end="\r")
    b, a = coeffunc(Q, dT, WN_[j])
    resphist = signal.lfilter(b, a, SIG_)
    ASV_[1, j] = abs(resphist).max()
    ASV_[2, j] = np.var(resphist, ddof=1)

    # use rainflow to count cycles:
    ind = cyclecount.findap(resphist)
    rf = cyclecount.rainflow(resphist[ind])

    amp = rf["amp"]
    count = rf["count"]
    ASV_[0, j] = amp.max()
    BinAmps_[j] *= ASV_[0, j]

    # cumulative bin count:
    for jj in range(BinAmps_.shape[1]):
        pv = amp >= BinAmps_[j, jj]
        Count_[j, jj] = np.sum(count[pv])


def _dofde_joblib(j, coeffunc, Q, dT, wn, sig, nbins, binfracs, verbose):
    """Utility routine for parallel processing"""
    if verbose:
        print(f"Processing frequency {wn[j] / 2 / np.pi:8.2f} Hz")
    b, a = coeffunc(Q, dT, wn[j])
    resphist = signal.lfilter(b, a, sig)
    srsmax = abs(resphist).max()
    var = np.var(resphist, ddof=1)

    # use rainflow to count cycles:
    ind = cyclecount.findap(resphist)
    rf = cyclecount.rainflow(resphist[ind])

    amp = rf["amp"]
    count = rf["count"]
    amax = amp.max()
    binamps = binfracs * amax

    # cumulative bin count:
    cumcounts = np.zeros(nbins)
    for jj in range(nbins):
        pv = amp >= binamps[jj]
        cumcounts[jj] = np.sum(count[pv])
    return amax, srsmax, var, cumcounts


[docs] def fdepsd( sig, sr, freq, Q, *, resp="absacce", detrend=True, winends="auto", hpfilter=5.0, nbins=300, T0=60.0, rolloff="lanczos", ppc=12, parallel="auto", maxcpu=14, verbose=False, ): r""" Compute a fatigue damage equivalent PSD from a signal. Parameters ---------- sig : 1d array_like Base acceleration signal. sr : scalar Sample rate. freq : array_like Frequency vector in Hz. This defines the single DOF (SDOF) systems to use. Q : scalar > 0.5 Dynamic amplification factor :math:`Q = 1/(2\zeta)` where :math:`\zeta` is the fraction of critical damping. resp : string; optional The type of response to base the damage calculations on: ========= ======================================= `resp` Damage is based on ========= ======================================= 'absacce' absolute acceleration [#fde1]_ 'pvelo' pseudo velocity [#fde2]_ ========= ======================================= detrend : bool; optional If True, `sig` is detrended via :func:`scipy.signal.detrend`. Option is ignored and treated as True if at least one of the `winends` or `hpfilter` options are used. Detrending is done before either of those options. winends : None or 'auto' or dictionary; optional If None, :func:`pyyeti.dsp.windowends` is not called. If 'auto', :func:`pyyeti.dsp.windowends` is called to apply a 0.25 second window or a 50 point window (whichever is smaller) to the front. Otherwise, `winends` must be a dictionary of arguments that will be passed to :func:`pyyeti.dsp.windowends` (not including `signal`). The signal is detrended prior to calling :func:`pyyeti.dsp.windowends`. hpfilter : scalar or None; optional High pass filter frequency; if None, no filtering is done. If filtering is done, it is after detrending and after any `winends` action was taken. The signal is filtered via :func:`scipy.signal.lfilter` using a 3rd order butterworth filter (:func:`scipy.signal.butter`). nbins : integer; optional The number of amplitude levels at which to count cycles T0 : scalar; optional Specifies test duration in seconds rolloff : string or function or None; optional Indicate which method to use to account for the SRS roll off when the minimum `ppc` value is not met. Either 'fft' or 'lanczos' seem the best. If a string, it must be one of these values: =========== ========================================== `rolloff` Notes =========== ========================================== 'fft' Use FFT to upsample data as needed. See :func:`scipy.signal.resample`. 'lanczos' Use Lanczos resampling to upsample as needed. See :func:`pyyeti.dsp.resample`. 'prefilter' Apply a high freq. gain filter to account for the SRS roll-off. See :func:`pyyeti.srs.preroll` for more information. This option ignores `ppc`. 'linear' Use linear interpolation to increase the points per cycle (this is not recommended; method; it's only here as a test case). 'none' Don't do anything to enforce the minimum `ppc`. Note error bounds listed above. None Same as 'none'. =========== ========================================== If a function, the call signature is: ``sig_new, sr_new = rollfunc(sig, sr, ppc, frq)``. Here, `sig` is 1d, len(time). The last three inputs are scalars. For example, the 'fft' function is (trimmed of documentation):: def fftroll(sig, sr, ppc, frq): N = sig.shape[0] if N > 1: curppc = sr/frq factor = int(np.ceil(ppc/curppc)) sig = signal.resample(sig, factor*N, axis=0) sr *= factor return sig, sr ppc : scalar; optional Specifies the minimum points per cycle for SRS calculations. See also `rolloff`. ====== ================================== `ppc` Maximum error at highest frequency ====== ================================== 3 81.61% 4 48.23% 5 31.58% 10 8.14% (minimum recommended `ppc`) 12 5.67% 15 3.64% 20 2.05% 25 1.31% 50 0.33% ====== ================================== parallel : string; optional Controls the parallelization of the calculations: ========== ============================================ `parallel` Notes ========== ============================================ 'auto' Routine determines whether or not to run parallel. 'no' Do not use parallel processing. 'yes' Use parallel processing. Beware, depending on the particular problem, using parallel processing can be slower than not using it. On Windows, be sure the :func:`fdepsd` call is contained within: ``if __name__ == "__main__":`` 'joblib' Like 'yes' except use ``joblib`` for parallelization (see :class:`joblib.Parallel`). ========== ============================================ maxcpu : integer or None; optional Specifies maximum number of CPUs to use. If None, it is internally set to 4/5 of available CPUs (as determined from :func:`multiprocessing.cpu_count`). verbose : bool or integer; optional If True (or > 0), routine will print some status information. If using ``joblib`` for parallelization, this value is passed as an integer to :class:`joblib.Parallel` as well. Returns ------- A SimpleNamespace with the members: freq : 1d ndarray Same as input `freq`. psd : pandas DataFrame; ``len(freq) x 5`` The amplitude and damage based PSDs. The index is `freq` and the five columns are: [G1, G2, G4, G8, G12] =========== =============================================== Name Description =========== =============================================== G1 The "G1" PSD (Mile's or similar equivalent from SRS); uses the maximum cycle amplitude instead of the raw SRS peak for each frequency. G1 is not a damage-based PSD. G2 The "G2" PSD of reference [#fde1]_; G2 >= G1 by bounding lower amplitude counts down to 1/3 of the maximum cycle amplitude. G2 is not a damage-based PSD. G4, G8, G12 The damage-based PSDs with fatigue exponents of 4, 8, and 12 =========== =============================================== peakamp : pandas DataFrame; ``len(freq) x 5`` The peak response of SDOFs (single DOF oscillators) using each PSD as a base input. The index and the five columns are the same as for `psd`. The peaks are computed from the Mile's equation (or similar if using ``resp='pvelo'``). The peak factor used is ``sqrt(2*log(f*T0))``. Note that the first column is, by definition, the maximum cycle amplitude for each SDOF from the rainflow count (G1 was calculated from this). Typically, this should be very close to the raw SRS peaks contained in the `srs` output but a little lower since SRS just grabs peaks without consideration of the opposite peak. binamps : pandas DataFrame; ``len(freq) x nbins`` A DataFrame of linearly spaced amplitude values defining the cycle counting bins. The index is `freq` and the columns are integers 0 to ``nbins - 1``. The values in each row (for a specific frequency SDOF), range from 0.0 up to ``peakamp.loc[freq, "G1"] * (nbins - 1) / nbins``. In other words, each value is the left-side amplitude boundary for that bin. The next column for this matrix would be ``peakamp.loc[:, "G1"]``. count : pandas DataFrame; ``len(freq) x nbins`` Summary matrix of the rainflow cycle counts. Size corresponds with `binamps` and the count is cumulative; that is, the count in each entry includes cycles at the `binamps` amplitude and above. Therefore, first column has total cycles for the SDOF. bincount : pandas DataFrame; ``len(freq) x nbins`` Non-cumulative version of `count`. In other words, the values are the number of cycles in the bin, left-side inclusive. The last bin includes the count of maximum amplitude cycles. di_sig : pandas DataFrame; ``len(freq) x 3`` Damage indicators computed from SDOF responses to the `sig` signal. Index is `freq` and columns are ['b=4', 'b=8', 'b=12']. The value for each frequency is the sum of the cycle count for a bin times its amplitude to the b power. That is, for the j-th frequency, the indicator is:: amps = binamps.loc[freq[j]] counts = bincount.loc[freq[j]] di = (amps ** b) @ counts # dot product of two vectors Note that this definition is slightly different than equation 14 from [#fde1]_ (would have to divide by the frequency), but the same as equation 10 of [#fde2]_ without the constant. di_test_part : pandas DataFrame; ``len(freq) x 3`` Test damage indicator without including the variance factor (see note). Same size as `di_sig`. Each value depends only on the frequency, `T0`, and the fatigue exponent ``b``. The ratio of a signal damage indicator to the corresponding partial test damage indicator is equal to the variance of the single DOF response to the test raised to the ``b / 2`` power:: var_test ** (b / 2) = di_sig / di_test_part .. note:: If the variance factor (`var_test`) were included, then the test damage indicator would be the same as `di_sig`. This relationship is the basis of determining the amplitude of the test signal. var_test : pandas DataFrame; ``len(freq) x 3`` The required SDOF test response variances (see `di_test_part` description). Same size as `di_sig`. The amplitude of the G4, G8, and G12 columns of `psd` are computed from Mile's equation (or similar) and `var_test`. sig : 1d ndarray The version of the input `sig` that is fed into the fatique damage algorithm. This would be after any filtering, windowing, and upsampling. sr : scalar The sample rate of the output `sig`. srs : pandas Series; length = ``len(freq)`` The raw SRS peaks version of the first column in `amp`. See `amp`. Index is `freq`. var : pandas Series; length = ``len(freq)`` Vector of the SDOF response variances. Index is `freq`. parallel : string Either 'yes' or 'no' depending on whether parallel processing was used or not. ncpu : integer Specifies the number of CPUs used. resp : string Same as the input `resp`. use_joblib : bool True if ``joblib`` was used for parallelization. Notes ----- Steps (see [#fde1]_, [#fde2]_): 1. Resample signal to higher rate if highest frequency would have less than `ppc` points-per-cycle. Method of increasing the sample rate is controlled by the `rolloff` input. 2. For each frequency: a. Compute the SDOF base-drive response b. Calculate `srs` and `var` outputs c. Use :func:`pyyeti.cyclecount.findap` to find cycle peaks d. Use :func:`pyyeti.cyclecount.rainflow` to count cycles and amplitudes e. Put counts into amplitude bins 3. Calculate `g1` based on cycle amplitudes from maximum amplitude (step 2d) and Mile's (or similar) equation. 4. Calculate `g2` to bound `g1` & lower amplitude cycles with high counts. Ignore amplitudes < ``Amax/3``. 5. Calculate damage indicators from data with b = 4, 8, 12 where b is the fatigue exponent. 6. By equating the theoretical damage from a `T0` second random vibration test to the damage from the input signal (step 5), solve for the required test response variances for b = 4, 8, 12. 7. Solve for `g4`, `g8`, `g12` from the results of step 6 using the Mile's equation (or similar); equations are shown below. No checks are done regarding the suitability of this method for the input data. It is recommended to read the references [#fde1]_ [#fde2]_ and do those checks (such as plotting Count or Time vs. Amp**2 and comparing to theoretical). The Mile's equation (or similar) is used in this methodology to relate acceleration PSDs to peak responses. If `resp` is 'absacce', it is the Mile's equation: .. math:: \sigma_{absacce}(f) = \sqrt{\frac{\pi}{2} \cdot f \cdot Q \cdot PSD(f)} If `resp` is 'pvelo', the similar equation is: .. math:: \sigma_{pvelo}(f) = \sqrt{\frac{Q \cdot PSD(f)}{8 \pi f}} Those two equations assume a flat acceleration PSD. Therefore, it is recommended to compare SDOF responses from flight data (SRS) to SDOF VRS responses from the developed specification (see :func:`pyyeti.srs.vrs` to compute the VRS response in the absolute-acceleration case). This is to check for conservatism. Instead of using 3 for peak factor (for 3-rms or 3-sigma), use :math:`\sqrt{2 \ln(f \cdot T_0)}` for the peak factor (derived below). Also, enveloping multiple specifications from multiple Q's is worth considering. Note that this analysis can be time consuming; the time is proportional to the number of frequencies multiplied by the number of time steps in the signal. The derivation of the peak factor is as follows. For the special case of narrow band noise where the instantaneous amplitudes follow the Gaussian distribution, the resulting probability density function for the peak amplitudes follow the Rayleigh distribution [#fde3]_. The single DOF response to Gaussian input is reasonably estimated as Gaussian narrow band. Let this response have the standard deviation :math:`\sigma`. From the Rayleigh distribution, the probability of a peak being greater than :math:`A` is: .. math:: Prob[peak > A] = e ^ {\frac{-A^2}{2 \sigma^2}} To estimate the maximum peak for the response of a single DOF system with frequency :math:`f`, find the amplitude that would be expected to occur once within the allotted time (:math:`T_0`). That is, set the product of the probability of a cycle amplitude being greater than :math:`A` and the number of cycles equal to 1.0, and then solve for :math:`A`. The number of cycles of :math:`f` Hz is :math:`N = f \cdot T_0`. Therefore: .. math:: \begin{aligned} Prob[peak > A] \cdot N &= 1.0 e ^ {\frac{-A^2}{2 \sigma^2}} f \cdot T_0 &= 1.0 \frac{-A^2}{2 \sigma^2} &= \ln(1.0) - \ln(f \cdot T_0) \frac{A^2}{2 \sigma^2} &= \ln(f \cdot T_0) A &= \sqrt{2 \ln(f \cdot T_0)} \sigma \end{aligned} .. note:: In addition to the example shown below, this routine is demonstrated in the pyYeti :ref:`tutorial`: :doc:`/tutorials/fatigue`. There is also a link to the source Jupyter notebook at the top of the tutorial. References ---------- .. [#fde1] "Analysis of Nonstationary Vibroacoustic Flight Data Using a Damage-Potential Basis"; S. J. DiMaggio, B. H. Sako, S. Rubin; Journal of Spacecraft and Rockets, Vol 40, No. 5, September-October 2003. .. [#fde2] "Implementing the Fatigue Damage Spectrum and Fatigue Damage Equivalent Vibration Testing"; Scot I. McNeill; 79th Shock and Vibration Symposium, October 26 – 30, 2008. .. [#fde3] Bendat, Julius S., "Probability Functions for Random Responses: Prediction of Peaks, Fatigue Damage, and Catastrophic Failures", NASA Contractor Report 33 (NASA CR-33), 1964. See also -------- :func:`scipy.signal.welch`, :func:`pyyeti.psd.psdmod`, :func:`pyyeti.cyclecount.rainflow`, :func:`pyyeti.srs.srs`. Examples -------- Generate 60 second random signal to a pre-defined spec level, compute the PSD several different ways and compare. Since it's 60 seconds, the damage-based PSDs should be fairly close to the input spec level. The damage-based PSDs will be calculated with several Qs and enveloped. In this example, G2 envelopes G1, G4, G8, G12. This is not always the case. For example, try TF=120; the damage-based curves go up in order to fit equal damage in 60s. One Count vs. Amp**2 plot is done for illustration. .. plot:: :context: close-figs >>> import numpy as np >>> import matplotlib.pyplot as plt >>> from pyyeti import psd, fdepsd >>> import scipy.signal as signal >>> >>> TF = 60 # make a 60 second signal >>> spec = np.array([[20, 1], [50, 1]]) >>> sig, sr, t = psd.psd2time( ... spec, ppc=10, fstart=20, fstop=50, df=1 / TF, ... winends=dict(portion=10), gettime=True) >>> >>> fig = plt.figure('Example', figsize=[9, 6], clear=True, ... layout='constrained') >>> _ = plt.subplot(211) >>> _ = plt.plot(t, sig) >>> _ = plt.title(r'Input Signal - Specification Level = ' ... '1.0 $g^{2}$/Hz') >>> _ = plt.xlabel('Time (sec)') >>> _ = plt.ylabel('Acceleration (g)') >>> ax = plt.subplot(212) >>> f, p = signal.welch(sig, sr, nperseg=sr) >>> f2, p2 = psd.psdmod(sig, sr, nperseg=sr, timeslice=4, ... tsoverlap=0.5) Calculate G1, G2, and the damage potential PSDs: >>> psd_ = 0 >>> freq = np.arange(20., 50.1) >>> for q in (10, 25, 50): ... fde = fdepsd.fdepsd(sig, sr, freq, q) ... psd_ = np.fmax(psd_, fde.psd) >>> # >>> _ = plt.plot(*spec.T, 'k--', lw=2.5, label='Spec') >>> _ = plt.plot(f, p, label='Welch PSD') >>> _ = plt.plot(f2, p2, label='PSDmod') >>> >>> # For plot, rename columns in DataFrame to include "Env": >>> psd_ = (psd_ ... .rename(columns={i: i + ' Env' ... for i in psd_.columns})) >>> _ = psd_.plot.line(ax=ax) >>> _ = plt.xlim(20, 50) >>> _ = plt.title('PSD Comparison') >>> _ = plt.xlabel('Freq (Hz)') >>> _ = plt.ylabel(r'PSD ($g^{2}$/Hz)') >>> _ = plt.legend(loc='upper left', ... bbox_to_anchor=(1.02, 1.), ... borderaxespad=0.) .. plot:: :context: close-figs Compare to theoretical bin counts @ 30 Hz: >>> _ = plt.figure('Example 2', clear=True, ... layout='constrained') >>> Frq = freq[np.searchsorted(freq, 30)] >>> _ = plt.semilogy(fde.binamps.loc[Frq]**2, ... fde.count.loc[Frq], ... label='Data') >>> # use flight time here (TF), not test time (T0) >>> Amax2 = 2 * fde.var.loc[Frq] * np.log(Frq * TF) >>> _ = plt.plot([0, Amax2], [Frq * TF, 1], label='Theory') >>> y1 = fde.count.loc[Frq, 0] >>> peakamp = fde.peakamp.loc[Frq] >>> for j, lbl in enumerate(fde.peakamp.columns): ... _ = plt.plot( ... [0, peakamp.iloc[j]**2], [y1, 1], label=lbl ... ) >>> _ = plt.title('Bin Count Check for Q=50, Freq=30 Hz') >>> _ = plt.xlabel(r'$Amp^2$') >>> _ = plt.ylabel('Count') >>> _ = plt.legend(loc='best') """ sig, freq = np.atleast_1d(sig, freq) if sig.ndim > 1 or freq.ndim > 1: raise ValueError("`sig` and `freq` must both be 1d arrays") if resp not in ("absacce", "pvelo"): raise ValueError("`resp` must be 'absacce' or 'pvelo'") (coeffunc, methfunc, rollfunc, ptr) = srs._process_inputs( resp, "abs", rolloff, "primary" ) if winends == "auto": winends = {"portion": min(int(0.25 * sr), 50, len(sig))} if detrend or winends is not None or hpfilter is not None: sig = signal.detrend(sig) if winends is not None: sig = dsp.windowends(sig, **winends) if hpfilter is not None: if verbose: print(f"High pass filtering @ {hpfilter} Hz") b, a = signal.butter(3, hpfilter / (sr / 2), "high") sig = signal.lfilter(b, a, sig) mxfrq = freq.max() curppc = sr / mxfrq if rolloff == "prefilter": sig, sr = rollfunc(sig, sr, ppc, mxfrq) rollfunc = None if curppc < ppc and rollfunc: if verbose: print( f"Using {rolloff} method to increase sample rate (have " f"only {curppc} pts/cycle @ {mxfrq} Hz" ) sig, sr = rollfunc(sig, sr, ppc, mxfrq) ppc = sr / mxfrq if verbose: print(f"After interpolation, have {ppc} pts/cycle @ {mxfrq} Hz\n") LF = freq.size dT = 1 / sr pi = np.pi Wn = 2 * pi * freq if parallel == "joblib": use_joblib = True parallel = "yes" binfracs = np.arange(nbins, dtype=float) / nbins else: use_joblib = False parallel, ncpu = srs._process_parallel( parallel, LF, sig.size, maxcpu, getresp=False ) # allocate RAM: if parallel == "yes" and use_joblib: func = _dofde_joblib Amax, SRSmax, Var, Count = ( np.zeros(LF), np.zeros(LF), np.zeros(LF), np.zeros((LF, nbins)), ) results = Parallel(n_jobs=ncpu, verbose=int(verbose))( delayed(func)(j, coeffunc, Q, dT, Wn, sig, nbins, binfracs, verbose) for j in range(LF) ) for j, (amax, srsmax, var, cumcounts) in enumerate(results): Amax[j], SRSmax[j], Var[j], Count[j] = amax, srsmax, var, cumcounts BinAmps = np.outer(Amax, binfracs) elif parallel == "yes": # global shared vars will be: WN, SIG, ASV, BinAmps, Count WN = (srs.copyToSharedArray(Wn), Wn.shape) SIG = (srs.copyToSharedArray(sig), sig.shape) ASV = (srs.createSharedArray((3, LF)), (3, LF)) BinAmps = (srs.createSharedArray((LF, nbins)), (LF, nbins)) a = _to_np_array(BinAmps) a += np.arange(nbins, dtype=float) / nbins Count = (srs.createSharedArray((LF, nbins)), (LF, nbins)) args = (coeffunc, Q, dT, verbose) gvars = (WN, SIG, ASV, BinAmps, Count) func = _dofde with mp.Pool( processes=ncpu, initializer=_mk_par_globals, initargs=gvars ) as pool: for _ in pool.imap_unordered(func, zip(range(LF), it.repeat(args, LF))): pass ASV = _to_np_array(ASV) Amax = ASV[0] SRSmax = ASV[1] Var = ASV[2] Count = _to_np_array(Count) BinAmps = a else: Amax = np.zeros(LF) SRSmax = np.zeros(LF) Var = np.zeros(LF) BinAmps = np.zeros((LF, nbins)) BinAmps += np.arange(nbins, dtype=float) / nbins Count = np.zeros((LF, nbins)) # loop over frequencies, calculating responses & counting # cycles for j, wn in enumerate(Wn): if verbose: print(f"Processing frequency {wn / 2 / pi:8.2f} Hz", end="\r") b, a = coeffunc(Q, dT, wn) resphist = signal.lfilter(b, a, sig) SRSmax[j] = abs(resphist).max() Var[j] = np.var(resphist, ddof=1) # use rainflow to count cycles: ind = cyclecount.findap(resphist) rf = cyclecount.rainflow(resphist[ind]) amp = rf["amp"] count = rf["count"] Amax[j] = amp.max() BinAmps[j] *= Amax[j] # cumulative bin count: for jj in range(nbins): pv = amp >= BinAmps[j, jj] Count[j, jj] = np.sum(count[pv]) if verbose: print() print("Computing outputs G1, G2, etc.") # calculate non-cumulative counts per bin: BinCount = np.hstack((Count[:, :-1] - Count[:, 1:], Count[:, -1:])) # for calculating G2: G2max = Amax**2 for j in range(LF): pv = BinAmps[j] >= Amax[j] / 3 # ignore small amp cycles if np.any(pv): x = BinAmps[j, pv] ** 2 x2 = G2max[j] y = np.log(Count[j, pv]) y1 = np.log(Count[j, 0]) g1y = np.interp(x, [0, x2], [y1, 0]) tantheta = (y - g1y) / x k = np.argmax(tantheta) if tantheta[k] > 0: # g2 line is higher than g1 line, so find BinAmps**2 # where log(count) = 0; ie, solve for x-intercept in # y = m x + b; (x, y) pts are: (0, y1), (x[k], y[k]): G2max[j] = x[k] * y1 / (y1 - y[k]) # calculate flight-damage indicators for b = 4, 8 and 12: b4 = 4 b8 = 8 b12 = 12 Df4 = np.zeros(LF) Df8 = np.zeros(LF) Df12 = np.zeros(LF) for j in range(LF): Df4[j] = (BinAmps[j] ** b4).dot(BinCount[j]) Df8[j] = (BinAmps[j] ** b8).dot(BinCount[j]) Df12[j] = (BinAmps[j] ** b12).dot(BinCount[j]) N0 = freq * T0 lnN0 = np.log(N0) if resp == "absacce": G1 = Amax**2 / (Q * pi * freq * lnN0) G2 = G2max / (Q * pi * freq * lnN0) # calculate test-damage indicators for b = 4, 8 and 12: Abar = 2 * lnN0 Abar2 = Abar**2 Dt4 = N0 * 8 - (Abar2 + 4 * Abar + 8) sig2_4 = np.sqrt(Df4 / Dt4) G4 = sig2_4 / ((Q * pi / 2) * freq) Abar3 = Abar2 * Abar Abar4 = Abar2 * Abar2 Dt8 = N0 * 384 - (Abar4 + 8 * Abar3 + 48 * Abar2 + 192 * Abar + 384) sig2_8 = (Df8 / Dt8) ** (1 / 4) G8 = sig2_8 / ((Q * pi / 2) * freq) Abar5 = Abar4 * Abar Abar6 = Abar4 * Abar2 Dt12 = N0 * 46080 - ( Abar6 + 12 * Abar5 + 120 * Abar4 + 960 * Abar3 + 5760 * Abar2 + 23040 * Abar + 46080 ) sig2_12 = (Df12 / Dt12) ** (1 / 6) G12 = sig2_12 / ((Q * pi / 2) * freq) Gmax = np.sqrt(np.vstack((G4, G8, G12)) * (Q * pi * freq * lnN0)) else: G1 = (Amax**2 * 4 * pi * freq) / (Q * lnN0) G2 = (G2max * 4 * pi * freq) / (Q * lnN0) Dt4 = 2 * N0 sig2_4 = np.sqrt(Df4 / Dt4) G4 = sig2_4 * ((4 * pi / Q) * freq) Dt8 = 24 * N0 sig2_8 = (Df8 / Dt8) ** (1 / 4) G8 = sig2_8 * ((4 * pi / Q) * freq) Dt12 = 720 * N0 sig2_12 = (Df12 / Dt12) ** (1 / 6) G12 = sig2_12 * ((4 * pi / Q) * freq) Gmax = np.sqrt(np.vstack((G4, G8, G12)) * (Q * lnN0) / (4 * pi * freq)) # for output, scale the damage indicators: Dt4 *= 4 # 2 ** (b/2) Dt8 *= 16 Dt12 *= 64 # assemble outputs: columns = ["G1", "G2", "G4", "G8", "G12"] lcls = locals() dct = {k: lcls[k] for k in columns} Gpsd = pd.DataFrame(dct, columns=columns, index=freq) Gpsd.index.name = "Frequency" index = Gpsd.index G2max = np.sqrt(G2max) Gmax = pd.DataFrame(np.vstack((Amax, G2max, Gmax)).T, columns=columns, index=index) BinAmps = pd.DataFrame(BinAmps, index=index) Count = pd.DataFrame(Count, index=index) BinCount = pd.DataFrame(BinCount, index=index) Var = pd.Series(Var, index=index) SRSmax = pd.Series(SRSmax, index=index) di_sig = pd.DataFrame( np.column_stack((Df4, Df8, Df12)), columns=["b=4", "b=8", "b=12"], index=index ) di_test = pd.DataFrame( np.column_stack((Dt4, Dt8, Dt12)), columns=["b=4", "b=8", "b=12"], index=index ) var_test = pd.DataFrame( np.column_stack((sig2_4, sig2_8, sig2_12)), columns=["b=4", "b=8", "b=12"], index=index, ) return SimpleNamespace( freq=freq, psd=Gpsd, peakamp=Gmax, binamps=BinAmps, count=Count, bincount=BinCount, var=Var, srs=SRSmax, parallel=parallel, ncpu=ncpu, di_sig=di_sig, di_test=di_test, var_test=var_test, resp=resp, sig=sig, sr=sr, use_joblib=use_joblib, )