Source code for ieeg.calc.fast

import numpy as np
from ieeg.calc._fast.ufuncs import mean_diff as _md, t_test as _ttest
from ieeg.calc._fast.mixup import mixupnd as cmixup, normnd as cnorm
from ieeg.calc._fast.permgt import permgtnd as permgt
from ieeg.arrays.api import array_namespace, is_numpy, is_torch, Array, is_cupy
from scipy.stats import rankdata
from functools import partial

__all__ = ["mean_diff", "mixup", "permgt", "norm", "concatenate_arrays",
           "ttest", "brunnermunzel"]


[docs] def brunnermunzel(x: np.ndarray, y: np.ndarray, axis=None, nan_policy='omit'): """ Compute the Brunner-Munzel test statistic for two independent samples. The Brunner-Munzel test is used to compare the stochastic dominance of two independent samples and does not assume equal variances. It is a nonparametric statistical test that operates using ranked data. This implementation allows handling NaN values based on the specified policy. Parameters ---------- x : np.ndarray The first input array representing sample data. y : np.ndarray The second input array representing sample data. axis : int or None, optional The axis along which to compute the test statistic. If None, the arrays are flattened before computation. Default is None. nan_policy : {'propagate', 'raise', 'omit'}, optional Defines how to handle NaN values in the inputs: - 'propagate': Returns NaN in the result if NaN is present in the input - 'raise': Raises an error if NaN is detected in the input. - 'omit': Omits NaN values during the computation. Default is 'omit'. Returns ------- np.ndarray or float The computed Brunner-Munzel statistic, returned as a scalar if the input arrays are 1D and as an array otherwise. """ if axis is None: nx, ny = x.size, y.size idxx = slice(0, nx) idxy = slice(nx, nx+ny) x, y = x.flat, y.flat concat = np.concatenate((x, y), axis=0) else: while axis < 0: axis += x.ndim nx, ny = x.shape[axis], y.shape[axis] idxx = tuple(slice(None) if i != axis else slice(0, nx) for i in range(x.ndim)) idxy = tuple(slice(None) if i != axis else slice(nx, nx+ny) for i in range(x.ndim)) concat = np.concatenate((x, y), axis=axis) where = ~np.isnan(concat) if nan_policy == 'omit': rank = partial(rankdata, nan_policy=nan_policy) wherex, wherey = where[idxx], where[idxy] else: rank = rankdata wherex = wherey = None if np.any(~where) and nan_policy == 'raise': raise ValueError("The input contains NaN.") kwargsx = dict(axis=axis, where=wherex, keepdims=True) kwargsy = dict(axis=axis, where=wherey, keepdims=True) rankc = rank(concat, axis=axis) rankcx, rankcy = rankc[idxx], rankc[idxy] rankcx_mean, rankcy_mean = rankcx.mean(**kwargsx), rankcy.mean(**kwargsy) rankx, ranky = rank(x, axis=axis), rank(y, axis=axis) rankx_mean, ranky_mean = rankx.mean(**kwargsx), ranky.mean(**kwargsy) Sx = np.sum(np.power(rankcx - rankx - rankcx_mean + rankx_mean, 2.0), **kwargsx) / (nx - 1) Sy = np.sum(np.power(rankcy - ranky - rankcy_mean + ranky_mean, 2.0), **kwargsy) / (ny - 1) wbfn = nx * ny * (rankcy_mean - rankcx_mean) wbfn /= (nx + ny) * np.sqrt(nx * Sx + ny * Sy) return np.squeeze(wbfn)
def _compute_mean_and_variance_np(group, axis, ddof=1): """Helper function to compute mean and variance with NaN handling.""" mask = ~np.isnan(group) n = np.sum(mask, axis=axis, keepdims=True) # copy the group to avoid modifying the original data group = np.where(mask, group, 0.) # calculate mean mean = np.sum(group, axis=axis, keepdims=True) / n # calculate variance indices = ''.join(chr(105 + i) for i in range(group.ndim)) formula = f"{indices},{indices}->{indices[:axis]}{indices[axis+1:]}" ind = tuple(slice(None) if i != axis else None for i in range(group.ndim)) variance = np.einsum(formula, group, group)[ind] variance -= np.square(mean) * n variance[n == ddof] = 0 # Set variance to 0 where n == 1 variance[n < ddof] = np.nan # Set variance to NaN where n == 0 variance[n > ddof] /= ((n - ddof) * n)[n > ddof] mean[n == 0] = np.nan return mean, variance
[docs] def ttest(group1: np.ndarray, group2: np.ndarray, axis: int, xp=None) -> np.ndarray: """Calculate the t-statistic between two groups. This function is the default statistic function for time_perm_cluster. It calculates the t-statistic between two groups along the specified axis. Parameters ---------- group1 : array, shape (..., time) The first group of observations. group2 : array, shape (..., time) The second group of observations. axis : int or tuple of ints, optional The axis or axes along which to compute the t-statistic. If None, compute the t-statistic over all axes. Returns ------- t : array The t-statistic between the two groups. Examples -------- >>> import numpy as np >>> group1 = np.array([[1, 1, 1, 1, 1], [0, 60, 0, 10, 0]]) >>> group2 = np.array([[1, 1, 1, 1, 1], [0, 0, 0, 0, 0]]) >>> ttest(group1, group2, 1) array([ nan, 1.2004901]) >>> ttest(group1, group2, 0) array([0. , 1.01680311, 0. , 1.10431526, 0. ]) >>> import cupy as cp # doctest: +SKIP >>> group1 = cp.array([[1, 1, 1, 1, 1], [0, 60, 0, 10, 0]] ... ) # doctest: +SKIP >>> group2 = cp.array([[1, 1, 1, 1, 1], [0, 0, 0, 0, 0]]) # doctest: +SKIP >>> ttest(group1, group2, 1) # doctest: +SKIP array([ nan, 1.2004901]) """ if xp is None: xp = array_namespace(group1, group2) while axis < 0: axis += group1.ndim if is_numpy(xp): # return _ttest(group1, group2, axes=[axis, axis]) with np.errstate(divide='ignore'): mean1, var1 = _compute_mean_and_variance_np(group1, axis) mean2, var2 = _compute_mean_and_variance_np(group2, axis) denom = np.sqrt(var1 + var2) # Handle cases where the denominator is zero denom[denom == 0] = np.nan t_stat = (mean1 - mean2) / denom return np.squeeze(t_stat) elif is_cupy(xp): n1 = xp.sum(~xp.isnan(group1), axis=axis) n2 = xp.sum(~xp.isnan(group2), axis=axis) mean1 = xp.nansum(group1, axis=axis) / n1 mean2 = xp.nansum(group2, axis=axis) / n2 var1 = xp.nanvar(group1, axis=axis, ddof=1) var2 = xp.nanvar(group2, axis=axis, ddof=1) return (mean1 - mean2) / xp.sqrt(var1 / n1 + var2 / n2) else: raise NotImplementedError("T-test is not implemented for this array" " type.")
[docs] def concatenate_arrays(arrays: tuple[np.ndarray, ...], axis: int = 0 ) -> np.ndarray: """Concatenate arrays along a specified axis, filling in empty arrays with nan values. Parameters ---------- arrays A list of arrays to concatenate axis The axis along which to concatenate the arrays Returns ------- result The concatenated arrays Examples -------- >>> concatenate_arrays((np.array([[1, 2, 3]]), np.array([[4, 5]])), axis=0) array([[ 1., 2., 3.], [ 4., 5., nan]]) >>> concatenate_arrays((np.array([1, 2, 3]), np.array([4, 5])), axis=0) array([1., 2., 3., 4., 5.]) >>> arr1 = np.arange(60, dtype=float).reshape(10, 2, 3) >>> arr2 = np.arange(240, dtype=float).reshape(20, 3, 4) >>> concatenate_arrays((arr1, arr2), axis=0)[0] array([[ 0., 1., 2., nan], [ 3., 4., 5., nan], [nan, nan, nan, nan]]) >>> concatenate_arrays((arr2[0], arr1[0]), axis=1) array([[ 0., 1., 2., 3., 0., 1., 2.], [ 4., 5., 6., 7., 3., 4., 5.], [ 8., 9., 10., 11., nan, nan, nan]]) >>> arr = concatenate_arrays((arr1[0], arr2[0]), axis=None) >>> arr array([[[ 0., 1., 2., nan], [ 3., 4., 5., nan], [nan, nan, nan, nan]], <BLANKLINE> [[ 0., 1., 2., 3.], [ 4., 5., 6., 7.], [ 8., 9., 10., 11.]]]) >>> concatenate_arrays((arr2[0].astype('f2'), arr1[0].astype('f2')), ... axis=1) array([[ 0., 1., 2., 3., 0., 1., 2.], [ 4., 5., 6., 7., 3., 4., 5.], [ 8., 9., 10., 11., nan, nan, nan]], dtype=float16) """ if axis is None: axis = 0 arrays = [np.expand_dims(ar, axis) for ar in arrays] arrays = [ar.astype(float) if ar.dtype.kind in 'iu' else ar for ar in arrays if ar.size > 0] if len(arrays) == 0: return np.array([]) while axis < 0: axis += max(a.ndim for a in arrays) max_shape = [max(a.shape[ax] for a in arrays) if ax != axis else sum(a.shape[ax] for a in arrays) for ax in range(arrays[0].ndim)] out = np.full(max_shape, np.nan, dtype=arrays[0].dtype) start = 0 for i, ar in enumerate(arrays): slices = tuple(slice(start, start + ar.shape[ax]) if ax == axis else slice(ar.shape[ax]) for ax in range(ar.ndim)) out[slices] = ar start += ar.shape[axis] return out
def _mixup_np(arr: np.ndarray, obs_axis: int, alpha: float = 1., rng: int = None) -> None: """Oversample by mixing two random non-NaN observations Parameters ---------- arr : array The data to oversample. obs_axis : int The axis along which to apply func. alpha : float The alpha parameter for the beta distribution. If alpha is 0, then the distribution is uniform. If alpha is 1, then the distribution is symmetric. If alpha is greater than 1, then the distribution is skewed towards the first observation. If alpha is less than 1, then the distribution is skewed towards the second observation. Examples -------- >>> arr = np.array([[1, 2], [4, 5], [7, 8], ... [float("nan"), float("nan")]]) >>> _mixup_np(arr, 0, rng=42) >>> arr # doctest: +NORMALIZE_WHITESPACE +SKIP array([[1. , 2. ], [4. , 5. ], [7. , 8. ], [5.24946679, 6.24946679]]) >>> arr2 = np.arange(24, dtype=float).reshape(2, 3, 4) >>> arr2[0, 2, :] = [float("nan")] * 4 >>> _mixup_np(arr2, 1, rng=42) >>> arr2 # doctest: +NORMALIZE_WHITESPACE +SKIP array([[[ 0. , 1. , 2. , 3. ], [ 4. , 5. , 6. , 7. ], [ 2.33404428, 3.33404428, 4.33404428, 5.33404428]], <BLANKLINE> [[12. , 13. , 14. , 15. ], [16. , 17. , 18. , 19. ], [20. , 21. , 22. , 23. ]]]) >>> arr3 = np.arange(24).reshape(3, 2, 4).astype("f2") >>> arr3[0, :, :] = float("nan") >>> _mixup_np(arr3, 0, rng=42) >>> arr3 # doctest: +NORMALIZE_WHITESPACE +SKIP array([[[12.67, 13.67, 14.67, 15.67], [17.31, 18.31, 19.31, 20.31]], <BLANKLINE> [[ 8. , 9. , 10. , 11. ], [12. , 13. , 14. , 15. ]], <BLANKLINE> [[16. , 17. , 18. , 19. ], [20. , 21. , 22. , 23. ]]], dtype=float16) """ if obs_axis == 0: arr = arr.swapaxes(1, obs_axis) if arr.ndim > 3: for i in range(arr.shape[0]): _mixup_np(arr[i], obs_axis - 1, alpha, rng) elif arr.ndim == 1: raise ValueError("Array must have at least 2 dimensions") else: if rng is None: rng = np.random.randint(0, 2 ** 16 - 1) if arr.dtype != np.float64: temp = arr.astype('f8', copy=True) cmixup(temp, 1, alpha, rng) arr[...] = temp else: cmixup(arr, 1, alpha, rng)
[docs] def mixup(arr: Array, obs_axis: int, alpha: float = 1., rng=None) -> None: """Replace rows along the observation axis that are “missing” (i.e. contain any NaNs) with a random convex combination of two non‐missing rows (the “mixup”). This function works for arrays of arbitrary dimension so long as the observation axis (obs_axis) contains the “rows” to mix up and the last axis holds features. In higher dimensions the axes other than obs_axis and the last axis are treated as independent batch indices. (Every such batch is assumed to have at least one non-NaN row.) The mixup coefficient for each missing row is drawn from a beta distribution with parameters (alpha, alpha) and then “flipped” if it is less than 0.5 (so that the coefficient is always >=0.5). Parameters ---------- arr : np.ndarray Array of data. In the 2D case it should have shape (n_obs, n_features). For higher dimensions, the last axis is taken as features and obs_axis (which must not be the last axis) is the observation axis. obs_axis : int The axis along which to look for rows that contain any NaN. alpha : float, default=1. The alpha parameter for the beta distribution. rng : np.random.RandomState or similar, optional A random number generator (if None, one is created using np.random.RandomState()). Returns ------- None; arr is modified in-place. Examples -------- >>> arr = np.array([[1, 2], ... [4, 5], ... [7, 8], ... [float("nan"), float("nan")]]) >>> mixup(arr, 0, rng=42) >>> arr # doctest: +SKIP array([[1. , 2. ], [4. , 5. ], [7. , 8. ], [5.24946679, 6.24946679]]) For a 3D example (here we mix along axis 1): >>> arr3 = np.arange(24, dtype=float).reshape(2, 3, 4) >>> arr3[0, 2, :] = [float("nan")] * 4 >>> mixup(arr3, 1, rng=42) >>> arr3 # doctest: +SKIP array([[[ 0. , 1. , 2. , 3. ], [ 4. , 5. , 6. , 7. ], [ 2.33404428, 3.33404428, 4.33404428, 5.33404428]], <BLANKLINE> [[12. , 13. , 14. , 15. ], [16. , 17. , 18. , 19. ], [20. , 21. , 22. , 23. ]]]) >>> np.random.seed(0) >>> group2 = np.random.rand(500, 10, 10, 100).astype("float16") >>> group2[::2, 0, 0, :] = np.nan >>> mixup(group2, 0) >>> group2[:10, 0, 0, :5] # doctest: +SKIP array([[0.3274 , 0.2805 , 0.1257 , 0.1256 , 0.3027 ], [0.748 , 0.1802 , 0.389 , 0.0376 , 0.01179], [0.6484 , 0.829 , 0.8213 , 0.2578 , 0.5327 ], [0.7583 , 0.5034 , 0.177 , 0.8325 , 0.5166 ], [0.7397 , 0.857 , 0.449 , 0.5913 , 0.714 ], [0.3076 , 0.062 , 0.989 , 0.719 , 0.758 ], [0.571 , 0.176 , 0.679 , 0.6924 , 0.636 ], [0.6323 , 0.07513, 0.722 , 0.4668 , 0.7417 ], [0.6987 , 0.3787 , 0.4668 , 0.04987, 0.915 ], [0.1912 , 0.05853, 0.4368 , 0.72 , 0.824 ]], dtype=float16) >>> import cupy as cp # doctest: +SKIP >>> group3 = cp.random.randn(100, 10, 10, 100) # doctest: +SKIP >>> group3[0::2, 0, 0, :] = float("nan") # doctest: +SKIP >>> mixup(group3, 0) # doctest: +SKIP >>> group3[0, 0, :, :5] # doctest: +SKIP array([[0.3274 , 0.2805 , 0.1257 , 0.1256 , 0.3027 ], [0.748 , 0.1802 , 0.389 , 0.0376 , 0.01179], [0.6484 , 0.829 , 0.8213 , 0.2578 , 0.5327 ], [0.7583 , 0.5034 , 0.177 , 0.8325 , 0.5166 ], [0.7397 , 0.857 , 0.449 , 0.5913 , 0.714 ], [0.3076 , 0.062 , 0.989 , 0.719 , 0.758 ], [0.571 , 0.176 , 0.679 , 0.6924 , 0.636 ], [0.6323 , 0.07513, 0.722 , 0.4668 , 0.7417 ], [0.6987 , 0.3787 , 0.4668 , 0.04987, 0.915 ], [0.1912 , 0.05853, 0.4368 , 0.72 , 0.824 ]], dtype=float16) """ xp = array_namespace(arr) if is_numpy(xp): _mixup_np(arr, obs_axis, alpha, rng) return elif is_torch(xp): # TODO: remove this crutch to keep data on the GPU temp = arr.numpy(force=True).astype(float) _mixup_np(temp, obs_axis, alpha, rng) arr.copy_(xp.from_numpy(temp)) return if rng is None: if is_torch(xp): xp.random.manual_seed(xp.random.seed()) rng = xp xp.beta = xp.distributions.beta.Beta(alpha, alpha) else: rng = xp.random.RandomState() # Bring the observation axis to the front; this is a view. arr_view = xp.moveaxis(arr, obs_axis, 0) # For ndim >= 3, assume that the last axis holds features. # Flatten all intermediate (batch) dimensions into one. n_obs = arr_view.shape[0] n_features = arr_view.shape[-1] # if is_torch(xp) and not arr_view.is_contiguous(): # arr_view = arr_view.contiguous() arr_flat = arr_view.reshape(n_obs, -1, n_features) # Compute a mask over the observation axis for each batch: mask = xp.isnan(arr_view).any(axis=-1).reshape(n_obs, -1) # For each batch (i.e. each column in the flattened batch dimension) we # want to know the available (non-NaN) indices. We do this by sorting the # boolean mask along axis 0: since False sorts before True, the first few # indices are the non-missing ones. order = xp.argsort(mask, axis=0) counts = xp.sum(~mask, axis=0) # number of non-missing rows per batch # Get all indices where the observation is missing. missing_rows, batch_idx = xp.nonzero(mask) if missing_rows.size: L = missing_rows.shape[0] # For each missing observation, generate a random index into the # available (non-missing) rows in its batch. idx1 = xp.astype(rng.rand(L) * counts[batch_idx], int) idx2 = xp.astype(rng.rand(L) * counts[batch_idx], int) donor1 = order[idx1, batch_idx] donor2 = order[idx2, batch_idx] if is_torch(xp): lams = xp.beta.sample((L,)) else: lams = rng.beta(alpha, alpha, size=L) lams = xp.where(lams < 0.5, 1 - lams, lams) # Instead of direct advanced indexing assignment, use index_put_ for # torch: value = ( lams[:, None] * arr_flat[donor1, batch_idx, :] + (1 - lams)[:, None] * arr_flat[donor2, batch_idx, :] ) if is_torch(xp): arr_flat.masked_scatter_(mask[..., None], value) else: arr_flat[missing_rows, batch_idx, :] = value
[docs] def norm(arr: np.ndarray, obs_axis: int = -1) -> None: """Oversample by obtaining the distribution and randomly selecting Parameters ---------- arr : array The data to oversample. obs_axis : int The axis along which to apply func. Examples -------- >>> np.random.seed(0) >>> arr = np.array([1, 2, 4, 5, 7, 8, ... float("nan"), float("nan")]) >>> norm(arr) >>> arr array([1. , 2. , 4. , 5. , 7. , 8. , 8.91013086, 5.50039302]) """ cnorm(arr, obs_axis)
[docs] def mean_diff(group1: Array, group2: Array, axis: int = -1, xp=None) -> np.ndarray[float] | float: """Calculate the mean difference between two groups. This function is the default statistic function for time_perm_cluster. It calculates the mean difference between two groups along the specified axis. Parameters ---------- group1 : array, shape (..., time) The first group of observations. group2 : array, shape (..., time) The second group of observations. axis : int or tuple of ints, optional The axis or axes along which to compute the mean difference. If None, compute the mean difference over all axes. Returns ------- avg1 - avg2 : array or float The mean difference between the two groups. Examples -------- >>> import numpy as np >>> group1 = np.array([[1, 1, 1, 1, 1], [0, 60, 0, 10, 0]]) >>> group2 = np.array([[1, 1, 1, 1, 1], [0, 0, 0, 0, 0]]) >>> mean_diff(group1, group2, axis=1) array([ 0., 14.]) >>> mean_diff(group1, group2, axis=0) array([ 0., 30., 0., 5., 0.]) >>> group3 = np.arange(100000, dtype=float).reshape(20000, 5) >>> mean_diff(group3, group1, axis=0) array([49997., 49968., 49999., 49995., 50001.]) """ if xp is None: xp = array_namespace(group1, group2) if is_numpy(xp): return _md(group1, group2, axes=[axis, axis]) else: return group1.mean(axis=axis) - group2.mean(axis=axis)
if __name__ == "__main__": import numpy as np from timeit import timeit from scipy import stats from ieeg.calc.fast import ttest, _ttest # np.random.seed(0) rng = np.random.default_rng() rvs1 = np.array([ stats.norm.rvs(loc=5, scale=10, size=500, random_state=rng) for _ in range(100)]) / 10000 # group2[::2] = np.nan rvs3 = np.array([ stats.norm.rvs(loc=8, scale=5, size=2000, random_state=rng) for _ in range(100)]) / 10000 res1 = stats.ttest_ind(rvs1, rvs3, axis=1) res2 = stats.ttest_ind(rvs1, rvs3, axis=1, equal_var=False) res3 = ttest(rvs1, rvs3, 1) n = 10000 kwargs = dict(globals=globals(), number=n) time1 = timeit('ttest(rvs1, rvs3, 1)', **kwargs) print(f"ttest: {time1 / n:.3g} per run") # time2 = timeit('stats.ttest_ind(rvs1, rvs3, axis=1, equal_var=False)', # **kwargs) # print(f"scipy: {time2 / n:.3g} per run") time3 = timeit('_ttest(rvs1, rvs3, axes=[1,1])', **kwargs) print(f"_ttest: {time3 / n:.3g} per run")