Source code for pyyeti.rainflow.py_rain

# -*- coding: utf-8 -*-
"""Plain Python version of the rainflow algorithm. Numba
(``numba.jit(nopython=True)``) is used if available to achieve speeds
comparable to the compiled-c version.
"""
import numpy as np


[docs] def rainflow(peaks, getoffsets=False): """ Rainflow cycle counting in plain Python (slow). Parameters ---------- peaks : 1d array-like Vector of alternating peaks (as returned by :func:`pyyeti.cyclecount.findap`, for example) getoffsets : bool; optional If True, the tuple ``(rf, os)`` is returned; otherwise, only `rf` is returned. Returns ------- rf : 2d ndarray n x 3 matrix with the rainflow cycle count information ``[amp, mean, count]``: - amp is the cycle amplitude (half the peak-to-peak range) - mean is mean of the cycle - count is either 0.5 or 1.0 depending on whether it's half or full cycle os : 2d ndarray; optional n x 2 matrix of cycle offsets ``[start, stop]``. Only returned if `getoffsets` is True. The start and stop values are: - start is the offset into `peaks` for start of cycle - stop is the offset into `peaks` for end of cycle Notes ----- This algorithm is derived from reference [#rain2]_. The compiled C version is preferred over this one and is very fast (the logic is the same). References ---------- .. [#rain2] "Standard Practices for Cycle Counting in Fatigue Analysis", ASTM E 1049 - 85 (Reapproved 2005). Examples -------- Run the example from the ASTM paper: >>> from pyyeti.rainflow.py_rain import rainflow >>> rainflow([-2, 1, -3, 5, -1, 3, -4, 4, -2]) array([[ 1.5, -0.5, 0.5], [ 2. , -1. , 0.5], [ 2. , 1. , 1. ], [ 4. , 1. , 0.5], [ 4.5, 0.5, 0.5], [ 4. , 0. , 0.5], [ 3. , 1. , 0.5]]) With offsets: >>> rf, os = rainflow([-2, 1, -3, 5, -1, 3, -4, 4, -2], ... getoffsets=True) >>> rf array([[ 1.5, -0.5, 0.5], [ 2. , -1. , 0.5], [ 2. , 1. , 1. ], [ 4. , 1. , 0.5], [ 4.5, 0.5, 0.5], [ 4. , 0. , 0.5], [ 3. , 1. , 0.5]]) >>> os # doctest: +ELLIPSIS array([[0, 1], [1, 2], [4, 5], [2, 3], [3, 6], [6, 7], [7, 8]]...) """ peaks = np.atleast_1d(peaks) L = peaks.size if peaks.ndim == 1 else 0 if L < 2: raise ValueError("`peaks` must be a real vector with length >= 2") if getoffsets: return _rainflow2(peaks, L) return _rainflow1(peaks, L)
def _rainflow1(peaks, L): # not getting offsets: pts = np.empty(L) rf = np.empty((L - 1, 3)) j = -1 fullcyclesp1 = 1 # full cycles plus 1 n = -1 for k in range(L): # /* step 1 from [1]: */ j += 1 pts[j] = peaks[k] # /* step 2 from [1]: */ while j > 1: # /* step 3 from [1]: */ Y = abs(pts[j - 2] - pts[j - 1]) X = abs(pts[j - 1] - pts[j]) if X < Y: break if j == 2: # /* step 5 from [1]: */ # /* [count Y as half cycle] */ n += 1 rf[n, 0] = Y / 2 rf[n, 1] = (pts[0] + pts[1]) / 2 rf[n, 2] = 0.5 pts[0] = pts[1] # /* discard j-2 pt */ pts[1] = pts[2] j = 1 else: # /* step 4 from [1]: */ # /* [count Y as full cycle] */ fullcyclesp1 += 1 n += 1 rf[n, 0] = Y / 2 rf[n, 1] = (pts[j - 2] + pts[j - 1]) / 2 rf[n, 2] = 1.0 pts[j - 2] = pts[j] # /* discard j-2, j-1 pts */ j -= 2 # /* step 6 from [1]: */ # /* [count all ranges in pts as half cycles] */ A = pts[0] for k in range(j): B = pts[k + 1] n += 1 rf[n, 0] = abs(A - B) / 2 rf[n, 1] = (A + B) / 2 rf[n, 2] = 0.5 A = B return rf[: L - fullcyclesp1] def _rainflow2(peaks, L): """Utility routine for :func:`rainflow`; returns (rf, os).""" pts = np.empty(L) rf = np.empty((L - 1, 3)) j = -1 fullcyclesp1 = 1 # full cycles plus 1 n = -1 cycle_index = np.empty(L, np.int64) os = np.empty((L - 1, 2), np.int64) for k in range(L): # /* step 1 from [1]: */ j += 1 pts[j] = peaks[k] cycle_index[j] = k # /* step 2 from [1]: */ while j > 1: # /* step 3 from [1]: */ Y = abs(pts[j - 2] - pts[j - 1]) X = abs(pts[j - 1] - pts[j]) if X < Y: break if j == 2: # /* step 5 from [1]: */ # /* [count Y as half cycle] */ n += 1 rf[n, 0] = Y / 2 rf[n, 1] = (pts[0] + pts[1]) / 2 rf[n, 2] = 0.5 os[n, 0] = cycle_index[0] os[n, 1] = cycle_index[1] pts[0] = pts[1] # /* discard j-2 pt */ pts[1] = pts[2] cycle_index[0] = cycle_index[1] cycle_index[1] = cycle_index[2] j = 1 else: # /* step 4 from [1]: */ # /* [count Y as full cycle] */ fullcyclesp1 += 1 n += 1 rf[n, 0] = Y / 2 rf[n, 1] = (pts[j - 2] + pts[j - 1]) / 2 rf[n, 2] = 1.0 os[n, 0] = cycle_index[j - 2] os[n, 1] = cycle_index[j - 1] pts[j - 2] = pts[j] # /* discard j-2, j-1 pts */ cycle_index[j - 2] = cycle_index[j] j -= 2 # /* step 6 from [1]: */ # /* [count all ranges in pts as half cycles] */ A = pts[0] for k in range(j): B = pts[k + 1] n += 1 rf[n, 0] = abs(A - B) / 2 rf[n, 1] = (A + B) / 2 rf[n, 2] = 0.5 os[n, 0] = cycle_index[k] os[n, 1] = cycle_index[k + 1] A = B return rf[: L - fullcyclesp1], os[: L - fullcyclesp1] try: import numba except ImportError: pass else: _rainflow1 = numba.jit(nopython=True, cache=True)(_rainflow1) _rainflow2 = numba.jit(nopython=True, cache=True)(_rainflow2)