Source code for ieeg.navigate

import mne
import numpy as np
from bids import BIDSLayout
from mne.utils import fill_doc, verbose
from scipy.signal import detrend
from sklearn.neighbors import LocalOutlierFactor

from ieeg import Doubles, Signal
from ieeg.calc import stats
from ieeg.io import update
from ieeg.timefreq.utils import to_samples


[docs] def crop_empty_data(raw: mne.io.Raw, bound: str = 'boundary', start_pad: str = "10s", end_pad: str = "10s" ) -> mne.io.Raw: """Crops out long stretches of data with no events. Takes raw instance with annotated events and crops the instance so that the raw file starts at start_pad before the first event and stops an amount of time in seconds given by end_pad after the last event. Parameters ---------- raw : mne.io.Raw The raw file to crop. bound : str, optional The annotation description to use as a boundary, by default 'boundary' start_pad : str, optional The amount of time to pad the start of the file, by default "10s" end_pad : str, optional The amount of time to pad the end of the file, by default "10s" Returns ------- mne.io.Raw The cropped raw file. Examples -------- >>> import mne >>> from ieeg.io import raw_from_layout >>> bids_root = mne.datasets.epilepsy_ecog.data_path(verbose=False) >>> layout = BIDSLayout(bids_root) >>> raw = raw_from_layout(layout, subject="pt1", preload=True, ... extension=".vhdr", verbose=False) Reading 0 ... 269079 = 0.000 ... 269.079 secs... >>> cropped = crop_empty_data(raw, 'onset') >>> cropped.times[0], cropped.times[-1] (0.0, 104.94) """ crop_list = [] start_pad = to_samples(start_pad, raw.info['sfreq']) / raw.info['sfreq'] end_pad = to_samples(end_pad, raw.info['sfreq']) / raw.info['sfreq'] # split annotations into blocks annot = raw.annotations.copy() block_idx = [idx + 1 for idx, val in enumerate(annot) if bound in val['description']] block_annot = [annot[i: j] for i, j in zip([0] + block_idx, block_idx + ([len(annot)] if block_idx[-1] != len(annot) else []))] for block_an in block_annot: # remove boundary events from annotations no_bound = None for an in block_an: if bound not in an['description']: if no_bound is None: no_bound = mne.Annotations(**an) else: an.pop('orig_time') no_bound.append(**an) # Skip if block is all boundary events if no_bound is None: continue # get start and stop time from raw.annotations onset attribute t_min = max(0, no_bound.onset[0] - start_pad) t_max = no_bound.onset[-1] + end_pad # create new cropped raw file crop_list.append(raw.copy().crop(tmin=t_min, tmax=t_max)) return mne.concatenate_raws(crop_list)
[docs] @fill_doc @verbose def channel_outlier_marker(input_raw: Signal, outlier_sd: float = 3, max_rounds: int = np.inf, axis: int = 0, save: bool = False, verbose: bool = True ) -> list[str]: """Identify bad channels by variance. Parameters ---------- input_raw : Signal Raw data to be analyzed. outlier_sd : int, optional Number of standard deviations above the mean to be considered an outlier, by default 3 max_rounds : int, optional Maximum number of variance estimations, by default runs until no more bad channels are found. axis : int, optional Axis to calculate variance over, by default 0 save : bool, optional Whether to save bad channels to raw.info['bads'], by default False %(verbose)s Returns ------- list[str] List of bad channel names. Examples -------- >>> import mne >>> from ieeg.io import raw_from_layout >>> bids_root = mne.datasets.epilepsy_ecog.data_path(verbose=False) >>> layout = BIDSLayout(bids_root) >>> raw = raw_from_layout(layout, subject="pt1", preload=True, ... extension=".vhdr", verbose=False) Reading 0 ... 269079 = 0.000 ... 269.079 secs... >>> bads = channel_outlier_marker(raw, 3, 2) outlier round 1 channels: ['AST2'] outlier round 1 channels: ['AST2', 'RQ2'] outlier round 1 channels: ['AST2', 'RQ2', 'N/A'] outlier round 2 channels: ['AST2', 'RQ2', 'N/A', 'G32'] outlier round 2 channels: ['AST2', 'RQ2', 'N/A', 'G32', 'AD3'] outlier round 2 channels: ['AST2', 'RQ2', 'N/A', 'G32', 'AD3', 'PD4'] """ names = input_raw.copy().pick('data').ch_names data = detrend(input_raw.get_data('data')) # channels X time bads = [] # output for bad channel names desc = [] # output for bad channel descriptions # Pop out names to bads output using comprehension list for ind, i in stats.outlier_repeat(data, outlier_sd, max_rounds, axis): bads.append(names[ind]) desc.append(f'outlier round {i} more than {outlier_sd} SDs above mean') # log channels excluded per round if verbose: mne.utils.logger.info(f'outlier round {i} channels: {bads}') if save: if not hasattr(input_raw, 'filenames'): raise ValueError("Raw instance must have filenames attribute to " "save bad channels") for file in input_raw.filenames: update(file, bads, desc) return bads
[docs] def find_bad_channels_lof( raw, *, picks=None, metric="seuclidean", threshold=1.5, return_scores=False, **kwargs ): """Find bad channels using Local Outlier Factor (LOF) algorithm. Parameters ---------- raw : instance of Raw Raw data to process. n_neighbors : int Number of neighbors defining the local neighborhood (default is 20). Smaller values will lead to higher LOF scores. %(picks_good_data)s metric : str Metric to use for distance computation. Default is “euclidean”, see :func:`sklearn.metrics.pairwise.distance_metrics` for details. threshold : float Threshold to define outliers. Theoretical threshold ranges anywhere between 1.0 and any positive integer. Default: 1.5 It is recommended to consider this as an hyperparameter to optimize. return_scores : bool If ``True``, return a dictionary with LOF scores for each evaluated channel. Default is ``False``. %(verbose)s Returns ------- noisy_chs : list List of bad M/EEG channels that were automatically detected. scores : ndarray, shape (n_picks,) Only returned when ``return_scores`` is ``True``. It contains the LOF outlier score for each channel in ``picks``. See Also -------- maxwell_filter annotate_amplitude Notes ----- See :footcite:`KumaravelEtAl2022` and :footcite:`BreunigEtAl2000` for background on choosing ``threshold``. .. versionadded:: 1.7 References ---------- .. footbibliography:: """ # noqa: E501 if metric == "seuclidean": kwargs.setdefault("metric_params", {"V": np.var(raw.get_data(), axis=0)}) # Get the channel types picks = list(range(len(raw.ch_names))) if picks is None else picks ch_names = [raw.ch_names[pick] for pick in picks] data = raw.get_data(picks=picks) clf = LocalOutlierFactor(n_neighbors=len(raw.ch_names) // 5, metric=metric, **kwargs) bad_channel_indices = picks clf.fit_predict(data) scores_lof = clf.negative_outlier_factor_ while len(bad_channel_indices) / len(picks) > 0.2: bad_channel_indices = [ i for i, v in enumerate(np.abs(scores_lof)) if v >= threshold ] threshold += 1 bads = [ch_names[idx] for idx in bad_channel_indices] if return_scores: return bads, scores_lof else: return bads
[docs] @verbose def outliers_to_nan(trials: mne.epochs.BaseEpochs, outliers: float, copy: bool = False, picks: list = 'data', deviation: callable = np.nanstd, center: callable = np.nanmean, tmin: int | float = None, tmax: int | float = None, verbose=None ) -> mne.epochs.BaseEpochs: """Set outliers to nan. Parameters ---------- trials : mne.epochs.BaseEpochs The trials to remove outliers from. outliers : float The number of deviations above the mean to be considered an outlier. copy : bool, optional Whether to copy the data, by default False picks : list, optional The channels to remove outliers from, by default 'data' deviation: callable, optional Metric function to determine the deviation from the center. Default is median absolute deviation. center : callable, optional Metric function to determine the center of the data. Default is median. Returns ------- mne.epochs.BaseEpochs The trials with outliers set to nan. Examples -------- >>> import mne >>> from ieeg.io import raw_from_layout >>> bids_root = mne.datasets.epilepsy_ecog.data_path(verbose=50) >>> layout = BIDSLayout(bids_root) >>> raw = raw_from_layout(layout, subject="pt1", preload=True, ... extension=".vhdr", verbose=False) Reading 0 ... 269079 = 0.000 ... 269.079 secs... >>> epochs = trial_ieeg(raw, ['AD1-4, ATT1,2', 'AST1,3', 'G16', 'PD'], ... (-1, 2), preload=True, verbose=False) >>> outliers_to_nan(epochs, 1, True, [0], verbose=False, ... ).get_data()[1] # doctest: +ELLIPSIS array([[ nan, nan, nan, ..., nan, nan, nan], [-4.63276969e-04, -4.67964469e-04, -4.72261344e-04, ..., 1.41019078e-04, 1.22269102e-04, 9.92222578e-05], [-2.84374563e-04, -3.03515188e-04, -3.08593313e-04, ..., 9.57034922e-05, 5.19535000e-05, 1.40628818e-05], ..., [-4.69516375e-04, -5.09750688e-04, -5.69906813e-04, ..., 3.45716687e-04, 3.10951125e-04, 3.25794844e-04], [-1.67187703e-04, -1.95703313e-04, -2.23047047e-04, ..., -2.52734531e-04, -2.89062656e-04, -2.57422031e-04], [-1.98796781e-04, -2.79265281e-04, -3.31218250e-04, ..., -2.73129219e-05, -1.52703172e-04, -2.52702875e-04]]) >>> outliers_to_nan(epochs, .1, verbose=False, copy=True, ... deviation=None).get_data()[0] # doctest: +SKIP """ if copy: trials = trials.copy() picks = mne.io.pick._picks_to_idx(trials.info, picks) if isinstance(trials, mne.time_frequency.BaseTFR): data = trials.get_data(picks, tmin=tmin, tmax=tmax) out_data = trials.get_data(picks) else: if not isinstance(trials, mne.epochs.EpochsArray): trials.load_data() data = trials.get_data(picks, tmin=tmin, tmax=tmax, verbose=verbose, copy=False) out_data = trials.get_data(picks, verbose=False, copy=False) # bool array of where to keep data trials X channels # if deviation is None or center is None: # keep = stats.find_outliers_lof(data, outliers) # else: keep = stats.find_outliers(data, outliers, deviation, center) # set outliers to nan if not keep data = np.where(keep[..., None], out_data, np.nan) trials._data[:, picks] = data return trials
[docs] @fill_doc @verbose def trial_ieeg(raw: mne.io.Raw, event: str | list[str, ...], times: Doubles, verbose=None, **kwargs) -> mne.Epochs: """Epochs data from a mne Raw iEEG instance. Takes a mne Raw instance and epochs the data around a specified event. If baseline is specified, the data is also epoched around the baseline event and the baseline is subtracted from the data epochs. Parameters ---------- raw : mne.io.Raw The raw data to epoch. event : str The event to epoch around. times : tuple[float, float] The time window to epoch around the event. %(picks_all)s %(reject_epochs)s %(flat)s %(decim)s %(epochs_reject_tmin_tmax)s %(detrend_epochs)s %(proj_epochs)s %(on_missing_epochs)s %(verbose)s Returns ------- mne.Epochs The epoched data. Examples -------- >>> import mne >>> from ieeg.io import raw_from_layout >>> bids_root = mne.datasets.epilepsy_ecog.data_path(verbose=False) >>> layout = BIDSLayout(bids_root) >>> raw = raw_from_layout(layout, subject="pt1", preload=True, ... extension=".vhdr", verbose=False) Reading 0 ... 269079 = 0.000 ... 269.079 secs... >>> epochs = trial_ieeg(raw, "AD1-4, ATT1,2", (-1, 2), verbose=True ... ) # doctest: +ELLIPSIS Used Annotations descriptions: ['AD1-4, ATT1,2', 'AST1,3', 'G16', 'PD',... Not setting metadata 1 matching events found No baseline correction applied 0 projection items activated >>> epochs = trial_ieeg(raw, ['AST1,3', 'G16'], (-1, 2), verbose=True ... ) # doctest: +ELLIPSIS Used Annotations descriptions: ['AD1-4, ATT1,2', 'AST1,3', 'G16', 'PD', ... Not setting metadata 2 matching events found No baseline correction applied 0 projection items activated """ # determine the events events, ids = mne.events_from_annotations(raw) dat_ids = [ids[i] for i in mne.event.match_event_names(ids, event)] rev = {k: v for k, v in ids.items() if v in dat_ids} # epoch the data return mne.Epochs(raw, events, event_id=rev, tmin=times[0], tmax=times[1], baseline=None, verbose=verbose, **kwargs)
if __name__ == "__main__": from os import path from ieeg.io import raw_from_layout # %% Set up logging log_filename = "output.log" # op.join(LAB_root, "Aaron_test", "Information.log") mne.set_log_file(log_filename, "%(levelname)s: %(message)s - %(asctime)s", overwrite=True) mne.set_log_level("INFO") HOME = path.expanduser("~") LAB_root = path.join(HOME, "Box", "CoganLab") TASK = "SentenceRep" sub_num = 29 subj = "D" + str(sub_num).zfill(4) # layout, raw, D_dat_raw, D_dat_filt = get_data(sub_num, TASK) bids_root = LAB_root + "/BIDS-1.4_SentenceRep/BIDS" layout = BIDSLayout(bids_root, derivatives=True) filt = raw_from_layout(layout.derivatives['clean'], subject=subj, extension='.edf', desc='clean', preload=True) raw = raw_from_layout(layout, subject=subj, extension='.edf', desc=None, preload=True) events, event_id = mne.events_from_annotations(filt) auds = mne.Epochs(filt, events, event_id['Audio'], baseline=None, tmin=-2, tmax=5, preload=True, detrend=1) # bads = channel_outlier_marker(auds) # auds.info['bads'] = bads