# %% IMPORT PACKAGES
import numpy as np
from numpy.linalg import inv as inv # Used in kalman filter
from scipy.spatial.distance import pdist
from scipy.spatial.distance import squareform
from scipy.stats import norm
from scipy.spatial.distance import cdist
import math
from sklearn.pipeline import Pipeline
from sklearn import linear_model # For Wiener Filter and Wiener Cascade
from sklearn.svm import SVR # For support vector regression (SVR)
from sklearn.svm import SVC # For support vector classification (SVM)
from sklearn.decomposition import PCA # For PCA decomposition (PCA - LDA)
from sklearn import \
discriminant_analysis as da # For LDA decomposition (PCA - LDA)
from sklearn.base import BaseEstimator
from sklearn.metrics import accuracy_score
# Used for naive bayes decoder
try:
import statsmodels.api as sm
except ImportError:
print(
"\nWARNING: statsmodels is not installed. You will be unable to use "
"the Naive Bayes Decoder")
pass
try:
from sklearnex import patch_sklearn
# The names match scikit-learn estimators
patch_sklearn(["PCA"])
except ImportError:
print(
"\nWARNING: sklearnex is not installed. You will be unable to use the"
"PCA decoder acceleration")
pass
# Import XGBoost if the package is installed
try:
import xgboost as xgb # For xgboost
except ImportError:
print(
"\nWARNING: Xgboost package is not installed. You will be unable to"
"use the xgboost decoder")
pass
# Import functions for Keras if Keras is installed
# Note that Keras has many more built-in functions that I have not imported
# because I have not used them but if you want to modify the decoders with
# other functions (e.g. regularization), import them here
try:
import keras
keras_v1 = int(keras.__version__[0]) <= 1
from keras.models import Sequential
from keras.layers import Dense, LSTM, SimpleRNN, GRU, Activation, Dropout
from keras.utils import np_utils
except ImportError:
print(
"\nWARNING: Keras package is not installed. You will be unable to use"
"all neural net decoders")
pass
# %% DECODER FUNCTIONS
# %% WIENER FILTER
[docs]
class WienerFilterRegression(object):
"""Class for the Wiener Filter Decoder
There are no parameters to set.
This simply leverages the scikit-learn linear regression.
"""
def __init__(self):
return
[docs]
def fit(self, X_flat_train, y_train):
"""Train Wiener Filter Decoder
Parameters
----------
X_flat_train: numpy 2d array of shape [n_samples,n_features]
This is the neural data.
See example file for an example of how to format the neural data
correctly
y_train: numpy 2d array of shape [n_samples, n_outputs]
This is the outputs that are being predicted
"""
# Initialize linear regression model
self.model = linear_model.LinearRegression()
# Train the model
self.model.fit(X_flat_train, y_train)
[docs]
def predict(self, X_flat_test):
"""Predict outcomes using trained Wiener Cascade Decoder
Parameters
----------
X_flat_test: numpy 2d array of shape [n_samples,n_features]
This is the neural data being used to predict outputs.
Returns
-------
y_test_predicted: numpy 2d array of shape [n_samples,n_outputs]
The predicted outputs
"""
y_test_predicted = self.model.predict(X_flat_test) # Make predictions
return y_test_predicted
# %% WIENER CASCADE
[docs]
class WienerCascadeRegression(object):
"""Class for the Wiener Cascade Decoder
Parameters
----------
degree: integer, optional, default 3
The degree of the polynomial used for the static nonlinearity
"""
def __init__(self, degree=3):
self.degree = degree
[docs]
def fit(self, X_flat_train, y_train):
"""Train Wiener Cascade Decoder
Parameters
----------
X_flat_train: numpy 2d array of shape [n_samples,n_features]
This is the neural data.
See example file for an example of how to format the neural data
correctly
y_train: numpy 2d array of shape [n_samples, n_outputs]
This is the outputs that are being predicted
"""
num_outputs = y_train.shape[1] # Number of outputs
models = [] # Initialize list of models (there will be a separate
# model for each output)
for i in range(num_outputs): # Loop through outputs
# Fit linear portion of model
regr = linear_model.LinearRegression() # Call the linear portion
# of the model "regr"
regr.fit(X_flat_train, y_train[:, i]) # Fit linear
y_train_predicted_linear = regr.predict(
X_flat_train) # Get outputs of linear portion of model
# Fit nonlinear portion of model
p = np.polyfit(y_train_predicted_linear, y_train[:, i],
self.degree)
# Add model for this output (both linear and nonlinear parts)
# to the list "models"
models.append([regr, p])
self.model = models
[docs]
def predict(self, X_flat_test):
"""Predict outcomes using trained Wiener Cascade Decoder
Parameters
----------
X_flat_test: numpy 2d array of shape [n_samples,n_features]
This is the neural data being used to predict outputs.
Returns
-------
y_test_predicted: numpy 2d array of shape [n_samples,n_outputs]
The predicted outputs
"""
num_outputs = len(
self.model) # Number of outputs being predicted. Recall from the
# "fit" function that self.model is a list of models
y_test_predicted = np.empty([X_flat_test.shape[0],
num_outputs]) # Initialize matrix that
# contains predicted outputs
for i in range(num_outputs): # Loop through outputs
[regr, p] = self.model[
i] # Get the linear (regr) and nonlinear (p) portions of the
# trained model
# Predictions on test set
y_test_predicted_linear = regr.predict(
X_flat_test) # Get predictions on the linear portion of
# the model
y_test_predicted[:, i] = np.polyval(p,
y_test_predicted_linear)
# Run the linear predictions through the nonlinearity to get
# the final predictions
return y_test_predicted
# %% KALMAN FILTER
[docs]
class KalmanFilterRegression(object):
"""Class for the Kalman Filter Decoder
Parameters
-----------
C - float, optional, default 1
This parameter scales the noise matrix associated with the transition in
kinematic states. It effectively allows changing the weight of the new
neural evidence in the current update.
Our implementation of the Kalman filter for neural decoding is based on
that of Wu et al 2003 (https://papers.nips.cc/paper/2178-neural-decoding-of
-cursor-motion-using-a-kalman-filter.pdf) with the exception of the
addition of the parameter C. The original implementation has previously
been coded in Matlab by Dan Morris
(http://dmorris.net/projects/neural_decoding.html#code)
"""
def __init__(self, C=1):
self.C = C
[docs]
def fit(self, X_kf_train, y_train):
"""Train Kalman Filter Decoder
Parameters
----------
X_kf_train: numpy 2d array of shape [n_samples(i.e. timebins) ,
n_neurons]
This is the neural data in Kalman filter format.
See example file for an example of how to format the neural data
correctly
y_train: numpy 2d array of shape [n_samples(i.e. timebins), n_outputs]
This is the outputs that are being predicted
"""
# First we'll rename and reformat the variables to be in a more
# standard kalman filter nomenclature (specifically that from Wu et
# al, 2003):
# xs are the state (here, the variable we're predicting, i.e. y_train)
# zs are the observed variable (neural data here, i.e. X_kf_train)
X = np.matrix(y_train.T)
Z = np.matrix(X_kf_train.T)
# number of time bins
nt = X.shape[1]
# Calculate the transition matrix (from x_t to x_t+1) using
# least-squares, and compute its covariance
# In our case, this is the transition from one kinematic state to
# the next
X2 = X[:, 1:]
X1 = X[:, 0:nt - 1]
A = X2 * X1.T * inv(X1 * X1.T) # Transition matrix
W = (X2 - A * X1) * (X2 - A * X1).T / (
nt - 1) / self.C # Covariance of transition matrix. Note we
# divide by nt-1 since only nt-1 points were used in the computation
# (that's the length of X1 and X2). We also introduce the extra
# parameter C here.
# Calculate the measurement matrix (from x_t to z_t) using
# least-squares, and compute its covariance
# In our case, this is the transformation from kinematics to spikes
H = Z * X.T * (inv(X * X.T)) # Measurement matrix
Q = ((Z - H * X) * (
(Z - H * X).T)) / nt # Covariance of measurement matrix
params = [A, W, H, Q]
self.model = params
[docs]
def predict(self, X_kf_test, y_test):
"""Predict outcomes using trained Kalman Filter Decoder
Parameters
----------
X_kf_test: numpy 2d array of shape [n_samples(i.e. timebins) ,
n_neurons]
This is the neural data in Kalman filter format.
y_test: numpy 2d array of shape [n_samples(i.e. timebins),n_outputs]
The actual outputs
This parameter is necesary for the Kalman filter (unlike other
decoders)
because the first value is nececessary for initialization
Returns
-------
y_test_predicted: numpy 2d array of shape [n_samples(i.e. timebins),
n_outputs]
The predicted outputs
"""
# Extract parameters
A, W, H, Q = self.model
# First we'll rename and reformat the variables to be in a more
# standard kalman filter nomenclature (specifically that from Wu et
# al):
# xs are the state (here, the variable we're predicting, i.e. y_train)
# zs are the observed variable (neural data here, i.e. X_kf_train)
X = np.matrix(y_test.T)
Z = np.matrix(X_kf_test.T)
# Initializations
num_states = X.shape[0] # Dimensionality of the state
states = np.empty(
X.shape) # Keep track of states over time (states is what will
# be returned as y_test_predicted)
P_m = np.matrix(np.zeros([num_states, num_states]))
P = np.matrix(np.zeros([num_states, num_states]))
state = X[:, 0] # Initial state
states[:, 0] = np.copy(np.squeeze(state))
# Get predicted state for every time bin
for t in range(X.shape[1] - 1):
# Do first part of state update - based on transition matrix
P_m = A * P * A.T + W
state_m = A * state
# Do second part of state update - based on measurement matrix
K = P_m * H.T * inv(H * P_m * H.T + Q) # Calculate Kalman gain
P = (np.matrix(np.eye(num_states)) - K * H) * P_m
state = state_m + K * (Z[:, t + 1] - H * state_m)
states[:, t + 1] = np.squeeze(
state) # Record state at the timestep
y_test_predicted = states.T
return y_test_predicted
# %% DENSE (FULLY-CONNECTED) NEURAL NETWORK
[docs]
class DenseNNRegression(object):
"""Class for the dense (fully-connected) neural network decoder
Parameters
----------
units: integer or vector of integers, optional, default 400
This is the number of hidden units in each layer
If you want a single layer, input an integer (e.g. units=400 will give
you a single hidden layer with 400 units) If you want multiple layers,
input a vector (e.g. units=[400,200]) will give you 2 hidden layers
with 400 and 200 units, repsectively.
The vector can either be a list or an array
dropout: decimal, optional, default 0
Proportion of units that get dropped out
num_epochs: integer, optional, default 10
Number of epochs used for training
verbose: binary, optional, default=0
Whether to show progress of the fit after each epoch
"""
def __init__(self, units=400, dropout=0, num_epochs=10, verbose=0):
self.dropout = dropout
self.num_epochs = num_epochs
self.verbose = verbose
# If "units" is an integer, put it in the form of a vector
try: # Check if it's a vector
units[0]
except IndexError:
# If it's not a vector, create a vector of the number of
# units for each layer
units = [units]
self.units = units
# Determine the number of hidden layers (based on "units" that the
# user entered)
self.num_layers = len(units)
[docs]
def fit(self, X_flat_train, y_train):
"""Train DenseNN Decoder
Parameters
----------
X_flat_train: numpy 2d array of shape [n_samples,n_features]
This is the neural data.
See example file for an example of how to format the neural data
correctly
y_train: numpy 2d array of shape [n_samples, n_outputs]
This is the outputs that are being predicted
"""
model = Sequential() # Declare model
# Add first hidden layer
model.add(Dense(self.units[0],
input_dim=X_flat_train.shape[1])) # Add dense layer
model.add(Activation('relu')) # Add nonlinear (tanh) activation
# if self.dropout!=0:
if self.dropout != 0:
# Dropout some units if proportion of dropout != 0
model.add(Dropout(self.dropout))
# Add any additional hidden layers (beyond the 1st)
for layer in range(
self.num_layers - 1): # Loop through additional layers
model.add(Dense(self.units[layer + 1])) # Add dense layer
model.add(Activation('relu')) # Add nonlinear (tanh) activation
if self.dropout != 0:
# Dropout some units if proportion of dropout != 0
model.add(Dropout(self.dropout))
# Add dense connections to all outputs
model.add(Dense(
y_train.shape[1])) # Add final dense layer (connected to outputs)
# Fit model (and set fitting parameters)
model.compile(loss='mse', optimizer='adam',
metrics=['accuracy']) # Set loss function and optimizer
if keras_v1:
model.fit(X_flat_train, y_train, nb_epoch=self.num_epochs,
verbose=self.verbose) # Fit the model
else:
model.fit(X_flat_train, y_train, epochs=self.num_epochs,
verbose=self.verbose) # Fit the model
self.model = model
[docs]
def predict(self, X_flat_test):
"""Predict outcomes using trained DenseNN Decoder
Parameters
----------
X_flat_test: numpy 2d array of shape [n_samples,n_features]
This is the neural data being used to predict outputs.
Returns
-------
y_test_predicted: numpy 2d array of shape [n_samples,n_outputs]
The predicted outputs
"""
y_test_predicted = self.model.predict(X_flat_test) # Make predictions
return y_test_predicted
# %% SIMPLE RECURRENT NEURAL NETWORK
[docs]
class SimpleRNNRegression(object):
"""Class for the simple recurrent neural network decoder
Parameters
----------
units: integer, optional, default 400
Number of hidden units in each layer
dropout: decimal, optional, default 0
Proportion of units that get dropped out
num_epochs: integer, optional, default 10
Number of epochs used for training
verbose: binary, optional, default=0
Whether to show progress of the fit after each epoch
"""
def __init__(self, units=400, dropout=0, num_epochs=10, verbose=0):
self.units = units
self.dropout = dropout
self.num_epochs = num_epochs
self.verbose = verbose
[docs]
def fit(self, X_train, y_train):
"""Train SimpleRNN Decoder
Parameters
----------
X_train: numpy 3d array of shape [n_samples,n_time_bins,n_neurons]
This is the neural data.
See example file for an example of how to format the neural data
correctly
y_train: numpy 2d array of shape [n_samples, n_outputs]
This is the outputs that are being predicted
"""
model = Sequential() # Declare model
# Add recurrent layer
if keras_v1:
model.add(SimpleRNN(self.units, input_shape=(
X_train.shape[1], X_train.shape[2]), dropout_W=self.dropout,
dropout_U=self.dropout,
activation='relu')) # Within recurrent
# layer, include dropout
else:
model.add(SimpleRNN(self.units, input_shape=(
X_train.shape[1], X_train.shape[2]), dropout=self.dropout,
recurrent_dropout=self.dropout,
activation='relu')) # Within recurrent
# layer, include dropout
if self.dropout != 0:
# Dropout some units (recurrent layer output units)
model.add(Dropout(self.dropout))
# Add dense connections to output layer
model.add(Dense(y_train.shape[1]))
# Fit model (and set fitting parameters)
model.compile(loss='mse', optimizer='rmsprop',
metrics=['accuracy']) # Set loss function and optimizer
if keras_v1:
model.fit(X_train, y_train, nb_epoch=self.num_epochs,
verbose=self.verbose) # Fit the model
else:
model.fit(X_train, y_train, epochs=self.num_epochs,
verbose=self.verbose) # Fit the model
self.model = model
[docs]
def predict(self, X_test):
"""Predict outcomes using trained SimpleRNN Decoder
Parameters
----------
X_test: numpy 3d array of shape [n_samples,n_time_bins,n_neurons]
This is the neural data being used to predict outputs.
Returns
-------
y_test_predicted: numpy 2d array of shape [n_samples,n_outputs]
The predicted outputs
"""
y_test_predicted = self.model.predict(X_test) # Make predictions
return y_test_predicted
# %% GATED RECURRENT UNIT (GRU) DECODER
[docs]
class GRURegression(object):
"""Class for the gated recurrent unit (GRU) decoder
Parameters
----------
units: integer, optional, default 400
Number of hidden units in each layer
dropout: decimal, optional, default 0
Proportion of units that get dropped out
num_epochs: integer, optional, default 10
Number of epochs used for training
verbose: binary, optional, default=0
Whether to show progress of the fit after each epoch
"""
def __init__(self, units=400, dropout=0, num_epochs=10, verbose=0):
self.units = units
self.dropout = dropout
self.num_epochs = num_epochs
self.verbose = verbose
[docs]
def fit(self, X_train, y_train):
"""Train GRU Decoder
Parameters
----------
X_train: numpy 3d array of shape [n_samples,n_time_bins,n_neurons]
This is the neural data.
See example file for an example of how to format the neural data
correctly
y_train: numpy 2d array of shape [n_samples, n_outputs]
This is the outputs that are being predicted
"""
model = Sequential() # Declare model
# Add recurrent layer
if keras_v1:
model.add(GRU(self.units,
input_shape=(X_train.shape[1], X_train.shape[2]),
dropout_W=self.dropout,
dropout_U=self.dropout)) # Within recurrent layer,
# include dropout
else:
model.add(GRU(self.units,
input_shape=(X_train.shape[1], X_train.shape[2]),
dropout=self.dropout,
recurrent_dropout=self.dropout))
if self.dropout != 0:
# Dropout some units (recurrent layer output units)
model.add(Dropout(self.dropout))
# Add dense connections to output layer
model.add(Dense(y_train.shape[1]))
# Fit model (and set fitting parameters)
model.compile(loss='mse', optimizer='rmsprop',
metrics=['accuracy']) # Set loss function and optimizer
if keras_v1:
model.fit(X_train, y_train, nb_epoch=self.num_epochs,
verbose=self.verbose) # Fit the model
else:
model.fit(X_train, y_train, epochs=self.num_epochs,
verbose=self.verbose) # Fit the model
self.model = model
[docs]
def predict(self, X_test):
"""Predict outcomes using trained GRU Decoder
Parameters
----------
X_test: numpy 3d array of shape [n_samples,n_time_bins,n_neurons]
This is the neural data being used to predict outputs.
Returns
-------
y_test_predicted: numpy 2d array of shape [n_samples,n_outputs]
The predicted outputs
"""
y_test_predicted = self.model.predict(X_test) # Make predictions
return y_test_predicted
# %% LONG SHORT TERM MEMORY (LSTM) DECODER
[docs]
class LSTMRegression(object):
"""Class for the gated recurrent unit (GRU) decoder
Parameters
----------
units: integer, optional, default 400
Number of hidden units in each layer
dropout: decimal, optional, default 0
Proportion of units that get dropped out
num_epochs: integer, optional, default 10
Number of epochs used for training
verbose: binary, optional, default=0
Whether to show progress of the fit after each epoch
"""
def __init__(self, units=400, dropout=0, num_epochs=10, verbose=0):
self.units = units
self.dropout = dropout
self.num_epochs = num_epochs
self.verbose = verbose
[docs]
def fit(self, X_train, y_train):
"""Train LSTM Decoder
Parameters
----------
X_train: numpy 3d array of shape [n_samples,n_time_bins,n_neurons]
This is the neural data.
See example file for an example of how to format the neural data
correctly
y_train: numpy 2d array of shape [n_samples, n_outputs]
This is the outputs that are being predicted
"""
model = Sequential() # Declare model
# Add recurrent layer
if keras_v1:
model.add(LSTM(self.units,
input_shape=(X_train.shape[1], X_train.shape[2]),
dropout_W=self.dropout,
dropout_U=self.dropout)) # Within recurrent layer,
# include dropout
else:
model.add(LSTM(self.units,
input_shape=(X_train.shape[1], X_train.shape[2]),
dropout=self.dropout,
recurrent_dropout=self.dropout)) # Within recurrent
# layer, include dropout
if self.dropout != 0:
# Dropout some units (recurrent layer output units)
model.add(Dropout(self.dropout))
# Add dense connections to output layer
model.add(Dense(y_train.shape[1]))
# Fit model (and set fitting parameters)
model.compile(loss='mse', optimizer='rmsprop',
metrics=['accuracy']) # Set loss function and optimizer
if keras_v1:
model.fit(X_train, y_train, nb_epoch=self.num_epochs,
verbose=self.verbose) # Fit the model
else:
model.fit(X_train, y_train, epochs=self.num_epochs,
verbose=self.verbose) # Fit the model
self.model = model
[docs]
def predict(self, X_test):
"""Predict outcomes using trained LSTM Decoder
Parameters
----------
X_test: numpy 3d array of shape [n_samples,n_time_bins,n_neurons]
This is the neural data being used to predict outputs.
Returns
-------
y_test_predicted: numpy 2d array of shape [n_samples,n_outputs]
The predicted outputs
"""
y_test_predicted = self.model.predict(X_test) # Make predictions
return y_test_predicted
# %% EXTREME GRADIENT BOOSTING (XGBOOST)
[docs]
class XGBoostRegression(object):
"""Class for the XGBoost Decoder
Parameters
----------
max_depth: integer, optional, default=3
the maximum depth of the trees
num_round: integer, optional, default=300
the number of trees that are fit
eta: float, optional, default=0.3
the learning rate
gpu: integer, optional, default=-1
if the gpu version of xgboost is installed, this can be used to select
which gpu to use
for negative values (default), the gpu is not used
"""
def __init__(self, max_depth=3, num_round=300, eta=0.3, gpu=-1):
self.max_depth = max_depth
self.num_round = num_round
self.eta = eta
self.gpu = gpu
[docs]
def fit(self, X_flat_train, y_train):
"""Train XGBoost Decoder
Parameters
----------
X_flat_train: numpy 2d array of shape [n_samples,n_features]
This is the neural data.
See example file for an example of how to format the neural data
correctly
y_train: numpy 2d array of shape [n_samples, n_outputs]
This is the outputs that are being predicted
"""
num_outputs = y_train.shape[1] # Number of outputs
# Set parameters for XGBoost
param = {'objective': "reg:linear", # for linear output
'eval_metric': "logloss", # loglikelihood loss
'max_depth': self.max_depth,
# this is the only parameter we have set, it's one of the way
# or regularizing
'eta': self.eta,
'seed': 2925, # for reproducibility
'silent': 1}
if self.gpu < 0:
param['nthread'] = -1 # with -1 it will use all available threads
else:
param['gpu_id'] = self.gpu
param['updater'] = 'grow_gpu'
models = [] # Initialize list of models (there will be a separate
# model for each output)
for y_idx in range(num_outputs): # Loop through outputs
dtrain = xgb.DMatrix(X_flat_train, label=y_train[:, y_idx])
# Put in correct format for XGB
bst = xgb.train(param, dtrain, self.num_round) # Train model
models.append(bst) # Add fit model to list of models
self.model = models
[docs]
def predict(self, X_flat_test):
"""Predict outcomes using trained XGBoost Decoder
Parameters
----------
X_flat_test: numpy 2d array of shape [n_samples,n_features]
This is the neural data being used to predict outputs.
Returns
-------
y_test_predicted: numpy 2d array of shape [n_samples,n_outputs]
The predicted outputs
"""
dtest = xgb.DMatrix(X_flat_test) # Put in XGB format
num_outputs = len(self.model) # Number of outputs
y_test_predicted = np.empty([X_flat_test.shape[0],
num_outputs]) # Initialize matrix of
# predicted outputs
for y_idx in range(num_outputs): # Loop through outputs
bst = self.model[y_idx] # Get fit model for this output
y_test_predicted[:, y_idx] = bst.predict(dtest) # Make prediction
return y_test_predicted
# %% SUPPORT VECTOR REGRESSION
[docs]
class SVRegression(object):
"""Class for the Support Vector Regression (SVR) Decoder
This simply leverages the scikit-learn SVR
Parameters
----------
C: float, default=3.0
Penalty parameter of the error term
max_iter: integer, default=-1
the maximum number of iteraations to run (to save time)
max_iter=-1 means no limit
Typically in the 1000s takes a short amount of time on a laptop
"""
def __init__(self, max_iter=-1, C=3.0):
self.max_iter = max_iter
self.C = C
return
[docs]
def fit(self, X_flat_train, y_train):
"""Train SVR Decoder
Parameters
----------
X_flat_train: numpy 2d array of shape [n_samples,n_features]
This is the neural data.
See example file for an example of how to format the neural data
correctly
y_train: numpy 2d array of shape [n_samples, n_outputs]
This is the outputs that are being predicted
"""
num_outputs = y_train.shape[1] # Number of outputs
models = [] # Initialize list of models (there will be a separate
# model for each output)
for y_idx in range(num_outputs): # Loop through outputs
model = SVR(C=self.C,
max_iter=self.max_iter) # Initialize SVR model
model.fit(X_flat_train, y_train[:, y_idx]) # Train the model
models.append(model) # Add fit model to list of models
self.model = models
[docs]
def predict(self, X_flat_test):
"""Predict outcomes using trained SVR Decoder
Parameters
----------
X_flat_test: numpy 2d array of shape [n_samples,n_features]
This is the neural data being used to predict outputs.
Returns
-------
y_test_predicted: numpy 2d array of shape [n_samples,n_outputs]
The predicted outputs
"""
num_outputs = len(self.model) # Number of outputs
y_test_predicted = np.empty([X_flat_test.shape[0],
num_outputs]) # Initialize matrix of
# predicted outputs
for y_idx in range(num_outputs): # Loop through outputs
model = self.model[y_idx] # Get fit model for that output
y_test_predicted[:, y_idx] = model.predict(
X_flat_test) # Make predictions
return y_test_predicted
# GLM helper function for the NaiveBayesDecoder
[docs]
def glm_run(Xr, Yr, X_range):
X2 = sm.add_constant(Xr)
poiss_model = sm.GLM(Yr, X2, family=sm.families.Poisson())
try:
glm_results = poiss_model.fit()
Y_range = glm_results.predict(sm.add_constant(X_range))
except np.linalg.LinAlgError:
print("\nWARNING: LinAlgError")
Y_range = np.mean(Yr) * np.ones([X_range.shape[0], 1])
return Y_range
[docs]
class NaiveBayesRegression(object):
"""Class for the Naive Bayes Decoder
Parameters
----------
encoding_model: string, default='quadratic'
what encoding model is used
res:int, default=100
resolution of predicted values
This is the number of bins to divide the outputs into (going from
minimum to maximum) larger values will make decoding slower
"""
def __init__(self, encoding_model='quadratic', res=100):
self.encoding_model = encoding_model
self.res = res
return
[docs]
def fit(self, X_b_train, y_train):
"""Train Naive Bayes Decoder
Parameters
----------
X_b_train: numpy 2d array of shape [n_samples,n_neurons]
This is the neural training data.
See example file for an example of how to format the neural data
correctly
y_train: numpy 2d array of shape [n_samples, n_outputs]
This is the outputs that are being predicted (training data)
"""
# %% FIT TUNING CURVE
# First, get the output values (x/y position or velocity) that we will
# be creating tuning curves over
# Create the range for x and y (position/velocity) values
input_x_range = np.arange(np.min(y_train[:, 0]),
np.max(y_train[:, 0]) + .01, np.round(
(np.max(y_train[:, 0]) - np.min(y_train[:, 0])) / self.res))
input_y_range = np.arange(np.min(y_train[:, 1]),
np.max(y_train[:, 1]) + .01, np.round(
(np.max(y_train[:, 1]) - np.min(y_train[:, 1])) / self.res))
# Get all combinations of x/y values
input_mat = np.meshgrid(input_x_range, input_y_range)
# Format so that all combinations of x/y values are in 2 columns (first
# column x, second column y). This is called "input_xy"
xs = np.reshape(input_mat[0],
[input_x_range.shape[0] * input_y_range.shape[0], 1])
ys = np.reshape(input_mat[1],
[input_x_range.shape[0] * input_y_range.shape[0], 1])
input_xy = np.concatenate((xs, ys), axis=1)
# If quadratic model:
# -make covariates have squared components and mixture of x and y
# -do same thing for "input_xy", which are the values for creating
# the tuning curves
if self.encoding_model == 'quadratic':
input_xy_modified = np.empty([input_xy.shape[0], 5])
input_xy_modified[:, 0] = input_xy[:, 0] ** 2
input_xy_modified[:, 1] = input_xy[:, 0]
input_xy_modified[:, 2] = input_xy[:, 1] ** 2
input_xy_modified[:, 3] = input_xy[:, 1]
input_xy_modified[:, 4] = input_xy[:, 0] * input_xy[:, 1]
y_train_modified = np.empty([y_train.shape[0], 5])
y_train_modified[:, 0] = y_train[:, 0] ** 2
y_train_modified[:, 1] = y_train[:, 0]
y_train_modified[:, 2] = y_train[:, 1] ** 2
y_train_modified[:, 3] = y_train[:, 1]
y_train_modified[:, 4] = y_train[:, 0] * y_train[:, 1]
# Create tuning curves
num_nrns = X_b_train.shape[
1] # Number of neurons to fit tuning curves for
tuning_all = np.zeros([num_nrns, input_xy.shape[
0]]) # Matrix that stores tuning curves for all neurons
# Loop through neurons and fit tuning curves
for j in range(num_nrns): # Neuron number
if self.encoding_model == 'linear':
tuning = glm_run(y_train, X_b_train[:, j:j + 1], input_xy)
if self.encoding_model == 'quadratic':
tuning = glm_run(y_train_modified, X_b_train[:, j:j + 1],
input_xy_modified)
# Enter tuning curves into matrix
tuning_all[j, :] = np.squeeze(tuning)
# Save tuning curves to be used in "predict" function
self.tuning_all = tuning_all
self.input_xy = input_xy
# Get information about the probability of being in one state
# (position/velocity) based on the previous state. Here we're
# calculating the standard deviation of the change in state
# (velocity/acceleration) in the training set
n = y_train.shape[0]
dx = np.zeros([n - 1, 1])
for i in range(n - 1):
# Change in state across time steps
dx[i] = np.sqrt((y_train[i + 1, 0] - y_train[i, 0]) ** 2 + (
y_train[i + 1, 1] - y_train[i, 1]) ** 2)
std = np.sqrt(np.mean(
dx ** 2)) # dx is only positive. this gets approximate stdev of
# distribution (if it was positive and negative)
self.std = std # Save for use in "predict" function
# Get probability of being in each state - we are not using this since
# it did not help decoding performance
# n_x=np.empty([input_xy.shape[0]])
# for i in range(n):
# loc_idx=np.argmin(cdist(y_train[0:1,:],input_xy))
# n_x[loc_idx]=n_x[loc_idx]+1
# p_x=n_x/n
# self.p_x=p_x
[docs]
def predict(self, X_b_test, y_test):
"""Predict outcomes using trained tuning curves
Parameters
----------
X_b_test: numpy 2d array of shape [n_samples,n_features]
This is the neural data being used to predict outputs.
y_test: numpy 2d array of shape [n_samples,n_outputs]
The actual outputs
This parameter is necesary for the NaiveBayesDecoder (unlike most
other decoders) because the first value is nececessary for
initialization
Returns
-------
y_test_predicted: numpy 2d array of shape [n_samples,n_outputs]
The predicted outputs
"""
# Get values saved in "fit" function
tuning_all = self.tuning_all
input_xy = self.input_xy
std = self.std
# Get probability of going from one state to the next
dists = squareform(pdist(input_xy,
'euclidean'))
# Distance between all states in "input_xy" Probability of going from
# one state to the next, based on the above calculated distances
# The probability is calculated based on the distances coming from a
# Gaussian with standard deviation of std
prob_dists = norm.pdf(dists, 0, std)
# Initializations
loc_idx = np.argmin(
cdist(y_test[0:1, :], input_xy)) # The index of the first location
num_nrns = tuning_all.shape[0] # Number of neurons
y_test_predicted = np.empty(
[X_b_test.shape[0], 2]) # Initialize matrix of predicted outputs
num_ts = X_b_test.shape[0] # Number of time steps we are predicting
# Loop across time and decode
for t in range(num_ts):
rs = X_b_test[t, :]
# Number of spikes at this time point (in the interval
# we've specified including bins_before and bins_after)
probs_total = np.ones([tuning_all[0, :].shape[
0]])
# Vector that stores the probabilities of being in any state based
# on the neural activity (does not include probabilities of going
# from one state to the next)
for j in range(num_nrns): # Loop across neurons
lam = np.copy(tuning_all[j, :])
# Expected spike counts given the tuning curve
r = rs[j] # Actual spike count
probs = np.exp(-lam) * lam ** r / math.factorial(r)
# Probability of the given neuron's spike count given tuning
# curve (assuming poisson distribution)
probs_total = np.copy(probs_total * probs)
# Update the probability across neurons (probabilities are
# multiplied across neurons due to the independence assumption)
prob_dists_vec = np.copy(prob_dists[loc_idx, :])
# Probability of going to all states from the previous state
probs_final = probs_total * prob_dists_vec
# Get final probability (multiply probabilities based on spike
# count and previous state)
# probs_final=probs_total*prob_dists_vec*self.p_x
# #Get final probability when including p(x), i.e. prior about
# being in states, which we're not using
loc_idx = np.argmax(probs_final)
# Get the index of the current state (that w/ the highest
# probability)
y_test_predicted[t, :] = input_xy[loc_idx, :]
# The current predicted output
return y_test_predicted # Return predictions
# %% ALIASES for Regression
WienerFilterDecoder = WienerFilterRegression
WienerCascadeDecoder = WienerCascadeRegression
KalmanFilterDecoder = KalmanFilterRegression
DenseNNDecoder = DenseNNRegression
SimpleRNNDecoder = SimpleRNNRegression
GRUDecoder = GRURegression
LSTMDecoder = LSTMRegression
XGBoostDecoder = XGBoostRegression
SVRDecoder = SVRegression
NaiveBayesDecoder = NaiveBayesRegression
# %% CLASSIFICATION
[docs]
class WienerFilterClassification(object):
"""Class for the Wiener Filter Decoder
There are no parameters to set.
This simply leverages the scikit-learn logistic regression.
"""
def __init__(self, C=1):
self.C = C
return
[docs]
def fit(self, X_flat_train, y_train):
"""Train Wiener Filter Decoder
Parameters
----------
X_flat_train: numpy 2d array of shape [n_samples,n_features]
This is the neural data.
See example file for an example of how to format the neural data
correctly
y_train: numpy 2d array of shape [n_samples, n_outputs]
This is the outputs that are being predicted
"""
# if self.C>0:
self.model = linear_model.LogisticRegression(C=self.C,
multi_class='auto')
# Initialize linear regression model
# else:
# self.model=linear_model.LogisticRegression(penalty='none',
# solver='newton-cg') #Initialize linear regression model
self.model.fit(X_flat_train, y_train) # Train the model
[docs]
def predict(self, X_flat_test):
"""Predict outcomes using trained Wiener Cascade Decoder
Parameters
----------
X_flat_test: numpy 2d array of shape [n_samples,n_features]
This is the neural data being used to predict outputs.
Returns
-------
y_test_predicted: numpy 2d array of shape [n_samples,n_outputs]
The predicted outputs
"""
y_test_predicted = self.model.predict(X_flat_test) # Make predictions
return y_test_predicted
# %% SUPPORT VECTOR REGRESSION
[docs]
class SVClassification(object):
"""Class for the Support Vector Classification Decoder
This simply leverages the scikit-learn SVM
Parameters
----------
C: float, default=3.0
Penalty parameter of the error term
max_iter: integer, default=-1
the maximum number of iteraations to run (to save time)
max_iter=-1 means no limit
Typically in the 1000s takes a short amount of time on a laptop
"""
def __init__(self, max_iter=-1, C=3.0):
self.max_iter = max_iter
self.C = C
return
[docs]
def fit(self, X_flat_train, y_train):
"""Train SVR Decoder
Parameters
----------
X_flat_train: numpy 2d array of shape [n_samples,n_features]
This is the neural data.
See example file for an example of how to format the neural data
correctly
y_train: numpy 2d array of shape [n_samples, n_outputs]
This is the outputs that are being predicted
"""
model = SVC(C=self.C, max_iter=self.max_iter) # Initialize model
model.fit(X_flat_train, y_train) # Train the model
self.model = model
[docs]
def predict(self, X_flat_test):
"""Predict outcomes using trained SV Decoder
Parameters
----------
X_flat_test: numpy 2d array of shape [n_samples,n_features]
This is the neural data being used to predict outputs.
Returns
-------
y_test_predicted: numpy 2d array of shape [n_samples,n_outputs]
The predicted outputs
"""
model = self.model # Get fit model for that output
y_test_predicted = model.predict(X_flat_test) # Make predictions
return y_test_predicted
# %% DENSE (FULLY-CONNECTED) NEURAL NETWORK
[docs]
class DenseNNClassification(object):
"""Class for the dense (fully-connected) neural network decoder
Parameters
----------
units: integer or vector of integers, optional, default 400
This is the number of hidden units in each layer
If you want a single layer, input an integer (e.g. units=400 will give
you a single hidden layer with 400 units). If you want multiple layers,
input a vector (e.g. units=[400,200]) will give you 2 hidden layers
with 400 and 200 units, repsectively. The vector can either be a list
or an array
dropout: decimal, optional, default 0
Proportion of units that get dropped out
num_epochs: integer, optional, default 10
Number of epochs used for training
verbose: binary, optional, default=0
Whether to show progress of the fit after each epoch
"""
def __init__(self, units=400, dropout=0, num_epochs=10, verbose=0):
self.dropout = dropout
self.num_epochs = num_epochs
self.verbose = verbose
# If "units" is an integer, put it in the form of a vector
try: # Check if it's a vector
units[0]
except IndexError:
# If it's not a vector, create a vector of the number of
# units for each layer
units = [units]
self.units = units
# Determine the number of hidden layers (based on "units" that the
# user entered)
self.num_layers = len(units)
[docs]
def fit(self, X_flat_train, y_train):
"""Train DenseNN Decoder
Parameters
----------
X_flat_train: numpy 2d array of shape [n_samples,n_features]
This is the neural data.
See example file for an example of how to format the neural data
correctly
y_train: numpy 2d array of shape [n_samples, n_outputs]
This is the outputs that are being predicted
"""
# Use one-hot coding for y
if y_train.ndim == 1:
y_train = np_utils.to_categorical(y_train.astype(int))
elif y_train.shape[1] == 1:
y_train = np_utils.to_categorical(y_train.astype(int))
model = Sequential() # Declare model
# Add first hidden layer
model.add(Dense(self.units[0],
input_dim=X_flat_train.shape[1])) # Add dense layer
model.add(Activation('relu')) # Add nonlinear (tanh) activation
# if self.dropout!=0:
if self.dropout != 0:
# Dropout some units if proportion of dropout != 0
model.add(Dropout(self.dropout))
# Add any additional hidden layers (beyond the 1st)
for layer in range(
self.num_layers - 1): # Loop through additional layers
model.add(Dense(self.units[layer + 1])) # Add dense layer
# Add nonlinear (tanh) activation - can also make
model.add(Activation('tanh'))
# relu
if self.dropout != 0:
# Dropout some units if proportion of dropout != 0
model.add(Dropout(self.dropout))
# Add dense connections to all outputs
model.add(Dense(
y_train.shape[1])) # Add final dense layer (connected to outputs)
model.add(Activation('softplus'))
# Fit model (and set fitting parameters)
model.compile(loss='categorical_crossentropy', optimizer='adam',
metrics=['accuracy']) # Set loss function and optimizer
if keras_v1:
model.fit(X_flat_train, y_train, nb_epoch=self.num_epochs,
verbose=self.verbose) # Fit the model
else:
model.fit(X_flat_train, y_train, epochs=self.num_epochs,
verbose=self.verbose) # Fit the model
self.model = model
[docs]
def predict(self, X_flat_test):
"""Predict outcomes using trained DenseNN Decoder
Parameters
----------
X_flat_test: numpy 2d array of shape [n_samples,n_features]
This is the neural data being used to predict outputs.
Returns
-------
y_test_predicted: numpy 2d array of shape [n_samples,n_outputs]
The predicted outputs
"""
y_test_predicted_raw = self.model.predict(
X_flat_test) # Make predictions
y_test_predicted = np.argmax(y_test_predicted_raw, axis=1)
return y_test_predicted
# %% SIMPLE RNN DECODER
[docs]
class SimpleRNNClassification(object):
"""Class for the RNN decoder
Parameters
----------
units: integer, optional, default 400
Number of hidden units in each layer
dropout: decimal, optional, default 0
Proportion of units that get dropped out
num_epochs: integer, optional, default 10
Number of epochs used for training
verbose: binary, optional, default=0
Whether to show progress of the fit after each epoch
"""
def __init__(self, units=400, dropout=0, num_epochs=10, verbose=0):
self.units = units
self.dropout = dropout
self.num_epochs = num_epochs
self.verbose = verbose
[docs]
def fit(self, X_train, y_train):
"""Train GRU Decoder
Parameters
----------
X_train: numpy 3d array of shape [n_samples,n_time_bins,n_neurons]
This is the neural data.
See example file for an example of how to format the neural data
correctly
y_train: numpy 2d array of shape [n_samples, n_outputs]
This is the outputs that are being predicted
"""
# Use one-hot coding for y
if y_train.ndim == 1:
y_train = np_utils.to_categorical(y_train.astype(int))
elif y_train.shape[1] == 1:
y_train = np_utils.to_categorical(y_train.astype(int))
model = Sequential() # Declare model
# Add recurrent layer
# %% MAKE RELU ACTIVATION BELOW LIKE IN REGRESSION?????
if keras_v1:
model.add(SimpleRNN(self.units, input_shape=(
X_train.shape[1], X_train.shape[2]), dropout_W=self.dropout,
dropout_U=self.dropout))
# Within recurrent layer, include dropout
else:
model.add(SimpleRNN(self.units, input_shape=(
X_train.shape[1], X_train.shape[2]), dropout=self.dropout,
recurrent_dropout=self.dropout))
# Within recurrent layer, include dropout
if self.dropout != 0:
# Dropout some units (recurrent layer output units)
model.add(Dropout(self.dropout))
# Add dense connections to output layer
model.add(Dense(y_train.shape[1]))
model.add(Activation('softplus'))
# Fit model (and set fitting parameters)
model.compile(loss='categorical_crossentropy', optimizer='rmsprop',
metrics=['accuracy']) # Set loss function and optimizer
if keras_v1:
model.fit(X_train, y_train, nb_epoch=self.num_epochs,
verbose=self.verbose) # Fit the model
else:
model.fit(X_train, y_train, epochs=self.num_epochs,
verbose=self.verbose) # Fit the model
self.model = model
[docs]
def predict(self, X_test):
"""Predict outcomes using trained LSTM Decoder
Parameters
----------
X_test: numpy 3d array of shape [n_samples,n_time_bins,n_neurons]
This is the neural data being used to predict outputs.
Returns
-------
y_test_predicted: numpy 2d array of shape [n_samples,n_outputs]
The predicted outputs
"""
y_test_predicted_raw = self.model.predict(X_test) # Make predictions
y_test_predicted = np.argmax(y_test_predicted_raw, axis=1)
return y_test_predicted
# %% GATED RECURRENT UNIT (GRU) DECODER
[docs]
class GRUClassification(object):
"""Class for the gated recurrent unit (GRU) decoder
Parameters
----------
units: integer, optional, default 400
Number of hidden units in each layer
dropout: decimal, optional, default 0
Proportion of units that get dropped out
num_epochs: integer, optional, default 10
Number of epochs used for training
verbose: binary, optional, default=0
Whether to show progress of the fit after each epoch
"""
def __init__(self, units=400, dropout=0, num_epochs=10, verbose=0):
self.units = units
self.dropout = dropout
self.num_epochs = num_epochs
self.verbose = verbose
[docs]
def fit(self, X_train, y_train):
"""Train GRU Decoder
Parameters
----------
X_train: numpy 3d array of shape [n_samples,n_time_bins,n_neurons]
This is the neural data.
See example file for an example of how to format the neural data
correctly
y_train: numpy 2d array of shape [n_samples, n_outputs]
This is the outputs that are being predicted
"""
# Use one-hot coding for y
if y_train.ndim == 1:
y_train = np_utils.to_categorical(y_train.astype(int))
elif y_train.shape[1] == 1:
y_train = np_utils.to_categorical(y_train.astype(int))
model = Sequential() # Declare model
# Add recurrent layer
if keras_v1:
model.add(GRU(self.units,
input_shape=(X_train.shape[1], X_train.shape[2]),
dropout_W=self.dropout,
dropout_U=self.dropout))
# Within recurrent layer, include dropout
else:
model.add(GRU(self.units,
input_shape=(X_train.shape[1], X_train.shape[2]),
dropout=self.dropout,
recurrent_dropout=self.dropout))
# Within recurrent layer, include dropout
if self.dropout != 0:
# Dropout some units (recurrent layer output units)
model.add(Dropout(self.dropout))
# Add dense connections to output layer
model.add(Dense(y_train.shape[1]))
model.add(Activation('softplus'))
# Fit model (and set fitting parameters)
model.compile(loss='categorical_crossentropy', optimizer='rmsprop',
metrics=['accuracy']) # Set loss function and optimizer
if keras_v1:
model.fit(X_train, y_train, nb_epoch=self.num_epochs,
verbose=self.verbose) # Fit the model
else:
model.fit(X_train, y_train, epochs=self.num_epochs,
verbose=self.verbose) # Fit the model
self.model = model
[docs]
def predict(self, X_test):
"""Predict outcomes using trained LSTM Decoder
Parameters
----------
X_test: numpy 3d array of shape [n_samples,n_time_bins,n_neurons]
This is the neural data being used to predict outputs.
Returns
-------
y_test_predicted: numpy 2d array of shape [n_samples,n_outputs]
The predicted outputs
"""
y_test_predicted_raw = self.model.predict(X_test) # Make predictions
y_test_predicted = np.argmax(y_test_predicted_raw, axis=1)
return y_test_predicted
# %% LONG SHORT TERM MEMORY (LSTM) DECODER
[docs]
class LSTMClassification(object):
"""Class for the LSTM decoder
Parameters
----------
units: integer, optional, default 400
Number of hidden units in each layer
dropout: decimal, optional, default 0
Proportion of units that get dropped out
num_epochs: integer, optional, default 10
Number of epochs used for training
verbose: binary, optional, default=0
Whether to show progress of the fit after each epoch
"""
def __init__(self, units=400, dropout=0, num_epochs=10, verbose=0):
self.units = units
self.dropout = dropout
self.num_epochs = num_epochs
self.verbose = verbose
[docs]
def fit(self, X_train, y_train):
"""Train LSTM Decoder
Parameters
----------
X_train: numpy 3d array of shape [n_samples,n_time_bins,n_neurons]
This is the neural data.
See example file for an example of how to format the neural data
correctly
y_train: numpy 2d array of shape [n_samples, n_outputs]
This is the outputs that are being predicted
"""
# Use one-hot coding for y
if y_train.ndim == 1:
y_train = np_utils.to_categorical(y_train.astype(int))
elif y_train.shape[1] == 1:
y_train = np_utils.to_categorical(y_train.astype(int))
model = Sequential() # Declare model
# Add recurrent layer
if keras_v1:
model.add(LSTM(self.units,
input_shape=(X_train.shape[1], X_train.shape[2]),
dropout_W=self.dropout,
dropout_U=self.dropout))
# Within recurrent layer, include dropout
else:
model.add(LSTM(self.units,
input_shape=(X_train.shape[1], X_train.shape[2]),
dropout=self.dropout,
recurrent_dropout=self.dropout))
# Within recurrent layer, include dropout
if self.dropout != 0:
# Dropout some units (recurrent layer output units)
model.add(Dropout(self.dropout))
# Add dense connections to output layer
model.add(Dense(y_train.shape[1]))
model.add(Activation('softplus'))
# Fit model (and set fitting parameters)
model.compile(loss='categorical_crossentropy', optimizer='rmsprop',
metrics=['accuracy']) # Set loss function and optimizer
if keras_v1:
model.fit(X_train, y_train, nb_epoch=self.num_epochs,
verbose=self.verbose) # Fit the model
else:
model.fit(X_train, y_train, epochs=self.num_epochs,
verbose=self.verbose) # Fit the model
self.model = model
[docs]
def predict(self, X_test):
"""Predict outcomes using trained LSTM Decoder
Parameters
----------
X_test: numpy 3d array of shape [n_samples,n_time_bins,n_neurons]
This is the neural data being used to predict outputs.
Returns
-------
y_test_predicted: numpy 2d array of shape [n_samples,n_outputs]
The predicted outputs
"""
y_test_predicted_raw = self.model.predict(X_test) # Make predictions
y_test_predicted = np.argmax(y_test_predicted_raw, axis=1)
return y_test_predicted
# %% EXTREME GRADIENT BOOSTING (XGBOOST)
[docs]
class XGBoostClassification(object):
"""Class for the XGBoost Decoder
Parameters
----------
max_depth: integer, optional, default=3
the maximum depth of the trees
num_round: integer, optional, default=300
the number of trees that are fit
eta: float, optional, default=0.3
the learning rate
gpu: integer, optional, default=-1
if the gpu version of xgboost is installed, this can be used to select
which gpu to use
for negative values (default), the gpu is not used
"""
def __init__(self, max_depth=3, num_round=300, eta=0.3, gpu=-1):
self.max_depth = max_depth
self.num_round = num_round
self.eta = eta
self.gpu = gpu
[docs]
def fit(self, X_flat_train, y_train):
"""Train XGBoost Decoder
Parameters
----------
X_flat_train: numpy 2d array of shape [n_samples,n_features]
This is the neural data.
See example file for an example of how to format the neural data
correctly
y_train: numpy 1d array of shape (n_samples), with integers
representing classes or 2d array of shape [n_samples, n_outputs] in
1-hot form. This is the outputs that are being predicted
"""
# turn to categorial (not 1-hat)
if (y_train.ndim == 2):
if (y_train.shape[1] == 1):
y_train = np.reshape(y_train, -1)
else:
y_train = np.argmax(y_train, axis=1, out=None)
# Get number of classes
n_classes = len(np.unique(y_train))
# Set parameters for XGBoost
param = {'objective': "multi:softmax", # or softprob
'eval_metric': "mlogloss", # loglikelihood loss
# 'eval_metric': "merror",
'max_depth': self.max_depth,
# this is the only parameter we have set, it's one of the way
# or regularizing
'eta': self.eta,
'num_class': n_classes, # y_train.shape[1],
'seed': 2925, # for reproducibility
'silent': 1}
if self.gpu < 0:
param['nthread'] = -1 # with -1 it will use all available threads
else:
param['gpu_id'] = self.gpu
param['updater'] = 'grow_gpu'
dtrain = xgb.DMatrix(X_flat_train,
label=y_train) # Put in correct format for XGB
bst = xgb.train(param, dtrain, self.num_round) # Train model
self.model = bst
[docs]
def predict(self, X_flat_test):
"""Predict outcomes using trained XGBoost Decoder
Parameters
----------
X_flat_test: numpy 2d array of shape [n_samples,n_features]
This is the neural data being used to predict outputs.
Returns
-------
y_test_predicted: numpy 1d array with integers as classes
The predicted outputs
"""
dtest = xgb.DMatrix(X_flat_test) # Put in XGB format
bst = self.model # Get fit model
y_test_predicted = bst.predict(dtest) # Make prediction
return y_test_predicted
# %% PRINCIPAL COMPONENT ANALYSIS - LINEAR DISCRIMINANT CLASSIFIER
[docs]
class PcaLdaClassification(BaseEstimator):
"""Class for the PCA - LDA Classifier
Parameters
----------
explained variance: integer, optional, default=80
the number of modes that explain the cumulative variance of the dataset
da_type: string, optional, default=lda
type of discriminant analysis; lda or qda
"""
def __init__(self, explained_variance=0.8, da_type='lda', PCA_kwargs={},
DA_kwargs={}):
# choose discriminant type
if (da_type == 'lda'):
# linear discriminant analysis
da_model = da.LinearDiscriminantAnalysis(**DA_kwargs)
else:
# Quadratic discriminant analysis
da_model = da.QuadraticDiscriminantAnalysis(**DA_kwargs)
PCA_kwargs['n_components'] = explained_variance
# Create a pipeline classifier
self.model = Pipeline(steps=[
('pca', PCA(**PCA_kwargs)),
('discriminant', da_model)])
# set default outputs to numpy
self.model.set_output(transform="default")
[docs]
def fit(self, X_flat_train, y_train):
"""Train PCA - LDA classifier
Parameters
----------
X_flat_train: numpy 2d array of shape [n_samples,n_features]
This is the neural data.
See example file for an example of how to format the neural data
correctly
y_train: numpy 1d array of shape (n_samples), with integers
representing classes
This is the outputs that are being predicted
"""
# Fit the model
self.model.fit(X_flat_train, y_train)
[docs]
def predict(self, X_flat_test):
"""Predict outcomes using trained PCA LDA Decoder
Parameters
----------
X_flat_test: numpy 2d array of shape [n_samples,n_features]
This is the neural data being used to predict outputs.
Returns
-------
y_test_predicted: numpy 1d array with integers as classes
The predicted outputs
"""
pca_lda_fit = self.model # Get fit model
y_test_predicted = pca_lda_fit.predict(X_flat_test) # Make prediction
return y_test_predicted
[docs]
def get_scores(self, deep=True):
"""Get scores of pca and lda model
Args:
deep (bool, optional): Defaults to True.
Returns:
scores: dict
Returns fitted scores of PCA and LDA
"""
pca = self.model['pca']
da = self.model['discriminant']
scores = dict()
scores['pca'] = pca.componens_
scores['discriminant'] = da.coef_
return scores
[docs]
def score(self, X, y, sample_weight=None):
"""Returns the mean accuracy on the given test data and labels.
In multi-label classification, this is the subset accuracy
which is a harsh metric since you require for each sample that
each label set be correctly predicted.
Parameters
----------
X : array-like, shape = (n_samples, n_features)
Test samples.
y : array-like, shape = (n_samples) or (n_samples, n_outputs)
True labels for X.
sample_weight : array-like, shape = [n_samples], optional
Sample weights.
Returns
-------
score : float
Mean accuracy of self.predict(X) wrt. y.
"""
from sklearn.metrics import accuracy_score
return accuracy_score(y, self.predict(X), sample_weight=sample_weight)
# %% PRINCIPAL COMPONENT ANALYSIS Wrapper for classification function
[docs]
class PcaEstimateDecoder(BaseEstimator):
"""Class for the PCA - SVM Classifier
Parameters
----------
explained_variance: float, optional, default=0.8
the cumulative explained variance ratio required for PCA
clf: object, optional, default=SVC()
the classifier object to use for classification
clf_params: dict, optional
Additional parameters to be passed to the classifier (e.g.,
{'param_name': value})
"""
def __init__(self, explained_variance=0.8, clf=SVC(), clf_params=None):
self.explained_variance = explained_variance
self.clf = clf
self.clf_params = clf_params
self._initialize_model()
def _initialize_model(self):
self.pca = PCA(n_components=self.explained_variance)
if self.clf_params is not None:
self.model = Pipeline(steps=[('pca', self.pca), (
'model', self.clf.set_params(**self.clf_params))])
else:
print('No initial parameters')
self.model = Pipeline(
steps=[('pca', self.pca), ('model', self.clf)])
[docs]
def fit(self, X_flat_train, y_train):
"""Train PCA - SVM classifier
Parameters
----------
X_flat_train: numpy 2d array of shape [n_samples, n_features]
This is the neural data.
See example file for an example of how to format the neural data
correctly
y_train: numpy 1d array of shape (n_samples), with integers
representing classes
This is the outputs that are being predicted
"""
self._initialize_model()
self.model.fit(X_flat_train, y_train)
[docs]
def predict(self, X_flat_test):
"""Predict outcomes using trained PCA - SVM Decoder
Parameters
----------
X_flat_test: numpy 2d array of shape [n_samples, n_features]
This is the neural data being used to predict outputs.
Returns
-------
y_test_predicted: numpy 1d array with integers as classes
The predicted outputs
"""
return self.model.predict(X_flat_test)
[docs]
def score(self, X, y, sample_weight=None):
"""Returns the mean accuracy on the given test data and labels.
Parameters
----------
X : array-like, shape = (n_samples, n_features)
Test samples.
y : array-like, shape = (n_samples,) or (n_samples, n_outputs)
True labels for X.
sample_weight : array-like, shape = [n_samples], optional
Sample weights.
Returns
-------
score : float
Mean accuracy of self.predict(X) wrt. y.
"""
return accuracy_score(y, self.predict(X), sample_weight=sample_weight)