diff --git a/tslearn/barycenters/dba.py b/tslearn/barycenters/dba.py index 774edaacd..266406fc0 100644 --- a/tslearn/barycenters/dba.py +++ b/tslearn/barycenters/dba.py @@ -3,6 +3,7 @@ from scipy.interpolate import interp1d from sklearn.exceptions import ConvergenceWarning from sklearn.utils import check_random_state +from joblib import parallel_backend, Parallel, delayed from ..metrics import dtw_path from ..utils import to_time_series_dataset, ts_size @@ -148,7 +149,8 @@ def dtw_barycenter_averaging_petitjean(X, barycenter_size=None, for dynamic time warping, with applications to clustering. Pattern Recognition, Elsevier, 2011, Vol. 44, Num. 3, pp. 678-693 """ - X_ = to_time_series_dataset(X) + #X_ = to_time_series_dataset(X) + X_ = X if barycenter_size is None: barycenter_size = X_.shape[1] weights = _set_weights(weights, X_.shape[0]) @@ -176,7 +178,7 @@ def dtw_barycenter_averaging_petitjean(X, barycenter_size=None, return barycenter -def _mm_assignment(X, barycenter, weights, metric_params=None): +def _mm_assignment(X, barycenter, weights, metric_params=None, n_treads=15): """Computes item assignement based on DTW alignments and return cost as a bonus. @@ -210,15 +212,23 @@ def _mm_assignment(X, barycenter, weights, metric_params=None): n = X.shape[0] cost = 0. list_p_k = [] - for i in range(n): - path, dist_i = dtw_path(barycenter, X[i], **metric_params) - cost += dist_i ** 2 * weights[i] - list_p_k.append(path) + #with parallel_backend('threading'): + res = Parallel(backend='threading',prefer="threads",require='sharedmem',n_jobs=n_treads,verbose=10)(delayed(dtw_path)(barycenter, X[i], global_constraint="sakoe_chiba",sakoe_chiba_radius=1) for i in range(n)) + paths, dists = zip(*res) + paths = list(paths) + dists = list(dists) + cost = numpy.sum(numpy.multiply(numpy.power(dists, 2),weights)) cost /= weights.sum() + list_p_k = paths + #for i in range(n): + # path, dist_i = dtw_path(barycenter, X[i], **metric_params) + # cost += dist_i ** 2 * weights[i] + # list_p_k.append(path) + #cost /= weights.sum() return list_p_k, cost -def _subgradient_valence_warping(list_p_k, barycenter_size, weights): +def _subgradient_valence_warping(list_p_k, barycenter_size, weights, n_treads): """Compute Valence and Warping matrices from paths. Valence matrices are denoted :math:`V^{(k)}` and Warping matrices are @@ -253,19 +263,22 @@ def _subgradient_valence_warping(list_p_k, barycenter_size, weights): for Averaging in Dynamic Time Warping Spaces. Pattern Recognition, 74, 340-358. """ - list_v_k = [] - list_w_k = [] - for k, p_k in enumerate(list_p_k): - sz_k = p_k[-1][1] + 1 - w_k = numpy.zeros((barycenter_size, sz_k)) + def create_w_k(p_k, barycenter_size): + w_k = numpy.zeros((barycenter_size, p_k[-1][1] + 1)) for i, j in p_k: w_k[i, j] = 1. - list_w_k.append(w_k * weights[k]) - list_v_k.append(w_k.sum(axis=1) * weights[k]) + return w_k + w_k_ones = Parallel(backend='threading',prefer='threads',require='sharedmem', n_jobs=n_treads, verbose = True)(delayed(create_w_k)(p_k,barycenter_size) for p_k in list_p_k) + def mult_w_k_weights(w_k, weight): + return w_k * weight + list_w_k = Parallel(backend='threading',prefer='threads',require='sharedmem', n_jobs=n_treads, verbose = True)(delayed(mult_w_k_weights)(w_k,weights[k]) for k,w_k in enumerate(w_k_ones)) + def mult_w_k_sum_wieghts(w_k, weight): + return w_k.sum(axis=1) * weight + list_v_k = Parallel(backend='threading',prefer='threads',require='sharedmem', n_jobs=n_treads, verbose = True)(delayed(mult_w_k_sum_wieghts)(w_k,weight) for (w_k, weight) in zip(w_k_ones, weights)) return list_v_k, list_w_k -def _mm_valence_warping(list_p_k, barycenter_size, weights): +def _mm_valence_warping(list_p_k, barycenter_size, weights, n_treads): """Compute Valence and Warping matrices from paths. Valence matrices are denoted :math:`V^{(k)}` and Warping matrices are @@ -304,13 +317,15 @@ def _mm_valence_warping(list_p_k, barycenter_size, weights): list_p_k=list_p_k, barycenter_size=barycenter_size, weights=weights) - diag_sum_v_k = numpy.zeros(list_v_k[0].shape) - for v_k in list_v_k: - diag_sum_v_k += v_k + #diag_sum_v_k = numpy.zeros(list_v_k[0].shape) + #for v_k in list_v_k: + # diag_sum_v_k += v_k + diag_sum_v_k = Parallel(backend='threading',prefer='threads',require='sharedmem', n_jobs=n_treads, verbose = True)(delayed(numpy.sum)(x) for x in zip(*numpy.array(list_v_k))) + diag_sum_v_k = numpy.array(diag_sum_v_k) return diag_sum_v_k, list_w_k -def _mm_update_barycenter(X, diag_sum_v_k, list_w_k): +def _mm_update_barycenter(X, diag_sum_v_k, list_w_k, n_treads): """Update barycenters using the formula from Algorithm 2 in [1]_. Parameters @@ -336,12 +351,17 @@ def _mm_update_barycenter(X, diag_sum_v_k, list_w_k): for Averaging in Dynamic Time Warping Spaces. Pattern Recognition, 74, 340-358. """ - d = X.shape[2] - barycenter_size = diag_sum_v_k.shape[0] - sum_w_x = numpy.zeros((barycenter_size, d)) - for k, (w_k, x_k) in enumerate(zip(list_w_k, X)): - sum_w_x += w_k.dot(x_k[:ts_size(x_k)]) - barycenter = numpy.diag(1. / diag_sum_v_k).dot(sum_w_x) + #d = X.shape[2] + #barycenter_size = diag_sum_v_k.shape[0] + #sum_w_x = numpy.zeros((barycenter_size, d)) + #for k, (w_k, x_k) in enumerate(zip(list_w_k, X)): + # sum_w_x += w_k.dot(x_k[:ts_size(x_k)]) + sum_w_x = Parallel(backend='threading',prefer='threads',require='sharedmem', n_jobs=n_treads, verbose = True)(delayed(numpy.dot)(w_k,x_k[:ts_size(x_k)]) for (w_k, x_k) in zip(list_w_k, X)) + sum_w_x = Parallel(backend='threading',prefer='threads',require='sharedmem',n_jobs=n_treads, verbose = True)(delayed(numpy.sum)(x) for x in zip(*numpy.array(sum_w_x))) + sum_w_x = numpy.array(sum_w_x).reshape((len(sum_w_x), 1)) + inverse_diag_sum_v_k = numpy.reciprocal(diag_sum_v_k) + inverse_diag_sum_v_k = numpy.diag(inverse_diag_sum_v_k) + barycenter = inverse_diag_sum_v_k.dot(sum_w_x) return barycenter @@ -392,7 +412,7 @@ def _subgradient_update_barycenter(X, list_diag_v_k, list_w_k, weights_sum, def dtw_barycenter_averaging(X, barycenter_size=None, init_barycenter=None, - max_iter=30, tol=1e-5, weights=None, + max_iter=30, tol=1e-5, weights=None, n_treads=15, metric_params=None, verbose=False, n_init=1): """DTW Barycenter Averaging (DBA) method estimated through @@ -514,6 +534,7 @@ def dtw_barycenter_averaging(X, barycenter_size=None, init_barycenter=None, def dtw_barycenter_averaging_one_init(X, barycenter_size=None, init_barycenter=None, max_iter=30, tol=1e-5, weights=None, + n_treads=15, metric_params=None, verbose=False): """DTW Barycenter Averaging (DBA) method estimated through @@ -576,23 +597,23 @@ def dtw_barycenter_averaging_one_init(X, barycenter_size=None, for Averaging in Dynamic Time Warping Spaces. Pattern Recognition, 74, 340-358. """ - X_ = to_time_series_dataset(X) + #X_ = to_time_series_dataset(X) if barycenter_size is None: - barycenter_size = X_.shape[1] - weights = _set_weights(weights, X_.shape[0]) + barycenter_size = X.shape[1] + weights = _set_weights(weights, X.shape[0]) if init_barycenter is None: - barycenter = _init_avg(X_, barycenter_size) + barycenter = _init_avg(X, barycenter_size) else: barycenter_size = init_barycenter.shape[0] barycenter = init_barycenter cost_prev, cost = numpy.inf, numpy.inf for it in range(max_iter): - list_p_k, cost = _mm_assignment(X_, barycenter, weights, metric_params) + list_p_k, cost = _mm_assignment(X, barycenter, weights, n_treads, metric_params) diag_sum_v_k, list_w_k = _mm_valence_warping(list_p_k, barycenter_size, - weights) + weights, n_treads) if verbose: print("[DBA] epoch %d, cost: %.3f" % (it + 1, cost)) - barycenter = _mm_update_barycenter(X_, diag_sum_v_k, list_w_k) + barycenter = _mm_update_barycenter(X, diag_sum_v_k, list_w_k, n_treads) if abs(cost_prev - cost) < tol: break elif cost_prev < cost: @@ -698,7 +719,8 @@ def dtw_barycenter_averaging_subgradient(X, barycenter_size=None, """ rng = check_random_state(random_state) - X_ = to_time_series_dataset(X) + #X_ = to_time_series_dataset(X) + X_ = X if barycenter_size is None: barycenter_size = X_.shape[1] weights = _set_weights(weights, X_.shape[0]) diff --git a/tslearn/clustering/kmeans.py b/tslearn/clustering/kmeans.py index 7df35dde0..ad83c7ef0 100644 --- a/tslearn/clustering/kmeans.py +++ b/tslearn/clustering/kmeans.py @@ -12,6 +12,7 @@ from scipy.spatial.distance import cdist import numpy import warnings +from joblib import parallel_backend, Parallel, delayed from tslearn.metrics import cdist_gak, cdist_dtw, cdist_soft_dtw, sigma_gak from tslearn.barycenters import euclidean_barycenter, \ @@ -506,7 +507,7 @@ class TimeSeriesKMeans(TransformerMixin, ClusterMixin, verbose : int (default: 0) If nonzero, print information about the inertia while learning - the model and joblib progress messages are printed. + the model and joblib progress messages are printed. random_state : integer or numpy.RandomState, optional Generator used to initialize the centers. If an integer is given, it @@ -662,11 +663,11 @@ def _transform(self, X): metric_params = self._get_metric_params() if self.metric == "euclidean": return cdist(X.reshape((X.shape[0], -1)), - self.cluster_centers_.reshape((self.n_clusters, -1)), - metric="euclidean") + self.cluster_centers_.reshape((self.n_clusters, -1)), + metric="euclidean") elif self.metric == "dtw": return cdist_dtw(X, self.cluster_centers_, n_jobs=self.n_jobs, - verbose=self.verbose, **metric_params) + verbose=self.verbose, **metric_params) elif self.metric == "softdtw": return cdist_soft_dtw(X, self.cluster_centers_, **metric_params) else: @@ -692,23 +693,37 @@ def _assign(self, X, update_class_attributes=True): def _update_centroids(self, X): metric_params = self._get_metric_params() + X_list = [] for k in range(self.n_clusters): - if self.metric == "dtw": - self.cluster_centers_[k] = dtw_barycenter_averaging( - X=X[self.labels_ == k], - barycenter_size=None, - init_barycenter=self.cluster_centers_[k], - metric_params=metric_params, - verbose=False) - elif self.metric == "softdtw": - self.cluster_centers_[k] = softdtw_barycenter( - X=X[self.labels_ == k], - max_iter=self.max_iter_barycenter, - init=self.cluster_centers_[k], - **metric_params) - else: - self.cluster_centers_[k] = euclidean_barycenter( - X=X[self.labels_ == k]) + X_list.append(X[self.labels_ == k]) + cluster_centers = Parallel(backend='threading', prefer='threads', require='sharedmem', n_jobs=self.n_clusters, verbose=5)(delayed(dtw_barycenter_averaging)( + X=X_list[k], + barycenter_size=None, + init_barycenter=self.cluster_centers_[k], + metric_params=metric_params, max_iter = 3, + verbose=True) for k in range(self.n_clusters)) + for k in range(self.n_clusters): + self.cluster_centers_[k] = cluster_centers[k] +# def _update_centroids(self, X): +# metric_params = self._get_metric_params() +# for k in range(self.n_clusters): +# if self.metric == "dtw": +# self.cluster_centers_[k] = (dtw_barycenter_averaging( +# X=X[self.labels_ == k], +# barycenter_size=None, +# init_barycenter=self.cluster_centers_[k], +# metric_params=metric_params, max_iter = 3, +# verbose=False)) +# elif self.metric == "softdtw": +# self.cluster_centers_[k] = softdtw_barycenter( +# X=X[self.labels_ == k], +# max_iter=self.max_iter_barycenter, +# init=self.cluster_centers_[k], +# **metric_params) +# else: +# self.cluster_centers_[k] = euclidean_barycenter( +# X=X[self.labels_ == k]) +# print(self.cluster_centers_[0]) def fit(self, X, y=None): """Compute k-means clustering. @@ -740,7 +755,7 @@ def fit(self, X, y=None): max_attempts = max(self.n_init, 10) - X_ = to_time_series_dataset(X) + X_ = X rs = check_random_state(self.random_state) if self.init == "k-means++" and self.metric == "euclidean": diff --git a/tslearn/metrics/dtw_variants.py b/tslearn/metrics/dtw_variants.py index 8bbd5a3de..a1ca99c03 100644 --- a/tslearn/metrics/dtw_variants.py +++ b/tslearn/metrics/dtw_variants.py @@ -24,7 +24,7 @@ def _local_squared_dist(x, y): @njit() def njit_accumulated_matrix(s1, s2, mask): """Compute the accumulated cost matrix score between two time series. - + Parameters ---------- s1 : array, shape = (sz1,) diff --git a/tslearn/metrics/utils.py b/tslearn/metrics/utils.py index ece61c435..630d68be6 100644 --- a/tslearn/metrics/utils.py +++ b/tslearn/metrics/utils.py @@ -1,5 +1,5 @@ import numpy -from joblib import Parallel, delayed +from joblib import Parallel, delayed, parallel_backend from tslearn.utils import to_time_series_dataset __author__ = 'Romain Tavenard romain.tavenard[at]univ-rennes2.fr' @@ -9,26 +9,21 @@ def _cdist_generic(dist_fun, dataset1, dataset2, n_jobs, verbose, compute_diagonal=True, dtype=numpy.float, *args, **kwargs): """Compute cross-similarity matrix with joblib parallelization for a given similarity function. - Parameters ---------- dist_fun : function Similarity function to be used - dataset1 : array-like A dataset of time series - dataset2 : array-like (default: None) Another dataset of time series. If `None`, self-similarity of `dataset1` is returned. - n_jobs : int or None, optional (default=None) The number of jobs to run in parallel. ``None`` means 1 unless in a :obj:`joblib.parallel_backend` context. ``-1`` means using all processors. See scikit-learns' `Glossary `__ for more details. - verbose : int, optional (default=0) The verbosity level: if non zero, progress messages are printed. Above 50, the output is sent to stdout. @@ -36,21 +31,17 @@ def _cdist_generic(dist_fun, dataset1, dataset2, n_jobs, verbose, If it more than 10, all iterations are reported. `Glossary `__ for more details. - compute_diagonal : bool (default: True) Whether diagonal terms should be computed or assumed to be 0 in the self-similarity case. Used only if `dataset2` is `None`. - *args and **kwargs : Optional additional parameters to be passed to the similarity function. - - Returns ------- cdist : numpy.ndarray Cross-similarity matrix """ # noqa: E501 - dataset1 = to_time_series_dataset(dataset1, dtype=dtype) + #dataset1 = to_time_series_dataset(dataset1, dtype=dtype) if dataset2 is None: # Inspired from code by @GillesVandewiele: @@ -60,7 +51,7 @@ def _cdist_generic(dist_fun, dataset1, dataset2, n_jobs, verbose, k=0 if compute_diagonal else 1, m=len(dataset1)) matrix[indices] = Parallel(n_jobs=n_jobs, - prefer="threads", + prefer="processes", verbose=verbose)( delayed(dist_fun)( dataset1[i], dataset1[j], @@ -74,12 +65,13 @@ def _cdist_generic(dist_fun, dataset1, dataset2, n_jobs, verbose, matrix[indices] = matrix.T[indices] return matrix else: - dataset2 = to_time_series_dataset(dataset2, dtype=dtype) - matrix = Parallel(n_jobs=n_jobs, prefer="threads", verbose=verbose)( - delayed(dist_fun)( - dataset1[i], dataset2[j], - *args, **kwargs + #dataset2 = to_time_series_dataset(dataset2, dtype=dtype) + with parallel_backend('loky'): + matrix = Parallel(backend='loky', n_jobs=n_jobs, prefer="processes", verbose=verbose)( + delayed(dist_fun)( + dataset1[i], dataset2[j], + *args, **kwargs + ) + for i in range(len(dataset1)) for j in range(len(dataset2)) ) - for i in range(len(dataset1)) for j in range(len(dataset2)) - ) return numpy.array(matrix).reshape((len(dataset1), -1))