diff --git a/AUTHORS.rst b/AUTHORS.rst index c0899dcd3..4a084ba80 100644 --- a/AUTHORS.rst +++ b/AUTHORS.rst @@ -54,6 +54,7 @@ Contributors * Faustin Pulvéric * Chaoqi Zhang * Leena Kamran Qidwai +* Omid Gheibi * Aman Vishnoi * Hannes Körner To be continued ... diff --git a/HISTORY.rst b/HISTORY.rst index bae4b16ed..a0bc03265 100644 --- a/HISTORY.rst +++ b/HISTORY.rst @@ -4,6 +4,8 @@ History 1.x.x (2025-xx-xx) ------------------ +* Introduce VennAbers calibrator both for binary and multiclass classification + * Add an example of risk control with LLM as a judge * Add comparison with naive threshold in risk control quick start example * Configure self hosted runner for minimal requirements tests diff --git a/mapie/_venn_abers.py b/mapie/_venn_abers.py new file mode 100644 index 000000000..930e092ab --- /dev/null +++ b/mapie/_venn_abers.py @@ -0,0 +1,1022 @@ +import numpy as np +from sklearn.model_selection import StratifiedKFold, train_test_split +from sklearn.multiclass import OneVsOneClassifier +from sklearn.utils.validation import check_is_fitted +from sklearn.exceptions import NotFittedError +from sklearn.base import clone + + +""" +Private module containing core Venn-ABERS implementation classes. + +This module contains the internal implementation details for Venn-ABERS +calibration. Users should use VennAbersCalibrator from mapie.calibration instead. +""" + + +def _geo_mean(a): + """Geometric mean calculation for Venn-ABERS.""" + return a.prod(axis=1) ** (1.0 / a.shape[1]) + + +def calc_p0p1(p_cal, y_cal, precision=None): + """ + Function that calculates isotonic calibration vectors + required for Venn-ABERS calibration + + This function relies on the geometric representation of isotonic + regression as the slope of the GCM (greatest convex minorant) of the CSD + (cumulative sum diagram) as decribed in [1] pages 9–13 (especially Theorem 1.1). + In particular, the function implements + algorithms 1-4 as described in Chapter 2 in [2] + + + References + ---------- + [1] Richard E. Barlow, D. J. Bartholomew, J. M. Bremner, and H. Daniel + Brunk. Statistical Inference under Order Restrictions: The Theory and + Application of Isotonic Regression. Wiley, London, 1972. + + [2] Vovk, Vladimir, Ivan Petej, and Valentina Fedorova. + "Large-scale probabilistic predictors with and without guarantees of validity." + Advances in Neural Information Processing Systems 28 (2015). + (arxiv version https://arxiv.org/pdf/1511.00213.pdf) + + + Parameters + ---------- + p_cal : {array-like}, shape (n_samples, 2) + Input data for calibration consisting of calibration set probabilities + + y_cal : {array-like}, shape (n_samples,) + Associated binary class labels. + + precision: int, default = None + Optional number of decimal points to which + Venn-Abers calibration probabilities p_cal are rounded to. + Yields significantly faster computation time for larger calibration datasets. + If None no rounding is applied. + + + Returns + ---------- + p_0 : {array-like}, shape (n_samples, ) + Precomputed vector storing values of the isotonic regression + fitted to a sequence that contains binary class label 0 + + p_1 : {array-like}, shape (n_samples, ) + Precomputed vector storing values of the isotonic regression + fitted to a sequence + that contains binary class label 1 + + c : {array-like}, shape (n_samples, ) + Ordered set of unique calibration probabilities + """ + if precision is not None: + cal = np.hstack( + (np.round(p_cal[:, 1], precision).reshape(-1, 1), y_cal.reshape(-1, 1)) + ) + else: + cal = np.hstack((p_cal[:, 1].reshape(-1, 1), y_cal.reshape(-1, 1))) + ix = np.argsort(cal[:, 0]) + k_sort = cal[ix, 0] + k_label_sort = cal[ix, 1] + + c = np.unique(k_sort) + ia = np.searchsorted(k_sort, c) + + w = np.zeros(len(c)) + + w[:-1] = np.diff(ia) + w[-1] = len(k_sort) - ia[-1] + + k_dash = len(c) + P = np.zeros((k_dash + 2, 2)) + + P[0, :] = -1 + + P[2:, 0] = np.cumsum(w) + P[2:-1, 1] = np.cumsum(k_label_sort)[(ia - 1)[1:]] + P[-1, 1] = np.cumsum(k_label_sort)[-1] + + p1 = np.zeros((len(c) + 1, 2)) + p1[1:, 0] = c + + P1 = P[1:] + 1 + + for i in range(len(p1)): + P1[i, :] = P1[i, :] - 1 + + if i == 0: + grads = np.divide(P1[:, 1], P1[:, 0]) + grad = np.nanmin(grads) + p1[i, 1] = grad + c_point = 0 + else: + imp_point = P1[c_point, 1] + (P1[i, 0] - P1[c_point, 0]) * grad + + if P1[i, 1] < imp_point: + grads = np.divide((P1[i:, 1] - P1[i, 1]), (P1[i:, 0] - P1[i, 0])) + if np.sum(np.isnan(np.nanmin(grads))) == 0: + grad = np.nanmin(grads) + c_point = i + p1[i, 1] = grad + else: + p1[i, 1] = grad + + p0 = np.zeros((len(c) + 1, 2)) + p0[1:, 0] = c + + P0 = P[1:] + + for i in range(len(p1) - 1, -1, -1): + P0[i, 0] = P0[i, 0] + 1 + + if i == len(p1) - 1: + grads = np.divide((P0[:, 1] - P0[i, 1]), (P0[:, 0] - P0[i, 0])) + grad = np.nanmax(grads) + p0[i, 1] = grad + c_point = i + else: + imp_point = P0[c_point, 1] + (P0[i, 0] - P0[c_point, 0]) * grad + + if P0[i, 1] < imp_point: + grads = np.divide((P0[:, 1] - P0[i, 1]), (P0[:, 0] - P0[i, 0])) + grads[i:] = 0 + grad = np.nanmax(grads) + c_point = i + p0[i, 1] = grad + else: + p0[i, 1] = grad + return p0, p1, c + + +def calc_probs(p0, p1, c, p_test): + """ + Function that calculates Venn-Abers multiprobability outputs and + associated calibrated probabilities + + In particular, the function implements algorithms 5-6 + as described in Chapter 2 in [1] + + References + ---------- + [1] Vovk, Vladimir, Ivan Petej, and Valentina Fedorova. + "Large-scale probabilistic predictors with and without guarantees of validity." + Advances in Neural Information Processing Systems 28 (2015). + (arxiv version https://arxiv.org/pdf/1511.00213.pdf) + + + Parameters + ---------- + p0 : {array-like}, shape (n_samples, ) + Precomputed vector storing values of the isotonic regression + fitted to a sequence that contains binary class label 0 + + p1 : {array-like}, shape (n_samples, ) + Precomputed vector storing values of the isotonic regression + fitted to a sequence that contains binary class label 1 + + c : {array-like}, shape (n_samples, ) + Ordered set of unique calibration probabilities + + p_test : {array-like}, shape (n_samples, 2) + An array of probability outputs which are to be calibrated + + + Returns + ---------- + p_prime : {array-like}, shape (n_samples, 2) + Calibrated probability outputs + + p0_p1 : {array-like}, shape (n_samples, 2) + Associated multiprobability outputs + (as described in Section 4 in https://arxiv.org/pdf/1511.00213.pdf) + """ + out = p_test[:, 1] + p0_p1 = np.hstack( + ( + p0[np.searchsorted(c, out, "right"), 1].reshape(-1, 1), + p1[np.searchsorted(c, out, "left"), 1].reshape(-1, 1), + ) + ) + + p_prime = np.zeros((len(out), 2)) + p_prime[:, 1] = p0_p1[:, 1] / (1 - p0_p1[:, 0] + p0_p1[:, 1]) + p_prime[:, 0] = 1 - p_prime[:, 1] + + return p_prime, p0_p1 + + +def predict_proba_prefitted_va( + p_cal, y_cal, p_test, precision=None, va_tpe="one_vs_one" +): + """ + Generate Venn-ABERS calibrated probabilities + for multiclass problems using pre-fitted calibration data. + + This function performs Venn-ABERS calibration on multiclass problems + by decomposing them into binary classification problems + using either one-vs-one or one-vs-all strategies. + It uses pre-computed calibration probabilities and + labels to calibrate test probabilities. + + Parameters + ---------- + p_cal : array-like of shape (n_cal_samples, n_classes) + Calibration set probabilities for each class. + These are the predicted probabilities + from the base classifier on the calibration set. + + y_cal : array-like of shape (n_cal_samples,) + True class labels for the calibration set. + Should contain integer class labels. + + p_test : array-like of shape (n_test_samples, n_classes) + Test set probabilities for each class that need to be calibrated. + These are the predicted probabilities from the base classifier + on the test set. + + precision : int, optional, default=None + Number of decimal places to round calibration probabilities + to for faster computation. + If None, no rounding is applied. + Useful for large calibration datasets to improve + computational efficiency. + + va_tpe : {'one_vs_one', 'one_vs_all'}, default='one_vs_one' + Strategy for decomposing multiclass problem into binary problems: + - 'one_vs_one': Creates binary classifiers for each pair of classes + - 'one_vs_all': Creates binary classifiers for each class vs all others + + Returns + ------- + p_prime : ndarray of shape (n_test_samples, n_classes) + Venn-ABERS calibrated probabilities for each class. + Probabilities are normalized + to sum to 1 across classes for each sample. + + multiclass_p0p1 : list of ndarray + List containing the multiprobability outputs (p0, p1) + for each binary problem. + The structure depends on the decomposition strategy: + - For 'one_vs_one': List of length C(n_classes, 2) + containing arrays of shape (n_test_samples, 2) for each class pair + - For 'one_vs_all': List of length n_classes containing arrays of shape + (n_test_samples, 2) for each class vs rest problem + + Notes + ----- + This function implements the Venn-ABERS calibration method as described in [1]_. + The multiclass extension uses the approach described in [2]_ for combining + binary calibrators. + + For 'one_vs_one' strategy, the final probabilities are computed using the + pairwise coupling method where each class probability is the harmonic mean + of its pairwise probabilities. + + References + ---------- + .. [1] Vovk, Vladimir, Ivan Petej, and Valentina Fedorova. "Large-scale + probabilistic predictors with and without guarantees of validity." + Advances in Neural Information Processing Systems 28 (2015). + + .. [2] Hastie, Trevor, and Robert Tibshirani. "Classification by pairwise + coupling." Advances in Neural Information Processing Systems 10 (1997). + + Examples + -------- + >>> import numpy as np + >>> # Calibration data + >>> p_cal = np.array([[0.7, 0.2, 0.1], [0.3, 0.6, 0.1], [0.1, 0.1, 0.8]]) + >>> y_cal = np.array([0, 1, 2]) + >>> # Test data + >>> p_test = np.array([[0.6, 0.3, 0.1], [0.2, 0.7, 0.1]]) + >>> + >>> p_calibrated, p0p1 = predict_proba_prefitted_va(p_cal, y_cal, p_test) + >>> print(p_calibrated.shape) + (2, 3) + """ + # Validate va_tpe parameter + if va_tpe not in ["one_vs_one", "one_vs_all"]: + raise ValueError( + f"Invalid va_tpe '{va_tpe}'. " + f"Allowed values are ['one_vs_one', 'one_vs_all']." + ) + + p_prime = None + multiclass_p0p1 = None + + classes = np.unique(y_cal) + class_label_to_idx_map = {label: i for i, label in enumerate(classes)} + + if va_tpe == "one_vs_one": + class_pairs_labels = [] + classes_pairs_indices = [] + for i in range(len(classes) - 1): + for j in range(i + 1, len(classes)): + class_pairs_labels.append([classes[i], classes[j]]) + classes_pairs_indices.append( + [ + class_label_to_idx_map[classes[i]], + class_label_to_idx_map[classes[j]], + ] + ) + + multiclass_probs = [] + multiclass_p0p1 = [] + for i, class_pair in enumerate(class_pairs_labels): + pairwise_indices = (y_cal == class_pair[0]) + (y_cal == class_pair[1]) + binary_cal_probs = p_cal[:, classes_pairs_indices[i]][ + pairwise_indices + ] / np.sum( + p_cal[:, classes_pairs_indices[i]][pairwise_indices], axis=1 + ).reshape(-1, 1) + binary_test_probs = p_test[:, classes_pairs_indices[i]] / np.sum( + p_test[:, classes_pairs_indices[i]], axis=1 + ).reshape(-1, 1) + binary_classes = y_cal[pairwise_indices] == class_pair[1] + + va = VennAbers() + va.fit(binary_cal_probs, binary_classes, precision=precision) + p_pr, p0_p1 = va.predict_proba(binary_test_probs) + multiclass_probs.append(p_pr) + multiclass_p0p1.append(p0_p1) + + p_prime = np.zeros((len(p_test), len(classes))) + + for ( + i, + cl_id, + ) in enumerate(classes): + stack_i = [ + p[:, 0].reshape(-1, 1) + for i, p in enumerate(multiclass_probs) + if class_pairs_labels[i][0] == cl_id + ] + stack_j = [ + p[:, 1].reshape(-1, 1) + for i, p in enumerate(multiclass_probs) + if class_pairs_labels[i][1] == cl_id + ] + p_stack = stack_i + stack_j + + p_prime[:, i] = 1 / ( + np.sum(np.hstack([(1 / p) for p in p_stack]), axis=1) + - (len(classes) - 2) + ) + + else: + multiclass_probs = [] + multiclass_p0p1 = [] + for _, class_id in enumerate(classes): + class_indices = y_cal == class_id + binary_cal_probs = np.zeros((len(p_cal), 2)) + binary_test_probs = np.zeros((len(p_test), 2)) + binary_cal_probs[:, 1] = p_cal[:, class_label_to_idx_map[class_id]] + binary_cal_probs[:, 0] = 1 - binary_cal_probs[:, 1] + binary_test_probs[:, 1] = p_test[:, class_label_to_idx_map[class_id]] + binary_test_probs[:, 0] = 1 - binary_test_probs[:, 1] + binary_classes = class_indices + + va = VennAbers() + va.fit(binary_cal_probs, binary_classes, precision=precision) + p_pr, p0_p1 = va.predict_proba(binary_test_probs) + multiclass_probs.append(p_pr) + multiclass_p0p1.append(p0_p1) + + p_prime = np.zeros((len(p_test), len(classes))) + + for i, _ in enumerate(classes): + p_prime[:, i] = multiclass_probs[i][:, 1] + + p_prime = p_prime / np.sum(p_prime, axis=1).reshape(-1, 1) + + return p_prime, multiclass_p0p1 + + +class VennAbers: + """ + Implementation of the Venn-ABERS calibration for binary classification problems. + Venn-ABERS calibration is a method of turning machine learning + classification algorithms into probabilistic predictors that + automatically enjoys a property of validity (perfect calibration) and + is computationally efficient. + The algorithm is described in [1]. + + + References + ---------- + [1] Vovk, Vladimir, Ivan Petej, and Valentina Fedorova. + "Large-scale probabilistic predictors with and without guarantees of validity." + Advances in Neural Information Processing Systems 28 (2015). + (arxiv version https://arxiv.org/pdf/1511.00213.pdf) + + .. versionadded:: 1.0 + + + Examples + -------- + >>> import numpy as np + >>> from sklearn.datasets import make_classification + >>> from sklearn.model_selection import train_test_split + >>> from sklearn.naive_bayes import GaussianNB + >>> from mapie._venn_abers import VennAbers + >>> + >>> # Generate data and split into train/test + >>> X, y = make_classification(n_samples=1000, n_classes=2, n_informative=10) + >>> X_train, X_test, y_train, y_test = train_test_split(X, y) + >>> + >>> # Further split training data into proper training and calibration sets + >>> X_train_proper, X_cal, y_train_proper, y_cal = train_test_split( + ... X_train, y_train, test_size=0.2, shuffle=False + ... ) + >>> + >>> # Train classifier on proper training set + >>> clf = GaussianNB() + >>> _ = clf.fit(X_train_proper, y_train_proper) + >>> + >>> # Get probability predictions for calibration and test sets + >>> p_cal = clf.predict_proba(X_cal) + >>> p_test = clf.predict_proba(X_test) + >>> + >>> # Apply Venn-ABERS calibration + >>> va = VennAbers() + >>> va.fit(p_cal, y_cal) + >>> p_prime, p0_p1 = va.predict_proba(p_test) + >>> + >>> # p_prime contains calibrated probabilities + >>> print(p_prime.shape) + (250, 2) + """ + + def __init__(self): + self.p0 = None + self.p1 = None + self.c = None + + def fit(self, p_cal, y_cal, precision=None): + """Fits the VennAbers calibrator to the calibration dataset + + Parameters + ---------- + p_cal : {array-like}, shape (n_samples, 2) + Input data for calibration consisting of calibration set probabilities + + y_cal : {array-like}, shape (n_samples,) + Associated binary class labels. + + precision: int, default = None + Optional number of decimal points to which Venn-Abers calibration + probabilities p_cal are rounded to. + Yields significantly faster computation time for larger calibration datasets + """ + self.p0, self.p1, self.c = calc_p0p1(p_cal, y_cal, precision) + + def predict_proba(self, p_test): + """Generates Venn-Abers probability estimates + + + Parameters + ---------- + p_test : {array-like}, shape (n_samples, 2) + An array of probability outputs which are to be calibrated + + + Returns + ---------- + p_prime : {array-like}, shape (n_samples, 2) + Calibrated probability outputs + + p0_p1 : {array-like}, shape (n_samples, 2) + Associated multiprobability outputs + (as described in Section 4 in https://arxiv.org/pdf/1511.00213.pdf) + """ + p_prime, p0_p1 = calc_probs(self.p0, self.p1, self.c, p_test) + return p_prime, p0_p1 + + +class VennAbersCV: + """ + Inductive (IVAP) or Cross (CVAP) Venn-ABERS prediction method + for binary classification problems + + Implements the Inductive or Cross Venn-Abers calibration method + as described in Sections 2-4 in [1] + + References + ---------- + [1] Vovk, Vladimir, Ivan Petej, and Valentina Fedorova. + "Large-scale probabilistic predictors with and without guarantees of validity." + Advances in Neural Information Processing Systems 28 (2015). + (arxiv version https://arxiv.org/pdf/1511.00213.pdf) + + Parameters + ---------- + + estimator : sci-kit learn estimator instance, default=None + The classifier whose output need to be calibrated to provide more + accurate `predict_proba` outputs. + + inductive : bool + True to run the Inductive (IVAP) or False for Cross (CVAP) + Venn-ABERS calibration + + n_splits: int, default=5 + For CVAP only, number of folds. Must be at least 2. + Uses sklearn.model_selection.StratifiedKFold functionality + (https://scikit-learn.org/stable/modules/generated/sklearn.model_selection.StratifiedKFold.html). + + cal_size : float or int, default=None + For IVAP only, uses sklearn.model_selection.train_test_split functionality + (https://scikit-learn.org/stable/modules/generated/sklearn.model_selection.train_test_split.html). + If float, should be between 0.0 and 1.0 and represent the proportion + of the dataset to include in the proper training / calibration split. + If int, represents the absolute number of test samples. If None, the + value is set to the complement of the train size. If ``train_size`` + is also None, it will be set to 0.25. + + train_proper_size : float or int, default=None + For IVAP only, if float, should be between 0.0 and 1.0 and represent the + proportion of the dataset to include in the proper training set split. If + int, represents the absolute number of train samples. If None, + the value is automatically set to the complement of the test size. + + random_state : int, RandomState instance or None, default=None + Controls the shuffling applied to the data before applying the split. + Pass an int for reproducible output across multiple function calls. + + shuffle : bool, default=True + Whether to shuffle the data before splitting. For IVAP if shuffle=False + then stratify must be None. For CVAP whether to shuffle each class's samples + before splitting into batches + + stratify : array-like, default=None + For IVAP only. If not None, data is split in a stratified fashion, using this as + the class labels. + + precision: int, default = None + Optional number of decimal points to which Venn-Abers calibration + probabilities p_cal are rounded to. + Yields significantly faster computation time for larger calibration datasets + + cv_ensemble: bool, default = True + If False then the predictions for the test set are generated using the underlying classifier trained + on the whole training set, instead of on the split (in the case of IVAP) or folds (in the case of CVAP) + """ + + def __init__( + self, + estimator, + inductive, + n_splits=None, + cal_size=None, + train_proper_size=None, + random_state=None, + shuffle=True, + stratify=None, + precision=None, + cv_ensemble=True, + ): + self.estimator = estimator + self.n_splits = n_splits + self.clf_p_cal = [] + self.clf_y_cal = [] + self.inductive = inductive + self.cal_size = cal_size + self.train_proper_size = train_proper_size + self.random_state = random_state + self.shuffle = shuffle + self.stratify = stratify + self.precision = precision + self.estimators = [] + self.cv_ensemble = cv_ensemble + + def fit(self, _x_train, _y_train, sample_weight=None): + """Fits the IVAP or CVAP calibrator to the training set. + + Parameters + ---------- + _x_train : {array-like}, shape (n_samples,) + Input data for calibration consisting of training set numerical features + + _y_train : {array-like}, shape (n_samples,) + Associated binary class labels. + + sample_weight : {array-like}, shape (n_samples,), optional + Sample weights for fitting the estimators. + If None, then samples are equally weighted. + """ + if self.inductive: + self.n_splits = 1 + + estimator_full = clone(self.estimator) + if sample_weight is not None: + estimator_full.fit( + _x_train, _y_train.flatten(), sample_weight=sample_weight + ) + else: + estimator_full.fit(_x_train, _y_train.flatten()) + self.estimators.append(estimator_full) + + # Split sample_weight along with data if provided + if sample_weight is not None: + x_train_proper, x_cal, y_train_proper, y_cal, sw_train, sw_cal = ( + train_test_split( + _x_train, + _y_train, + sample_weight, + test_size=self.cal_size, + train_size=self.train_proper_size, + random_state=self.random_state, + shuffle=self.shuffle, + stratify=self.stratify, + ) + ) + else: + x_train_proper, x_cal, y_train_proper, y_cal = train_test_split( + _x_train, + _y_train, + test_size=self.cal_size, + train_size=self.train_proper_size, + random_state=self.random_state, + shuffle=self.shuffle, + stratify=self.stratify, + ) + sw_train = None + + estimator_proper = clone(self.estimator) + if sw_train is not None: + estimator_proper.fit( + x_train_proper, y_train_proper.flatten(), sample_weight=sw_train + ) + else: + estimator_proper.fit(x_train_proper, y_train_proper.flatten()) + self.estimators.append(estimator_proper) + + clf_prob = estimator_proper.predict_proba(x_cal) + self.clf_p_cal.append(clf_prob) + self.clf_y_cal.append(y_cal) + + else: + estimator_full = clone(self.estimator) + if sample_weight is not None: + estimator_full.fit( + _x_train, _y_train.flatten(), sample_weight=sample_weight + ) + else: + estimator_full.fit(_x_train, _y_train.flatten()) + self.estimators.append(estimator_full) + + kf = StratifiedKFold( + n_splits=self.n_splits, + shuffle=self.shuffle, + random_state=self.random_state, + ) + for train_index, test_index in kf.split(_x_train, _y_train): + # Extract sample weights for this fold if provided + fold_sample_weight = None + if sample_weight is not None: + fold_sample_weight = sample_weight[train_index] + + # Clone and fit estimator for this fold (for cv_ensemble=True) + estimator_fold = clone(self.estimator) + if fold_sample_weight is not None: + estimator_fold.fit( + _x_train[train_index], + _y_train[train_index].flatten(), + sample_weight=fold_sample_weight, + ) + else: + estimator_fold.fit( + _x_train[train_index], _y_train[train_index].flatten() + ) + self.estimators.append(estimator_fold) + + clf_prob = estimator_fold.predict_proba(_x_train[test_index]) + self.clf_p_cal.append(clf_prob) + self.clf_y_cal.append(_y_train[test_index]) + + def predict_proba(self, _x_test, loss="log", p0_p1_output=False): + """Generates Venn-ABERS calibrated probabilities. + + + Parameters + ---------- + _x_test : {array-like}, shape (n_samples,) + Training set numerical features + + loss : str, default='log' + Log or Brier loss. For further details of calculation + see Section 4 in https://arxiv.org/pdf/1511.00213.pdf + + p0_p1_output: bool, default = False + If True, function also returns p0_p1 binary probabilistic outputs + + Returns + ---------- + p_prime: {array-like}, shape (n_samples,n_classes) + Venn-ABERS calibrated probabilities + + p0_p1: {array-like}, default = None + Venn-ABERS calibrated p0 and p1 outputs (if p0_p1_output = True) + """ + + p0p1_test = [] + for i in range(self.n_splits): + va = VennAbers() + va.fit( + p_cal=self.clf_p_cal[i], + y_cal=self.clf_y_cal[i], + precision=self.precision, + ) + clf_prob_test = ( + self.estimators[i + 1] if self.cv_ensemble else self.estimators[0] + ).predict_proba(_x_test) + _, probs = va.predict_proba(p_test=clf_prob_test) + p0p1_test.append(probs) + p0_stack = np.hstack([prob[:, 0].reshape(-1, 1) for prob in p0p1_test]) + p1_stack = np.hstack([prob[:, 1].reshape(-1, 1) for prob in p0p1_test]) + + p_prime = np.zeros((len(_x_test), 2)) + + if loss == "log": + p_prime[:, 1] = _geo_mean(p1_stack) / ( + _geo_mean(1 - p0_stack) + _geo_mean(p1_stack) + ) + p_prime[:, 0] = 1 - p_prime[:, 1] + else: + p_prime[:, 1] = ( + 1 + / self.n_splits + * ( + np.sum(p1_stack, axis=1) + + 0.5 * np.sum(p0_stack**2, axis=1) + - 0.5 * np.sum(p1_stack**2, axis=1) + ) + ) + p_prime[:, 0] = 1 - p_prime[:, 1] + + if p0_p1_output: + p0_p1 = np.hstack((p0_stack, p1_stack)) + return p_prime, p0_p1 + else: + return p_prime + + +class VennAbersMultiClass: + """ + Inductive (IVAP) or Cross (CVAP) Venn-ABERS prediction method + for multi-class classification problems + + Implements the Inductive or Cross Venn-Abers calibration method + as described in [1] + + References + ---------- + [1] Manokhin, Valery. "Multi-class probabilistic classification using + inductive and cross Venn–Abers predictors." In Conformal and Probabilistic + Prediction and Applications, pp. 228-240. PMLR, 2017. + + Parameters + __________ + + estimator : sci-kit learn estimator instance + The classifier whose output need to be calibrated to provide more + accurate `predict_proba` outputs. + + inductive : bool + True to run the Inductive (IVAP) or False for Cross (CVAP) + Venn-ABERS calibration + + n_splits: int, default=5 + For CVAP only, number of folds. Must be at least 2. + Uses sklearn.model_selection.StratifiedKFold functionality + (https://scikit-learn.org/stable/modules/generated/sklearn.model_selection.StratifiedKFold.html). + + cal_size : float or int, default=None + For IVAP only, uses sklearn.model_selection.train_test_split functionality + (https://scikit-learn.org/stable/modules/generated/sklearn.model_selection.train_test_split.html). + If float, should be between 0.0 and 1.0 and represent the proportion + of the dataset to include in the proper training / calibration split. + If int, represents the absolute number of test samples. If None, the + value is set to the complement of the train size. If ``train_size`` + is also None, it will be set to 0.25. + + train_size : float or int, default=None + For IVAP only, if float, should be between 0.0 and 1.0 and represent the + proportion of the dataset to include in the proper training set split. If + int, represents the absolute number of train samples. If None, + the value is automatically set to the complement of the test size. + + random_state : int, RandomState instance or None, default=None + Controls the shuffling applied to the data before applying the split. + Pass an int for reproducible output across multiple function calls. + + shuffle : bool, default=True + Whether to shuffle the data before splitting. For IVAP if shuffle=False + then stratify must be None. + For CVAP whether to shuffle each class's samples + before splitting into batches + + stratify : array-like, default=None + For IVAP only. If not None, data is split in a stratified fashion, + using this as the class labels. + + precision: int, default = None + Optional number of decimal points to which Venn-Abers calibration + probabilities p_cal are rounded to. + Yields significantly faster computation time for larger calibration datasets + + cv_ensemble: bool, default = True + If False then the predictions for the test set are generated using the underlying classifier trained + on the whole training set, instead of on the split (in the case of IVAP) or folds (in the case of CVAP) + """ + + def __init__( + self, + estimator, + inductive, + n_splits=None, + cal_size=None, + train_proper_size=None, + random_state=None, + shuffle=True, + stratify=None, + precision=None, + cv_ensemble=True, + ): + self.estimator = estimator + self.inductive = inductive + self.n_splits = n_splits + self.cal_size = cal_size + self.train_proper_size = train_proper_size + self.random_state = random_state + self.shuffle = shuffle + self.stratify = stratify + self.multi_class_model = [] + self.n_classes = None + self.classes = None + self.pairwise_id = [] + self.clf_ovo = None + self.multiclass_cal = [] + self.multiclass_va_estimators = [] + self.multiclass_probs = [] + self.multiclass_p0p1 = [] + self.precision = precision + self.cv_ensemble = cv_ensemble + + def fit(self, _x_train, _y_train, sample_weight=None): + """ + Fits the Venn-ABERS calibrator to the training set + + Parameters + ---------- + _x_train : {array-like}, shape (n_samples,) + Input data for calibration consisting of training set numerical features + + _y_train : {array-like}, shape (n_samples,) + Associated binary class labels. + + sample_weight : {array-like}, shape (n_samples,), optional + Sample weights for fitting the estimators. + If None, then samples are equally weighted. + """ + + # integrity checks + if not self.inductive and self.n_splits is None: + raise ValueError("For Cross Venn ABERS please provide n_splits") + try: + check_is_fitted(self.estimator) + except NotFittedError: + if (self.inductive and self.cal_size is None) and ( + self.train_proper_size is None + ): + raise ValueError( + "For Inductive Venn-ABERS please provide either calibration" + "or proper train set size" + ) + + self.classes = np.unique(_y_train) + self.n_classes = len(self.classes) + + for i in range(self.n_classes): + for j in range(i + 1, self.n_classes): + self.pairwise_id.append([self.classes[i], self.classes[j]]) + + # Fit the OneVsOne classifier with sample weights if provided + fit_params = {} + if sample_weight is not None: + fit_params["sample_weight"] = sample_weight + + # Clone the estimator to avoid modifying the original + # estimator_clone = clone(self.estimator) + + # OneVsOneClassifier will handle the estimator's preprocessing + # (e.g., if it's a pipeline, it will apply transformations internally) + self.clf_ovo = OneVsOneClassifier(self.estimator) + self.clf_ovo.fit(_x_train, _y_train, **fit_params) + + for pair_id, clf_ovo_estimator in enumerate(self.clf_ovo.estimators_): + _pairwise_indices = (_y_train == self.pairwise_id[pair_id][0]) + ( + _y_train == self.pairwise_id[pair_id][1] + ) + + # Extract sample weights for this pair if provided + pair_sample_weight = None + if sample_weight is not None: + pair_sample_weight = sample_weight[_pairwise_indices] + + # pair_estimator = clone(self.estimator) + + va_cv = VennAbersCV( + self.estimator, + inductive=self.inductive, + n_splits=self.n_splits, + cal_size=self.cal_size, + train_proper_size=self.train_proper_size, + random_state=self.random_state, + shuffle=self.shuffle, + stratify=self.stratify, + precision=self.precision, + cv_ensemble=self.cv_ensemble, + ) + va_cv.fit( + _x_train[_pairwise_indices], + np.array( + _y_train[_pairwise_indices] == self.pairwise_id[pair_id][1] + ).reshape(-1, 1), + sample_weight=pair_sample_weight, + ) + self.multiclass_va_estimators.append(va_cv) + + def predict_proba(self, _x_test, loss="log", p0_p1_output=False): + """ + Generates Venn-ABERS calibrated probabilities. + + Parameters + ---------- + _x_test : {array-like}, shape (n_samples,) + Training set numerical features + + loss : str, default='log' + Log or Brier loss. For further details of calculation + see Section 4 in https://arxiv.org/pdf/1511.00213.pdf + + p0_p1_output: bool, default = False + If True, function also returns a set p0_p1 binary probabilistic outputs + for each fold + + Returns + ---------- + p_prime: {array-like}, shape (n_samples,n_classes) + Venn-ABERS calibrated probabilities + + p0_p1: {array-like}, default = None + Venn-ABERS calibrated p0 and p1 outputs (if p0_p1_output = True) + """ + + self.multiclass_probs = [] + self.multiclass_p0p1 = [] + + if p0_p1_output: + for i, va_estimator in enumerate(self.multiclass_va_estimators): + _p_prime, _p0_p1 = va_estimator.predict_proba( + _x_test, loss=loss, p0_p1_output=True + ) + self.multiclass_probs.append(_p_prime) + self.multiclass_p0p1.append(_p0_p1) + else: + for i, va_estimator in enumerate(self.multiclass_va_estimators): + _p_prime = va_estimator.predict_proba(_x_test, loss=loss) + self.multiclass_probs.append(_p_prime) + + p_prime = np.zeros((len(_x_test), self.n_classes)) + + for ( + i, + cl_id, + ) in enumerate(self.classes): + stack_i = [ + p[:, 0].reshape(-1, 1) + for i, p in enumerate(self.multiclass_probs) + if self.pairwise_id[i][0] == cl_id + ] + stack_j = [ + p[:, 1].reshape(-1, 1) + for i, p in enumerate(self.multiclass_probs) + if self.pairwise_id[i][1] == cl_id + ] + p_stack = stack_i + stack_j + + p_prime[:, i] = 1 / ( + np.sum(np.hstack([(1 / p) for p in p_stack]), axis=1) + - (self.n_classes - 2) + ) + + p_prime = p_prime / np.sum(p_prime, axis=1).reshape(-1, 1) + + if p0_p1_output: + return p_prime, self.multiclass_p0p1 + else: + return p_prime diff --git a/mapie/calibration.py b/mapie/calibration.py index ded5106e7..9982b4a54 100644 --- a/mapie/calibration.py +++ b/mapie/calibration.py @@ -2,11 +2,13 @@ import warnings from typing import Dict, Optional, Tuple, Union, cast - +from inspect import signature import numpy as np from sklearn.base import BaseEstimator, ClassifierMixin, RegressorMixin, clone from sklearn.calibration import _SigmoidCalibration from sklearn.isotonic import IsotonicRegression +from sklearn.exceptions import NotFittedError +from sklearn.pipeline import Pipeline from sklearn.utils import check_random_state from sklearn.utils.multiclass import type_of_target from sklearn.utils.validation import _check_y, _num_samples, check_is_fitted, indexable @@ -21,6 +23,8 @@ _get_calib_set, ) +from ._venn_abers import predict_proba_prefitted_va, VennAbers, VennAbersMultiClass + class TopLabelCalibrator(BaseEstimator, ClassifierMixin): """ @@ -539,3 +543,544 @@ def predict( """ check_is_fitted(self, self.fit_attributes) return self.single_estimator_.predict(X) + + +class VennAbersCalibrator(BaseEstimator, ClassifierMixin): + """ + Venn-ABERS calibration for binary and multi-class problems. + + A class implementing binary [1] or multi-class [2] Venn-ABERS calibration. + This calibrator provides well-calibrated probabilities with validity guarantees. + The implementation is based on the reference implementation by the user ip200 [3]. + + Can be used in 3 different forms: + - Prefit Venn-ABERS: estimator is already fitted, only calibration is performed + - Inductive Venn-ABERS (IVAP): splits data into training and calibration sets + - Cross Venn-ABERS (CVAP): uses cross-validation for calibration + + Parameters + ---------- + estimator : ClassifierMixin + The classifier whose output needs to be calibrated to provide more + accurate `predict_proba` outputs. Must be a scikit-learn compatible + classifier with `fit` and `predict_proba` methods. + + cv : Optional[str], default=None + The cross-validation strategy: + + - ``"prefit"``: Assumes that ``estimator`` has been fitted already. + All data provided in ``fit`` are used for calibration only. + - ``None``: Uses inductive or cross validation based on the + ``inductive`` parameter. + + inductive : bool, default=True + Determines the calibration strategy when ``cv=None``: + + - ``True``: Inductive Venn-ABERS (IVAP) - splits data into proper + training and calibration sets. + - ``False``: Cross Venn-ABERS (CVAP) - uses k-fold cross-validation. + + n_splits : Optional[int], default=None + Number of folds for Cross Venn-ABERS (CVAP). Must be at least 2. + Only used when ``inductive=False`` and ``cv=None``. + Uses ``sklearn.model_selection.StratifiedKFold`` functionality. + + train_proper_size : Optional[float], default=None + Proportion of the dataset to use for proper training in Inductive + Venn-ABERS (IVAP). Only used when ``inductive=True`` and ``cv=None``. + + - If float, should be between 0.0 and 1.0. + - If int, represents the absolute number of training samples. + - If ``None``, automatically set to complement of ``cal_size``. + + random_state : Optional[int], default=None + Controls the shuffling applied to the data before splitting. + Pass an int for reproducible output across multiple function calls. + Can be overridden in the ``fit`` method. + + shuffle : bool, default=True + Whether to shuffle the data before splitting. + + - For IVAP: if ``shuffle=False``, then ``stratify`` must be ``None``. + - For CVAP: controls whether to shuffle each class's samples before + splitting into batches. + + Can be overridden in the ``fit`` method. + + stratify : Optional[ArrayLike], default=None + For Inductive Venn-ABERS (IVAP) only. If not ``None``, data is split + in a stratified fashion, using this as the class labels. + Can be overridden in the ``fit`` method. + + precision : Optional[int], default=None + Number of decimal points to round Venn-ABERS calibration probabilities. + Yields significantly faster computation for larger calibration datasets. + Trade-off between speed and precision. + + Attributes + ---------- + classes_ : NDArray + Array with the name of each class. + + n_classes_ : int + Number of classes in the training dataset. + + n_features_in_ : int + Number of features seen during fit. + + va_calibrator_ : Union[VennAbersMultiClass, VennAbers, None] + The fitted Venn-ABERS calibrator instance. + May be None in prefit mode with multi-class classification. + + transformers_ : Optional[Pipeline] + Transformers from sklearn pipeline to transform categorical attributes. + + single_estimator_ : Optional[ClassifierMixin] + The fitted estimator (only for prefit mode). + + p_cal_ : Optional[NDArray] + Calibration probabilities (only for prefit mode with multi-class). + + y_cal_ : Optional[NDArray] + Calibration labels (only for prefit mode with multi-class). + + cv_ensemble: bool, default = True + If False then the predictions for the test set are generated using the underlying classifier trained + on the whole training set, instead of on the split (in the case of IVAP) or folds (in the case of CVAP) + + References + ---------- + [1] Vovk, Vladimir, Ivan Petej, and Valentina Fedorova. + "Large-scale probabilistic predictors with and without guarantees + of validity." Advances in Neural Information Processing Systems 28 + (2015). https://arxiv.org/pdf/1511.00213.pdf + + [2] Manokhin, Valery. "Multi-class probabilistic classification using + inductive and cross Venn–Abers predictors." In Conformal and + Probabilistic Prediction and Applications, pp. 228-240. PMLR, 2017. + + [3] Reference implementation: + https://github.com/ip200/venn-abers/blob/main/src/venn_abers.py + + Examples + -------- + >>> import numpy as np + >>> from sklearn.datasets import make_classification + >>> from sklearn.model_selection import train_test_split + >>> from sklearn.naive_bayes import GaussianNB + >>> from mapie.calibration import VennAbersCalibrator + + **Example 1: Prefit mode** + + >>> X, y = make_classification(n_samples=1000, n_features=20, + ... n_classes=3, n_informative=10, + ... random_state=42) + >>> X_train, X_test, y_train, y_test = train_test_split( + ... X, y, test_size=0.2, random_state=42 + ... ) + >>> # Fit the base classifier + >>> clf = GaussianNB() + >>> _ = clf.fit(X_train, y_train) + >>> # Calibrate using prefit mode + >>> va_cal = VennAbersCalibrator(estimator=clf, cv="prefit") + >>> _ = va_cal.fit(X_test, y_test) # Use test set for calibration + >>> # Get calibrated probabilities + >>> calibrated_probs = va_cal.predict_proba(X_test) + + **Example 2: Inductive Venn-ABERS (IVAP)** + + >>> X, y = make_classification(n_samples=1000, n_features=20, + ... n_classes=2, random_state=42) + >>> X_train, X_test, y_train, y_test = train_test_split( + ... X, y, test_size=0.2, random_state=42 + ... ) + >>> # Inductive mode with 30% calibration split + >>> clf = GaussianNB() + >>> va_cal = VennAbersCalibrator( + ... estimator=clf, + ... inductive=True, + ... random_state=42 + ... ) + >>> _ = va_cal.fit(X_train, y_train) + >>> calibrated_probs = va_cal.predict_proba(X_test) + >>> predictions = va_cal.predict(X_test) + + **Example 3: Cross Venn-ABERS (CVAP)** + + >>> X, y = make_classification(n_samples=1000, n_features=20, + ... n_informative=10, n_classes=3, + ... random_state=42) + >>> X_train, X_test, y_train, y_test = train_test_split( + ... X, y, test_size=0.2, random_state=42 + ... ) + >>> # Cross validation mode with 5 folds + >>> clf = GaussianNB() + >>> va_cal = VennAbersCalibrator( + ... estimator=clf, + ... inductive=False, + ... n_splits=5, + ... random_state=42 + ... ) + >>> _ = va_cal.fit(X_train, y_train) + >>> calibrated_probs = va_cal.predict_proba(X_test) + >>> predictions = va_cal.predict(X_test) + + Notes + ----- + - Venn-ABERS calibration provides probabilistic predictions with + validity guarantees under the exchangeability assumption. + - For binary classification, the method produces well-calibrated + probabilities with minimal assumptions. + - For multi-class problems, the method uses a one-vs-one approach + to extend binary Venn-ABERS to multiple classes. + - The ``precision`` parameter can significantly speed up computation + for large datasets with minimal impact on calibration quality. + - When using ``cv="prefit"``, ensure the estimator is fitted on a + different dataset than the one used for calibration to avoid + overfitting. + + See Also + -------- + TopLabelCalibrator : Top-label calibration for multi-class problems. + sklearn.calibration.CalibratedClassifierCV : Scikit-learn's probability + calibration with isotonic regression or Platt scaling. + """ + + fit_attributes = ["va_calibrator_", "classes_", "n_classes_"] + + valid_cv = ["prefit", None] + + def __init__( + self, + estimator: Optional[ClassifierMixin] = None, + cv: Optional[str] = None, + inductive: bool = True, + n_splits: Optional[int] = None, + train_proper_size: Optional[float] = None, + random_state: Optional[int] = None, + shuffle: bool = True, + stratify: Optional[ArrayLike] = None, + precision: Optional[int] = None, + cv_ensemble: bool = True, + ) -> None: + self.estimator = estimator + self.cv = cv + self.inductive = inductive + self.n_splits = n_splits + self.train_proper_size = train_proper_size + self.random_state = random_state + self.shuffle = shuffle + self.stratify = stratify + self.precision = precision + self.cv_ensemble = cv_ensemble + + # Initialize attributes that will be set during fit + self.va_calibrator_: Optional[Union[VennAbersMultiClass, VennAbers]] = None + self.classes_: Optional[NDArray] = None + self.n_classes_: Optional[int] = None + self.transformers_: Optional[Pipeline] = None + self.single_estimator_: Optional[ClassifierMixin] = None + self.p_cal_: Optional[NDArray] = None + self.y_cal_: Optional[NDArray] = None + + def _check_cv(self, cv: Optional[str]) -> Optional[str]: + """ + Check if cross-validator is valid. + + Parameters + ---------- + cv : Optional[str] + Cross-validator to check. + + Returns + ------- + Optional[str] + 'prefit' or None. + + Raises + ------ + ValueError + If the cross-validator is not valid. + """ + if cv in self.valid_cv: + return cv + raise ValueError("Invalid cv argument. Allowed values are {self.valid_cv}.") + + def fit( + self, + X: ArrayLike, + y: ArrayLike, + sample_weight: Optional[NDArray] = None, + calib_size: Optional[float] = 0.33, + random_state: Optional[Union[int, np.random.RandomState, None]] = None, + shuffle: Optional[bool] = True, + stratify: Optional[ArrayLike] = None, + **fit_params, + ) -> "VennAbersCalibrator": + """ + Fits the Venn-ABERS calibrator. + + Parameters + ---------- + X : ArrayLike of shape (n_samples, n_features) + Training data. + + y : ArrayLike of shape (n_samples,) + Training labels. + + sample_weight : Optional[NDArray] of shape (n_samples,) + Sample weights for fitting the out-of-fold models. + If ``None``, then samples are equally weighted. + Note that the sample weight defined are only for the training, not + for the calibration procedure. + By default ``None``. + + calib_size : Optional[float], default=0.33 + Proportion of the dataset to use for calibration when using + Inductive Venn-ABERS (IVAP) mode (``inductive=True`` and ``cv=None``). + It should be between 0.0 and 1.0 and represents + the proportion of the dataset to include in the calibration split. + This parameter is ignored when ``cv="prefit"`` or when using + Cross Venn-ABERS (``inductive=False``). + + random_state : Optional[Union[int, np.random.RandomState, None]], default=None + Controls the shuffling applied to the data before applying the split. + Pass an int for reproducible output across multiple function calls. + + shuffle : Optional[bool], default=True + Whether to shuffle the data before splitting. If shuffle=False + then stratify must be None. + + stratify : Optional[ArrayLike], default=None + If not None, data is split in a stratified fashion, using this as + the class labels. + + **fit_params : dict + Additional parameters for the underlying estimator. + + Returns + ------- + VennAbersCalibrator + The fitted calibrator. + + Raises + ------ + ValueError + If required parameters are missing for the chosen mode. + """ + cv = self._check_cv(self.cv) + + # Check for manual mode (backward compatibility) + # If estimator is None, we expect this to be manual mode + if self.estimator is None: + raise ValueError( + "For VennAbersCalibrator, an estimator must be provided. " + "For manual calibration with pre-computed probabilities, " + "please use the VennAbers class directly from mapie._venn_abers" + ) + + # Validate inputs + X, y = indexable(X, y) + y = _check_y(y) + sample_weight, X, y = _check_null_weight(sample_weight, X, y) + + # Handle categorical features + last_estimator = self.estimator + X_processed = X + + if isinstance(last_estimator, Pipeline): + # Separate transformers and final estimator + transformers = self.estimator[:-1] # all steps except last + last_estimator = self.estimator[-1] # usually a classifier + + X_processed = transformers.fit_transform(X, y) + self.transformers_ = transformers + + # Set up classes + self.classes_ = np.unique(y) + self.n_classes_ = len(self.classes_) + + # Prefit mode: estimator is already fitted, only calibrate + if cv == "prefit": + try: + check_is_fitted(last_estimator) + except NotFittedError: + raise ValueError( + "For cv='prefit', the estimator must be already fitted" + ) + + # Set up classes from the fitted estimator + self.single_estimator_ = last_estimator + self.classes_ = self.single_estimator_.classes_ + + # Type guard: ensure classes_ is not None + if self.classes_ is None: + raise RuntimeError( + "classes_ should not be None after fitting estimator" + ) + + self.n_classes_ = len(self.classes_) + + # Get predictions from the fitted estimator + p_cal_pred = self.single_estimator_.predict_proba(X_processed) + + # Fit Venn-ABERS calibrator on these predictions + if self.n_classes_ <= 2: + self.va_calibrator_ = VennAbers() + self.va_calibrator_.fit(p_cal_pred, y, self.precision) + else: + # For multi-class, store calibration data for later use + self.p_cal_ = np.asarray(p_cal_pred) + self.y_cal_ = np.asarray(y) + self.va_calibrator_ = None # Will be used in predict_proba + + return self + + # Standard inductive or cross validation mode + # Integrity checks + if not self.inductive and self.n_splits is None: + raise ValueError("For Cross Venn-ABERS please provide n_splits") + + # Check random state + random_state_to_use: Optional[Union[int, np.random.RandomState]] = None + if random_state is not None: + random_state_to_use = random_state + else: + random_state_to_use = self.random_state + + # Initialize and fit the Venn-ABERS calibrator + self.va_calibrator_ = VennAbersMultiClass( + estimator=last_estimator, + inductive=self.inductive, + n_splits=self.n_splits, + cal_size=calib_size, + train_proper_size=self.train_proper_size, + random_state=random_state_to_use, + shuffle=shuffle if shuffle is not None else self.shuffle, + stratify=stratify if stratify is not None else self.stratify, + precision=self.precision, + cv_ensemble=self.cv_ensemble, + ) + + self.va_calibrator_.fit(X_processed, y, sample_weight=sample_weight) + + return self + + def predict_proba(self, X: ArrayLike, loss="log") -> NDArray: + """ + Prediction of the calibrated scores using fitted classifier and + Venn-ABERS calibrator. + + Parameters + ---------- + X : ArrayLike of shape (n_samples, n_features) + Test data. + + Returns + ------- + NDArray of shape (n_samples, n_classes) + Venn-ABERS calibrated probabilities. + """ + check_is_fitted(self, self.fit_attributes) + + cv = self._check_cv(self.cv) + + # Process test data + if self.transformers_ is not None: + X_processed = self.transformers_.transform(X) + else: + X_processed = X + # Prefit mode: use fitted estimator to get probabilities, then calibrate + if cv == "prefit": + if self.single_estimator_ is None: + raise RuntimeError( + "single_estimator_ should not be None in prefit mode" + ) + + p_test_pred = self.single_estimator_.predict_proba(X_processed) + + # Type guard: ensure n_classes_ is not None after fit + if self.n_classes_ is None: + raise RuntimeError("n_classes_ should not be None after fitting") + + if self.n_classes_ <= 2: + # Binary classification + if self.va_calibrator_ is None: + raise RuntimeError( + "va_calibrator_ should not be None for binary classification" + ) + p_prime, _ = self.va_calibrator_.predict_proba(p_test_pred) + else: + # Multi-class classification + p_prime, _ = predict_proba_prefitted_va( + self.p_cal_, + self.y_cal_, + p_test_pred, + precision=self.precision, + va_tpe="one_vs_one", + ) + + return p_prime + + # Standard inductive or cross validation mode + if self.va_calibrator_ is None: + raise RuntimeError( + "va_calibrator_ should not be None in inductive/cross-validation mode" + ) + + # Type guard: ensure we have VennAbersMultiClass instance + if not isinstance(self.va_calibrator_, VennAbersMultiClass): + raise RuntimeError( + "va_calibrator_ should be VennAbersMultiClass instance in " + "inductive/cross-validation mode" + ) + + if "loss" in signature(self.va_calibrator_.predict_proba).parameters: + p_prime = self.va_calibrator_.predict_proba( + X_processed, loss=loss, p0_p1_output=False + ) + else: + p_prime = self.va_calibrator_.predict_proba(X_processed, p0_p1_output=False) + + return p_prime + + def predict(self, X: ArrayLike, loss="log") -> NDArray: + """ + Predict the class of the estimator after Venn-ABERS calibration. + + Parameters + ---------- + X : ArrayLike of shape (n_samples, n_features) + Test data. + + Returns + ------- + NDArray of shape (n_samples,) + The predicted class labels. + """ + check_is_fitted(self, self.fit_attributes) + + # Type guard: ensure n_classes_ is not None after fit + if self.n_classes_ is None: + raise RuntimeError("n_classes_ should not be None after fitting") + + # Type guard: ensure classes_ is not None after fit + if self.classes_ is None: + raise RuntimeError("classes_ should not be None after fitting") + + # Get calibrated probabilities + p_prime = self.predict_proba(X, loss=loss) + + # Store classes_ in a local variable to help type checker + classes: NDArray = self.classes_ + n_classes = self.n_classes_ + + # Convert probabilities to class predictions + if n_classes <= 2: + # Binary classification + y_pred = classes[(p_prime[:, 1] >= 0.5).astype(int)] + else: + # Multi-class classification + y_pred = classes[np.argmax(p_prime, axis=1)] + + return y_pred diff --git a/mapie/tests/test_venn_abers_calibration.py b/mapie/tests/test_venn_abers_calibration.py new file mode 100644 index 000000000..8e74ddd3f --- /dev/null +++ b/mapie/tests/test_venn_abers_calibration.py @@ -0,0 +1,2649 @@ +""" +Tests for VennAbersCalibrator class. +""" + +from inspect import signature +from typing import Optional, Dict, Any, List, Tuple + +import numpy as np +import pandas as pd +import pytest +import sklearn +from sklearn.base import ClassifierMixin +from sklearn.compose import ColumnTransformer +from sklearn.datasets import make_classification +from sklearn.ensemble import GradientBoostingClassifier, RandomForestClassifier +from sklearn.impute import SimpleImputer +from sklearn.linear_model import LogisticRegression +from sklearn.model_selection import train_test_split +from sklearn.naive_bayes import GaussianNB +from sklearn.pipeline import Pipeline, make_pipeline +from sklearn.preprocessing import OneHotEncoder +from sklearn.exceptions import NotFittedError +from sklearn.utils.validation import check_is_fitted +from mapie.calibration import VennAbersCalibrator +from mapie._venn_abers import VennAbers, VennAbersMultiClass, predict_proba_prefitted_va + +random_state = 42 + +ESTIMATORS = [ + LogisticRegression(random_state=random_state), + RandomForestClassifier(random_state=random_state), + GaussianNB(), +] + +# Binary classification dataset +X_binary, y_binary = make_classification( + n_samples=10000, + n_features=20, + n_classes=2, + n_informative=10, + random_state=random_state, +) + +X_binary_train, X_binary_test, y_binary_train, y_binary_test = train_test_split( + X_binary, y_binary, test_size=0.2, random_state=random_state +) + +X_binary_proper, X_binary_cal, y_binary_proper, y_binary_cal = train_test_split( + X_binary_train, y_binary_train, test_size=0.3, random_state=random_state +) + +# Multi-class classification dataset +X_multi, y_multi = make_classification( + n_samples=10000, + n_features=20, + n_classes=3, + n_informative=10, + random_state=random_state, +) + +X_multi_train, X_multi_test, y_multi_train, y_multi_test = train_test_split( + X_multi, y_multi, test_size=0.2, random_state=random_state +) + +X_multi_proper, X_multi_cal, y_multi_proper, y_multi_cal = train_test_split( + X_multi_train, y_multi_train, test_size=0.3, random_state=random_state +) + + +# ============================================================================ +# Basic Initialization Tests +# ============================================================================ + + +def test_initialized() -> None: + """Test that initialization does not crash.""" + VennAbersCalibrator() + + +def test_default_parameters() -> None: + """Test default values of input parameters.""" + va_cal = VennAbersCalibrator() + assert va_cal.estimator is None + assert va_cal.cv is None + assert va_cal.inductive is True + assert va_cal.n_splits is None + assert va_cal.train_proper_size is None + assert va_cal.random_state is None + assert va_cal.shuffle is True + assert va_cal.stratify is None + assert va_cal.precision is None + + +def test_default_fit_params() -> None: + """Test default sample weights and other parameters.""" + va_cal = VennAbersCalibrator() + assert signature(va_cal.fit).parameters["sample_weight"].default is None + assert signature(va_cal.fit).parameters["calib_size"].default == 0.33 + assert signature(va_cal.fit).parameters["random_state"].default is None + assert signature(va_cal.fit).parameters["shuffle"].default is True + assert signature(va_cal.fit).parameters["stratify"].default is None + + +# ============================================================================ +# CV Parameter Tests +# ============================================================================ + + +@pytest.mark.parametrize("cv", ["prefit", None]) +def test_valid_cv_argument(cv: Optional[str]) -> None: + """Test that valid cv methods work.""" + if cv == "prefit": + est = GaussianNB().fit(X_binary_train, y_binary_train) + va_cal = VennAbersCalibrator(estimator=est, cv=cv) + va_cal.fit(X_binary_cal, y_binary_cal) + else: + va_cal = VennAbersCalibrator(estimator=GaussianNB(), cv=cv, inductive=True) + va_cal.fit(X_binary_train, y_binary_train) + + +@pytest.mark.parametrize("cv", ["split", "invalid", "cross"]) +def test_invalid_cv_argument(cv: str) -> None: + """Test that invalid cv methods raise ValueError.""" + with pytest.raises( + ValueError, + match=r".*Invalid cv argument*", + ): + va_cal = VennAbersCalibrator(estimator=GaussianNB(), cv=cv) + va_cal.fit(X_binary_train, y_binary_train) + + +def test_prefit_unfitted_estimator_raises_error() -> None: + """ + Test that VennAbersCalibrator in 'prefit' mode raises a ValueError + if the estimator is not fitted. + """ + clf = GaussianNB() # Unfitted estimator + va_cal = VennAbersCalibrator(estimator=clf, cv="prefit") + with pytest.raises( + ValueError, match=r".*For cv='prefit', the estimator must be already fitted*" + ): + va_cal.fit(X_binary_cal, y_binary_cal) + + +def test_prefit_requires_estimator() -> None: + """Test that prefit mode requires a fitted estimator.""" + va_cal = VennAbersCalibrator(cv="prefit") + with pytest.raises(ValueError, match=r".*an estimator must be provided*"): + va_cal.fit(X_binary_train, y_binary_train) + + +# ============================================================================ +# Inductive vs Cross Validation Tests +# ============================================================================ + + +def test_inductive_mode_binary() -> None: + """Test Inductive Venn-ABERS (IVAP) for binary classification.""" + va_cal = VennAbersCalibrator( + estimator=GaussianNB(), inductive=True, random_state=random_state + ) + va_cal.fit(X_binary_train, y_binary_train) + probs = va_cal.predict_proba(X_binary_test) + + assert probs.shape == (len(X_binary_test), 2) + assert np.allclose(probs.sum(axis=1), 1.0) + assert np.all((probs >= 0) & (probs <= 1)) + + +def test_inductive_mode_multiclass() -> None: + """Test Inductive Venn-ABERS (IVAP) for multi-class classification.""" + va_cal = VennAbersCalibrator( + estimator=GaussianNB(), inductive=True, random_state=random_state + ) + va_cal.fit(X_multi_train, y_multi_train) + probs = va_cal.predict_proba(X_multi_test) + + assert probs.shape == (len(X_multi_test), 3) + assert np.allclose(probs.sum(axis=1), 1.0) + assert np.all((probs >= 0) & (probs <= 1)) + + +def test_cross_validation_mode_binary() -> None: + """Test Cross Venn-ABERS (CVAP) for binary classification.""" + va_cal = VennAbersCalibrator( + estimator=GaussianNB(), inductive=False, n_splits=5, random_state=random_state + ) + va_cal.fit(X_binary_train, y_binary_train) + probs = va_cal.predict_proba(X_binary_test) + + assert probs.shape == (len(X_binary_test), 2) + assert np.allclose(probs.sum(axis=1), 1.0) + assert np.all((probs >= 0) & (probs <= 1)) + + +def test_cross_validation_mode_multiclass() -> None: + """Test Cross Venn-ABERS (CVAP) for multi-class classification.""" + va_cal = VennAbersCalibrator( + estimator=GaussianNB(), inductive=False, n_splits=5, random_state=random_state + ) + va_cal.fit(X_multi_train, y_multi_train) + probs = va_cal.predict_proba(X_multi_test) + + assert probs.shape == (len(X_multi_test), 3) + assert np.allclose(probs.sum(axis=1), 1.0) + assert np.all((probs >= 0) & (probs <= 1)) + + +def test_cross_validation_requires_n_splits() -> None: + """Test that CVAP requires n_splits parameter.""" + va_cal = VennAbersCalibrator(estimator=GaussianNB(), inductive=False, n_splits=None) + with pytest.raises( + ValueError, match=r".*For Cross Venn-ABERS please provide n_splits*" + ): + va_cal.fit(X_binary_train, y_binary_train) + + +def test_cross_validation_with_shuffle() -> None: + """Test Cross Venn-ABERS with shuffle parameter.""" + va_cal = VennAbersCalibrator( + estimator=GaussianNB(), + inductive=False, + n_splits=5, + shuffle=True, + random_state=random_state, + ) + va_cal.fit(X_binary_train, y_binary_train) + probs = va_cal.predict_proba(X_binary_test) + + assert probs.shape == (len(X_binary_test), 2) + + +def test_cross_validation_with_stratify() -> None: + """Test Cross Venn-ABERS with stratify parameter.""" + va_cal = VennAbersCalibrator( + estimator=GaussianNB(), + inductive=False, + n_splits=5, + stratify=y_binary_train, + random_state=random_state, + ) + va_cal.fit(X_binary_train, y_binary_train) + probs = va_cal.predict_proba(X_binary_test) + + assert probs.shape == (len(X_binary_test), 2) + + +# ============================================================================ +# Prefit Mode Tests +# ============================================================================ + + +def test_prefit_mode_binary() -> None: + """Test prefit mode for binary classification.""" + clf = GaussianNB() + clf.fit(X_binary_proper, y_binary_proper) + + va_cal = VennAbersCalibrator(estimator=clf, cv="prefit") + va_cal.fit(X_binary_cal, y_binary_cal) + probs = va_cal.predict_proba(X_binary_test) + + assert probs.shape == (len(X_binary_test), 2) + assert np.allclose(probs.sum(axis=1), 1.0) + assert np.all((probs >= 0) & (probs <= 1)) + + +def test_prefit_mode_multiclass() -> None: + """Test prefit mode for multi-class classification.""" + clf = GaussianNB() + clf.fit(X_multi_proper, y_multi_proper) + + va_cal = VennAbersCalibrator(estimator=clf, cv="prefit") + va_cal.fit(X_multi_cal, y_multi_cal) + probs = va_cal.predict_proba(X_multi_test) + + assert probs.shape == (len(X_multi_test), 3) + assert np.allclose(probs.sum(axis=1), 1.0) + assert np.all((probs >= 0) & (probs <= 1)) + + +def test_prefit_inductive_consistency() -> None: + """Test that prefit and inductive modes give similar results.""" + # Fit estimator on proper training set + clf = GaussianNB() + clf.fit(X_binary_proper, y_binary_proper) + + # Prefit mode + va_cal_prefit = VennAbersCalibrator(estimator=clf, cv="prefit") + va_cal_prefit.fit(X_binary_cal, y_binary_cal) + probs_prefit = va_cal_prefit.predict_proba(X_binary_test) + + # Inductive mode with same split + va_cal_inductive = VennAbersCalibrator( + estimator=GaussianNB(), inductive=True, random_state=random_state + ) + # Combine proper and cal sets + X_combined = np.vstack([X_binary_proper, X_binary_cal]) + y_combined = np.hstack([y_binary_proper, y_binary_cal]) + va_cal_inductive.fit(X_combined, y_combined) + probs_inductive = va_cal_inductive.predict_proba(X_binary_test) + + # Results should be similar (not exact due to different random splits) + assert probs_prefit.shape == probs_inductive.shape + + +# ============================================================================ +# Estimator Tests +# ============================================================================ + + +@pytest.mark.parametrize("estimator", ESTIMATORS) +def test_different_estimators_binary(estimator: ClassifierMixin) -> None: + """Test VennAbersCalibrator with different base estimators (binary).""" + va_cal = VennAbersCalibrator( + estimator=estimator, inductive=True, random_state=random_state + ) + va_cal.fit(X_binary_train, y_binary_train) + probs = va_cal.predict_proba(X_binary_test) + + assert probs.shape == (len(X_binary_test), 2) + assert np.allclose(probs.sum(axis=1), 1.0) + assert np.all((probs >= 0) & (probs <= 1)) + + +@pytest.mark.parametrize("estimator", ESTIMATORS) +def test_different_estimators_multiclass(estimator: ClassifierMixin) -> None: + """Test VennAbersCalibrator with different base estimators (multi-class).""" + va_cal = VennAbersCalibrator( + estimator=estimator, inductive=True, random_state=random_state + ) + va_cal.fit(X_multi_train, y_multi_train) + probs = va_cal.predict_proba(X_multi_test) + + assert probs.shape == (len(X_multi_test), 3) + assert np.allclose(probs.sum(axis=1), 1.0) + assert np.all((probs >= 0) & (probs <= 1)) + + +def test_estimator_none_raises_error() -> None: + """Test that None estimator raises ValueError.""" + va_cal = VennAbersCalibrator(estimator=None) + with pytest.raises(ValueError, match=r".*an estimator must be provided*"): + va_cal.fit(X_binary_train, y_binary_train) + + +def test_predict_method_multiclass() -> None: + """Test predict method for multi-class classification.""" + va_cal = VennAbersCalibrator( + estimator=GaussianNB(), inductive=True, random_state=random_state + ) + va_cal.fit(X_multi_train, y_multi_train) + predictions = va_cal.predict(X_multi_test) + + assert predictions.shape == (len(X_multi_test),) + assert va_cal.classes_ is not None + assert np.all(np.isin(predictions, va_cal.classes_)) + + +def test_predict_proba_consistency() -> None: + """Test that predict is consistent with predict_proba.""" + va_cal = VennAbersCalibrator( + estimator=GaussianNB(), inductive=True, random_state=random_state + ) + va_cal.fit(X_binary_train, y_binary_train) + + predictions = va_cal.predict(X_binary_test) + probs = va_cal.predict_proba(X_binary_test) + + assert va_cal.classes_ is not None + predictions_from_probs = va_cal.classes_[np.argmax(probs, axis=1)] + + np.testing.assert_array_equal(predictions, predictions_from_probs) + + +def test_predict_proba_shape_binary() -> None: + """Test that predict_proba returns correct shape for binary classification.""" + va_cal = VennAbersCalibrator( + estimator=GaussianNB(), inductive=True, random_state=random_state + ) + va_cal.fit(X_binary_train, y_binary_train) + probs = va_cal.predict_proba(X_binary_test) + + assert probs.shape == (len(X_binary_test), va_cal.n_classes_) + assert va_cal.n_classes_ == 2 + + +def test_predict_proba_shape_multiclass() -> None: + """Test that predict_proba returns correct shape for multi-class classification.""" + va_cal = VennAbersCalibrator( + estimator=GaussianNB(), inductive=True, random_state=random_state + ) + va_cal.fit(X_multi_train, y_multi_train) + probs = va_cal.predict_proba(X_multi_test) + + assert probs.shape == (len(X_multi_test), va_cal.n_classes_) + assert va_cal.n_classes_ == 3 + + +def test_gradient_boosting_with_early_stopping() -> None: + """Test VennAbersCalibrator with GradientBoosting and early stopping.""" + gb = GradientBoostingClassifier(n_estimators=100, random_state=random_state) + + va_cal = VennAbersCalibrator( + estimator=gb, inductive=True, random_state=random_state + ) + + va_cal.fit(X_binary_train, y_binary_train) + probs = va_cal.predict_proba(X_binary_test) + + assert probs.shape == (len(X_binary_test), 2) + + +# ============================================================================ +# Sample Weight Tests +# ============================================================================ + + +def test_sample_weights_none() -> None: + """Test that sample_weight=None works correctly.""" + sklearn.set_config(enable_metadata_routing=True) + va_cal = VennAbersCalibrator( + estimator=GaussianNB(), inductive=True, random_state=random_state + ) + va_cal.fit(X_binary_train, y_binary_train, sample_weight=None) + probs = va_cal.predict_proba(X_binary_test) + + assert probs.shape == (len(X_binary_test), 2) + + +def test_sample_weights_constant() -> None: + """Test that constant sample weights give same results as None.""" + sklearn.set_config(enable_metadata_routing=True) + + n_samples = len(X_binary_train) + weighted_estimator = GaussianNB().set_fit_request(sample_weight=True) + + va_cal_none = VennAbersCalibrator( + estimator=weighted_estimator, inductive=True, random_state=random_state + ) + va_cal_none.fit(X_binary_train, y_binary_train, sample_weight=None) + + va_cal_ones = VennAbersCalibrator( + estimator=weighted_estimator, inductive=True, random_state=random_state + ) + va_cal_ones.fit(X_binary_train, y_binary_train, sample_weight=np.ones(n_samples)) + + va_cal_fives = VennAbersCalibrator( + estimator=weighted_estimator, inductive=True, random_state=random_state + ) + va_cal_fives.fit( + X_binary_train, y_binary_train, sample_weight=np.ones(n_samples) * 5 + ) + + probs_none = va_cal_none.predict_proba(X_binary_test) + probs_ones = va_cal_ones.predict_proba(X_binary_test) + probs_fives = va_cal_fives.predict_proba(X_binary_test) + + np.testing.assert_allclose(probs_none, probs_ones, rtol=1e-2, atol=1e-2) + np.testing.assert_allclose(probs_none, probs_fives, rtol=1e-2, atol=1e-2) + + +def test_sample_weights_variable() -> None: + """Test that variable sample weights affect the results.""" + sklearn.set_config(enable_metadata_routing=True) + n_samples = len(X_binary_train) + + va_cal_uniform = VennAbersCalibrator( + estimator=RandomForestClassifier(random_state=random_state), + inductive=True, + random_state=random_state, + ) + va_cal_uniform.fit(X_binary_train, y_binary_train, sample_weight=None) + + # Create non-uniform weights + sample_weights = np.random.RandomState(random_state).uniform( + 0.1, 2.0, size=n_samples + ) + + estimator_weighted = RandomForestClassifier( + random_state=random_state + ).set_fit_request(sample_weight=True) + + va_cal_weighted = VennAbersCalibrator( + estimator=estimator_weighted, inductive=True, random_state=random_state + ) + va_cal_weighted.fit(X_binary_train, y_binary_train, sample_weight=sample_weights) + + probs_uniform = va_cal_uniform.predict_proba(X_binary_test) + probs_weighted = va_cal_weighted.predict_proba(X_binary_test) + + # Results should be different with non-uniform weights + assert not np.allclose(probs_uniform, probs_weighted) + + +# ============================================================================ +# Random State and Reproducibility Tests +# ============================================================================ + + +def test_random_state_reproducibility() -> None: + """Test that random_state ensures reproducible results.""" + va_cal1 = VennAbersCalibrator( + estimator=GaussianNB(), inductive=True, random_state=42 + ) + va_cal1.fit(X_binary_train, y_binary_train) + probs1 = va_cal1.predict_proba(X_binary_test) + + va_cal2 = VennAbersCalibrator( + estimator=GaussianNB(), inductive=True, random_state=42 + ) + va_cal2.fit(X_binary_train, y_binary_train) + probs2 = va_cal2.predict_proba(X_binary_test) + + np.testing.assert_array_equal(probs1, probs2) + + +def test_random_state_in_fit_overrides() -> None: + """Test that random_state in fit() overrides constructor parameter.""" + va_cal1 = VennAbersCalibrator( + estimator=GaussianNB(), inductive=True, random_state=42 + ) + va_cal1.fit(X_binary_train, y_binary_train, random_state=123) + probs1 = va_cal1.predict_proba(X_binary_test) + + va_cal2 = VennAbersCalibrator( + estimator=GaussianNB(), + inductive=True, + random_state=999, # Different from fit + ) + va_cal2.fit(X_binary_train, y_binary_train, random_state=123) + probs2 = va_cal2.predict_proba(X_binary_test) + + np.testing.assert_array_equal(probs1, probs2) + + +def test_different_random_states_give_different_results() -> None: + """Test that different random states give different results.""" + va_cal1 = VennAbersCalibrator( + estimator=GaussianNB(), inductive=True, random_state=42 + ) + va_cal1.fit(X_binary_train, y_binary_train) + probs1 = va_cal1.predict_proba(X_binary_test) + + va_cal2 = VennAbersCalibrator( + estimator=GaussianNB(), inductive=True, random_state=123 + ) + va_cal2.fit(X_binary_train, y_binary_train) + probs2 = va_cal2.predict_proba(X_binary_test) + + # Results should be different with different random states + assert not np.array_equal(probs1, probs2) + + +# ============================================================================ +# Shuffle and Stratify Tests +# ============================================================================ + + +def test_shuffle_parameter() -> None: + """Test that shuffle parameter works correctly.""" + va_cal_shuffle = VennAbersCalibrator( + estimator=GaussianNB(), inductive=True, random_state=random_state, shuffle=True + ) + va_cal_shuffle.fit(X_binary_train, y_binary_train) + probs_shuffle = va_cal_shuffle.predict_proba(X_binary_test) + + va_cal_no_shuffle = VennAbersCalibrator( + estimator=GaussianNB(), inductive=True, random_state=random_state, shuffle=False + ) + va_cal_no_shuffle.fit(X_binary_train, y_binary_train) + probs_no_shuffle = va_cal_no_shuffle.predict_proba(X_binary_test) + + assert probs_shuffle.shape == probs_no_shuffle.shape + + +def test_shuffle_in_fit_overrides() -> None: + """Test that shuffle in fit() overrides constructor parameter.""" + va_cal = VennAbersCalibrator( + estimator=GaussianNB(), inductive=True, random_state=random_state, shuffle=False + ) + # Override with shuffle=True in fit + va_cal.fit(X_binary_train, y_binary_train, shuffle=True) + probs = va_cal.predict_proba(X_binary_test) + + assert probs.shape == (len(X_binary_test), 2) + + +def test_stratify_parameter() -> None: + """Test that stratify parameter works correctly.""" + va_cal = VennAbersCalibrator( + estimator=GaussianNB(), + inductive=True, + random_state=random_state, + stratify=y_binary_train, + ) + va_cal.fit(X_binary_train, y_binary_train) + probs = va_cal.predict_proba(X_binary_test) + + assert probs.shape == (len(X_binary_test), 2) + + +def test_stratify_in_fit_overrides() -> None: + """Test that stratify in fit() overrides constructor parameter.""" + va_cal = VennAbersCalibrator( + estimator=GaussianNB(), inductive=True, random_state=random_state, stratify=None + ) + # Override with stratify in fit + va_cal.fit(X_binary_train, y_binary_train, stratify=y_binary_train) + probs = va_cal.predict_proba(X_binary_test) + + assert probs.shape == (len(X_binary_test), 2) + + +# ============================================================================ +# Calibration Size Tests +# ============================================================================ + + +@pytest.mark.parametrize("cal_size", [0.2, 0.3, 0.4, 0.5]) +def test_different_calibration_sizes(cal_size: float) -> None: + """Test that different calibration sizes work correctly.""" + va_cal = VennAbersCalibrator( + estimator=GaussianNB(), inductive=True, random_state=random_state + ) + va_cal.fit(X_binary_train, y_binary_train, calib_size=cal_size) + probs = va_cal.predict_proba(X_binary_test) + + assert probs.shape == (len(X_binary_test), 2) + assert np.allclose(probs.sum(axis=1), 1.0) + + +def test_cal_size_in_fit_overrides() -> None: + """Test that calib_size in fit() overrides constructor parameter.""" + va_cal = VennAbersCalibrator( + estimator=GaussianNB(), inductive=True, random_state=random_state + ) + # Override with calib_size in fit + va_cal.fit(X_binary_train, y_binary_train, calib_size=0.4) + probs = va_cal.predict_proba(X_binary_test) + + assert probs.shape == (len(X_binary_test), 2) + + +def test_train_proper_size_parameter() -> None: + """Test that train_proper_size parameter works correctly.""" + va_cal = VennAbersCalibrator( + estimator=GaussianNB(), + inductive=True, + train_proper_size=0.6, + random_state=random_state, + ) + va_cal.fit(X_binary_train, y_binary_train) + probs = va_cal.predict_proba(X_binary_test) + + assert probs.shape == (len(X_binary_test), 2) + + +# ============================================================================ +# N_splits Tests +# ============================================================================ + + +@pytest.mark.parametrize("n_splits", [2, 3, 5, 10]) +def test_different_n_splits(n_splits: int) -> None: + """Test that different n_splits values work correctly.""" + va_cal = VennAbersCalibrator( + estimator=GaussianNB(), + inductive=False, + n_splits=n_splits, + random_state=random_state, + ) + va_cal.fit(X_binary_train, y_binary_train) + probs = va_cal.predict_proba(X_binary_test) + + assert probs.shape == (len(X_binary_test), 2) + assert np.allclose(probs.sum(axis=1), 1.0) + + +def test_n_splits_too_small_raises_error() -> None: + """Test that n_splits < 2 raises an error.""" + va_cal = VennAbersCalibrator( + estimator=GaussianNB(), inductive=False, n_splits=1, random_state=random_state + ) + with pytest.raises(ValueError): + va_cal.fit(X_binary_train, y_binary_train) + + +# ============================================================================ +# Attributes Tests +# ============================================================================ + + +def test_fitted_attributes_inductive() -> None: + """Test that fitted attributes are set correctly for inductive mode.""" + va_cal = VennAbersCalibrator( + estimator=GaussianNB(), inductive=True, random_state=random_state + ) + va_cal.fit(X_binary_train, y_binary_train) + + assert hasattr(va_cal, "classes_") + assert hasattr(va_cal, "n_classes_") + assert hasattr(va_cal, "va_calibrator_") + assert va_cal.n_classes_ is not None + assert va_cal.classes_ is not None + assert va_cal.n_classes_ == 2 + assert len(va_cal.classes_) == 2 + + +def test_fitted_attributes_cross_validation() -> None: + """Test that fitted attributes are set correctly for cross validation mode.""" + va_cal = VennAbersCalibrator( + estimator=GaussianNB(), inductive=False, n_splits=5, random_state=random_state + ) + va_cal.fit(X_binary_train, y_binary_train) + + assert hasattr(va_cal, "classes_") + assert hasattr(va_cal, "n_classes_") + assert hasattr(va_cal, "va_calibrator_") + assert va_cal.n_classes_ is not None + assert va_cal.classes_ is not None + assert va_cal.n_classes_ == 2 + assert len(va_cal.classes_) == 2 + + +def test_fitted_attributes_prefit() -> None: + """Test that fitted attributes are set correctly for prefit mode.""" + clf = GaussianNB() + clf.fit(X_binary_proper, y_binary_proper) + + va_cal = VennAbersCalibrator(estimator=clf, cv="prefit") + va_cal.fit(X_binary_cal, y_binary_cal) + + assert hasattr(va_cal, "classes_") + assert hasattr(va_cal, "n_classes_") + assert hasattr(va_cal, "single_estimator_") + assert va_cal.n_classes_ is not None + assert va_cal.classes_ is not None + assert va_cal.n_classes_ == 2 + assert len(va_cal.classes_) == 2 + + +# ============================================================================ +# Pipeline Compatibility Tests +# ============================================================================ + + +def test_pipeline_compatibility() -> None: + """Test that VennAbersCalibrator works with sklearn pipelines.""" + X_df = pd.DataFrame( + { + "x_cat": ["A", "A", "B", "A", "A", "B"] * 10, + "x_num": [0, 1, 1, 4, np.nan, 5] * 10, + } + ) + y_series = pd.Series([0, 1, 0, 1, 0, 1] * 10) + + numeric_preprocessor = Pipeline( + [ + ("imputer", SimpleImputer(strategy="mean")), + ] + ) + categorical_preprocessor = Pipeline( + steps=[("encoding", OneHotEncoder(handle_unknown="ignore"))] + ) + preprocessor = ColumnTransformer( + [ + ("cat", categorical_preprocessor, ["x_cat"]), + ("num", numeric_preprocessor, ["x_num"]), + ] + ) + pipe = make_pipeline(preprocessor, LogisticRegression(random_state=random_state)) + pipe.fit(X_df, y_series) + + va_cal = VennAbersCalibrator( + estimator=pipe, inductive=True, random_state=random_state + ) + va_cal.fit(X_df, y_series) + predictions = va_cal.predict(X_df) + probs = va_cal.predict_proba(X_df) + + assert predictions.shape == (len(y_series),) + assert probs.shape == (len(y_series), 2) + + +def test_pipeline_prefit_mode() -> None: + """Test that VennAbersCalibrator works with prefit pipelines.""" + X_df = pd.DataFrame( + { + "x_cat": ["A", "A", "B", "A", "A", "B"] * 10, + "x_num": [0, 1, 1, 4, np.nan, 5] * 10, + } + ) + y_series = pd.Series([0, 1, 0, 1, 0, 1] * 10) + + numeric_preprocessor = Pipeline( + [ + ("imputer", SimpleImputer(strategy="mean")), + ] + ) + categorical_preprocessor = Pipeline( + steps=[("encoding", OneHotEncoder(handle_unknown="ignore"))] + ) + preprocessor = ColumnTransformer( + [ + ("cat", categorical_preprocessor, ["x_cat"]), + ("num", numeric_preprocessor, ["x_num"]), + ] + ) + pipe = make_pipeline(preprocessor, LogisticRegression(random_state=random_state)) + pipe.fit(X_df, y_series) + + va_cal = VennAbersCalibrator(estimator=pipe, cv="prefit") + va_cal.fit(X_df, y_series) + predictions = va_cal.predict(X_df) + probs = va_cal.predict_proba(X_df) + + assert predictions.shape == (len(y_series),) + assert probs.shape == (len(y_series), 2) + + +def test_with_pipeline() -> None: + """Test VennAbersCalibrator with sklearn Pipeline.""" + from sklearn.preprocessing import StandardScaler + + pipeline = make_pipeline(StandardScaler(), GaussianNB()) + + va_cal = VennAbersCalibrator( + estimator=pipeline, inductive=True, random_state=random_state + ) + va_cal.fit(X_binary_train, y_binary_train) + probs = va_cal.predict_proba(X_binary_test) + + assert probs.shape == (len(X_binary_test), 2) + assert np.allclose(probs.sum(axis=1), 1.0) + + +def test_with_column_transformer() -> None: + """Test VennAbersCalibrator with ColumnTransformer.""" + # Create a mixed dataset + X_mixed = np.column_stack( + [X_binary_train, np.random.choice(["A", "B", "C"], size=len(X_binary_train))] + ) + + preprocessor = ColumnTransformer( + transformers=[ + ( + "num", + SimpleImputer(strategy="mean"), + list(range(X_binary_train.shape[1])), + ), + ("cat", OneHotEncoder(handle_unknown="ignore"), [X_binary_train.shape[1]]), + ] + ) + + pipeline = Pipeline([("preprocessor", preprocessor), ("classifier", GaussianNB())]) + + va_cal = VennAbersCalibrator( + estimator=pipeline, inductive=True, random_state=random_state + ) + + X_test_mixed = np.column_stack( + [X_binary_test, np.random.choice(["A", "B", "C"], size=len(X_binary_test))] + ) + + va_cal.fit(X_mixed, y_binary_train) + probs = va_cal.predict_proba(X_test_mixed) + + assert probs.shape == (len(X_binary_test), 2) + + +# ============================================================================ +# Multiclass Strategy Tests +# ============================================================================ + + +def test_multiclass_one_vs_one_strategy() -> None: + """Test multiclass with one_vs_one strategy.""" + # Create calibrator with explicit one_vs_one + clf = GaussianNB() + clf.fit(X_multi_proper, y_multi_proper) + + va_cal = VennAbersCalibrator(estimator=clf, cv="prefit") + va_cal.fit(X_multi_cal, y_multi_cal) + probs = va_cal.predict_proba(X_multi_test) + + assert probs.shape == (len(X_multi_test), 3) + assert np.allclose(probs.sum(axis=1), 1.0) + + +# ============================================================================ +# Check Fitted Tests +# ============================================================================ + + +def test_check_is_fitted_after_fit() -> None: + """Test that check_is_fitted passes after fitting.""" + va_cal = VennAbersCalibrator( + estimator=GaussianNB(), inductive=True, random_state=random_state + ) + va_cal.fit(X_binary_train, y_binary_train) + + # Should not raise an error + check_is_fitted(va_cal) + + +# ============================================================================ +# Edge Cases and Error Handling Tests +# ============================================================================ + + +def test_empty_dataset_raises_error() -> None: + """Test that empty dataset raises an error.""" + X_empty = np.array([]).reshape(0, 20) + y_empty = np.array([]) + + va_cal = VennAbersCalibrator( + estimator=GaussianNB(), inductive=True, random_state=random_state + ) + with pytest.raises(ValueError): + va_cal.fit(X_empty, y_empty) + + +def test_single_class_raises_error() -> None: + """Test that single class dataset raises an error.""" + X_single = X_binary_train[:10] + y_single = np.zeros(10) # All same class + + va_cal = VennAbersCalibrator( + estimator=GaussianNB(), inductive=True, random_state=random_state + ) + with pytest.raises(ValueError): + va_cal.fit(X_single, y_single) + + +def test_mismatched_X_y_length_raises_error() -> None: + """Test that mismatched X and y lengths raise an error.""" + X_mismatch = X_binary_train[:50] + y_mismatch = y_binary_train[:40] + + va_cal = VennAbersCalibrator( + estimator=GaussianNB(), inductive=True, random_state=random_state + ) + with pytest.raises(ValueError): + va_cal.fit(X_mismatch, y_mismatch) + + +def test_predict_before_fit_raises_error() -> None: + """Test that calling predict before fit raises an error.""" + va_cal = VennAbersCalibrator( + estimator=GaussianNB(), inductive=True, random_state=random_state + ) + with pytest.raises(Exception): # NotFittedError or AttributeError + va_cal.predict(X_binary_test) + + +def test_predict_proba_before_fit_raises_error() -> None: + """Test that calling predict_proba before fit raises an error.""" + va_cal = VennAbersCalibrator( + estimator=GaussianNB(), inductive=True, random_state=random_state + ) + with pytest.raises(Exception): # NotFittedError or AttributeError + va_cal.predict_proba(X_binary_test) + + +def test_invalid_cal_size_raises_error() -> None: + """Test that invalid cal_size values raise an error.""" + va_cal = VennAbersCalibrator( + estimator=GaussianNB(), inductive=True, random_state=random_state + ) + with pytest.raises(ValueError): + va_cal.fit(X_binary_train, y_binary_train, calib_size=1.5) # Invalid: > 1.0 + + +def test_negative_cal_size_raises_error() -> None: + """Test that negative calib_size raises an error.""" + va_cal = VennAbersCalibrator( + estimator=GaussianNB(), inductive=True, random_state=random_state + ) + with pytest.raises(ValueError): + va_cal.fit(X_binary_train, y_binary_train, calib_size=-0.1) + + +def test_empty_calibration_set_raises_error() -> None: + """Test that empty calibration set raises an error.""" + va_cal = VennAbersCalibrator( + estimator=GaussianNB(), inductive=True, random_state=random_state + ) + # This should work but with a very small training set + try: + # Very large calib_size leaves almost no training data + va_cal.fit(X_binary_train[:10], y_binary_train[:10], calib_size=0.99) + except ValueError: + # Expected if the split is invalid + pass + + +def test_very_small_dataset() -> None: + """Test with a very small dataset.""" + X_small = X_binary_train[:20] + y_small = y_binary_train[:20] + + va_cal = VennAbersCalibrator( + estimator=GaussianNB(), inductive=True, random_state=random_state + ) + va_cal.fit(X_small, y_small) + probs = va_cal.predict_proba(X_binary_test[:5]) + + assert probs.shape == (5, 2) + + +# ============================================================================ +# Calibration Quality Tests +# ============================================================================ + + +def test_calibration_improves_probabilities() -> None: + """Test that Venn-ABERS calibration improves probability estimates.""" + # Train uncalibrated model + clf = RandomForestClassifier(random_state=random_state) + clf.fit(X_binary_proper, y_binary_proper) + uncalibrated_probs = clf.predict_proba(X_binary_test) + + # Train calibrated model + va_cal = VennAbersCalibrator(estimator=clf, cv="prefit") + va_cal.fit(X_binary_cal, y_binary_cal) + calibrated_probs = va_cal.predict_proba(X_binary_test) + + # Both should have valid probability distributions + assert np.allclose(uncalibrated_probs.sum(axis=1), 1.0) + assert np.allclose(calibrated_probs.sum(axis=1), 1.0) + + # Calibrated probabilities should be different + assert not np.allclose(uncalibrated_probs, calibrated_probs) + + +def test_probabilities_sum_to_one() -> None: + """Test that predicted probabilities sum to 1 for all samples.""" + va_cal = VennAbersCalibrator( + estimator=GaussianNB(), inductive=True, random_state=random_state + ) + va_cal.fit(X_binary_train, y_binary_train) + probs = va_cal.predict_proba(X_binary_test) + + # Check that probabilities sum to 1 for each sample + prob_sums = probs.sum(axis=1) + np.testing.assert_allclose(prob_sums, np.ones(len(X_binary_test)), rtol=1e-5) + + +def test_probabilities_in_valid_range() -> None: + """Test that all predicted probabilities are in [0, 1].""" + va_cal = VennAbersCalibrator( + estimator=GaussianNB(), inductive=True, random_state=random_state + ) + va_cal.fit(X_binary_train, y_binary_train) + probs = va_cal.predict_proba(X_binary_test) + + assert np.all(probs >= 0) + assert np.all(probs <= 1) + + +def test_multiclass_probabilities_sum_to_one() -> None: + """Test that multi-class predicted probabilities sum to 1.""" + va_cal = VennAbersCalibrator( + estimator=GaussianNB(), inductive=True, random_state=random_state + ) + va_cal.fit(X_multi_train, y_multi_train) + probs = va_cal.predict_proba(X_multi_test) + + prob_sums = probs.sum(axis=1) + np.testing.assert_allclose(prob_sums, np.ones(len(X_multi_test)), rtol=1e-5) + + +def test_multiclass_probabilities_in_valid_range() -> None: + """Test that all multi-class predicted probabilities are in [0, 1].""" + va_cal = VennAbersCalibrator( + estimator=GaussianNB(), inductive=True, random_state=random_state + ) + va_cal.fit(X_multi_train, y_multi_train) + probs = va_cal.predict_proba(X_multi_test) + + assert np.all(probs >= 0) + assert np.all(probs <= 1) + + +# ============================================================================ +# Comparison Tests Between Modes +# ============================================================================ + + +def test_inductive_vs_cross_validation_different_results() -> None: + """Test that inductive and cross validation modes give different results.""" + va_cal_inductive = VennAbersCalibrator( + estimator=GaussianNB(), inductive=True, random_state=random_state + ) + va_cal_inductive.fit(X_binary_train, y_binary_train) + probs_inductive = va_cal_inductive.predict_proba(X_binary_test) + + va_cal_cv = VennAbersCalibrator( + estimator=GaussianNB(), inductive=False, n_splits=5, random_state=random_state + ) + va_cal_cv.fit(X_binary_train, y_binary_train) + probs_cv = va_cal_cv.predict_proba(X_binary_test) + + # Results should be different between modes + assert not np.allclose(probs_inductive, probs_cv) + + +def test_all_modes_produce_valid_probabilities() -> None: + """Test that all calibration modes produce valid probability distributions.""" + modes: List[Tuple[str, Dict[str, Any]]] = [ + ("inductive", {"inductive": True}), + ("cross_val", {"inductive": False, "n_splits": 5}), + ] + + for mode_name, mode_params in modes: + va_cal = VennAbersCalibrator( + estimator=GaussianNB(), random_state=random_state, **mode_params + ) + va_cal.fit(X_binary_train, y_binary_train) + probs = va_cal.predict_proba(X_binary_test) + + # Check valid probabilities + assert np.all(probs >= 0), f"Mode {mode_name} produced negative probabilities" + assert np.all(probs <= 1), f"Mode {mode_name} produced probabilities > 1" + assert np.allclose(probs.sum(axis=1), 1.0), ( + f"Mode {mode_name} probabilities don't sum to 1" + ) + + +# ============================================================================ +# Special Cases Tests +# ============================================================================ + + +def test_perfect_predictions_no_calibration_needed() -> None: + """Test behavior when base estimator already makes perfect predictions.""" + # Create a simple linearly separable dataset + from sklearn.datasets import make_blobs + + X_perfect, y_perfect = make_blobs( + n_samples=100, + n_features=2, + centers=2, + cluster_std=0.5, + random_state=random_state, + ) + + X_train_p, X_test_p, y_train_p, y_test_p = train_test_split( + X_perfect, y_perfect, test_size=0.2, random_state=random_state + ) + + va_cal = VennAbersCalibrator( + estimator=LogisticRegression(random_state=random_state), + inductive=True, + random_state=random_state, + ) + va_cal.fit(X_train_p, y_train_p) + probs = va_cal.predict_proba(X_test_p) + predictions = va_cal.predict(X_test_p) + + # Should still produce valid probabilities + assert probs.shape == (len(X_test_p), 2) + assert np.allclose(probs.sum(axis=1), 1.0) + + # Predictions should be accurate + accuracy = np.mean(predictions == y_test_p) + assert accuracy > 0.9 # Should be very accurate for linearly separable data + + +def test_imbalanced_dataset() -> None: + """Test VennAbersCalibrator with highly imbalanced dataset.""" + # Create imbalanced dataset (90% class 0, 10% class 1) + X_imb, y_imb = make_classification( + n_samples=200, + n_features=20, + n_classes=2, + weights=[0.9, 0.1], + random_state=random_state, + ) + + X_train_imb, X_test_imb, y_train_imb, y_test_imb = train_test_split( + X_imb, y_imb, test_size=0.2, random_state=random_state, stratify=y_imb + ) + + va_cal = VennAbersCalibrator( + estimator=GaussianNB(), + inductive=True, + random_state=random_state, + stratify=y_train_imb, + ) + va_cal.fit(X_train_imb, y_train_imb) + probs = va_cal.predict_proba(X_test_imb) + + # Should still produce valid probabilities + assert probs.shape == (len(X_test_imb), 2) + assert np.allclose(probs.sum(axis=1), 1.0) + assert np.all((probs >= 0) & (probs <= 1)) + + +def test_many_classes() -> None: + """Test VennAbersCalibrator with many classes.""" + # Create dataset with 10 classes + X_many, y_many = make_classification( + n_samples=500, + n_features=20, + n_classes=10, + n_informative=15, + random_state=random_state, + ) + + X_train_many, X_test_many, y_train_many, y_test_many = train_test_split( + X_many, y_many, test_size=0.2, random_state=random_state + ) + + va_cal = VennAbersCalibrator( + estimator=RandomForestClassifier(random_state=random_state), + inductive=True, + random_state=random_state, + ) + va_cal.fit(X_train_many, y_train_many) + probs = va_cal.predict_proba(X_test_many) + + assert probs.shape == (len(X_test_many), 10) + assert np.allclose(probs.sum(axis=1), 1.0) + assert np.all((probs >= 0) & (probs <= 1)) + + +def test_small_calibration_set() -> None: + """Test behavior with very small calibration set.""" + va_cal = VennAbersCalibrator( + estimator=GaussianNB(), inductive=True, random_state=random_state + ) + va_cal.fit( + X_binary_train, y_binary_train, calib_size=0.1 + ) # Very small calibration set + probs = va_cal.predict_proba(X_binary_test) + + # Should still work, though calibration quality may be lower + assert probs.shape == (len(X_binary_test), 2) + assert np.allclose(probs.sum(axis=1), 1.0) + + +def test_large_calibration_set() -> None: + """Test behavior with very large calibration set.""" + va_cal = VennAbersCalibrator( + estimator=GaussianNB(), inductive=True, random_state=random_state + ) + va_cal.fit( + X_binary_train, y_binary_train, calib_size=0.8 + ) # Very large calibration set + probs = va_cal.predict_proba(X_binary_test) + + # Should still work, though training set is small + assert probs.shape == (len(X_binary_test), 2) + assert np.allclose(probs.sum(axis=1), 1.0) + + +# ============================================================================ +# Consistency Tests +# ============================================================================ + + +def test_multiple_fits_same_data() -> None: + """Test that fitting multiple times with same data gives same results.""" + va_cal = VennAbersCalibrator( + estimator=GaussianNB(), inductive=True, random_state=random_state + ) + + va_cal.fit(X_binary_train, y_binary_train) + probs1 = va_cal.predict_proba(X_binary_test) + + va_cal.fit(X_binary_train, y_binary_train) + probs2 = va_cal.predict_proba(X_binary_test) + + np.testing.assert_array_equal(probs1, probs2) + + +def test_predict_single_sample() -> None: + """Test prediction on a single sample.""" + va_cal = VennAbersCalibrator( + estimator=GaussianNB(), inductive=True, random_state=random_state + ) + va_cal.fit(X_binary_train, y_binary_train) + + single_sample = X_binary_test[0:1] + probs = va_cal.predict_proba(single_sample) + pred = va_cal.predict(single_sample) + + assert probs.shape == (1, 2) + assert pred.shape == (1,) + assert np.allclose(probs.sum(), 1.0) + + +def test_predict_multiple_times_same_result() -> None: + """Test that multiple predictions on same data give same results.""" + va_cal = VennAbersCalibrator( + estimator=GaussianNB(), inductive=True, random_state=random_state + ) + va_cal.fit(X_binary_train, y_binary_train) + + probs1 = va_cal.predict_proba(X_binary_test) + probs2 = va_cal.predict_proba(X_binary_test) + + np.testing.assert_array_equal(probs1, probs2) + + +# ============================================================================ +# Data Type Tests +# ============================================================================ + + +def test_pandas_dataframe_input() -> None: + """Test that VennAbersCalibrator works with pandas DataFrames.""" + X_df = pd.DataFrame(X_binary_train) + y_series = pd.Series(y_binary_train) + X_test_df = pd.DataFrame(X_binary_test) + + va_cal = VennAbersCalibrator( + estimator=GaussianNB(), inductive=True, random_state=random_state + ) + va_cal.fit(X_df, y_series) + probs = va_cal.predict_proba(X_test_df) + predictions = va_cal.predict(X_test_df) + + assert probs.shape == (len(X_test_df), 2) + assert predictions.shape == (len(X_test_df),) + + +def test_numpy_array_input() -> None: + """Test that VennAbersCalibrator works with numpy arrays.""" + va_cal = VennAbersCalibrator( + estimator=GaussianNB(), inductive=True, random_state=random_state + ) + va_cal.fit(X_binary_train, y_binary_train) + probs = va_cal.predict_proba(X_binary_test) + predictions = va_cal.predict(X_binary_test) + + assert isinstance(probs, np.ndarray) + assert isinstance(predictions, np.ndarray) + + +def test_mixed_input_types() -> None: + """Test with mixed input types (DataFrame for X, array for y).""" + X_df = pd.DataFrame(X_binary_train) + y_array = np.array(y_binary_train) + X_test_df = pd.DataFrame(X_binary_test) + + va_cal = VennAbersCalibrator( + estimator=GaussianNB(), inductive=True, random_state=random_state + ) + va_cal.fit(X_df, y_array) + probs = va_cal.predict_proba(X_test_df) + + assert probs.shape == (len(X_test_df), 2) + + +def test_with_pandas_dataframe() -> None: + """Test VennAbersCalibrator with pandas DataFrame.""" + X_train_df = pd.DataFrame(X_binary_train) + X_test_df = pd.DataFrame(X_binary_test) + + va_cal = VennAbersCalibrator( + estimator=GaussianNB(), inductive=True, random_state=random_state + ) + va_cal.fit(X_train_df, y_binary_train) + probs = va_cal.predict_proba(X_test_df) + + assert probs.shape == (len(X_binary_test), 2) + assert np.allclose(probs.sum(axis=1), 1.0) + + +def test_with_pandas_series() -> None: + """Test VennAbersCalibrator with pandas Series for y.""" + y_train_series = pd.Series(y_binary_train) + + va_cal = VennAbersCalibrator( + estimator=GaussianNB(), inductive=True, random_state=random_state + ) + va_cal.fit(X_binary_train, y_train_series) + probs = va_cal.predict_proba(X_binary_test) + + assert probs.shape == (len(X_binary_test), 2) + + +# ============================================================================ +# Integration Tests +# ============================================================================ + + +def test_integration_with_cross_validation() -> None: + """Test integration with sklearn's cross-validation utilities.""" + from sklearn.model_selection import cross_val_score + + va_cal = VennAbersCalibrator( + estimator=GaussianNB(), inductive=True, random_state=random_state + ) + + # This should work with cross_val_score + scores = cross_val_score(va_cal, X_binary, y_binary, cv=3, scoring="accuracy") + + assert len(scores) == 3 + assert np.all(scores >= 0) and np.all(scores <= 1) + + +# def test_integration_with_grid_search() -> None: +# """Test integration with sklearn's GridSearchCV.""" +# from sklearn.model_selection import GridSearchCV + +# va_cal = VennAbersCalibrator( +# estimator=GaussianNB(), +# inductive=True, +# random_state=random_state +# ) + +# param_grid = { +# 'cal_size': [0.2, 0.3, 0.4], +# } + +# grid_search = GridSearchCV( +# va_cal, param_grid, cv=3, scoring='accuracy' +# ) +# grid_search.fit(X_binary_train, y_binary_train) + +# assert hasattr(grid_search, 'best_params_') +# assert 'cal_size' in grid_search.best_params_ + + +def test_clone_estimator() -> None: + """Test that VennAbersCalibrator can be cloned.""" + from sklearn.base import clone + + va_cal = VennAbersCalibrator( + estimator=GaussianNB(), inductive=True, random_state=random_state + ) + va_cal.fit(X_binary_train, y_binary_train) + + va_cal_clone = clone(va_cal) + + is_fitted = True + try: + check_is_fitted(va_cal_clone.estimator) + except NotFittedError: + is_fitted = False + + # Clone should have same parameters but not be fitted + assert va_cal_clone.inductive == va_cal.inductive + assert is_fitted is False + + +# ============================================================================ +# Performance and Scalability Tests +# ============================================================================ + + +def test_large_dataset_performance() -> None: + """Test performance on a larger dataset.""" + X_large, y_large = make_classification( + n_samples=5000, n_features=50, n_classes=2, random_state=random_state + ) + + X_train_large, X_test_large, y_train_large, y_test_large = train_test_split( + X_large, y_large, test_size=0.2, random_state=random_state + ) + + va_cal = VennAbersCalibrator( + estimator=RandomForestClassifier(n_estimators=10, random_state=random_state), + inductive=True, + random_state=random_state, + precision=2, # Use precision for faster computation + ) + + import time + + start = time.time() + va_cal.fit(X_train_large, y_train_large) + va_cal.predict_proba(X_test_large) + elapsed = time.time() - start + + # Should complete in reasonable time (< 60 seconds) + assert elapsed < 60 + + +def test_high_dimensional_data() -> None: + """Test with high-dimensional data.""" + X_high_dim, y_high_dim = make_classification( + n_samples=200, + n_features=100, + n_informative=50, + n_classes=2, + random_state=random_state, + ) + + X_train_hd, X_test_hd, y_train_hd, y_test_hd = train_test_split( + X_high_dim, y_high_dim, test_size=0.2, random_state=random_state + ) + + va_cal = VennAbersCalibrator( + estimator=GaussianNB(), inductive=True, random_state=random_state + ) + va_cal.fit(X_train_hd, y_train_hd) + probs = va_cal.predict_proba(X_test_hd) + + assert probs.shape == (len(X_test_hd), 2) + assert np.allclose(probs.sum(axis=1), 1.0) + + +# ============================================================================ +# Documentation and Examples Tests +# ============================================================================ + + +def test_basic_example_from_docstring() -> None: + """Test the basic example from the class docstring.""" + from sklearn.datasets import make_classification + from sklearn.model_selection import train_test_split + from sklearn.naive_bayes import GaussianNB + + X, y = make_classification(n_samples=1000, n_classes=2, n_informative=10) + X_train, X_test, y_train, y_test = train_test_split(X, y) + + clf = GaussianNB() + va_cal = VennAbersCalibrator(estimator=clf, inductive=True) + va_cal.fit(X_train, y_train) + + p_prime = va_cal.predict_proba(X_test) + + assert p_prime.shape == (len(X_test), 2) + assert np.allclose(p_prime.sum(axis=1), 1.0) + + +def test_prefit_example() -> None: + """Test prefit example workflow.""" + X_train_proper, X_cal, y_train_proper, y_cal = train_test_split( + X_binary_train, y_binary_train, test_size=0.2, shuffle=False + ) + + clf = GaussianNB() + clf.fit(X_train_proper, y_train_proper) + + va_cal = VennAbersCalibrator(estimator=clf, cv="prefit") + va_cal.fit(X_cal, y_cal) + + p_prime = va_cal.predict_proba(X_binary_test) + + assert p_prime.shape == (len(X_binary_test), 2) + + +def test_cross_validation_example() -> None: + """Test cross-validation example workflow.""" + va_cal = VennAbersCalibrator(estimator=GaussianNB(), inductive=False, n_splits=5) + va_cal.fit(X_binary_train, y_binary_train) + + p_prime = va_cal.predict_proba(X_binary_test) + + assert p_prime.shape == (len(X_binary_test), 2) + + +# ============================================================================ +# Comparison with Other Calibration Methods Tests +# ============================================================================ + + +def test_comparison_with_uncalibrated() -> None: + """Compare calibrated vs uncalibrated predictions.""" + # Uncalibrated + clf_uncal = RandomForestClassifier(random_state=random_state) + clf_uncal.fit(X_binary_train, y_binary_train) + probs_uncal = clf_uncal.predict_proba(X_binary_test) + + # Calibrated + va_cal = VennAbersCalibrator( + estimator=RandomForestClassifier(random_state=random_state), + inductive=True, + random_state=random_state, + ) + va_cal.fit(X_binary_train, y_binary_train) + probs_cal = va_cal.predict_proba(X_binary_test) + + # Both should be valid probabilities + assert np.allclose(probs_uncal.sum(axis=1), 1.0) + assert np.allclose(probs_cal.sum(axis=1), 1.0) + + # Calibrated should be different from uncalibrated + assert not np.allclose(probs_uncal, probs_cal) + + +# ============================================================================ +# Regression Tests (ensure no breaking changes) +# ============================================================================ + + +def test_backward_compatibility_basic_usage() -> None: + """Test that basic usage pattern remains compatible.""" + # This test ensures the most common usage pattern doesn't break + va_cal = VennAbersCalibrator( + estimator=GaussianNB(), inductive=True, random_state=random_state + ) + va_cal.fit(X_binary_train, y_binary_train) + probs = va_cal.predict_proba(X_binary_test) + preds = va_cal.predict(X_binary_test) + + assert probs.shape == (len(X_binary_test), 2) + assert preds.shape == (len(X_binary_test),) + assert np.allclose(probs.sum(axis=1), 1.0) + + +def test_backward_compatibility_prefit() -> None: + """Test that prefit mode usage pattern remains compatible.""" + clf = GaussianNB() + clf.fit(X_binary_proper, y_binary_proper) + + va_cal = VennAbersCalibrator(estimator=clf, cv="prefit") + va_cal.fit(X_binary_cal, y_binary_cal) + probs = va_cal.predict_proba(X_binary_test) + + assert probs.shape == (len(X_binary_test), 2) + + +def test_backward_compatibility_cross_val() -> None: + """Test that cross-validation mode usage pattern remains compatible.""" + va_cal = VennAbersCalibrator( + estimator=GaussianNB(), inductive=False, n_splits=5, random_state=random_state + ) + va_cal.fit(X_binary_train, y_binary_train) + probs = va_cal.predict_proba(X_binary_test) + + assert probs.shape == (len(X_binary_test), 2) + + +# ============================================================================ +# Edge Cases for Different Modes +# ============================================================================ + + +def test_prefit_with_unfitted_estimator_raises_error() -> None: + """Test that prefit mode with unfitted estimator raises an error.""" + clf = GaussianNB() # Not fitted + + va_cal = VennAbersCalibrator(estimator=clf, cv="prefit") + + with pytest.raises(ValueError, match=".*must be already fitted.*"): + va_cal.fit(X_binary_cal, y_binary_cal) + + +def test_cross_val_without_n_splits_raises_error() -> None: + """Test that cross-validation mode without n_splits raises an error.""" + va_cal = VennAbersCalibrator( + estimator=GaussianNB(), + inductive=False, + n_splits=None, # Missing n_splits + ) + + with pytest.raises(ValueError, match=".*please provide n_splits.*"): + va_cal.fit(X_binary_train, y_binary_train) + + +def test_inductive_with_very_small_dataset() -> None: + """Test inductive mode with very small dataset.""" + X_small = X_binary_train[:20] + y_small = y_binary_train[:20] + + va_cal = VennAbersCalibrator( + estimator=GaussianNB(), inductive=True, random_state=random_state + ) + + # Should work but might have limited calibration quality + va_cal.fit(X_small, y_small) + probs = va_cal.predict_proba(X_binary_test) + + assert probs.shape == (len(X_binary_test), 2) + + +# ============================================================================ +# Attribute Access Tests +# ============================================================================ + + +def test_classes_attribute() -> None: + """Test that classes_ attribute is correctly set.""" + va_cal = VennAbersCalibrator( + estimator=GaussianNB(), inductive=True, random_state=random_state + ) + va_cal.fit(X_binary_train, y_binary_train) + + assert hasattr(va_cal, "classes_") + assert va_cal.classes_ is not None + assert len(va_cal.classes_) == 2 + np.testing.assert_array_equal(va_cal.classes_, np.unique(y_binary_train)) + + +def test_n_classes_attribute() -> None: + """Test that n_classes_ attribute is correctly set.""" + va_cal = VennAbersCalibrator( + estimator=GaussianNB(), inductive=True, random_state=random_state + ) + va_cal.fit(X_binary_train, y_binary_train) + + assert hasattr(va_cal, "n_classes_") + assert va_cal.n_classes_ == 2 + + +def test_va_calibrator_attribute() -> None: + """Test that va_calibrator_ attribute is correctly set.""" + va_cal = VennAbersCalibrator( + estimator=GaussianNB(), inductive=True, random_state=random_state + ) + va_cal.fit(X_binary_train, y_binary_train) + + assert hasattr(va_cal, "va_calibrator_") + assert va_cal.va_calibrator_ is not None + + +def test_single_estimator_attribute_prefit() -> None: + """Test that single_estimator_ attribute is set in prefit mode.""" + clf = GaussianNB() + clf.fit(X_binary_proper, y_binary_proper) + + va_cal = VennAbersCalibrator(estimator=clf, cv="prefit") + va_cal.fit(X_binary_cal, y_binary_cal) + + assert hasattr(va_cal, "single_estimator_") + assert va_cal.single_estimator_ is not None + + +# ============================================================================ +# Multi-class Specific Tests +# ============================================================================ + + +def test_multiclass_binary_calibration() -> None: + """Test that multi-class uses binary calibration for each class pair.""" + va_cal = VennAbersCalibrator( + estimator=GaussianNB(), inductive=True, random_state=random_state + ) + va_cal.fit(X_multi_train, y_multi_train) + probs = va_cal.predict_proba(X_multi_test) + + # For 3 classes, should have 3 probability columns + assert probs.shape == (len(X_multi_test), 3) + + # Each row should sum to 1 + np.testing.assert_allclose(probs.sum(axis=1), 1.0, rtol=1e-5) + + +def test_multiclass_prefit_mode() -> None: + """Test multi-class calibration in prefit mode.""" + clf = RandomForestClassifier(random_state=random_state) + clf.fit(X_multi_proper, y_multi_proper) + + va_cal = VennAbersCalibrator(estimator=clf, cv="prefit") + va_cal.fit(X_multi_cal, y_multi_cal) + probs = va_cal.predict_proba(X_multi_test) + + assert probs.shape == (len(X_multi_test), 3) + assert np.allclose(probs.sum(axis=1), 1.0) + + +def test_multiclass_cross_validation_mode() -> None: + """Test multi-class calibration in cross-validation mode.""" + va_cal = VennAbersCalibrator( + estimator=GaussianNB(), inductive=False, n_splits=5, random_state=random_state + ) + va_cal.fit(X_multi_train, y_multi_train) + probs = va_cal.predict_proba(X_multi_test) + + assert probs.shape == (len(X_multi_test), 3) + assert np.allclose(probs.sum(axis=1), 1.0) + + +def test_multiclass_predictions_match_argmax() -> None: + """Test that multi-class predictions match argmax of probabilities.""" + va_cal = VennAbersCalibrator( + estimator=GaussianNB(), inductive=True, random_state=random_state + ) + va_cal.fit(X_multi_train, y_multi_train) + + probs = va_cal.predict_proba(X_multi_test) + preds = va_cal.predict(X_multi_test) + + # Predictions should match the class with highest probability + assert va_cal.classes_ is not None + expected_preds = va_cal.classes_[np.argmax(probs, axis=1)] + np.testing.assert_array_equal(preds, expected_preds) + + +def test_multiclass_with_different_estimators() -> None: + """Test multi-class calibration with different base estimators.""" + estimators = [ + GaussianNB(), + RandomForestClassifier(n_estimators=10, random_state=random_state), + LogisticRegression(random_state=random_state, max_iter=1000), + ] + + for estimator in estimators: + va_cal = VennAbersCalibrator( + estimator=estimator, inductive=True, random_state=random_state + ) + va_cal.fit(X_multi_train, y_multi_train) + probs = va_cal.predict_proba(X_multi_test) + + assert probs.shape == (len(X_multi_test), 3) + assert np.allclose(probs.sum(axis=1), 1.0) + assert np.all((probs >= 0) & (probs <= 1)) + + +# ============================================================================ +# Precision Parameter Tests +# ============================================================================ + + +@pytest.mark.parametrize("precision", [None, 2, 4, 6]) +def test_precision_parameter(precision: Optional[int]) -> None: + """Test that precision parameter works correctly.""" + va_cal = VennAbersCalibrator( + estimator=GaussianNB(), + inductive=True, + random_state=random_state, + precision=precision, + ) + va_cal.fit(X_binary_train, y_binary_train) + probs = va_cal.predict_proba(X_binary_test) + + assert probs.shape == (len(X_binary_test), 2) + assert np.allclose(probs.sum(axis=1), 1.0) + + +def test_precision_speeds_up_computation() -> None: + """Test that precision parameter reduces computation time.""" + import time + + # Without precision + va_cal_no_precision = VennAbersCalibrator( + estimator=GaussianNB(), + inductive=True, + random_state=random_state, + precision=None, + ) + start = time.time() + va_cal_no_precision.fit(X_binary_train, y_binary_train) + va_cal_no_precision.predict_proba(X_binary_test) + time_no_precision = time.time() - start + + # With precision + va_cal_with_precision = VennAbersCalibrator( + estimator=GaussianNB(), inductive=True, random_state=random_state, precision=2 + ) + start = time.time() + va_cal_with_precision.fit(X_binary_train, y_binary_train) + va_cal_with_precision.predict_proba(X_binary_test) + time_with_precision = time.time() - start + + # With precision should be faster or similar + # (may not always be faster for small datasets) + assert time_with_precision <= time_no_precision + + +@pytest.mark.parametrize("precision", [1, 2, 3, 4]) +def test_different_precision_values(precision: int) -> None: + """Test that different precision values work correctly.""" + va_cal = VennAbersCalibrator( + estimator=GaussianNB(), + inductive=True, + random_state=random_state, + precision=precision, + ) + va_cal.fit(X_binary_train, y_binary_train) + probs = va_cal.predict_proba(X_binary_test) + + assert probs.shape == (len(X_binary_test), 2) + assert np.allclose(probs.sum(axis=1), 1.0) + + +def test_precision_maintains_calibration_quality() -> None: + """Test that precision parameter maintains reasonable calibration quality.""" + va_cal_high_prec = VennAbersCalibrator( + estimator=GaussianNB(), inductive=True, random_state=random_state, precision=4 + ) + va_cal_high_prec.fit(X_binary_train, y_binary_train) + probs_high = va_cal_high_prec.predict_proba(X_binary_test) + + va_cal_low_prec = VennAbersCalibrator( + estimator=GaussianNB(), inductive=True, random_state=random_state, precision=2 + ) + va_cal_low_prec.fit(X_binary_train, y_binary_train) + probs_low = va_cal_low_prec.predict_proba(X_binary_test) + + # Both should be valid probabilities + assert np.allclose(probs_high.sum(axis=1), 1.0) + assert np.allclose(probs_low.sum(axis=1), 1.0) + + # They should be similar but not necessarily identical + assert probs_high.shape == probs_low.shape + + +def test_precision_parameter_multiclass() -> None: + """Test that precision parameter works correctly for multiclass.""" + va_cal = VennAbersCalibrator( + estimator=GaussianNB(), inductive=True, precision=6, random_state=random_state + ) + va_cal.fit(X_multi_train, y_multi_train) + probs = va_cal.predict_proba(X_multi_test) + + assert probs.shape == (len(X_multi_test), 3) + assert np.allclose(probs.sum(axis=1), 1.0) + + +# ============================================================================ +# Error Message Quality Tests +# ============================================================================ + + +def test_error_message_for_missing_estimator() -> None: + """Test that missing estimator gives clear error message.""" + va_cal = VennAbersCalibrator(estimator=None) + + with pytest.raises(ValueError, match=".*estimator must be provided.*"): + va_cal.fit(X_binary_train, y_binary_train) + + +def test_error_message_for_invalid_cv() -> None: + """Test that invalid cv parameter gives clear error message.""" + va_cal = VennAbersCalibrator(estimator=GaussianNB(), cv="invalid_cv_option") + + with pytest.raises(ValueError): + va_cal.fit(X_binary_train, y_binary_train) + + +# ============================================================================ +# Final Comprehensive Test +# ============================================================================ + + +def test_venn_abers_cv_with_sample_weight() -> None: + """Test VennAbersCV with sample weights in cross-validation mode.""" + # Create sample weights - higher weights for some samples + sklearn.set_config(enable_metadata_routing=True) + sample_weight = np.ones(len(y_binary_train)) + sample_weight[: len(y_binary_train) // 2] = 2.0 # Double weight for first half + weighted_estimator = GaussianNB().set_fit_request(sample_weight=True) + va_cal = VennAbersCalibrator( + estimator=weighted_estimator, + inductive=False, # Use cross-validation mode + n_splits=3, + random_state=random_state, + ) + + # Fit with sample weights + va_cal.fit(X_binary_train, y_binary_train, sample_weight=sample_weight) + probs = va_cal.predict_proba(X_binary_test) + + # Should produce valid probabilities + assert probs.shape == (len(X_binary_test), 2) + assert np.allclose(probs.sum(axis=1), 1.0) + assert np.all((probs >= 0) & (probs <= 1)) + + # Fit without sample weights for comparison + va_cal_no_weight = VennAbersCalibrator( + estimator=GaussianNB(), inductive=False, n_splits=3, random_state=random_state + ) + va_cal_no_weight.fit(X_binary_train, y_binary_train) + probs_no_weight = va_cal_no_weight.predict_proba(X_binary_test) + + # Results should be different when using sample weights + with pytest.raises(AssertionError): + np.testing.assert_array_almost_equal(probs, probs_no_weight) + + +def test_venn_abers_cv_sample_weight_all_folds() -> None: + """Test that sample weights are properly used across all CV folds.""" + sklearn.set_config(enable_metadata_routing=True) + sample_weight = np.random.RandomState(42).uniform(0.5, 2.0, len(y_binary_train)) + weighted_estimator = GaussianNB().set_fit_request(sample_weight=True) + va_cal = VennAbersCalibrator( + estimator=weighted_estimator, + inductive=False, + n_splits=5, # Multiple folds to ensure all are tested + random_state=random_state, + ) + + # Should not raise any errors + va_cal.fit(X_binary_train, y_binary_train, sample_weight=sample_weight) + probs = va_cal.predict_proba(X_binary_test) + + # Verify output validity + assert probs.shape == (len(X_binary_test), 2) + assert np.allclose(probs.sum(axis=1), 1.0) + assert np.all((probs >= 0) & (probs <= 1)) + + +def test_comprehensive_workflow() -> None: + """Comprehensive test covering multiple aspects of VennAbersCalibrator.""" + # Test all three modes with binary classification + modes: List[Tuple[str, Dict[str, Any]]] = [ + ("inductive", {"inductive": True}), + ("cross_val", {"inductive": False, "n_splits": 5}), + ] + + for mode_name, mode_params in modes: + # Binary classification + va_cal_binary = VennAbersCalibrator( + estimator=RandomForestClassifier( + n_estimators=10, random_state=random_state + ), + random_state=random_state, + **mode_params, + ) + va_cal_binary.fit(X_binary_train, y_binary_train) + + probs_binary = va_cal_binary.predict_proba(X_binary_test) + preds_binary = va_cal_binary.predict(X_binary_test) + + # Validate binary results + assert probs_binary.shape == (len(X_binary_test), 2) + assert preds_binary.shape == (len(X_binary_test),) + assert np.allclose(probs_binary.sum(axis=1), 1.0) + assert np.all((probs_binary >= 0) & (probs_binary <= 1)) + + # Multi-class classification + va_cal_multi = VennAbersCalibrator( + estimator=RandomForestClassifier( + n_estimators=10, random_state=random_state + ), + random_state=random_state, + **mode_params, + ) + va_cal_multi.fit(X_multi_train, y_multi_train) + + probs_multi = va_cal_multi.predict_proba(X_multi_test) + preds_multi = va_cal_multi.predict(X_multi_test) + + # Validate multi-class results + assert probs_multi.shape == (len(X_multi_test), 3) + assert preds_multi.shape == (len(X_multi_test),) + assert np.allclose(probs_multi.sum(axis=1), 1.0) + assert np.all((probs_multi >= 0) & (probs_multi <= 1)) + + # Test prefit mode separately + clf_binary = RandomForestClassifier(n_estimators=10, random_state=random_state) + clf_binary.fit(X_binary_proper, y_binary_proper) + + va_cal_prefit = VennAbersCalibrator(estimator=clf_binary, cv="prefit") + va_cal_prefit.fit(X_binary_cal, y_binary_cal) + + probs_prefit = va_cal_prefit.predict_proba(X_binary_test) + assert probs_prefit.shape == (len(X_binary_test), 2) + assert np.allclose(probs_prefit.sum(axis=1), 1.0) + + +def test_predict_proba_prefitted_va_one_vs_all(): + """ + Test predict_proba_prefitted_va with one_vs_all strategy + to cover lines 345-368. + """ + # Generate multiclass classification data + X, y = make_classification( + n_samples=500, + n_classes=3, + n_informative=10, + n_redundant=0, + n_clusters_per_class=1, + random_state=42, + ) + + # Split into train, calibration, and test sets + X_train, X_temp, y_train, y_temp = train_test_split( + X, y, test_size=0.4, random_state=42 + ) + X_cal, X_test, y_cal, y_test = train_test_split( + X_temp, y_temp, test_size=0.5, random_state=42 + ) + + # Train a classifier + clf = GaussianNB() + clf.fit(X_train, y_train) + + # Get probability predictions + p_cal = clf.predict_proba(X_cal) + p_test = clf.predict_proba(X_test) + + # Test one_vs_all strategy + p_calibrated, p0p1 = predict_proba_prefitted_va( + p_cal, y_cal, p_test, precision=None, va_tpe="one_vs_all" + ) + + # Assertions + assert p_calibrated.shape == p_test.shape + assert np.allclose(p_calibrated.sum(axis=1), 1.0) + assert len(p0p1) == 3 # One for each class + assert all(p.shape == (len(p_test), 2) for p in p0p1) + + # Test with precision parameter + p_calibrated_prec, p0p1_prec = predict_proba_prefitted_va( + p_cal, y_cal, p_test, precision=3, va_tpe="one_vs_all" + ) + + assert p_calibrated_prec.shape == p_test.shape + assert np.allclose(p_calibrated_prec.sum(axis=1), 1.0) + + +def test_predict_proba_prefitted_va_one_vs_one(): + """ + Test predict_proba_prefitted_va with one_vs_one strategy + for comparison and completeness. + """ + # Generate multiclass classification data + X, y = make_classification( + n_samples=500, + n_classes=3, + n_informative=10, + n_redundant=0, + n_clusters_per_class=1, + random_state=42, + ) + + # Split into train, calibration, and test sets + X_train, X_temp, y_train, y_temp = train_test_split( + X, y, test_size=0.4, random_state=42 + ) + X_cal, X_test, y_cal, y_test = train_test_split( + X_temp, y_temp, test_size=0.5, random_state=42 + ) + + # Train a classifier + clf = GaussianNB() + clf.fit(X_train, y_train) + + # Get probability predictions + p_cal = clf.predict_proba(X_cal) + p_test = clf.predict_proba(X_test) + + # Test one_vs_one strategy + p_calibrated, p0p1 = predict_proba_prefitted_va( + p_cal, y_cal, p_test, precision=None, va_tpe="one_vs_one" + ) + + # Assertions + assert p_calibrated.shape == p_test.shape + assert np.allclose(p_calibrated.sum(axis=1), 1.0) + assert len(p0p1) == 3 # C(3,2) = 3 pairs + + +def test_predict_proba_prefitted_va_invalid_type(): + """ + Test that invalid va_tpe raises ValueError. + """ + # Generate simple data + X, y = make_classification(n_samples=100, n_classes=2, random_state=42) + X_train, X_test, y_train, y_test = train_test_split(X, y, random_state=42) + + clf = GaussianNB() + clf.fit(X_train, y_train) + + p_cal = clf.predict_proba(X_train) + p_test = clf.predict_proba(X_test) + + with pytest.raises(ValueError, match="Invalid va_tpe"): + predict_proba_prefitted_va(p_cal, y_train, p_test, va_tpe="invalid_type") + + +def test_venn_abers_basic(): + """ + Test basic VennAbers functionality for binary classification. + """ + # Generate binary classification data + X, y = make_classification(n_samples=500, n_classes=2, random_state=42) + X_train, X_test, y_train, y_test = train_test_split(X, y, random_state=42) + + # Further split training data + X_train_proper, X_cal, y_train_proper, y_cal = train_test_split( + X_train, y_train, test_size=0.2, random_state=42 + ) + + # Train classifier + clf = GaussianNB() + clf.fit(X_train_proper, y_train_proper) + + # Get probabilities + p_cal = clf.predict_proba(X_cal) + p_test = clf.predict_proba(X_test) + + # Apply Venn-ABERS calibration + va = VennAbers() + va.fit(p_cal, y_cal) + p_prime, p0_p1 = va.predict_proba(p_test) + + # Assertions + assert p_prime.shape == (len(X_test), 2) + assert p0_p1.shape == (len(X_test), 2) + assert np.allclose(p_prime.sum(axis=1), 1.0) + + # Test with precision + va_prec = VennAbers() + va_prec.fit(p_cal, y_cal, precision=3) + p_prime_prec, _ = va_prec.predict_proba(p_test) + assert p_prime_prec.shape == (len(X_test), 2) + + +def test_venn_abers_cv_brier_loss() -> None: + """Test VennAbersCV with Brier loss (non-log loss).""" + va_cal = VennAbersCalibrator( + estimator=GaussianNB(), inductive=False, n_splits=3, random_state=random_state + ) + va_cal.fit(X_binary_train, y_binary_train) + + # Use 'brier' loss to trigger the else branch + probs_brier = va_cal.predict_proba(X_binary_test, loss="brier") + + # Should produce valid probabilities + assert probs_brier.shape == (len(X_binary_test), 2) + assert np.allclose(probs_brier.sum(axis=1), 1.0) + assert np.all((probs_brier >= 0) & (probs_brier <= 1)) + + +def test_venn_abers_cv_p0_p1_output() -> None: + """Test VennAbersCV predict_proba with p0_p1_output=True.""" + from sklearn.naive_bayes import GaussianNB + from mapie._venn_abers import VennAbersCV + + # Create and fit VennAbersCV in inductive mode + va_cv = VennAbersCV( + estimator=GaussianNB(), inductive=True, random_state=random_state + ) + va_cv.fit(X_binary_train, y_binary_train) + + # Call predict_proba with p0_p1_output=True to reach the target code + p_prime, p0_p1 = va_cv.predict_proba(X_binary_test, p0_p1_output=True) + + # Verify the outputs + assert p_prime.shape == (len(X_binary_test), 2) + assert p0_p1.shape == (len(X_binary_test), 2) # Should have p0 and p1 stacked + assert np.allclose(p_prime.sum(axis=1), 1.0) + assert np.all((p_prime >= 0) & (p_prime <= 1)) + assert np.all((p0_p1 >= 0) & (p0_p1 <= 1)) + + +def test_multiclass_cross_validation_requires_n_splits() -> None: + """Test that VennAbersMultiClass in CVAP mode requires n_splits parameter.""" + from mapie._venn_abers import VennAbersMultiClass + + va_multi = VennAbersMultiClass( + estimator=GaussianNB(), + inductive=False, + n_splits=None, # Missing n_splits for cross-validation mode + ) + + with pytest.raises( + Exception, match=r".*For Cross Venn ABERS please provide n_splits.*" + ): + va_multi.fit(X_multi_train, y_multi_train) + + +def test_inductive_missing_size_parameters_raises_error(): + """Test that inductive mode raises error + when train_proper_size is None. + """ + # Generate multi-class dataset + X, y = make_classification( + n_samples=100, n_classes=3, n_informative=10, n_redundant=0, random_state=42 + ) + + # Create VennAbersMultiClass with inductive=True but no size parameters + va_multi = VennAbersMultiClass( + estimator=GaussianNB(), inductive=True, train_proper_size=None, random_state=42 + ) + + # Should raise Exception when fitting without size parameters + with pytest.raises( + Exception, match="For Inductive Venn-ABERS please provide either calibration" + ): + va_multi.fit(X, y) + + +def test_multiclass_p0_p1_output() -> None: + """Test VennAbersMultiClass with p0_p1_output=True.""" + from mapie._venn_abers import VennAbersMultiClass + from sklearn.naive_bayes import GaussianNB + import numpy as np + + # Use the existing test data fixtures + random_state = 42 + np.random.seed(random_state) + + # Generate multiclass data + n_samples = 100 + n_features = 4 + n_classes = 3 + + X_train = np.random.randn(n_samples, n_features) + y_train = np.random.randint(0, n_classes, n_samples) + + X_test = np.random.randn(30, n_features) + + # Create and fit VennAbersMultiClass + estimator = GaussianNB() + va_multi = VennAbersMultiClass( + estimator=estimator, inductive=True, cal_size=0.3, random_state=random_state + ) + + va_multi.fit(X_train, y_train) + + # Test with p0_p1_output=True + p_prime, p0_p1_list = va_multi.predict_proba(X_test, loss="log", p0_p1_output=True) + + # Verify p_prime shape and properties + assert p_prime.shape == (len(X_test), n_classes) + assert np.allclose(p_prime.sum(axis=1), 1.0) + assert np.all((p_prime >= 0) & (p_prime <= 1)) + + # Verify p0_p1_list structure + # For 3 classes, we should have C(3,2) = 3 pairwise comparisons + n_pairs = n_classes * (n_classes - 1) // 2 + assert len(p0_p1_list) == n_pairs + + # Verify each p0_p1 entry has correct shape + # Each entry should have shape (n_test_samples, 2*n_splits) for IVAP + for p0_p1 in p0_p1_list: + assert p0_p1.shape[0] == len(X_test) + assert p0_p1.shape[1] >= 2 # At least p0 and p1 for one split + + # Verify multiclass_probs and multiclass_p0p1 are populated + assert len(va_multi.multiclass_probs) == n_pairs + assert len(va_multi.multiclass_p0p1) == n_pairs + + # Verify each multiclass_probs entry is binary probabilities + for probs in va_multi.multiclass_probs: + assert probs.shape == (len(X_test), 2) + assert np.allclose(probs.sum(axis=1), 1.0) + + +def test_venn_abers_multiclass_p0_p1_output() -> None: + """Test VennAbersMultiClass.predict_proba with p0_p1_output=True.""" + + # Setup test data + random_state = 42 + np.random.seed(random_state) + + n_samples = 150 + n_features = 4 + n_classes = 3 + + X_train = np.random.randn(n_samples, n_features) + y_train = np.random.randint(0, n_classes, n_samples) + X_test = np.random.randn(30, n_features) + + # Test with inductive mode + estimator = GaussianNB() + va_multi = VennAbersMultiClass( + estimator=estimator, inductive=True, cal_size=0.3, random_state=random_state + ) + + va_multi.fit(X_train, y_train) + + # Test with p0_p1_output=True + p_prime, p0_p1_list = va_multi.predict_proba(X_test, loss="log", p0_p1_output=True) + + # Verify p_prime shape and properties + assert p_prime.shape == (len(X_test), n_classes) + assert np.allclose(p_prime.sum(axis=1), 1.0) + assert np.all((p_prime >= 0) & (p_prime <= 1)) + + # Verify p0_p1_list structure + # For 3 classes with one-vs-one, we should have C(3,2) = 3 pairwise comparisons + n_pairs = n_classes * (n_classes - 1) // 2 + assert len(p0_p1_list) == n_pairs + + # Verify each p0_p1 entry has correct shape + for p0_p1 in p0_p1_list: + assert p0_p1.shape[0] == len(X_test) + # For inductive mode with n_splits=1, should have 2 columns (p0 and p1) + assert p0_p1.shape[1] == 2 + assert np.all((p0_p1 >= 0) & (p0_p1 <= 1)) + + # Verify multiclass_p0p1 attribute is populated + assert len(va_multi.multiclass_p0p1) == n_pairs + assert va_multi.multiclass_p0p1 == p0_p1_list + + # Test with p0_p1_output=False (default behavior) + p_prime_only = va_multi.predict_proba(X_test, loss="log", p0_p1_output=False) + + # Verify it returns only p_prime + assert isinstance(p_prime_only, np.ndarray) + assert p_prime_only.shape == (len(X_test), n_classes) + assert np.allclose(p_prime_only.sum(axis=1), 1.0) + + # Test with cross-validation mode + va_multi_cv = VennAbersMultiClass( + estimator=GaussianNB(), inductive=False, n_splits=3, random_state=random_state + ) + + va_multi_cv.fit(X_train, y_train) + + p_prime_cv, p0_p1_list_cv = va_multi_cv.predict_proba( + X_test, loss="log", p0_p1_output=True + ) + + # Verify CV mode results + assert p_prime_cv.shape == (len(X_test), n_classes) + assert len(p0_p1_list_cv) == n_pairs + + # For CV mode with n_splits=3, each p0_p1 should have 6 columns (2 * n_splits) + for p0_p1_cv in p0_p1_list_cv: + assert p0_p1_cv.shape[0] == len(X_test) + assert p0_p1_cv.shape[1] == 2 * 3 # 2 * n_splits + assert np.all((p0_p1_cv >= 0) & (p0_p1_cv <= 1)) + + # Test with Brier loss + p_prime_brier, p0_p1_brier = va_multi.predict_proba( + X_test, loss="brier", p0_p1_output=True + ) + + assert p_prime_brier.shape == (len(X_test), n_classes) + assert len(p0_p1_brier) == n_pairs + assert np.allclose(p_prime_brier.sum(axis=1), 1.0) + + +def test_prefit_predict_proba_without_single_estimator() -> None: + """ + Test that predict_proba raises RuntimeError when single_estimator_ + is None in prefit mode. + """ + + clf = GaussianNB() + clf.fit(X_binary_proper, y_binary_proper) + + va_cal = VennAbersCalibrator(estimator=clf, cv="prefit") + va_cal.fit(X_binary_cal, y_binary_cal) + + # Manually set single_estimator_ to None to simulate the error condition + va_cal.single_estimator_ = None + + with pytest.raises( + RuntimeError, match=r"single_estimator_ should not be None in prefit mode" + ): + va_cal.predict_proba(X_binary_test) + + +def test_prefit_predict_proba_without_n_classes() -> None: + """ + Test that predict_proba raises RuntimeError when n_classes_ + is None after fitting in prefit mode. + """ + + clf = GaussianNB() + clf.fit(X_binary_proper, y_binary_proper) + + va_cal = VennAbersCalibrator(estimator=clf, cv="prefit") + va_cal.fit(X_binary_cal, y_binary_cal) + + # Manually set n_classes_ to None to simulate the error condition + va_cal.n_classes_ = None + + with pytest.raises( + RuntimeError, match=r"n_classes_ should not be None after fitting" + ): + va_cal.predict_proba(X_binary_test) + + +def test_prefit_predict_proba_binary_without_va_calibrator() -> None: + """ + Test that predict_proba raises RuntimeError when va_calibrator_ + is None for binary classification in prefit mode. + """ + + clf = GaussianNB() + clf.fit(X_binary_proper, y_binary_proper) + + va_cal = VennAbersCalibrator(estimator=clf, cv="prefit") + va_cal.fit(X_binary_cal, y_binary_cal) + + # Manually set va_calibrator_ to None to simulate the error condition + va_cal.va_calibrator_ = None + + with pytest.raises( + RuntimeError, + match=r"va_calibrator_ should not be None for binary classification", + ): + va_cal.predict_proba(X_binary_test) + + +def test_prefit_predict_proba_binary_with_loss_parameter() -> None: + """ + Test that predict_proba correctly uses loss parameter when available + in va_calibrator_.predict_proba for binary classification in prefit mode. + """ + + clf = GaussianNB() + clf.fit(X_binary_proper, y_binary_proper) + + va_cal = VennAbersCalibrator(estimator=clf, cv="prefit") + va_cal.fit(X_binary_cal, y_binary_cal) + + # Test with default loss='log' + probs_log = va_cal.predict_proba(X_binary_test, loss="log") + + # Test with loss='brier' + probs_brier = va_cal.predict_proba(X_binary_test, loss="brier") + + # Verify output shape and properties + assert probs_log.shape == (len(X_binary_test), 2) + assert probs_brier.shape == (len(X_binary_test), 2) + assert np.allclose(probs_log.sum(axis=1), 1.0) + assert np.allclose(probs_brier.sum(axis=1), 1.0) + + +def test_inductive_predict_proba_with_wrong_calibrator_type() -> None: + """ + Test that predict_proba raises RuntimeError when va_calibrator_ + is not a VennAbersMultiClass instance in inductive/cross-validation mode. + """ + + va_cal = VennAbersCalibrator( + estimator=GaussianNB(), inductive=True, random_state=random_state + ) + va_cal.fit(X_binary_train, y_binary_train) + + # Manually set va_calibrator_ to wrong type + # (VennAbers instead of VennAbersMultiClass) + va_cal.va_calibrator_ = VennAbers() + + with pytest.raises( + RuntimeError, + match=r"va_calibrator_ should be VennAbersMultiClass instance in " + r"inductive/cross-validation mode", + ): + va_cal.predict_proba(X_binary_test) + + +def test_inductive_predict_proba_without_loss_parameter() -> None: + """ + Test that predict_proba works correctly when va_calibrator_.predict_proba + doesn't have a loss parameter in inductive/cross-validation mode. + """ + import inspect + + va_cal = VennAbersCalibrator( + estimator=GaussianNB(), inductive=True, random_state=random_state + ) + va_cal.fit(X_binary_train, y_binary_train) + + # Create a mock that inherits from VennAbersMultiClass + class MockVennAbersMultiClass(VennAbersMultiClass): + def predict_proba(self, X, p0_p1_output=False): + """Mock predict_proba without loss parameter.""" + probs = np.random.rand(len(X), 2) + probs = probs / probs.sum(axis=1, keepdims=True) + return probs + + # Replace with mock that doesn't have loss parameter + mock_calibrator = MockVennAbersMultiClass(estimator=GaussianNB(), inductive=True) + + # Verify the mock's predict_proba doesn't have 'loss' parameter + sig = inspect.signature(mock_calibrator.predict_proba) + assert "loss" not in sig.parameters + + va_cal.va_calibrator_ = mock_calibrator + + # Call predict_proba - should use the else branch without loss parameter + probs = va_cal.predict_proba(X_binary_test) + + # Verify output shape + assert probs.shape == (len(X_binary_test), 2) + assert np.allclose(probs.sum(axis=1), 1.0) + + +def test_predict_without_n_classes() -> None: + """ + Test that predict raises RuntimeError when n_classes_ + is None after fitting. + """ + va_cal = VennAbersCalibrator( + estimator=GaussianNB(), inductive=True, random_state=random_state + ) + va_cal.fit(X_binary_train, y_binary_train) + + # Manually set n_classes_ to None to simulate the error condition + va_cal.n_classes_ = None + + with pytest.raises( + RuntimeError, match=r"n_classes_ should not be None after fitting" + ): + va_cal.predict(X_binary_test) + + +def test_predict_without_classes() -> None: + """ + Test that predict raises RuntimeError when classes_ + is None after fitting. + """ + va_cal = VennAbersCalibrator( + estimator=GaussianNB(), inductive=True, random_state=random_state + ) + va_cal.fit(X_binary_train, y_binary_train) + + # Manually set classes_ to None to simulate the error condition + va_cal.classes_ = None + + with pytest.raises( + RuntimeError, match=r"classes_ should not be None after fitting" + ): + va_cal.predict(X_binary_test) + + +def test_prefit_classes_none_after_fitting() -> None: + """ + Test that fit raises RuntimeError when classes_ is None + after fitting estimator in prefit mode. + """ + from sklearn.naive_bayes import GaussianNB + + # Create and fit a base estimator + clf = GaussianNB() + clf.fit(X_binary_train, y_binary_train) + + # Create VennAbersCalibrator in prefit mode + va_cal = VennAbersCalibrator(estimator=clf, cv="prefit", random_state=random_state) + + # Manually set the classes_ attribute to None + # to simulate the error condition + clf.classes_ = None + + with pytest.raises( + RuntimeError, match=r"classes_ should not be None after fitting estimator" + ): + va_cal.fit(X_binary_test, y_binary_test) + + +@pytest.mark.parametrize("cv_ensemble", [True, False]) +def test_cv_ensemble_cross_binary(cv_ensemble): + """Test cv_ensemble parameter with cross-validation mode for binary classification.""" + + clf = LogisticRegression(random_state=42) + va_cal = VennAbersCalibrator( + estimator=clf, + inductive=False, + n_splits=3, + cv_ensemble=cv_ensemble, + random_state=42, + ) + + va_cal.fit(X_binary_train, y_binary_train) + + # Verify predictions work + proba = va_cal.predict_proba(X_binary_test) + predictions = va_cal.predict(X_binary_test) + + assert proba.shape == (len(X_binary_test), 2) + assert predictions.shape == (len(X_binary_test),) + assert np.allclose(proba.sum(axis=1), 1.0) + assert np.all((proba >= 0) & (proba <= 1)) diff --git a/mapie/utils.py b/mapie/utils.py index 860bc5066..fa1463df1 100644 --- a/mapie/utils.py +++ b/mapie/utils.py @@ -2,7 +2,6 @@ import warnings from inspect import signature from typing import Any, Iterable, Optional, Tuple, Union, cast - import numpy as np from sklearn.base import ClassifierMixin, RegressorMixin from sklearn.linear_model import LogisticRegression