diff --git a/opls.py b/opls.py new file mode 100644 index 0000000..62201af --- /dev/null +++ b/opls.py @@ -0,0 +1,194 @@ +# Copyright (c) 2019 Wright State University +# Author: Daniel Foose +# License: MIT + +import numpy as np +from sklearn.base import BaseEstimator, TransformerMixin +from sklearn.utils import check_array +from sklearn.utils.validation import check_consistent_length + + +def _center_scale_xy(X, Y, scale=True): + """ Center X, Y and scale if the scale parameter==True + + Returns + ------- + X, Y, x_mean, y_mean, x_std, y_std + """ + # center + x_mean = X.mean(axis=0) + X -= x_mean + y_mean = Y.mean(axis=0) + Y -= y_mean + # scale + if scale: + x_std = X.std(axis=0, ddof=1) + x_std[x_std == 0.0] = 1.0 + X /= x_std + y_std = Y.std(axis=0, ddof=1) + y_std[y_std == 0.0] = 1.0 + Y /= y_std + else: + x_std = np.ones(X.shape[1]) + y_std = np.ones(Y.shape[1]) + return X, Y, x_mean, y_mean, x_std, y_std + + +class OPLS(BaseEstimator, TransformerMixin): + """Orthogonal Projection to Latent Structures (O-PLS) + + This class implements the O-PLS algorithm for one (and only one) response as described by [Trygg 2002]. + This is equivalent to the implementation of the libPLS MATLAB library (http://libpls.net/) + + Parameters + ---------- + n_components: int, number of orthogonal components to filter. (default 5). + + scale: boolean, scale data? (default True) + + Attributes + ---------- + W_ortho_ : weights orthogonal to y + + P_ortho_ : loadings orthogonal to y + + T_ortho_ : scores orthogonal to y + + x_mean_ : mean of the X provided to fit() + y_mean_ : mean of the Y provided to fit() + x_std_ : std deviation of the X provided to fit() + y_std_ : std deviation of the Y provided to fit() + + References + ---------- + Johan Trygg and Svante Wold. Orthogonal projections to latent structures (O-PLS). + J. Chemometrics 2002; 16: 119-128. DOI: 10.1002/cem.695 + """ + def __init__(self, n_components=5, scale=True): + self.n_components = n_components + self.scale = scale + + self.W_ortho_ = None + self.P_ortho_ = None + self.T_ortho_ = None + + self.x_mean_ = None + self.y_mean_ = None + self.x_std_ = None + self.y_std_ = None + + def fit(self, X, Y): + """Fit model to data + + Parameters + ---------- + X : array-like, shape = [n_samples, n_features] + Training vectors, where n_samples is the number of samples and + n_features is the number of predictors. + + Y : array-like, shape = [n_samples, 1] + Target vector, where n_samples is the number of samples. + This implementation only supports a single response (target) variable. + + """ + + # copy since this will contains the residuals (deflated) matrices + check_consistent_length(X, Y) + X = check_array(X, dtype=np.float64, copy=True, ensure_min_samples=2) + Y = check_array(Y, dtype=np.float64, copy=True, ensure_2d=False) + if Y.ndim == 1: + Y = Y.reshape(-1, 1) + + X, Y, self.x_mean_, self.y_mean_, self.x_std_, self.y_std_ = _center_scale_xy(X, Y, self.scale) + + Z = X.copy() + w = np.dot(X.T, Y) # calculate weight vector + w /= np.linalg.norm(w) # normalize weight vector + + W_ortho = [] + T_ortho = [] + P_ortho = [] + + for i in range(self.n_components): + t = np.dot(Z, w) # scores vector + p = np.dot(Z.T, t) / np.dot(t.T, t).item() # loadings of X + w_ortho = p - np.dot(w.T, p).item() / np.dot(w.T, w).item() * w # orthogonal weight + w_ortho = w_ortho / np.linalg.norm(w_ortho) # normalize orthogonal weight + t_ortho = np.dot(Z, w_ortho) # orthogonal components + p_ortho = np.dot(Z.T, t_ortho) / np.dot(t_ortho.T, t_ortho).item() + Z -= np.dot(t_ortho, p_ortho.T) + W_ortho.append(w_ortho) + T_ortho.append(t_ortho) + P_ortho.append(p_ortho) + + self.W_ortho_ = np.hstack(W_ortho) + self.T_ortho_ = np.hstack(T_ortho) + self.P_ortho_ = np.hstack(P_ortho) + + return self + + def transform(self, X): + """Get the non-orthogonal components of X (which are considered in prediction). + + Parameters + ---------- + X : array-like, shape = [n_samples, n_features] + Training or test vectors, where n_samples is the number of samples and + n_features is the number of predictors (which should be the same predictors the model was trained on). + + Returns + ------- + X_res, X with the orthogonal data filtered out + """ + Z = check_array(X, copy=True) + + Z -= self.x_mean_ + if self.scale: + Z /= self.x_std_ + + # filter out orthogonal components of X + for i in range(self.n_components): + t = np.dot(Z, self.W_ortho_[:, i]).reshape(-1, 1) + Z -= np.dot(t, self.P_ortho_[:, i].T.reshape(1, -1)) + + return Z + + def fit_transform(self, X, y=None, **fit_params): + """ Learn and apply the filtering on the training data and get the filtered X + + Parameters + ---------- + X : array-like, shape=[n_samples, n_features] + Training vectors, where n_samples is the number of samples and + n_features is the number of predictors. + + y : array-like, shape = [n_samples, 1] + Target vector, where n_samples is the number of samples. + This O-PLS implementation only supports a single response (target) variable. + Y=None will raise ValueError from fit(). + + Returns + ------- + X_filtered + """ + return self.fit(X, y).transform(X) + + def score(self, X): + """ Return the coefficient of determination R^2X of the transformation. + Parameters + ---------- + X : array-like of shape (n_samples, n_features) + Test samples. For some estimators this may be a + precomputed kernel matrix or a list of generic objects instead, + shape = (n_samples, n_samples_fitted), + where n_samples_fitted is the number of + samples used in the fitting for the estimator. + Returns + ------- + score : float + The amount of variation in X explained by the transformed X. A lower number indicates more orthogonal + variation has been removed. + """ + X = check_array(X) + Z = self.transform(X) + return np.sum(np.square(Z)) / np.sum(np.square(X - self.x_mean_)) # Z is already properly centered diff --git a/permutation_test.py b/permutation_test.py new file mode 100644 index 0000000..635a689 --- /dev/null +++ b/permutation_test.py @@ -0,0 +1,551 @@ +import warnings +from sys import stderr + +import numpy as np +from joblib import Parallel, delayed +from sklearn.base import is_classifier, clone, ClassifierMixin +from sklearn.exceptions import DataConversionWarning +from sklearn.metrics import r2_score, accuracy_score +from sklearn.model_selection import check_cv, cross_val_predict +from sklearn.utils import indexable, check_random_state + + +def passthrough_scorer(estimator, *args, **kwargs): + """Function that wraps estimator.score""" + return estimator.score(*args, **kwargs) + + +def non_cv_permutation_test_score(estimator, X, y, groups=None, + n_permutations=100, n_jobs=None, random_state=0, + verbose=0, pre_dispatch='2*n_jobs', scorers=None): + """Evaluate the significance of several non-cross-validated scores with permutations + + Read more in the :ref:`User Guide `. + + Parameters + ---------- + estimator : estimator object implementing 'fit' + The object to use to fit the data. + + X : array-like of shape at least 2D + The data to fit. + + y : array-like + The target variable to try to predict in the case of + supervised learning. + + groups : array-like, with shape (n_samples,), optional + Labels to constrain permutation within groups, i.e. ``y`` values + are permuted among samples with the same group identifier. + When not specified, ``y`` values are permuted among all samples. + + When a grouped cross-validator is used, the group labels are + also passed on to the ``split`` method of the cross-validator. The + cross-validator uses them for grouping the samples while splitting + the dataset into train/test set. + + scorers : string, callable or None, optional, default: None + a list of scoring functions + + + n_permutations : integer, optional + Number of times to permute ``y``. + + n_jobs : int or None, optional (default=None) + The number of CPUs to use to do the computation. + ``None`` means 1 unless in a :obj:`joblib.parallel_backend` context. + ``-1`` means using all processors. See :term:`Glossary ` + for more details. + + random_state : int, RandomState instance or None, optional (default=0) + If int, random_state is the seed used by the random number generator; + If RandomState instance, random_state is the random number generator; + If None, the random number generator is the RandomState instance used + by `np.random`. + + verbose : integer, optional + The verbosity level. + + + pre_dispatch : int, or string, optional + Controls the number of jobs that get dispatched during parallel + execution. Reducing this number can be useful to avoid an + explosion of memory consumption when more jobs get dispatched + than CPUs can process. This parameter can be: + + - None, in which case all the jobs are immediately + created and spawned. Use this for lightweight and + fast-running jobs, to avoid delays due to on-demand + spawning of the jobs + + - An int, giving the exact number of total jobs that are + spawned + + - A string, giving an expression as a function of n_jobs, + as in '2*n_jobs' + + Returns + ------- + score : float + The true score without permuting targets. + + permutation_scores : array, shape (n_permutations,) + The scores obtained for each permutations. + + pvalue : float + The p-value, which approximates the probability that the score would + be obtained by chance. This is calculated as: + + `(C + 1) / (n_permutations + 1)` + + Where C is the number of permutations whose score >= the true score. + + The best possible p-value is 1/(n_permutations + 1), the worst is 1.0. + + Notes + ----- + This function implements Test 1 in: + + Ojala and Garriga. Permutation Tests for Studying Classifier + Performance. The Journal of Machine Learning Research (2010) + vol. 11 + + """ + X, y, groups = indexable(X, y, groups) + + random_state = check_random_state(random_state) + if scorers is None or not len(scorers): + if hasattr(estimator, 'score'): + scorers = [passthrough_scorer] + else: + raise TypeError( + "If no scoring is specified, the estimator passed should " + "have a 'score' method. The estimator %r does not." + % estimator) + + # We clone the estimator to make sure that all the folds are + # independent, and that it is pickle-able. + score = _non_cv_permutation_test_score(clone(estimator), X, y, groups, scorers) + permutation_scores = Parallel(n_jobs=n_jobs, verbose=verbose, pre_dispatch=pre_dispatch)( + delayed(_non_cv_permutation_test_score)( + clone(estimator), X, _shuffle(y, groups, random_state), + groups, scorers) + for _ in range(n_permutations)) + permutation_scores = np.array(permutation_scores) + pvalue = (np.sum(permutation_scores >= score, axis=0) + 1.0) / (n_permutations + 1) + return [(score[i], permutation_scores[:, i], pvalue[i]) for i in range(len(scorers))] + + +def _non_cv_permutation_test_score(estimator, X, y, groups, scorers): + """Auxiliary function for permutation_test_score""" + estimator.fit(X, y) + return [scorer(estimator, X, y) for scorer in scorers] + + +def permutation_test_score(estimator, X, y, groups=None, cv='warn', + n_permutations=100, n_jobs=None, random_state=0, + verbose=0, pre_dispatch='2*n_jobs', cv_score_functions=None, + fit_params=None, method='predict', parallel_by='permutation'): + """Evaluate the significance of several cross-validated scores with permutations + + Note: this is different from sklearn.model_selection.permutation_test_score in two ways. + 1. The scikit-learn method calculates the metrics for each CV split, this makes using metrics like r-squared with + LeaveOneOut impossible. This method uses sklearn.model_selection.cross_val_predict to predict the left-out labels, + then calculates the metrics for that prediction. + 2. The scikit-learn method only evaluates one metric at a time, this one evaluates an arbitrary number of metrics + + Parameters + ---------- + estimator : estimator object implementing 'fit' + The object to use to fit the data. + + X : array-like of shape at least 2D + The data to fit. + + y : array-like + The target variable to try to predict in the case of + supervised learning. + + groups : array-like, with shape (n_samples,), optional + Labels to constrain permutation within groups, i.e. ``y`` values + are permuted among samples with the same group identifier. + When not specified, ``y`` values are permuted among all samples. + + When a grouped cross-validator is used, the group labels are + also passed on to the ``split`` method of the cross-validator. The + cross-validator uses them for grouping the samples while splitting + the dataset into train/test set. + + cv_score_functions : list of callables or None, optional, default: None + a list of score functions of form score(y_true, y_pred) (like r2_score, accuracy_score). + If you have special arguments for your score function you should create another function with + the required prototype that wraps that function. + + cv : int, cross-validation generator or an iterable, optional + Determines the cross-validation splitting strategy. + Possible inputs for cv are: + + - None, to use the default 3-fold cross validation, + - integer, to specify the number of folds in a `(Stratified)KFold`, + - :term:`CV splitter`, + - An iterable yielding (train, test) splits as arrays of indices. + + For integer/None inputs, if the estimator is a classifier and ``y`` is + either binary or multiclass, :class:`StratifiedKFold` is used. In all + other cases, :class:`KFold` is used. + + n_permutations : integer, optional + Number of times to permute ``y``. + + n_jobs : int or None, optional (default=None) + The number of CPUs to use to do the computation. + ``None`` means 1 unless in a :obj:`joblib.parallel_backend` context. + ``-1`` means using all processors. See :term:`Glossary ` + for more details. + + random_state : int, RandomState instance or None, optional (default=0) + If int, random_state is the seed used by the random number generator; + If RandomState instance, random_state is the random number generator; + If None, the random number generator is the RandomState instance used + by `np.random`. + + verbose : integer, optional + The verbosity level. + + pre_dispatch : int, or string, optional + Controls the number of jobs that get dispatched during parallel + execution. Reducing this number can be useful to avoid an + explosion of memory consumption when more jobs get dispatched + than CPUs can process. This parameter can be: + + - None, in which case all the jobs are immediately + created and spawned. Use this for lightweight and + fast-running jobs, to avoid delays due to on-demand + spawning of the jobs + + - An int, giving the exact number of total jobs that are + spawned + + - A string, giving an expression as a function of n_jobs, + as in '2*n_jobs' + + fit_params : dict, optional + Parameters to pass to the fit method of the estimator. + + method : string, optional, default: 'predict' + Invokes the passed method name of the passed estimator. For + method='predict_proba', the columns correspond to the classes + in sorted order. + + parallel_by : string, optional, default: 'permutation' + Whether to parallelize the estimation step or the permuation step. + Either 'estimation' or 'permutation'. If 'estimation', the training of each cross-validation + fold gets its own job. If 'permutation', each permutation of the target gets its own job. + + Returns + ------- + score : float + The true score without permuting targets. + + permutation_scores : array, shape (n_permutations,) + The scores obtained for each permutations. + + pvalue : float + The p-value, which approximates the probability that the score would + be obtained by chance. This is calculated as: + + `(C + 1) / (n_permutations + 1)` + + Where C is the number of permutations whose score >= the true score. + + The best possible p-value is 1/(n_permutations + 1), the worst is 1.0. + + Notes + ----- + This function implements Test 1 in: + + Ojala and Garriga. Permutation Tests for Studying Classifier + Performance. The Journal of Machine Learning Research (2010) + vol. 11 + + """ + X, y, groups = indexable(X, y, groups) + + cv = check_cv(cv, y, classifier=is_classifier(estimator)) + random_state = check_random_state(random_state) + if cv_score_functions is None: + if isinstance(estimator, ClassifierMixin): + cv_score_functions = [accuracy_score] + else: + cv_score_functions = [r2_score] + # We clone the estimator to make sure that all the folds are + # independent, and that it is pickle-able. + score = _permutation_test_score(clone(estimator), X, y, groups, cv, + n_jobs, verbose, fit_params, pre_dispatch, + method, cv_score_functions) + if parallel_by == 'estimation': + permutation_scores = np.vstack([ + _permutation_test_score( + clone(estimator), X, _shuffle(y, groups, random_state), + groups, cv, n_jobs, verbose, fit_params, pre_dispatch, + method, cv_score_functions + ) for _ in range(n_permutations) + ]) + elif parallel_by == 'permutation': + permutation_scores = np.vstack( + Parallel(n_jobs=n_jobs, verbose=verbose, pre_dispatch=pre_dispatch)( + delayed(_permutation_test_score)( + clone(estimator), X, _shuffle(y, groups, random_state), + groups, cv, fit_params=fit_params, method=method, score_functions=cv_score_functions + ) for _ in range(n_permutations) + ) + ) + else: + raise ValueError(f'Invalid option for parallel_by {parallel_by}') + pvalue = (np.sum(permutation_scores >= score, axis=0) + 1.0) / (n_permutations + 1) + return [(score[i], permutation_scores[:, i], pvalue[i]) for i in range(len(score))] + # return score, permutation_scores, pvalue + + +def _permutation_test_score(estimator, X, y, groups=None, cv='warn', + n_jobs=None, verbose=0, fit_params=None, + pre_dispatch='2*n_jobs', method='predict', + score_functions=None): + """Auxiliary function for permutation_test_score""" + if score_functions is None: + score_functions = [r2_score] + + ## Changed arguments in cross_val_predict from positional to keyword arguments + y_pred = cross_val_predict(estimator=estimator, X=X, y=y, + groups=groups, cv=cv, n_jobs=n_jobs, + verbose=verbose, params=fit_params, + pre_dispatch=pre_dispatch, method=method) + cv_scores = [score_function(y, y_pred) for score_function in score_functions] + return np.array(cv_scores) + + +def _shuffle(y, groups, random_state): + """Return a shuffled copy of y eventually shuffle among same groups.""" + if groups is None: + indices = random_state.permutation(len(y)) + else: + indices = np.arange(len(groups)) + for group in np.unique(groups): + this_mask = (groups == group) + indices[this_mask] = random_state.permutation(indices[this_mask]) + return safe_indexing(y, indices) + + +def feature_permutation_loading(estimator, X, y, initial_permutations=100, alpha=0.2, final_permutations=500, + random_state=0, n_jobs=None, verbose=0, pre_dispatch='2*n_jobs'): + """Determine the significance of each feature + + This is done by permuting each feature in X and measuring the loading. + The feature is considered significant if the loadings are significantly different. + + This is always done with a regular PLS regressor + PLS-DA should be binarized first. + + Parameters + ---------- + estimator : estimator object implementing 'fit' with x_loadings_ + The object to use to fit the data. This should have an [n_features, 1] x_loadings_ array. This can be a + one-component PLS or OPLS model. + + X : array-like, shape = [n_samples, n_features] + Training vectors, where n_samples is the number of samples and + n_features is the number of predictors. + + y : array-like, shape = [n_samples, 1] + Target vector, where n_samples is the number of samples. + This implementation only supports a single response (target) variable. + + initial_permutations : int + The number of permutations to perform for all features. + + alpha : float, in range (0, 1) + The threshold for significance. If a feature is found significant in the first round, it will be retested with + final_permutations in the second round. + + final_permutations : int + The number of permutations to perform during the second round to retest points found significant in the first + round. + + n_jobs : int or None, optional (default=None) + The number of CPUs to use to do the computation. + ``None`` means 1 unless in a :obj:`joblib.parallel_backend` context. + ``-1`` means using all processors. See :term:`Glossary ` + for more details. + + verbose : integer, optional + The verbosity level. + + pre_dispatch : int, or string, optional + Controls the number of jobs that get dispatched during parallel + execution. Reducing this number can be useful to avoid an + explosion of memory consumption when more jobs get dispatched + than CPUs can process. This parameter can be: + + - None, in which case all the jobs are immediately + created and spawned. Use this for lightweight and + fast-running jobs, to avoid delays due to on-demand + spawning of the jobs + + - An int, giving the exact number of total jobs that are + spawned + + - A string, giving an expression as a function of n_jobs, + as in '2*n_jobs' + + random_state : int, RandomState instance or None, optional (default=0) + If int, random_state is the seed used by the random number generator; + If RandomState instance, random_state is the random number generator; + If None, the random number generator is the RandomState instance used + by `np.random`. + + Returns + ------- + x_loadings : array [n_features] + The x_loadings found from non-permuted data. + + permutation_x_loadings: array [n_inner_permutations, n_features] + The one-component PLS loadings for each permutation in the first round. + + p_values: array [n_features] + The p-values for each feature. The null hypothesis is that permuting the feature does not change it's weight + in the one-component PLS model. + """ + + def feature_ind_generator(n_permutations_, feature_inds): + """ + Repeats each value in feature_inds n_permutations_ times. + """ + i = 0 + count = 0 + while count < (n_permutations_ * len(feature_inds)): + yield feature_inds[i] + count += 1 + if (count % n_permutations_) == 0: + i += 1 + + def _log(txt): + if verbose in range(1, 51): + stderr.write(txt + '\n') + if verbose > 50: + print(txt) + + random_state = check_random_state(random_state) + n_features = X.shape[1] + x_loadings = np.ravel(estimator.fit(X, y).x_loadings_) + loading_max = np.max((x_loadings, -1 * x_loadings), axis=0) + loading_min = np.min((x_loadings, -1 * x_loadings), axis=0) + + _log('Performing initial permutation tests.') + permutation_x_loadings = Parallel(n_jobs=n_jobs, verbose=verbose, pre_dispatch=pre_dispatch)( + delayed(_feature_permutation_loading)( + clone(estimator), _feature_shuffle(X, feature_ind, random_state), y, x_loadings, feature_ind) + for feature_ind in feature_ind_generator(initial_permutations, [i for i in range(n_features)])) + permutation_x_loadings = np.array(permutation_x_loadings).reshape(n_features, initial_permutations).T + + _log('Calculating p values.') + p_values = Parallel(n_jobs=n_jobs, verbose=verbose, pre_dispatch=pre_dispatch)( + delayed(_loading_p_value)(permutation_x_loading, upper, lower, initial_permutations) + for permutation_x_loading, upper, lower in zip(np.hsplit(permutation_x_loadings, n_features), + loading_max, loading_min) + ) + + # Retest values found significant in first round + retest_columns = [i for i in range(n_features) if p_values[i] < (alpha / 2.0)] # remember, this is two-tailed + retest_loading_max = np.max((x_loadings[retest_columns], -1 * x_loadings[retest_columns]), axis=0) + retest_loading_min = np.min((x_loadings[retest_columns], -1 * x_loadings[retest_columns]), axis=0) + + _log(f'Re-testing {len(retest_columns)} features') + retest_loadings = Parallel(n_jobs=n_jobs, verbose=verbose, pre_dispatch=pre_dispatch)( + delayed(_feature_permutation_loading)( + clone(estimator), _feature_shuffle(X, feature_ind, random_state), y, x_loadings, feature_ind) + for feature_ind in feature_ind_generator(final_permutations, retest_columns)) + retest_loadings = np.array(retest_loadings).reshape(len(retest_columns), final_permutations).T + + # replace p-values with the more accurate ones + if len(retest_columns): + _log(f'Calculating p values for {len(retest_columns)} features.') + p_values = np.array(p_values) + p_values[retest_columns] = Parallel(n_jobs=n_jobs, verbose=verbose, pre_dispatch=pre_dispatch)( + delayed(_loading_p_value)(retest_loading, upper, lower, initial_permutations) + for retest_loading, upper, lower in zip(np.hsplit(retest_loadings, len(retest_columns)), + retest_loading_max, retest_loading_min) + ) + else: + _log('No significant features after first round of tests.') + p_values = np.array(p_values) + p_values[p_values > 1] = 1 # if feature_min=feature_max=loading=0 values will be greater than 1 + return x_loadings, permutation_x_loadings, p_values + + +def _feature_permutation_loading(estimator, X, y, reference_loadings, feature_ind): + """Auxiliary function for feature_permutation_loading""" + """Not that since loading only depends on training data, we dont use cross-validation""" + test_loadings = np.ravel(estimator.fit(X, y).x_loadings_) + # make directions the same + err1 = (np.sum(np.square(test_loadings[:feature_ind] - reference_loadings[:feature_ind])) + + np.sum(np.square(test_loadings[feature_ind:] - reference_loadings[feature_ind:]))) + err2 = (np.sum(np.square(test_loadings[:feature_ind] + reference_loadings[:feature_ind])) + + np.sum(np.square(test_loadings[feature_ind:] + reference_loadings[feature_ind:]))) + sign = -1 if err2 < err1 else 1 + return sign * test_loadings[feature_ind] + + +def _feature_shuffle(X, feature_ind, random_state): + X = X.copy() + random_state.shuffle(X[:, feature_ind]) + return X + + +def _loading_p_value(permutation_loadings, upper, lower, n_permutations): + return (np.sum(permutation_loadings >= upper) + np.sum(permutation_loadings <= lower) + 1) / (n_permutations + 1) + + +def safe_indexing(X, indices): + """Return items or rows from X using indices. + + Allows simple indexing of lists or arrays. + This is copied from the deprecated sklearn.utils.safe_indexing + + Parameters + ---------- + X : array-like, sparse-matrix, list, pandas.DataFrame, pandas.Series. + Data from which to sample rows or items. + indices : array-like of int + Indices according to which X will be subsampled. + + Returns + ------- + subset + Subset of X on first axis + + Notes + ----- + CSR, CSC, and LIL sparse matrices are supported. COO sparse matrices are + not supported. + """ + if hasattr(X, "iloc"): + # Work-around for indexing with read-only indices in pandas + indices = indices if indices.flags.writeable else indices.copy() + # Pandas Dataframes and Series + try: + return X.iloc[indices] + except ValueError: + # Cython typed memoryviews internally used in pandas do not support + # readonly buffers. + warnings.warn("Copying input dataframe for slicing.", + DataConversionWarning) + return X.copy().iloc[indices] + elif hasattr(X, "shape"): + if hasattr(X, 'take') and (hasattr(indices, 'dtype') and + indices.dtype.kind == 'i'): + # This is often substantially faster than X[indices] + return X.take(indices, axis=0) + else: + return X[indices] + else: + return [X[idx] for idx in indices] diff --git a/validation.py b/validation.py new file mode 100644 index 0000000..4ec27b5 --- /dev/null +++ b/validation.py @@ -0,0 +1,549 @@ +import warnings +from sys import stderr + +import numpy as np +from sklearn.base import BaseEstimator, TransformerMixin, RegressorMixin, ClassifierMixin +from sklearn.cross_decomposition import PLSRegression ## Changed all PLSRegression object creation calls: positional arguments were changed to keyword arguments to fix a recurring +from sklearn.metrics import accuracy_score, roc_auc_score, roc_curve, r2_score +from sklearn.model_selection import KFold, StratifiedKFold, LeaveOneOut, cross_val_predict +from sklearn.preprocessing import LabelBinarizer +from sklearn.utils import check_array +from sklearn.utils.multiclass import type_of_target +from sklearn.utils.validation import check_is_fitted + +from .opls import OPLS +from .permutation_test import permutation_test_score, feature_permutation_loading + + +def discriminator_accuracy(y_true, y_pred): + try: + return accuracy_score(y_true.astype(int), np.sign(y_pred).astype(int)) + except ValueError as e: + warnings.warn(str(e), UserWarning) + return float('nan') + + +def discriminator_roc_auc(y_true, y_pred): + try: + return roc_auc_score(y_true, np.clip(y_pred, -1, 1)) + except ValueError as e: + warnings.warn(str(e), UserWarning) + return float('nan') + + +def discriminator_r2_score(y_true, y_pred): + return r2_score(y_true, np.clip(y_pred, -1, 1)) + + +def neg_press(y_true, y_pred): + return -1 * np.sum(np.square(y_true - y_pred)) + + +def neg_pressd(y_true, y_pred): + return -1 * np.sum(np.square(y_true - np.clip(y_pred, -1, 1))) + + +class OPLSValidator(BaseEstimator, TransformerMixin, RegressorMixin): + """Cross Validation and Diagnostics of Orthogonal Projection to Latent Structures (O-PLS) + + Parameters + ---------- + min_n_components : int, minimum number of orthogonal components to remove + + k : int + number of folds for k-fold cross-validation (default 5). If set to -1, leave-one out cross-validation is used. + + scale : boolean, scale data? (default True) + + n_permutations : int, number of permutations to perform on X + + Attributes + ---------- + q_squared_: float, overall Q-squared metric for the regression, the R-squared value of the left-out data. + + permutation_q_squared_: array [n_splits*n_permutations] + The R-squared metric for the left-out data for each permutation + + q_squared_p_value_ : float + The p-value for the permutation test on Q-squared + + + r_squared_Y_: float, overall R-squared metric for the regression + + r_squared_X_: float, overall R-squared X metric ( + + discriminant_q_squared_: float + Discriminant Q-squared, if this is an OPLSDA problem. Discriminant Q-squared disregards the error of class + predictions whose values are beyond the class labels (e.g. it treats predictions of -1.5 as -1 and 1.5 as 1). + + permutation_discriminant_q_squared_: array [n_splits*n_permutations] + The discriminant R-squared metric for the left-out data for each permutation. + + discriminant_q_squared_p_value_ : float + The p-value for the permutation test on DQ-squared + + accuracy_ : float, accuracy for discrimination + + discriminant_r_squared_: float + Discriminant R-squared, if this is an OPLSDA problem. Discriminant R-squared disregards the error of class + predictions whose values are beyond the class labels (e.g. it treats a predictions of -1.5 as -1 and 1.5 as 1). + + permutation_accuracy_: array [n_splits*n_permutations] + The accuracy of the left-out data for each permutation + + accuracy_p_value_: float + The p-value for the permutation test on accuracy + + roc_auc_ : float, area under ROC curve for discrimination + + permutation_roc_auc_: array [n_splits*n_permutations] + The area under the ROC curve of the left-out data for each permutation. + + roc_auc_p_value_: float + The p-value for the permutation test on the are under the ROC curve. + + n_components_ : float + The optimal number of orthogonal components to remove + + feature_significance_ : array [n_features], type bool + Whether permuting the feature results in a significantly different loading for that feature in the model. + Defined as the loading for the non-permuted data being outside the "middle" of the distribution of loadings + for the permuted data, where the boundaries are a percentile range defined by outer_alpha. + + feature_p_values_ : array [n_features] + An estimated p-value for the significance of the feature, defined as the ratio of loading values inside (-p,p) + where p is the loading for non-permuted data. + + permutation_loadings_ : array [n_inner_permutations, n_features] + Values for the loadings for the permuted data. + + loadings_ : array [n_features] + Loadings for the non-permuted data + + opls_ : OPLS + The OPLS transformer + + pls_ : PLSRegression + A 1-component PLS regressor used to evaluate the OPLS transform + + + References + ---------- + Johan Trygg and Svante Wold. Orthogonal projections to latent structures (O-PLS). + J. Chemometrics 2002; 16: 119-128. DOI: 10.1002/cem.695 + + Johan A. Westerhuis, Ewoud J. J. van Velzen, Huub C. J. Hoefsloot and Age K. Smilde. + Discriminant Q-squared (DQ-squared) for improved discrimination in PLSDA models. + Metabolomics (2008) 4: 293. https://doi.org/10.1007/s11306-008-0126-2 + """ + + def __init__(self, + min_n_components=1, + k=10, + scale=True, + force_regression=False, + n_permutations=100, + n_inner_permutations=100, + n_outer_permutations=500, + inner_alpha=0.2, + outer_alpha=0.05): + self.min_n_components = min_n_components + self.k = k + self.scale = scale + self.n_permutations = n_permutations + self.n_inner_permutations = n_inner_permutations + self.n_outer_permutations = n_outer_permutations + self.inner_alpha = inner_alpha + self.outer_alpha = outer_alpha + self.force_regression = force_regression + self.n_components_ = None + self.feature_significance_ = None + self.feature_p_values_ = None + + self.r_squared_Y_ = None + self.discriminant_r_squared_ = None + self.r_squared_X_ = None + + self.q_squared_ = None + self.permutation_q_squared_ = None + self.q_squared_p_value_ = None + + self.accuracy_ = None + self.permutation_accuracy_ = None + self.accuracy_p_value_ = None + + self.roc_auc_ = None + self.permutation_roc_auc_ = None + self.roc_auc_p_value_ = None + + self.discriminant_q_squared_ = None + self.discriminant_q_squared_p_value_ = None + self.permutation_discriminant_q_squared_ = None + + self.permutation_loadings_ = None + self.pls_ = None # a 1-component PLSRegression + self.opls_ = None # OPLS transform + self.loadings_ = None + self.binarizer_ = None + + @staticmethod + def _get_validator(Y, k): + if k == -1: + return LeaveOneOut() + else: + if type_of_target(Y) in ('binary', 'multiclass'): + return StratifiedKFold(k) + else: + return KFold(k) + + def is_discrimination(self, Y): + return type_of_target(Y).startswith('binary') and not self.force_regression + + def _get_score_function(self, Y): + return neg_pressd if self.is_discrimination(Y) else neg_press + + def _validate(self, X, Y, n_components, score_function, cv=None, n_jobs=None, verbose=0, pre_dispatch='2*n_jobs'): + cv = cv or self._get_validator(Y, self.k) + Z = OPLS(n_components, self.scale).fit_transform(X, Y) + y_pred = cross_val_predict(PLSRegression(n_components=1, scale=self.scale), Z, Y, cv=cv, n_jobs=n_jobs, verbose=verbose, + pre_dispatch=pre_dispatch) + return score_function(Y, y_pred) + + def _process_binary_target(self, y, pos_label=None): + self.binarizer_ = LabelBinarizer(-1, 1) + self.binarizer_.fit(y) + if pos_label is not None and self.binarizer_.transform([pos_label])[0] == -1: + self.binarizer_.classes_ = np.flip(self.binarizer_.classes_) + return self.binarizer_.transform(y).astype(float) + + def _check_target(self, y, pos_label=None): + y = check_array(y, dtype=None, copy=True, ensure_2d=False).reshape(-1, 1) + if type_of_target(y).startswith('multiclass') and not self.force_regression: + raise ValueError('Multiclass input not directly supported. ' + 'Try binarizing with sklearn.preprocessing.LabelBinarizer.') + if self.is_discrimination(y): + y = self._process_binary_target(y, pos_label) + else: + self.binarizer_ = None + return y + + def _determine_n_components(self, X, y, cv=None, scoring=None, n_jobs=None, verbose=0, pre_dispatch='2*n_jobs'): + """Determine number of orthogonal components to remove. + + Orthogonal components are removed until removing a component does not improve the performance + of the k-fold cross-validated OPLS estimator, as measured by the residual sum of squares of the left-out + data. + + Parameters + ---------- + X : array-like, shape = [n_samples, n_features] + Training vectors, where n_samples is the number of samples and + n_features is the number of predictors. + + y : array-like, shape = [n_samples, 1] + Target vector, where n_samples is the number of samples. + This implementation only supports a single response (target) variable. + + cv : sklearn.model_selection.BaseCrossValidator + A cross validator. If None, _get_validator() is used to determine the validator. If target is binary or + multiclass, sklearn.model_selection.StratifiedKFold is used, otherwise sklearn.model_selection.KFold + is used unless k=-1, then sklearn.model_selection.LeaveOneOut is used. + + scoring : + Scoring method to use. Will default to 'accuracy' for OPLS-DA and 'neg_mean_squared_error' for OPLS regression. + + n_jobs : int or None, optional (default=None) + The number of CPUs to use to do the computation. + ``None`` means 1 unless in a :obj:`joblib.parallel_backend` context. + ``-1`` means using all processors. See :term:`Glossary ` + for more details. + + verbose : integer, optional + The verbosity level. + + pre_dispatch : int, or string, optional + Controls the number of jobs that get dispatched during parallel + execution. Reducing this number can be useful to avoid an + explosion of memory consumption when more jobs get dispatched + than CPUs can process. This parameter can be: + + - None, in which case all the jobs are immediately + created and spawned. Use this for lightweight and + fast-running jobs, to avoid delays due to on-demand + spawning of the jobs + + - An int, giving the exact number of total jobs that are + spawned + + - A string, giving an expression as a function of n_jobs, + as in '2*n_jobs' + Returns + ------- + n_components: int + The number of components to remove to maximize q-squared + + """ + cv = cv or self._get_validator(y, self.k) + scoring = scoring or self._get_score_function(y) + n_components = self.min_n_components + + score = self._validate(X, y, n_components, scoring, cv, n_jobs, verbose, pre_dispatch) + while n_components < X.shape[1]: + next_score = self._validate(X, y, n_components + 1, scoring, cv, n_jobs, verbose, pre_dispatch) + if next_score <= score: + break + else: + score = next_score + n_components += 1 + return n_components + + def _determine_significant_features(self, + X, + y, + n_components, + random_state=0, + n_jobs=None, + verbose=0, + pre_dispatch='2*n_jobs'): + """Determine the significance of each feature + + This is done by permuting each feature in X and measuring the loading. + The feature is considered significant if the loadings are significantly different. + + This is always done with a regular PLS regressor + PLS-DA should be binarized first. + + Parameters + ---------- + X : array-like, shape = [n_samples, n_features] + Training vectors, where n_samples is the number of samples and + n_features is the number of predictors. + + y : array-like, shape = [n_samples, 1] + Target vector, where n_samples is the number of samples. + This implementation only supports a single response (target) variable. + + n_components : int + The number of orthogonal components to remove + + n_jobs : int or None, optional (default=None) + The number of CPUs to use to do the computation. + ``None`` means 1 unless in a :obj:`joblib.parallel_backend` context. + ``-1`` means using all processors. See :term:`Glossary ` + for more details. + + verbose : integer, optional + The verbosity level. + + + pre_dispatch : int, or string, optional + Controls the number of jobs that get dispatched during parallel + execution. Reducing this number can be useful to avoid an + explosion of memory consumption when more jobs get dispatched + than CPUs can process. This parameter can be: + + - None, in which case all the jobs are immediately + created and spawned. Use this for lightweight and + fast-running jobs, to avoid delays due to on-demand + spawning of the jobs + + - An int, giving the exact number of total jobs that are + spawned + + - A string, giving an expression as a function of n_jobs, + as in '2*n_jobs' + + Returns + ------- + significance: array [n_features], type bool + Whether a particular feature is significant. + + p_values: array [n_features] + The p-values for each feature. The null hypothesis is that permuting the feature does not change it's weight + in the one-component PLS model. + + permuted_loadings: array [n_inner_permutations, n_features] + The one-component PLS loadings for each permutation + """ + Z = OPLS(n_components, self.scale).fit_transform(X, y) + x_loadings, permutation_x_loadings, p_values = feature_permutation_loading( + PLSRegression(n_components=1, scale=self.scale), Z, y, self.n_inner_permutations, self.inner_alpha, + self.n_outer_permutations, random_state, n_jobs, verbose, pre_dispatch + ) + return p_values < self.outer_alpha, p_values, permutation_x_loadings + + def cross_val_roc_curve(self, X, y, cv=None, n_jobs=None, verbose=0, pre_dispatch='2*n_jobs'): + Z = self.opls_.transform(X) + cv = cv or self._get_validator(y, self.k) + check_is_fitted(self, ['opls_', 'pls_', 'binarizer_']) + y_pred = cross_val_predict(PLSRegression(n_components=1, scale=self.scale), Z, y, cv=cv, n_jobs=n_jobs, verbose=verbose, + pre_dispatch=pre_dispatch) + return roc_curve(y, y_pred) + + def fit(self, X, y, n_components=None, cv=None, pos_label=None, + random_state=0, n_jobs=None, verbose=0, pre_dispatch='2*n_jobs'): + """Evaluate the quality of the OPLS regressor + + The q-squared value and a p-value for each feature's significance is determined. The final regressor can be + accessed as estimator_. + + Parameters + ---------- + X : array-like, shape = [n_samples, n_features] + Training vectors, where n_samples is the number of samples and + n_features is the number of predictors. + + y : array-like, shape = [n_samples, 1] + Target vector, where n_samples is the number of samples. + This implementation only supports a single response (target) variable. + + n_components : int + The number of orthogonal components to remove. Will be determined by determine_n_components if None + + cv : sklearn.model_selection.BaseCrossValidator + A cross-validator to use for the determination of the number of components and the q-squared value. + If None, a cross validator will be selected based on the value of k and the values of the target variable. + If target is binary or multiclass, sklearn.model_selection.StratifiedKFold is used, otherwise + sklearn.model_selection.KFold is used unless k=-1, then sklearn.model_selection.LeaveOneOut is used. + + pos_label : string + If this is a discrimination problem, the value of the target corresponding to "1". + + random_state : int, RandomState instance or None, optional (default=0) + If int, random_state is the seed used by the random number generator; + If RandomState instance, random_state is the random number generator; + If None, the random number generator is the RandomState instance used + by `np.random`. + + n_jobs : int or None, optional (default=None) + The number of CPUs to use to do the computation. + ``None`` means 1 unless in a :obj:`joblib.parallel_backend` context. + ``-1`` means using all processors. See :term:`Glossary ` + for more details. + + verbose : integer, optional + The verbosity level. + + + pre_dispatch : int, or string, optional + Controls the number of jobs that get dispatched during parallel + execution. Reducing this number can be useful to avoid an + explosion of memory consumption when more jobs get dispatched + than CPUs can process. This parameter can be: + + - None, in which case all the jobs are immediately + created and spawned. Use this for lightweight and + fast-running jobs, to avoid delays due to on-demand + spawning of the jobs + + - An int, giving the exact number of total jobs that are + spawned + + - A string, giving an expression as a function of n_jobs, + as in '2*n_jobs' + + """ + + def _log(txt): + if verbose in range(1, 51): + stderr.write(txt + '\n') + if verbose > 50: + print(txt) + + X = check_array(X, dtype=float, copy=True) + y = self._check_target(y, pos_label) + + if not n_components: + _log('Determining number of components to remove.') + n_components = self._determine_n_components(X, y, cv, n_jobs=n_jobs, verbose=verbose, + pre_dispatch=pre_dispatch) + _log(f'Removing {n_components} orthogonal components.') + self.n_components_ = n_components or self._determine_n_components(X, y) + + self.opls_ = OPLS(self.n_components_, self.scale).fit(X, y) + Z = self.opls_.transform(X) + self.pls_ = PLSRegression(n_components=1, scale=self.scale).fit(Z, y) + self.r_squared_X_ = self.opls_.score(X) + y_pred = self.pls_.predict(Z) + self.r_squared_Y_ = r2_score(y, y_pred) + if self.is_discrimination(y): + self.discriminant_r_squared_ = r2_score(y, np.clip(y_pred, -1, 1)) + + cv = cv or self._get_validator(y, self.k) + + score_functions = [r2_score] + if self.is_discrimination(y): + score_functions += [discriminator_r2_score, discriminator_accuracy, discriminator_roc_auc] + + _log('Performing cross-validated metric permutation tests.') + + cv_results = permutation_test_score(PLSRegression(n_components=1, scale=self.scale), Z, y, cv=cv, + n_permutations=self.n_permutations, cv_score_functions=score_functions, + n_jobs=n_jobs, verbose=verbose, pre_dispatch=pre_dispatch) + if self.is_discrimination(y): + [ + (self.q_squared_, self.permutation_q_squared_, self.q_squared_p_value_), + (self.discriminant_q_squared_, self.permutation_discriminant_q_squared_, + self.discriminant_q_squared_p_value_), + (self.accuracy_, self.permutation_accuracy_, self.accuracy_p_value_), + (self.roc_auc_, self.permutation_roc_auc_, self.roc_auc_p_value_) + ] = cv_results + else: + [ + (self.q_squared_, self.permutation_q_squared_, self.q_squared_p_value_) + ] = cv_results + + _log('Estimating feature significance.') + + (self.feature_significance_, + self.feature_p_values_, + self.permutation_loadings_) = self._determine_significant_features(X, y, self.n_components_, random_state, + n_jobs, verbose, pre_dispatch) + return self + + def transform(self, X): + return self.opls_.transform(X) + + def predict(self, X): + Z = self.transform(X) + return self.pls_.predict(Z) + + def score(self, X, y, sample_weight=None): + Z = self.transform(X) + return r2_score(y, self.pls_.predict(Z)) + + def discriminator_roc(self, X, y): + Z = self.transform(X) + return roc_curve(y, self.pls_.predict(Z)) + + +class OPLSDAValidator(OPLSValidator, ClassifierMixin): + def __init__(self, + min_n_components=1, + k=10, + scale=True, + force_regression=False, + n_permutations=100, + n_inner_permutations=100, + n_outer_permutations=500, + inner_alpha=0.2, + outer_alpha=0.01): + super().__init__(min_n_components, + k, + scale, + force_regression, + n_permutations, + n_inner_permutations, + n_outer_permutations, + inner_alpha, + outer_alpha) + + def score(self, X, y, sample_weight=None): + Z = self.transform(X) + y_pred = self.pls_.predict(Z) + return r2_score(y, np.clip(y_pred, -1, 1)) + + def predict(self, X): + Z = self.opls_.transform(X) + values = np.sign(self.pls_.predict(Z)) + return self.binarizer_.inverse_transform(values).reshape(-1, 1)