@@ -32,36 +32,39 @@ The list of strings available to define the metric are:
3232* `"manhattan"`, `"cityblock"`, `"taxicab"` or `"min"` for the Manhattan or L1 norm
3333 (`Cityblock()` in `Distances`).
3434"""
35- distancematrix (x, metric:: Union{Metric,String} = DEFAULT_METRIC) = distancematrix (x, x, metric)
35+ distancematrix (x, metric:: Union{Metric,String} = DEFAULT_METRIC, parallel = size (x)[ 1 ] > 500 ) = _distancematrix ( x, getmetric ( metric), Val (parallel) )
3636
3737# For 1-dimensional arrays (vectors), the distance does not depend on the metric
38- distancematrix (x:: Vector , y:: Vector , metric= DEFAULT_METRIC) = abs .(x .- y' )
38+ distancematrix (x:: Vector , y:: Vector , metric= DEFAULT_METRIC, parallel = size (x)[ 1 ] > 500 ) = abs .(x .- y' )
3939
4040# If the metric is supplied as a string, get the corresponding Metric from Distances
41- distancematrix (x, y, metric:: String ) = distancematrix (x, y, getmetric (metric))
41+ distancematrix (x, y, metric:: String , parallel = size (x)[ 1 ] > 500 ) = distancematrix (x, y, getmetric (metric), parallel )
4242
4343const MAXDIM = 9
44- function distancematrix (x:: Tx , y:: Ty , metric:: Metric = DEFAULT_METRIC) where
44+ function distancematrix (x:: Tx , y:: Ty , metric:: Metric = DEFAULT_METRIC, parallel = size (x)[ 1 ] > 500 ) where
4545 {Tx<: Union{AbstractMatrix, Dataset} } where {Ty<: Union{AbstractMatrix, Dataset} }
4646 sx, sy = size (x), size (y)
47- if sx[2 ] != sy[2 ]
48- error (" the dimensions of `x` and `y` data points must be the equal" )
49- end
47+ @assert sx[2 ] == sy[2 ] """
48+ The dimensions of the data points in `x` and `y` must be equal!
49+ Found dim(x)=$(sx[2 ]) , dim(y)=$(sy[2 ]) .
50+ """
51+
5052 if sx[2 ] ≥ MAXDIM && typeof (metric) == Euclidean # Blas optimization
51- return _distancematrix (Matrix (x), Matrix (y), metric)
53+ return _distancematrix (Matrix (x), Matrix (y), metric, Val (parallel) )
5254 elseif Tx <: Matrix && Ty <: Matrix && metric == Chebyshev ()
53- return _distancematrix (x, y, metric)
55+ return _distancematrix (x, y, metric, Val (parallel) )
5456 else
55- return _distancematrix (Dataset (x), Dataset (y), metric)
57+ return _distancematrix (Dataset (x), Dataset (y), metric, Val (parallel) )
5658 end
5759end
5860
5961# Core function for Matrices (wrapper of `pairwise` from the Distances package)
60- _distancematrix (x:: AbstractMatrix , y:: AbstractMatrix , metric:: Metric ) =
61- pairwise (metric, x' , y' , dims= 2 )
62+ _distancematrix (x:: AbstractMatrix , y:: AbstractMatrix , metric:: Metric , :: Val{false} ) = pairwise (metric, x' , y' , dims= 2 )
63+
64+ # First we define the serial versions of the functions.
6265# Core function for Datasets
6366function _distancematrix (x:: Dataset{S,Tx} , y:: Dataset{S,Ty} ,
64- metric:: Metric ) where {S, Tx, Ty}
67+ metric:: Metric , :: Val{false} ) where {S, Tx, Ty}
6568
6669 x = x. data
6770 y = y. data
@@ -74,6 +77,127 @@ function _distancematrix(x::Dataset{S,Tx}, y::Dataset{S,Ty},
7477 return d
7578end
7679
80+ # Now, we define the parallel versions.
81+
82+ function _distancematrix (x:: Dataset{S,Tx} , y:: Dataset{S,Ty} ,
83+ metric:: Metric , :: Val{true} ) where {S, Tx, Ty}
84+
85+ x = x. data
86+ y = y. data
87+ d = zeros (promote_type (Tx,Ty), length (x), length (y))
88+ Threads. @threads for j in 1 : length (y)
89+ for i in 1 : length (x)
90+ @inbounds d[i,j] = evaluate (metric, x[i], y[j])
91+ end
92+ end
93+ return d
94+ end
95+
96+ function _distancematrix (x:: Matrix{Tx} , y:: Matrix{Ty} ,
97+ metric:: Metric , :: Val{true} ) where {Tx, Ty}
98+
99+ x = x. data
100+ y = y. data
101+ d = zeros (promote_type (Tx,Ty), length (x), length (y))
102+ Threads. @threads for j in 1 : length (y)
103+ for i in 1 : length (x)
104+ @inbounds d[i,j] = evaluate (metric, x[i, :], y[j, :])
105+ end
106+ end
107+ return d
108+ end
109+
110+
111+ # Now, we define methods for the single trajectory.
112+ # We can speed this up by only calculating the lower triangle,
113+ # which halves the number of computations needed.
114+
115+ # Again, we'll define the serial version first:
116+ function _distancematrix (x:: Vector{T} , metric:: Metric , :: Val{false} ) where T
117+ d = zeros (T, length (x), length (x))
118+
119+ for j in 1 : length (x)
120+ for i in 1 : j
121+ @inbounds d[i, j] = abs (x[i] - x[j])
122+ end
123+ end
124+
125+ return Symmetric (d, :U )
126+
127+ end
128+
129+ function _distancematrix (x:: Dataset{S, T} , metric:: Metric , :: Val{false} ) where T where S
130+ d = zeros (T, length (x), length (x))
131+
132+ for j in 1 : length (x)
133+ for i in 1 : j
134+ @inbounds d[i, j] = evaluate (metric, x[i], x[j])
135+ end
136+ end
137+
138+ return Symmetric (d, :U )
139+
140+ end
141+
142+ # Now, we define the parallel version. There's a twist, though.
143+
144+ # The single trajectory case is a little tricky. If you try it naively,
145+ # using the method we used for the serial computation above, the points are
146+ # partioned out unequally between threads. This means that performance actually
147+ # **decreases** compared to the full parallel version. To mitigate this, we need
148+ # to partition the computation equally among all threads.
149+ # The function I've written below does essentially this - given a length,
150+ # it calculates the appropriate partitioning, and returns a list of indices,
151+ # by utilizing the fact that the area is proportional to the square of the height.
152+ # It partitions the "triangle" which needs to be computed into "trapezoids",
153+ # which all have an equal area.
154+ function partition_indices (len)
155+ indices = Vector {UnitRange{Int}} (undef, Threads. nthreads ())
156+ length = len
157+ offset = 1
158+
159+ # Here, we have to iterate in reverse, to get an equal area every time
160+ for n in Threads. nthreads (): - 1 : 1
161+ partition = round (Int, length / sqrt (n)) # length varies as square root of area
162+ indices[n] = offset: (partition .+ offset .- 1 )
163+ length -= partition
164+ offset += partition
165+ end
166+
167+ return indices
168+ end
169+
170+ # And now for the actual implementation:
171+ function _distancematrix (x:: Vector{T} , metric:: Metric , :: Val{true} ) where T
172+ d = zeros (T, length (x), length (x))
173+
174+ Threads. @threads for k in partition_indices (length (x))
175+ for j in k
176+ for i in 1 : j
177+ @inbounds d[i, j] = abs (x[i] - x[j])
178+ end
179+ end
180+ end
181+
182+ return Symmetric (d, :U )
183+
184+ end
185+
186+ function _distancematrix (x:: Dataset{S, T} , metric:: Metric , :: Val{true} ) where T where S
187+ d = zeros (T, length (x), length (x))
188+
189+ Threads. @threads for k in partition_indices (length (x))
190+ for j in k
191+ for i in 1 : j
192+ @inbounds d[i, j] = evaluate (metric, x[i], x[j])
193+ end
194+ end
195+ end
196+
197+ return Symmetric (d, :U )
198+
199+ end
200+
77201# ###############################################################################
78202# AbstractRecurrenceMatrix type definitions and documentation strings
79203# ###############################################################################
@@ -308,12 +432,9 @@ Return a sparse matrix which encodes recurrence points.
308432
309433Note that `parallel` may be either `Val(true)` or `Val(false)`.
310434"""
311- recurrence_matrix (x, metric:: Metric , ε:: Real , parallel:: Val ) =
312- recurrence_matrix (x, x, metric, ε, parallel)
313-
314- # Convert Matrices to Datasets (better performance in all cases)
315435function recurrence_matrix (x:: AbstractMatrix , y:: AbstractMatrix ,
316436 metric:: Metric , ε, parallel:: Val )
437+ # Convert Matrices to Datasets (better performance in all cases)
317438 return recurrence_matrix (Dataset (x), Dataset (y), metric, ε, parallel)
318439end
319440
@@ -357,6 +478,42 @@ function recurrence_matrix(x::AbstractVector, y::AbstractVector, metric, ε, par
357478 return sparse (rowvals, colvals, nzvals, length (x), length (y))
358479end
359480
481+ function recurrence_matrix (x:: AbstractVector , metric:: Metric , ε:: Real , parallel:: Val{false} )
482+ rowvals = Vector {Int} ()
483+ colvals = Vector {Int} ()
484+ for j in 1 : length (x)
485+ nzcol = 0
486+ for i in 1 : j
487+ if @inbounds abs (x[i] - x[j]) ≤ ε
488+ push! (rowvals, i)
489+ nzcol += 1
490+ end
491+ end
492+ append! (colvals, fill (j, (nzcol,)))
493+ end
494+ nzvals = fill (true , (length (rowvals),))
495+ return Symmetric (sparse (rowvals, colvals, nzvals, length (x), length (x)), :U )
496+ end
497+
498+ function recurrence_matrix (xx:: Dataset , metric:: Metric , ε:: Real , parallel:: Val{false} )
499+ x = xx. data
500+ rowvals = Vector {Int} ()
501+ colvals = Vector {Int} ()
502+ for j in 1 : length (x)
503+ nzcol = 0
504+ for i in 1 : j
505+ @inbounds if evaluate (metric, x[i], x[j]) ≤ ε
506+ push! (rowvals, i)
507+ nzcol += 1
508+ end
509+ end
510+ append! (colvals, fill (j, (nzcol,)))
511+ end
512+ nzvals = fill (true , (length (rowvals),))
513+ return Symmetric (sparse (rowvals, colvals, nzvals, length (x), length (x)), :U )
514+ end
515+
516+
360517# Now, we define the parallel versions of these functions.
361518
362519# Core function
@@ -412,3 +569,61 @@ function recurrence_matrix(x::AbstractVector, y::AbstractVector, metric, ε, par
412569 nzvals = fill (true , (length (finalrows),))
413570 return sparse (finalrows, finalcols, nzvals, length (x), length (y))
414571end
572+
573+
574+
575+ function recurrence_matrix (xx:: AbstractVector , metric:: Metric , ε:: Real , parallel:: Val{true} )
576+ x = xx
577+ # We create an `Array` of `Array`s, for each thread to have its
578+ # own array to push to. This avoids race conditions with
579+ # multiple threads pushing to the same `Array` (`Array`s are not atomic).
580+ rowvals = [Vector {Int} () for _ in 1 : Threads. nthreads ()]
581+ colvals = [Vector {Int} () for _ in 1 : Threads. nthreads ()]
582+
583+ # This is the same logic as the serial function, but parallelized.
584+ Threads. @threads for k in partition_indices (length (x))
585+ threadn = Threads. threadid ()
586+ for j in k
587+ nzcol = 0
588+ for i in 1 : j
589+ @inbounds if abs (x[i] - x[j]) ≤ ε
590+ push! (rowvals[threadn], i) # push to the thread-specific row array
591+ nzcol += 1
592+ end
593+ end
594+ append! (colvals[threadn], fill (j, (nzcol,)))
595+ end
596+ end
597+ finalrows = vcat (rowvals... ) # merge into one array
598+ finalcols = vcat (colvals... ) # merge into one array
599+ nzvals = fill (true , (length (finalrows),))
600+ return Symmetric (sparse (finalrows, finalcols, nzvals, length (x), length (x)), :U )
601+ end
602+
603+ function recurrence_matrix (xx:: Dataset , metric:: Metric , ε:: Real , parallel:: Val{true} )
604+ x = xx. data
605+ # We create an `Array` of `Array`s, for each thread to have its
606+ # own array to push to. This avoids race conditions with
607+ # multiple threads pushing to the same `Array` (`Array`s are not atomic).
608+ rowvals = [Vector {Int} () for _ in 1 : Threads. nthreads ()]
609+ colvals = [Vector {Int} () for _ in 1 : Threads. nthreads ()]
610+
611+ # This is the same logic as the serial function, but parallelized.
612+ Threads. @threads for k in partition_indices (length (x))
613+ threadn = Threads. threadid ()
614+ for j in k
615+ nzcol = 0
616+ for i in 1 : j
617+ @inbounds if evaluate (metric, x[i], x[j]) ≤ ε
618+ push! (rowvals[threadn], i) # push to the thread-specific row array
619+ nzcol += 1
620+ end
621+ end
622+ append! (colvals[threadn], fill (j, (nzcol,)))
623+ end
624+ end
625+ finalrows = vcat (rowvals... ) # merge into one array
626+ finalcols = vcat (colvals... ) # merge into one array
627+ nzvals = fill (true , (length (finalrows),))
628+ return Symmetric (sparse (finalrows, finalcols, nzvals, length (x), length (x)), :U )
629+ end
0 commit comments