# Checked
import numpy as np
from numpy.lib.stride_tricks import as_strided
from ieeg.arrays.api import (array_namespace, xp_assert_equal, ArrayLike,
is_numpy, is_cupy)
try:
from numpy.lib.array_utils import normalize_axis_tuple
except ImportError:
import operator
def _normalize_axis_tuple(axis, ndim, argname=None, allow_duplicate=False):
if type(axis) not in (tuple, list):
try:
axis = [operator.index(axis)]
except TypeError:
pass
# Going via an iterator directly is slower than via list comprehension.
axis = tuple([_normalize_index(ax, ndim) for ax in axis])
if not allow_duplicate and len(set(axis)) != len(axis):
if argname:
raise ValueError('repeated axis in `{}` argument'
''.format(argname))
else:
raise ValueError('repeated axis')
return axis
def _normalize_index(index: int, ndim: int):
if ndim < 1:
raise ValueError("ndim must be at least 1")
while index < 0:
index += ndim
if index >= ndim:
raise IndexError("index {} is out of bounds for axis with size {}"
"".format(index, ndim))
return index
normalize_axis_tuple = _normalize_axis_tuple
try:
import cupy as cp
from cupy.lib.stride_tricks import as_strided as as_strided_cp
no_cupy = False
except ImportError:
no_cupy = True
[docs]
def stitch_mats(mats: list[ArrayLike], overlaps: list[int], axis: int = 0
) -> ArrayLike:
"""break up the matrices into their overlapping and non-overlapping parts
then stitch them back together
Parameters
----------
mats : list
The matrices to stitch together
overlaps : list
The number of overlapping rows between each matrix
axis : int, optional
The axis to stitch along, by default 0
Returns
-------
np.ndarray
The stitched matrix
Examples
--------
>>> mat1 = np.array([[1, 2, 3], [4, 5, 6]])
>>> mat2 = np.array([[7, 8, 9], [10, 11, 12]])
>>> mat3 = np.array([[13, 14, 15], [16, 17, 18]])
>>> stitch_mats([mat1, mat2, mat3], [1, 1])
array([[ 1, 2, 3],
[10, 11, 12],
[16, 17, 18]])
>>> stitch_mats([mat1, mat2, mat3], [0, 0], axis=1)
array([[ 1, 2, 3, 7, 8, 9, 13, 14, 15],
[ 4, 5, 6, 10, 11, 12, 16, 17, 18]])
>>> mat4 = np.array([[19, 20, 21], [22, 23, float("nan")]])
>>> stitch_mats([mat3, mat4], [0], axis=1)
array([[13., 14., 15., 19., 20., 21.],
[16., 17., 18., 22., 23., nan]])
"""
xp = array_namespace(*mats)
stitches = [mats[0]]
if len(mats) != len(overlaps) + 1:
raise ValueError("The number of matrices must be one more than the"
" number of overlaps")
for i, over in enumerate(overlaps):
stitches = stitches[:-2] + merge(stitches[-1], mats[i + 1], over, axis)
out = xp.concatenate(stitches, axis=axis)
try:
xp_assert_equal(out.astype(int), out, check_dtype=False, xp=xp)
return out.astype(int)
except AssertionError:
return out
[docs]
def merge(mat1: ArrayLike, mat2: ArrayLike, overlap: int, axis: int = 0
) -> list[ArrayLike]:
"""Take two arrays and merge them over the overlap gradually"""
xp = array_namespace(mat1, mat2)
sl = [slice(None)] * mat1.ndim
sl[axis] = slice(0, mat1.shape[axis] - overlap)
start = mat1[tuple(sl)]
sl[axis] = slice(mat1.shape[axis] - overlap, mat1.shape[axis])
middle1 = xp.multiply(xp.linspace(1, 0, overlap), mat1[tuple(sl)])
sl[axis] = slice(0, overlap)
middle2 = xp.multiply(xp.linspace(0, 1, overlap), mat2[tuple(sl)])
middle = xp.add(middle1, middle2)
sl[axis] = slice(overlap, mat2.shape[axis])
last = mat2[tuple(sl)]
return [start, middle, last]
[docs]
def make_data_same(data_fix: ArrayLike, shape: tuple | list, stack_ax: int = 0,
pad_ax: int = -1, make_stacks_same: bool = True,
rng: np.random.Generator = None) -> ArrayLike:
"""Force the last dimension of data_fix to match the last dimension of
shape.
Takes the two arrays and checks if the last dimension of data_fix is
smaller than the last dimension of shape. If there's more than two
dimensions, it will rearrange the data to match the shape. If there's only
two dimensions, it will repeat the signal to match the shape of data_like.
If the last dimension of data_fix is larger than the last dimension of
shape, it will return a subset of data_fix.
Parameters
----------
data_fix : ArrayLike
The data to reshape.
shape : list | tuple
The shape of data to match.
Returns
-------
data_fix : ArrayLike
The reshaped data.
Examples
--------
>>> np.random.seed(0)
>>> data_fix = np.array([[1, 2, 3, 4, 5], [6, 7, 8, 9, 10]])
>>> make_data_same(data_fix, (2, 8))
array([[ 1, 2, 3, 4, 5, 4, 3, 2],
[ 6, 7, 8, 9, 10, 9, 8, 7]])
>>> (newarr := make_data_same(data_fix, (2, 2), make_stacks_same=False))
array([[1, 2],
[3, 4],
[6, 7],
[8, 9]])
>>> make_data_same(newarr, (3, 2), stack_ax=1, pad_ax=0)
array([[1, 2],
[3, 4],
[6, 7]])
>>> import cupy as cp # doctest: +SKIP
>>> make_data_same(cp.asarray(data_fix), (2, 2),
... make_stacks_same=False) # doctest: +SKIP
array([[1, 2],
[3, 4],
[6, 7],
[8, 9]])
"""
xp = array_namespace(data_fix)
if is_cupy(xp):
rng = None
elif not isinstance(rng, np.random.Generator):
rng = xp.random.default_rng(rng)
stack_ax = list(range(len(shape)))[stack_ax]
pad_ax = list(range(len(shape)))[pad_ax]
# Check if the pad dimension of data_fix is smaller than the pad
# dimension of shape
if data_fix.shape[pad_ax] <= shape[pad_ax]:
out = pad_to_match(xp.zeros(shape), data_fix, stack_ax)
# When the pad dimension of data_fix is larger than the pad dimension of
# shape, take subsets of data_fix and stack them together on the stack
# dimension
else:
out = rand_offset_reshape(data_fix, shape, stack_ax, pad_ax, rng)
if not make_stacks_same:
return out
elif out.shape[stack_ax] > shape[stack_ax]: # subsample stacks if too many
idx = rng.choice(out.shape[stack_ax], (shape[stack_ax],), False)
out = xp.take(out, idx, axis=stack_ax)
elif out.shape[stack_ax] < shape[stack_ax]: # oversample stacks if too few
n = shape[stack_ax] - out.shape[stack_ax]
idx = rng.choice(out.shape[stack_ax], (n,), True)
new_data = xp.take(out, idx, axis=stack_ax)
out = xp.concatenate((out, new_data), axis=stack_ax)
return out
[docs]
def pad_to_match(sig1: ArrayLike, sig2: ArrayLike,
axis: int | tuple[int, ...] = ()) -> ArrayLike:
"""Pad the second signal to match the first signal along all axes not
specified.
Takes the two arrays and checks if the shape of sig2 is smaller than the
shape of sig1. For each axis not specified, it will pad the second signal
to match the first signal along that axis.
Parameters
----------
sig1 : ArrayLike
The data to match.
sig2 : ArrayLike
The data to pad.
axis : int | tuple
The axes along which to pad the data.
Returns
-------
sig2 : ArrayLike
The padded data.
Examples
--------
>>> sig1 = np.arange(48).reshape(2, 3, 8)
>>> sig2 = np.arange(24).reshape(2, 3, 4)
>>> pad_to_match(sig1, sig2)
array([[[ 0, 1, 2, 3, 2, 1, 0, 1],
[ 4, 5, 6, 7, 6, 5, 4, 5],
[ 8, 9, 10, 11, 10, 9, 8, 9]],
<BLANKLINE>
[[12, 13, 14, 15, 14, 13, 12, 13],
[16, 17, 18, 19, 18, 17, 16, 17],
[20, 21, 22, 23, 22, 21, 20, 21]]])
"""
xp = array_namespace(sig1, sig2)
# Make sure the data is the same shape
if xp.isscalar(axis):
axis = (axis,)
axis = list(axis)
for i, ax in enumerate(axis):
axis[i] = xp.arange(sig1.ndim)[ax]
eq = [
e for i, e in enumerate(
xp.equal(xp.asarray(sig1.shape), xp.asarray(sig2.shape))
) if i not in axis
]
if not all(eq):
for ax in axis:
eq.insert(ax, True)
pad_shape = [
(0, 0) if eq[i] else (0, sig1.shape[i] - sig2.shape[i])
for i in range(sig1.ndim)
]
sig2 = xp.pad(sig2, pad_shape, mode='reflect')
return sig2
[docs]
def rand_offset_reshape(data_fix: ArrayLike, shape: tuple, stack_ax: int,
pad_ax: int, rng: np.random.Generator | int = None
) -> ArrayLike:
"""Take subsets of data_fix and stack them together on the stack dimension
This function takes the data and reshapes it to match the shape by taking
subsets of data_fix and stacking them together on the stack dimension,
randomly offsetting the start of the first subset. It is assumed that the
padding axis 'pad_ax' is larger in data_fix.shape than in shape.
Parameters
----------
data_fix : ArrayLike
The data to reshape.
shape : list | tuple
The shape of data to match.
stack_ax : int
The axis along which to stack the subsets.
pad_ax : int
The axis along which to slice the subsets.
rng : np.random.Generator | int, optional
The random number generator to use. If None, a default random number
generator will be used.
Returns
-------
data_fix : ArrayLike
The reshaped data.
Examples
--------
>>> data_fix = np.arange(50).reshape((5, 10))
>>> data_fix
array([[ 0, 1, 2, 3, 4, 5, 6, 7, 8, 9],
[10, 11, 12, 13, 14, 15, 16, 17, 18, 19],
[20, 21, 22, 23, 24, 25, 26, 27, 28, 29],
[30, 31, 32, 33, 34, 35, 36, 37, 38, 39],
[40, 41, 42, 43, 44, 45, 46, 47, 48, 49]])
>>> rand_offset_reshape(data_fix, (2, 4), 0, 1, 0)
array([[ 1, 2, 3, 4],
[ 5, 6, 7, 8],
[11, 12, 13, 14],
[15, 16, 17, 18],
[21, 22, 23, 24],
[25, 26, 27, 28],
[31, 32, 33, 34],
[35, 36, 37, 38],
[41, 42, 43, 44],
[45, 46, 47, 48]])
>>> rand_offset_reshape(data_fix, (2, 4), 1, 0, 0) # doctest: +ELLIPSIS
array([[ 0, 20, 1, 21, 2, 22, 3, 23, 4, 24, 5, 25, 6, 26, 7, 27,
8, 28, 9, 29],
[10, 30, 11, 31, 12, 32, 13, 33, 14, 34, 15, 35, 16, 36, 17, 37,
18, 38, 19, 39]])
"""
xp = array_namespace(data_fix)
if not isinstance(rng, np.random.Generator):
rng = xp.random.default_rng(rng)
# Randomly offset the start of the first subset
num_stack = data_fix.shape[pad_ax] // shape[pad_ax]
if data_fix.shape[pad_ax] % shape[pad_ax] == 0:
num_stack -= 1
offset = rng.integers(0,
data_fix.shape[pad_ax] - shape[pad_ax] * num_stack)
# Create an array to store the output
out_shape = [shape[i] if i == pad_ax else data_fix.shape[i] for i in
range(data_fix.ndim)]
out_shape[stack_ax] *= num_stack
out = xp.zeros(tuple(out_shape), dtype=data_fix.dtype)
# Iterate over the subsets
sl_in = [slice(None)] * data_fix.ndim
sl_out = [slice(None)] * data_fix.ndim
for i in range(num_stack):
# Get the start and end indices of the subset
start = i * shape[pad_ax] + offset
end = start + shape[pad_ax]
# Create a slice object for the subset
sl_in[pad_ax] = slice(start, end)
sl_out[stack_ax] = slice(i, None, num_stack)
# Fill in the subset
out[tuple(sl_out)] = data_fix[tuple(sl_in)]
return out
[docs]
def sliding_window_view(x, window_shape, axis=None, *, subok=False,
writeable=False):
"""
Create a sliding window view into the array with the given window shape.
Also known as rolling or moving window, the window slides across all
dimensions of the array and extracts subsets of the array at all window
positions.
.. Copied from numpy.lib.stride_tricks.sliding_window_view
Parameters
----------
x : ArrayLike
Array to create the sliding window view from.
window_shape : int or tuple of int
Size of window over each axis that takes part in the sliding window.
If `axis` is not present, must have same length as the number of input
array dimensions. Single integers `i` are treated as if they were the
tuple `(i,)`.
axis : int or tuple of int, optional
Axis or axes along which the sliding window is applied.
By default, the sliding window is applied to all axes and
`window_shape[i]` will refer to axis `i` of `x`.
If `axis` is given as a `tuple of int`, `window_shape[i]` will refer to
the axis `axis[i]` of `x`.
Single integers `i` are treated as if they were the tuple `(i,)`.
subok : bool, optional
If True, sub-classes will be passed-through, otherwise the returned
array will be forced to be a base-class array (default).
writeable : bool, optional
When true, allow writing to the returned view. The default is false,
as this should be used with caution: the returned view contains the
same memory location multiple times, so writing to one location will
cause others to change.
Returns
-------
view : ndarray
Sliding window view of the array. The sliding window dimensions are
inserted at the end, and the original dimensions are trimmed as
required by the size of the sliding window.
That is, ``view.shape = x_shape_trimmed + window_shape``, where
``x_shape_trimmed`` is ``x.shape`` with every entry reduced by one less
than the corresponding window size.
See Also
--------
lib.stride_tricks.as_strided: A lower-level and less safe routine for
creating arbitrary views from custom shape and strides.
broadcast_to: broadcast an array to a given shape.
Notes
-----
For many applications using a sliding window view can be convenient, but
potentially very slow. Often specialized solutions exist, for example:
- `scipy.signal.fftconvolve`
- Filtering functions in `scipy.ndimage`
- Moving window functions provided by
`bottleneck <https://github.com/pydata/bottleneck>`_.
As a rough estimate, a sliding window approach with an input size of `N`
and a window size of `W` will scale as `O(N*W)` where frequently a special
algorithm can achieve `O(N)`. That means that the sliding window variant
for a window size of 100 can be a 100 times slower than a more specialized
version.
Nevertheless, for small window sizes, when no custom algorithm exists, or
as a prototyping and developing tool, this function can be a good solution.
Examples
--------
>>> import numpy as np
>>> from ieeg.arrays.reshape import sliding_window_view
>>> x = np.arange(6)
>>> x.shape
(6,)
>>> v = sliding_window_view(x, 3)
>>> v.shape
(4, 3)
>>> v
array([[0, 1, 2],
[1, 2, 3],
[2, 3, 4],
[3, 4, 5]])
This also works in more dimensions, e.g.
>>> i, j = np.ogrid[:3, :4]
>>> x = 10*i + j
>>> x.shape
(3, 4)
>>> x
array([[ 0, 1, 2, 3],
[10, 11, 12, 13],
[20, 21, 22, 23]])
>>> shape = (2,2)
>>> v = sliding_window_view(x, shape)
>>> v.shape
(2, 3, 2, 2)
>>> v
array([[[[ 0, 1],
[10, 11]],
<BLANKLINE>
[[ 1, 2],
[11, 12]],
<BLANKLINE>
[[ 2, 3],
[12, 13]]],
<BLANKLINE>
<BLANKLINE>
[[[10, 11],
[20, 21]],
<BLANKLINE>
[[11, 12],
[21, 22]],
<BLANKLINE>
[[12, 13],
[22, 23]]]])
The axis can be specified explicitly:
>>> v = sliding_window_view(x, 3, 0)
>>> v.shape
(1, 4, 3)
>>> v
array([[[ 0, 10, 20],
[ 1, 11, 21],
[ 2, 12, 22],
[ 3, 13, 23]]])
The same axis can be used several times. In that case, every use reduces
the corresponding original dimension:
>>> v = sliding_window_view(x, (2, 3), (1, 1))
>>> v.shape
(3, 1, 2, 3)
>>> v
array([[[[ 0, 1, 2],
[ 1, 2, 3]]],
<BLANKLINE>
<BLANKLINE>
[[[10, 11, 12],
[11, 12, 13]]],
<BLANKLINE>
<BLANKLINE>
[[[20, 21, 22],
[21, 22, 23]]]])
Combining with stepped slicing (`::step`), this can be used to take sliding
views which skip elements:
>>> x = np.arange(7)
>>> sliding_window_view(x, 5)[:, ::2]
array([[0, 2, 4],
[1, 3, 5],
[2, 4, 6]])
Or views which move by multiple elements
>>> x = np.arange(7)
>>> sliding_window_view(x, 3)[::2, :]
array([[0, 1, 2],
[2, 3, 4],
[4, 5, 6]])
A common application of `sliding_window_view` is the calculation of running
statistics. The simplest example is the
`moving average <https://en.wikipedia.org/wiki/Moving_average>`_:
>>> x = np.arange(6)
>>> x.shape
(6,)
>>> v = sliding_window_view(x, 3)
>>> v.shape
(4, 3)
>>> v
array([[0, 1, 2],
[1, 2, 3],
[2, 3, 4],
[3, 4, 5]])
>>> moving_average = v.mean(axis=-1)
>>> moving_average
array([1., 2., 3., 4.])
Note that a sliding window approach is often **not** optimal (see Notes).
"""
xp = array_namespace(x)
window_shape = (tuple(window_shape)
if np.iterable(window_shape)
else (window_shape,))
# # first convert input to array, possibly keeping subclass
# x = xp.asarray(x)
window_shape_array = np.array(window_shape)
if np.any(window_shape_array < 0):
raise ValueError('`window_shape` cannot contain negative values')
if axis is None:
axis = tuple(range(x.ndim))
if len(window_shape) != len(axis):
raise ValueError(f'Since axis is `None`, must provide '
f'window_shape for all dimensions of `x`; '
f'got {len(window_shape)} window_shape elements '
f'and `x.ndim` is {x.ndim}.')
else:
axis = normalize_axis_tuple(axis, x.ndim, allow_duplicate=True)
if len(window_shape) != len(axis):
raise ValueError(f'Must provide matching length window_shape and '
f'axis; got {len(window_shape)} window_shape '
f'elements and {len(axis)} axes elements.')
out_strides = x.strides + tuple(x.strides[ax] for ax in axis)
# note: same axis can be windowed repeatedly
x_shape_trimmed = list(x.shape)
for ax, dim in zip(axis, window_shape):
if x_shape_trimmed[ax] < dim:
raise ValueError(
'window shape cannot be larger than input array shape')
x_shape_trimmed[ax] -= dim - 1
out_shape = tuple(x_shape_trimmed) + window_shape
if is_numpy(xp):
return as_strided(x, strides=out_strides, shape=out_shape,
subok=subok, writeable=writeable)
elif is_cupy(xp) and no_cupy:
raise ModuleNotFoundError("cupy not available")
elif is_cupy(xp):
return as_strided_cp(x, strides=out_strides, shape=out_shape)
else:
try:
return as_strided(x, strides=out_strides, shape=out_shape,
subok=subok, writeable=writeable)
except Exception:
raise NotImplementedError(f"Only numpy and cupy are supported,"
f" not {xp.__name__}")