# -*- coding: utf-8 -*-
"""
Tools for cycle counting. Adapted and enhanced from the Yeti version.
"""
import warnings
import numpy as np
import pandas as pd
from pyyeti import locate
try:
import numba
except ImportError:
HAVE_NUMBA = False
numba_bool = bool
else:
HAVE_NUMBA = True
numba_bool = numba.types.bool_
try:
import pyyeti.rainflow.c_rain as rain
except ImportError:
if not HAVE_NUMBA:
warnings.warn(
"Compiled C version of rainflow algorithm failed to "
"import. Using plain Python version, which can be very "
"slow. Recommend installing 'numba': pyYeti will use that "
"automatically to speed up the Python version to be on "
"par with the C version.",
RuntimeWarning,
)
import pyyeti.rainflow.py_rain as rain
# FIXME: We need the str/repr formatting used in Numpy < 1.14.
try:
np.set_printoptions(legacy="1.13")
except TypeError:
pass
[docs]
def rainflow(peaks, getoffsets=False, use_pandas=True):
"""
Rainflow cycle counting.
Parameters
----------
peaks : 1d array_like
Vector of alternating peaks (as returned by :func:`findap`,
for example)
getoffsets : bool; optional
If True, the tuple ``(rf, os)`` is returned; otherwise, only
`rf` is returned.
use_pandas : bool; optional
If True, this routine will return pandas DataFrames
Returns
-------
rf : pandas DataFrame or 2d ndarray
n x 3 matrix with the rainflow cycle count information with
the index going from 0 to n-1 and the columns being ['amp',
'mean', 'count']:
- amp is the cycle amplitude (half the peak-to-peak range)
- mean is mean of the cycle
- count is either 0.5 or 1.0 depending on whether it's
half or full cycle
os : pandas DataFrame or 2d ndarray
Only returned if `getoffsets` is True. n x 2 matrix of cycle
offsets with index going from 0 to n-1 and the columns being
['start', 'stop']:
- start is the offset into `peaks` for start of cycle
- stop is the offset into `peaks` for end of cycle
Notes
-----
This algorithm is derived from reference [#cc1]_. This routine is
a wrapper for either the C version
(:func:`pyyeti.rainflow.c_rain`) or the Python version
(:func:`pyyeti.rainflow.py_rain`) of the algorithm. Both versions
give identical results. They also run in comparable times if
:func:`numba.jit` is available (if ``import numba`` works). If
:func:`numba.jit` is not available, the C version is *much*
faster.
References
----------
.. [#cc1] "Standard Practices for Cycle Counting in Fatigue
Analysis", ASTM E 1049 - 85 (Reapproved 2005).
Examples
--------
Run the example from the ASTM paper:
>>> from pyyeti.cyclecount import rainflow
>>> rainflow([-2, 1, -3, 5, -1, 3, -4, 4, -2])
amp mean count
0 1.5 -0.5 0.5
1 2.0 -1.0 0.5
2 2.0 1.0 1.0
3 4.0 1.0 0.5
4 4.5 0.5 0.5
5 4.0 0.0 0.5
6 3.0 1.0 0.5
With offsets:
>>> rf, os = rainflow([-2, 1, -3, 5, -1, 3, -4, 4, -2],
... getoffsets=True)
>>> rf
amp mean count
0 1.5 -0.5 0.5
1 2.0 -1.0 0.5
2 2.0 1.0 1.0
3 4.0 1.0 0.5
4 4.5 0.5 0.5
5 4.0 0.0 0.5
6 3.0 1.0 0.5
>>> os
start stop
0 0 1
1 1 2
2 4 5
3 2 3
4 3 6
5 6 7
6 7 8
If not using pandas:
>>> rf, os = rainflow([-2, 1, -3, 5, -1, 3, -4, 4, -2],
... getoffsets=True, use_pandas=False)
>>> rf
array([[ 1.5, -0.5, 0.5],
[ 2. , -1. , 0.5],
[ 2. , 1. , 1. ],
[ 4. , 1. , 0.5],
[ 4.5, 0.5, 0.5],
[ 4. , 0. , 0.5],
[ 3. , 1. , 0.5]])
>>> os # doctest: +ELLIPSIS
array([[0, 1],
[1, 2],
[4, 5],
[2, 3],
[3, 6],
[6, 7],
[7, 8]]...)
"""
if getoffsets:
rf, os = rain.rainflow(peaks, getoffsets)
if use_pandas:
rf = pd.DataFrame(rf, columns=["amp", "mean", "count"])
os = pd.DataFrame(os, columns=["start", "stop"])
return rf, os
rf = rain.rainflow(peaks, getoffsets)
if use_pandas:
rf = pd.DataFrame(rf, columns=["amp", "mean", "count"])
return rf
if not HAVE_NUMBA:
def _findap_worker(y, tol):
# example: [1, 2, 3, 4, 4, -2, -2, -2]
if y.size == 1:
return np.array([True])
# first, find unique values (1st of series is unique)
u = locate.find_unique(y, tol)
# [ True, True, True, True, False, True, False, False]
# work with unique values only:
if np.all(u):
yu = y
allu = True
else:
yu = y[u]
# [ 1, 2, 3, 4, -2]
allu = False
# find signs of slopes to locate local max/mins:
s = np.sign(np.diff(yu))
# [ 1, 1, 1, -1]
# locate local max/mins:
pv = np.ones(yu.size, bool)
pv[1:-1] = np.abs(np.diff(s)) == 2
if yu.size > 2:
pv[-1] = yu[-1] != yu[-2]
# [ True, False, False, True, True]
if allu:
return pv
# expand to full size:
PV = np.zeros(y.size, bool) # non-uniques are not peaks
PV[u] = pv
# [ True, False, False, True, False, True, False, False]
return PV
else:
def _findap_worker(y, tol):
if y.size == 1:
return np.array([True])
if y.size == 2:
if y[1] == y[0]:
return np.array([True, False])
return np.array([True, True])
stol = np.abs(tol * np.abs(np.diff(y)).max())
PV = np.zeros(y.size, numba_bool)
PV[0] = True
prv = y[0]
i = 1
while i < y.size:
if np.abs(y[i] - prv) > stol:
break
i += 1
if i == y.size:
return PV
j = i
cur = y[j]
if cur > prv:
mountain = True # find mountain peak
else:
mountain = False # find valley floor
for i in range(i + 1, y.size):
nxt = y[i]
if np.abs(nxt - cur) > stol:
if mountain:
if nxt < cur:
PV[j] = True
mountain = False
else:
if nxt > cur:
PV[j] = True
mountain = True
cur = nxt
j = i
if np.abs(nxt - y[-2]) > stol:
PV[-1] = True
else:
PV[j] = True
return PV
[docs]
def findap(y, tol=1e-6, nan_check=True):
"""
Find alternating local maximum and minimum points in a vector.
Parameters
----------
y : ndarray
y-axis data vector
tol : scalar; optional
Tolerance value for detecting unique values; see
:func:`pyyeti.locate.find_unique`
nan_check : bool; optional
Whether to check for NaNs. Set to ``False`` for a performance
gain if you know there are no NaNs.
Returns
-------
pv : ndarray
Boolean vector for the alternating peaks in `y`.
Notes
-----
`y` is flattened to one dimension before operations.
When `y` has a series of equal points, the first of the series is
considered the peak. The first value in `y` is always considered a
peak. The last point is a peak if and only if it is not equal to
the point before it.
This routine is typically used to prepare a signal for cycle
counting.
NaNs are not considered to be peaks and treated as if they are not
present when `nan_check` is True (the default).
Examples
--------
>>> from pyyeti import cyclecount
>>> import numpy as np
>>> cyclecount.findap(np.array([1]))
array([ True], dtype=bool)
>>> cyclecount.findap(np.array([1, 1, 1, 1]))
array([ True, False, False, False], dtype=bool)
>>> tf = cyclecount.findap(np.array([1, 2, 3, 4, 4, -2, -2, 0]))
>>> np.nonzero(tf)[0] # doctest: +ELLIPSIS
array([0, 3, 5, 7]...)
>>> tf = cyclecount.findap(np.array([1, 2, 3, 4, 4, -2, -2, -2]))
>>> np.nonzero(tf)[0] # doctest: +ELLIPSIS
array([0, 3, 5]...)
>>> tf = cyclecount.findap(np.array([1, 2, 3, 4, -2]))
>>> np.nonzero(tf)[0] # doctest: +ELLIPSIS
array([0, 3, 4]...)
>>> cyclecount.findap(np.array([np.nan, 1, 2, np.nan]))
array([False, True, True, False], dtype=bool)
"""
y = y.ravel()
if nan_check:
nans = np.isnan(y)
if np.any(nans):
full_pv = np.zeros(y.size, bool)
non_nans = ~nans
y = y[non_nans]
if y.size == 0:
return full_pv
pv = _findap_worker(y, tol)
full_pv[non_nans] = pv
return full_pv
return _findap_worker(y, tol)
[docs]
def getbins(bins, mx, mn, right=True, check_bounds=False):
"""
Utility routine used by :func:`sigcount` to get bin boundaries.
Parameters
----------
bins : scalar integer or 1d array_like
Used to define bin boundaries. If array_like, must be
monotonically increasing. See description below.
mx, mn : scalar
Maximum and minimum values of data to be put in bins. `mx` and
`mn` may be input in either order. If `bins` is a 1d
array_like, `mx` and `mn` are ignored unless `check_bounds`
is True; see below.
right : bool; optional
Indicates whether the bins include the rightmost edge or the
leftmost edge. If right == True (the default), then the bins
[1,2,3,4] indicate (1,2], (2,3], (3,4].
check_bounds : bool; optional
If True, routine will check to see if the final bin boundaries
cover the `mn` to `mx` range. Note that the range is guaranteed to be
covered if `bins` is a scalar.
Returns
-------
bb : 1d ndarray
The bin boundaries; ``length = bins + 1``
out_of_bounds : bool; optional
True if either `mx` or `mn` will fall out of range of the bins
(according to `right`). Only returned if `check_bounds` is
True.
Notes
-----
If `mx` and `mn` are equal, they are reset::
mx = mx + 0.5
mn = mn - 0.5
If `bins` is a scalar, this routine tries to mimic the behavior of
the :func:`pandas.cut` routine::
bb = np.linspace(mn, mx, bins+1)
p = 0.001 * (mx - mn)
if right:
bb[0] -= p
else:
bb[-1] += p
If `bins` is a vector, it defines the boundaries directly and is
returned as is::
bb = np.array(bins)
Raises
------
ValueError
If `bins` is a vector but is not monotonically increasing
Examples
--------
>>> from pyyeti import cyclecount
>>> cyclecount.getbins(4, 12, 4)
array([ 3.992, 6. , 8. , 10. , 12. ])
>>> cyclecount.getbins(4, 12, 4, right=False)
array([ 4. , 6. , 8. , 10. , 12.008])
>>> cyclecount.getbins([1, 2, 3, 4], 12, 4, check_bounds=True)
(array([1, 2, 3, 4]), True)
"""
bins = np.atleast_1d(bins)
if mx < mn:
mx, mn = mn, mx
elif mx == mn:
mx = mx + 0.5
mn = mn - 0.5
# raise ValueError("`mx` and `mn` must not be equal")
if bins.size == 1:
bins = int(bins[0])
bb = np.linspace(mn, mx, bins + 1)
p = 0.001 * (mx - mn)
if right:
bb[0] -= p
else:
bb[-1] += p
out_of_bounds = False
elif bins.ndim == 1:
if np.any(np.diff(bins) <= 0):
raise ValueError(
"when `bins` is input as a vector, it "
"must be monotonically increasing"
)
bb = np.array(bins)
if check_bounds:
if right:
if mn <= bb[0] or mx > bb[-1]:
out_of_bounds = True
else:
out_of_bounds = False
else:
if mn < bb[0] or mx >= bb[-1]:
out_of_bounds = True
else:
out_of_bounds = False
if check_bounds:
return bb, out_of_bounds
return bb
def _getlabels(form, bins):
return [form.format(i, j) for i, j in zip(bins[:-1], bins[1:])]
# @njit(cache=True)
def _binify(cycles, bins_range, bins_mean, right, ensure_boundaries=True):
# Many thanks to Daniel Kowollik for contributing the original
# implementation of this routine! :)
num_bins_mean = len(bins_mean) - 1
num_bins_range = len(bins_range) - 1
bin_indices_range = np.digitize(cycles[:, 0], bins_range, right=right) - 1
bin_indices_mean = np.digitize(cycles[:, 1], bins_mean, right=right) - 1
markov_matrix = np.zeros((num_bins_mean, num_bins_range), np.float64)
if ensure_boundaries:
for i in range(len(cycles)):
bim = bin_indices_mean[i]
bir = bin_indices_range[i]
if (0 <= bim < num_bins_mean) and (0 <= bir < num_bins_range):
markov_matrix[bim, bir] += cycles[i, 2]
else:
for i in range(len(cycles)):
markov_matrix[bin_indices_mean[i], bin_indices_range[i]] += cycles[i, 2]
return markov_matrix
if HAVE_NUMBA:
_findap_worker = numba.njit(cache=True)(_findap_worker)
_binify = numba.njit(cache=True)(_binify)
@numba.njit(cache=True)
def maxmin(x):
mn = mx = x[0]
for v in x[1:]:
if v > mx:
mx = v
elif v < mn:
mn = v
return mx, mn
else:
def maxmin(x):
return x.max(), x.min()
[docs]
def binify(
rf,
ampbins=10,
meanbins=1,
right=True,
precision=3,
retbins=False,
use_pandas=True,
check_bounds=True,
):
"""
Summarize cycle count results (as from :func:`rainflow`) into
bins.
Parameters
----------
rf : 2d array_like
2d matrix with (at least) 3 columns: [amp, mean, count]. See
:func:`rainflow`.
ampbins : scalar or 1d array_like; optional
Defines the cycle amplitude bins; see :func:`getbins` for
more complete description on how bins are defined.
============== =============================================
`ampbins` type Brief description (see also :func:`getbins`)
============== =============================================
scalar Defines number of bins to use
vector Defines lower bin boundaries
============== =============================================
meanbins : scalar or 1d array_like; optional
Defines the cycle mean-value bins; see `ampbins` description
and :func:`getbins` for more complete description on how bins
are defined.
right : bool; optional
Indicates whether the bins include the rightmost edge or the
leftmost edge. If right == True (the default), then the bins
[1,2,3,4] indicate (1,2], (2,3], (3,4].
precision : integer; optional
Precision to use for DataFrame labels; only used if
`use_pandas` is True.
retbins : bool; optional
If True, return the `ampbins` and `meanbins` vectors.
use_pandas : bool; optional
If True, this routine will return `table` as a pandas
DataFrame.
check_bounds : bool; optional
If True, routine will check to see if the final bin boundaries
for both the amplitude and mean cover the range of the
data. Note that the range is guaranteed to be covered if the
`ampbins` and `meanbins` inputs are scalars.
Returns
-------
table : pandas DataFrame or 2d ndarray
A cycle count table (len(`meanbins`) x len(`ampbins`)). Each
value in `table` entry is the number of cycles in the bin
(see below).
ampbins : 1d ndarray; optional
Boundaries of amplitude bins used; length = # of bins + 1.
Only returned if `retbins` is True.
meanbins : 1d ndarray; optional
Boundaries of mean-value bins used; length = # of bins + 1.
Only returned if `retbins` is True.
Notes
-----
Algorithm works as follows:
For each mean value bin (i'th):
1. `rf` is filtered down to only those rows where the mean value
falls in the mean-bin (call this subset of `rf` `rffilt`).
2. For each amplitude bin (j'th): count number of cycles in
`rffilt` with amplitude that falls in the amp-bin and store in
``table[i, j]``
A value falls in bin `i` if::
bin[i] < value <= bin[i+1] } if right is True
bin[i] <= value < bin[i+1] } if right is not True
Examples
--------
>>> from pyyeti import cyclecount
>>> rf = cyclecount.rainflow([-2, 1, -3, 5, -1, 3, -4, 4, -2])
>>> rf
amp mean count
0 1.5 -0.5 0.5
1 2.0 -1.0 0.5
2 2.0 1.0 1.0
3 4.0 1.0 0.5
4 4.5 0.5 0.5
5 4.0 0.0 0.5
6 3.0 1.0 0.5
>>> format = lambda x: f"{x:.1f}"
>>> df = cyclecount.binify(rf, 3, 2)
>>> df.map(format) # doctest: +ELLIPSIS
Amp (1.497, 2.500] (2.500, 3.500] (3.500, 4.500]
Mean...
(-1.002, 0.000] 1.0 0.0 0.5
(0.000, 1.000] 1.0 0.5 1.0
>>> df = cyclecount.binify(rf, 3, 2, right=0)
>>> df.map(format) # doctest: +ELLIPSIS
Amp [1.500, 2.500) [2.500, 3.500) [3.500, 4.503)
Mean...
[-1.000, 0.000) 1.0 0.0 0.0
[0.000, 1.002) 1.0 0.5 1.5
If not using pandas:
>>> cyclecount.binify(rf, 3, 2, use_pandas=False)
array([[ 1. , 0. , 0.5],
[ 1. , 0.5, 1. ]])
"""
rf = np.atleast_2d(rf)
# rf = [amp, mean, count] columns
ampb = getbins(ampbins, *maxmin(rf[:, 0]), right, check_bounds)
aveb = getbins(meanbins, *maxmin(rf[:, 1]), right, check_bounds)
if check_bounds:
ampb, out_amp = ampb
aveb, out_ave = aveb
out = out_amp or out_ave
else:
out = False
table = _binify(rf, ampb, aveb, right, out)
if use_pandas:
f = "{:." + str(precision) + "f}"
f = f + ", " + f
if right:
form = "(" + f + "]"
else:
form = "[" + f + ")"
index = _getlabels(form, aveb)
columns = _getlabels(form, ampb)
table = pd.DataFrame(table, index=index, columns=columns)
table.columns.name = "Amp"
table.index.name = "Mean"
if retbins:
return table, ampb, aveb
return table
[docs]
def sigcount(
sig, ampbins=10, meanbins=1, right=True, precision=3, retbins=False, use_pandas=True
):
"""
Do rainflow cycle counting on a signal.
Parameters
----------
sig : 1d array_like
Signal (vector) to do cycle counting on.
ampbins : scalar or 1d array_like
Defines the cycle amplitude bins; see :func:`getbins` for
more complete description on how bins are defined.
============== =============================================
`ampbins` type Brief description (see also :func:`getbins`)
============== =============================================
scalar Defines number of bins to use
vector Defines lower bin boundaries
============== =============================================
meanbins : scalar or 1d array_like
Defines the cycle mean-value bins; see `ampbins` description
and :func:`getbins` for more complete description on how bins
are defined.
right : bool; optional
Indicates whether the bins include the rightmost edge or the
leftmost edge. If right == True (the default), then the bins
[1,2,3,4] indicate (1,2], (2,3], (3,4].
precision : integer; optional
Precision to use for DataFrame labels.
retbins : bool; optional
If True, return the `ampbins` and `meanbins` vectors.
use_pandas : bool; optional
If True, this routine will return `table` as a pandas
DataFrame.
Returns
-------
table : pandas DataFrame or 2d ndarray
A cycle count table (len(`meanbins`) x len(`ampbins`)). Each
value in `table` entry is the number of cycles in the bin
(see below).
ampbins : 1d ndarray
Boundaries of amplitude bins used; length = # of bins + 1.
Only returned if `retbins` is True.
meanbins : 1d ndarray
Boundaries of mean-value bins used; length = # of bins + 1.
Only returned if `retbins` is True.
Notes
-----
Steps:
1. Calls :func:`findap` to find all local minima and maxima:
``peaks = sig[findap(sig)]``
2. Calls :func:`rainflow` to do the cycle counting:
``rf = rainflow(peaks)``
3. Summarizes the rainflow counting by putting the counts in
bins. For each mean value bin (i'th):
a. `rf` is filtered down to only those rows where the mean
value falls in the mean-bin (call this subset of `rf`
`rffilt`).
b. For each amplitude bin (j'th): count number of cycles in
`rffilt` with amplitude that falls in the amp-bin and
store in ``table[i, j]``
A value falls in bin `i` if::
bin[i] < value <= bin[i+1] } if right is True
bin[i] <= value < bin[i+1] } if right is not True
Examples
--------
>>> from pyyeti import cyclecount
>>> import numpy as np
>>> sig = np.arange(100)
>>> sig[::2] *= -1 # [0, 1, -2, 3, -4, ..., 99]
>>> # `sig` has 99 half-cycles; amplitude grows from 0.5 up to
>>> # 98.5; mean of each is either 0.5 or -0.5
>>> table = cyclecount.sigcount(sig, 2, 2)
>>> table # doctest: +ELLIPSIS
Amp (0.402, 49.500] (49.500, 98.500]
Mean...
(-0.501, 0.000] 12.5 12.0
(0.000, 0.500] 12.5 12.5
We can focus on amplitudes only (ignoring the mean value bins):
>>> table.sum(axis=0)
Amp
(0.402, 49.500] 25.0
(49.500, 98.500] 24.5
dtype: float64
Focus on only the mean value bins:
>>> table.sum(axis=1)
Mean
(-0.501, 0.000] 24.5
(0.000, 0.500] 25.0
dtype: float64
If not using pandas:
>>> cyclecount.sigcount(sig, 2, 2, use_pandas=False)
array([[ 12.5, 12. ],
[ 12.5, 12.5]])
"""
rf = rainflow(sig[findap(sig)], use_pandas=False)
return binify(rf, ampbins, meanbins, right, precision, retbins, use_pandas)