Source code for reval.best_nclust_cv

from sklearn.model_selection import RepeatedKFold, RepeatedStratifiedKFold
from reval.relative_validation import RelativeValidation
from collections import namedtuple
from scipy import stats
import numpy as np
import math
import logging
import multiprocessing as mp
import itertools
import pandas as pd

logging.basicConfig(format='%(asctime)s, %(levelname)s %(message)s',
                    datefmt='%H:%M:%S',
                    level=logging.INFO)


[docs]class FindBestClustCV(RelativeValidation): """ Child class of :class:`reval.relative_validation.RelativeValidation`. It performs (repeated) k-fold cross validation on the training set to select the best number of clusters, i.e., the number that minimizes the normalized stability (i.e., average misclassification error/asymptotic misclassification rate). :param nfold: number of CV folds. :type nfold: int :param nclust_range: list with clusters to look for, default None. :type nclust_range: list of int :param s: classification object inherited from :class:`reval.relative_validation.RelativeValidation`. :type s: class :param c: clustering object inherited from :class:`reval.relative_validation.RelativeValidation`. :type c: class :param nrand: number of random labeling iterations to compute asymptotic misclassification rate, inherited from :class:`reval.relative_validation.RelativeValidation` class. :type nrand: int :param n_jobs: number of processes to be run in parallel, default 1. :type n_jobs: int :attribute: cv_results_ dataframe with cross validation results. Columns are ncl = number of clusters; ms_tr = misclassification training; ms_val = misclassification validation. """ def __init__(self, s, c, nrand, nfold=2, n_jobs=1, nclust_range=None): """ Construct method. """ super().__init__(s, c, nrand) self.nfold = nfold if abs(n_jobs) > mp.cpu_count(): self.n_jobs = mp.cpu_count() else: self.n_jobs = abs(n_jobs) self.nclust_range = nclust_range
[docs] def best_nclust(self, data, iter_cv=1, strat_vect=None): """ This method takes as input the training dataset and the stratification vector (if available) and performs a (repeated) CV procedure to select the best number of clusters that minimizes normalized stability. :param data: training dataset. :type data: ndarray, (n_samples, n_features) :param iter_cv: number of iteration for repeated CV, default 1. :type iter_cv: integer :param strat_vect: vector for stratification, defaults to None. :type strat_vect: ndarray, (n_samples,) :return: CV metrics for training and validation sets, best number of clusters, misclassification errors at each CV iteration. :rtype: dictionary, int, (list) if n_clusters parameter is not available """ data_array = np.array(data) reval = RelativeValidation(self.class_method, self.clust_method, self.nrand) if strat_vect is not None: kfold = RepeatedStratifiedKFold(n_splits=self.nfold, n_repeats=iter_cv, random_state=42) else: kfold = RepeatedKFold(n_splits=self.nfold, n_repeats=iter_cv, random_state=42) fold_gen = kfold.split(data_array, strat_vect) if self.nclust_range is not None and 'n_clusters' in self.clust_method.get_params().keys(): params = list(itertools.product([(data_array, reval)], fold_gen, self.nclust_range)) else: params = list(itertools.product([(data_array, reval)], fold_gen)) if self.n_jobs > 1: p = mp.Pool(processes=self.n_jobs) miscl = list(zip(*p.starmap(self._fit, params))) p.close() p.join() out = list(zip(*miscl)) else: miscl = [] for p in params: if len(p) > 2: miscl.append(self._fit(data_obj=p[0], idxs=p[1], ncl=p[2])) else: miscl.append(self._fit(data_obj=p[0], idxs=p[1])) out = miscl # return dataframe attribute (cv_results_) with cv scores # If no point are labeled (e.g., all points assigned to -1 class by HDBSCAN) # the method returns cv_results_ = pd.DataFrame(out, columns=['ncl', 'ms_tr', 'ms_val', 'tr_labels', 'val_labels']) ctrl_rows = cv_results_.shape[0] cv_results_.dropna(axis=0, inplace=True) if 0 < ctrl_rows - cv_results_.shape[0] < ctrl_rows: logging.info("Dropped results where clustering algorithm failed to identify clusters.") FindBestClustCV.cv_results_ = cv_results_ elif ctrl_rows - cv_results_.shape[0] == 0: FindBestClustCV.cv_results_ = cv_results_ else: logging.info(f"{self.clust_method} was not able to identify any cluster. Failed run with " f"{self.class_method}.") return None metrics = {'train': {}, 'val': {}} for ncl in cv_results_.ncl.unique(): norm_stab_tr = cv_results_.loc[cv_results_.ncl == ncl]['ms_tr'] norm_stab_val = cv_results_.loc[cv_results_.ncl == ncl]['ms_val'] metrics['train'][ncl] = (np.mean(norm_stab_tr), _confint(norm_stab_tr)) metrics['val'][ncl] = (np.mean(norm_stab_val), _confint(norm_stab_val)) val_score = np.array([val[0] for val in metrics['val'].values()]) bestscore = min(val_score) # select the cluster with the minimum misclassification error # and the maximum number of clusters if self.nclust_range is not None and 'n_clusters' in self.clust_method.get_params().keys(): bestncl = self.nclust_range[np.flatnonzero(val_score == bestscore)[-1]] return metrics, bestncl else: bestncl = list(metrics['val'].keys())[np.flatnonzero(val_score == bestscore)[-1]] best_idx = cv_results_.loc[cv_results_.ncl == bestncl].ms_val.idxmin() idx_vect = np.concatenate((params[best_idx][1][-2], params[best_idx][1][-1])) label_vect = np.concatenate((out[best_idx][-2], out[best_idx][-1])) tr_lab = [lab for _, lab in sorted(zip(idx_vect, label_vect))] return metrics, bestncl, tr_lab
[docs] def evaluate(self, data_tr, data_ts, nclust=None, tr_lab=None): """ Method that applies the selected clustering algorithm with the best number of clusters to the test set. It returns clustering labels. :param data_tr: training dataset. :type data_tr: ndarray, (n_samples, n_features) :param data_ts: test dataset. :type data_ts: ndarray, (n_samples, n_features) :param nclust: best number of clusters, default None. :type nclust: int :param tr_lab: clustering labels for the training set. If not None the clustering algorithm is not performed and the classifier is fitted. Available for clustering methods without `n_clusters` parameter. Default None. :type tr_lab: array-like :return: labels and accuracy for both training and test sets. :rtype: namedtuple, (train_cllab: array, train_acc:float, test_cllab:array, test_acc:float) """ if isinstance(tr_lab, list): tr_lab = np.array(tr_lab) try: if 'n_clusters' in self.clust_method.get_params().keys(): self.clust_method.n_clusters = nclust tr_misc, modelfit, labels_tr = super().train(data_tr) else: tr_misc, modelfit, labels_tr = super().train(data_tr, tr_lab) ts_misc, labels_ts = super().test(data_ts, modelfit) except TypeError: logging.info(f"Not possible to perform evaluation on the test set. " f"No clusters were identified with " f"{self.clust_method}") return None Eval = namedtuple('Eval', ['train_cllab', 'train_acc', 'test_cllab', 'test_acc']) out = Eval(labels_tr, 1 - tr_misc, labels_ts, 1 - ts_misc) return out
[docs] @staticmethod def _fit(data_obj, idxs, ncl=None): """ Function that calls training, test, and random labeling. :param data_obj: dataset and `reval.RelativeValidation` class. :type data_obj: tuple :param idxs: lists of training and validation indices. :type idxs: tuple :param ncl: number of clusters, default None :type ncl: int :return: number of clusters and misclassification errors for training and validation. :rtype: tuple (int, float, float) """ data_array, reval = data_obj tr_idx, val_idx = idxs tr_set, val_set = data_array[tr_idx], data_array[val_idx] if 'n_clusters' in reval.clust_method.get_params().keys(): reval.clust_method.n_clusters = ncl # Case in which the number of clusters identified is not sufficient or points are not classified # (it happens particularly for SpectralClustering and HDBSCAN) try: miscl_tr, modelfit, tr_labels = reval.train(tr_set) miscl_val, val_labels = reval.test(val_set, modelfit) rndmisc_mean_val = reval.rndlabels_traineval(tr_set, val_set, tr_labels, val_labels) # If random labeling gives perfect prediction, substitute # misclassification for validation loop with 1.0 if rndmisc_mean_val > 0.0: ms_val = miscl_val / rndmisc_mean_val else: ms_val = 1.0 return len([n for n in np.unique(val_labels) if n >= 0]), miscl_tr, ms_val, tr_labels, val_labels except TypeError: return None, None, None, None, None
[docs]def _confint(vect): """ Private function to compute confidence interval. :param vect: performance scores. :type vect: array-like :return: mean and error. :rtype: tuple """ mean = np.mean(vect) # interval = 1.96 * math.sqrt((mean * (1 - mean)) / len(vect)) interval = stats.t.ppf(1 - (0.05 / 2), len(vect) - 1) * (np.std(vect) / math.sqrt(len(vect))) return mean, interval