Skip to content

Commit c9fbd6e

Browse files
authored
Add arrow version to on-disk sampling (#121)
1 parent d23397e commit c9fbd6e

File tree

2 files changed

+90
-45
lines changed

2 files changed

+90
-45
lines changed

docs/src/example.md

Lines changed: 88 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -1,94 +1,139 @@
11

2-
# Illustrative Examples
2+
# Some Applications
33

44
## Sampling from Data on Disk
55

66
Suppose we want to sample from large datasets stored on disk. `StreamSampling.jl`
77
is very suited for this task. Let's simulate this task by generating some data in
8-
HDF5 format and batch sampling them. You will need 10GB of space on disk for running
9-
this example. If not available you can set a smaller size for `totaltuples`.
8+
HDF5 and arrow formats and batch sampling them. You will need 20GB of space on disk
9+
for running this example. If not available you can set a smaller size for `totaltpl`.
10+
11+
We first generate the datasets and store them with
1012

11-
We first generate the dataset and store it with
1213

1314
```julia
14-
using StreamSampling, Random, ChunkSplitters, HDF5
15+
using StreamSampling, Random, ChunkSplitters
16+
using HDF5, Arrow
1517

1618
const dtype = @NamedTuple{a::Float64, b::Float64, c::Float64, d::Float64}
17-
const totaltuples = 10^10÷32
18-
const chunktuples = 5*10^5
19-
const numchunks = ceil(Int, totaltuples / chunktuples)
20-
21-
function generate_large_hdf5_file(filename)
22-
h5open(filename, "w") do file
23-
dset = create_dataset(file, "data", dtype, (totaltuples,), chunk=(chunktuples,))
24-
Threads.@threads for i in 1:numchunks
25-
startrow, endrow = (i-1)*chunktuples+1, min(i*chunktuples, totaltuples)
26-
dset[startrow:endrow] = map(i -> (a=rand(), b=rand(), c=rand(), d=rand()),
27-
1:endrow-startrow+1)
19+
const totaltpl = 10^10÷32
20+
const chunktpl = 5*10^5
21+
const numchunks = ceil(Int, totaltpl / chunktpl)
22+
23+
function generate_file(filename, format)
24+
if format == :hdf5
25+
h5open(filename, "w") do file
26+
dset = create_dataset(file, "data", dtype, (totaltpl,), chunk=(chunktpl,))
27+
Threads.@threads for i in 1:numchunks
28+
starttpl, endtpl = (i-1)*chunktpl+1, min(i*chunktpl, totaltpl)
29+
dset[starttpl:endtpl] = map(i -> (a=rand(), b=rand(), c=rand(), d=rand()),
30+
1:endtpl-starttpl+1)
31+
end
32+
end
33+
elseif format == :arrow
34+
open(Arrow.Writer, filename) do writer
35+
for i in 1:numchunks
36+
starttpl, endtpl = (i-1)*chunktpl+1, min(i*chunktpl, totaltpl)
37+
Arrow.write(writer, (data=map(i -> (a=rand(), b=rand(), c=rand(), d=rand()),
38+
1:endtpl-starttpl+1),))
39+
end
2840
end
2941
end
3042
end
3143

32-
!isfile("large_random_data.h5") && generate_large_hdf5_file("large_random_data.h5")
44+
!isfile("random_data.h5") && generate_file("random_data.h5", :hdf5)
45+
!isfile("random_data.arrow") && generate_file("random_data.arrow", :arrow)
3346
```
3447

35-
Then we can sample it using 1 thread with
48+
Then we can sample them using 1 thread with
3649

3750
```julia
38-
function sample_large_hdf5_file(filename, rng, n, alg)
51+
function sample_file(filename, rng, n, alg, format)
3952
rs = ReservoirSampler{dtype}(rng, n, alg)
40-
h5open(filename, "r") do file
41-
dset = file["data"]
42-
for i in 1:numchunks
43-
startrow, endrow = (i-1)*chunktuples+1, min(i*chunktuples, totaltuples)
44-
data_chunk = dset[startrow:endrow]
45-
for d in data_chunk
46-
fit!(rs, d)
53+
if format == :hdf5
54+
h5open(filename, "r") do file
55+
dset = file["data"]
56+
for i in 1:numchunks
57+
starttpl, endtpl = (i-1)*chunktpl+1, min(i*chunktpl, totaltpl)
58+
data_chunk = dset[starttpl:endtpl]
59+
for d in data_chunk
60+
fit!(rs, d)
61+
end
4762
end
4863
end
64+
elseif format == :arrow
65+
rs = ReservoirSampler{dtype}(rng, n, alg)
66+
data = Arrow.Table(filename).data
67+
@inbounds for i in 1:length(data)
68+
fit!(rs, data[i])
69+
end
4970
end
5071
return rs
5172
end
5273

5374
rng = Xoshiro(42)
54-
@time rs = sample_large_hdf5_file("large_random_data.h5", rng, 10^7, AlgRSWRSKIP())
75+
@time rs = sample_file("random_data.h5", rng, 10^7, AlgRSWRSKIP(), :hdf5)
5576
```
5677
```julia
5778
43.514238 seconds (937.21 M allocations: 42.502 GiB, 2.57% gc time)
5879
```
80+
```julia
81+
@time rs = sample_file("random_data.arrow", rng, 10^7, AlgRSWRSKIP(), :arrow)
82+
```
83+
```julia
84+
38.635389 seconds (1.25 G allocations: 33.500 GiB, 3.52% gc time, 75763 lock conflicts)
85+
```
5986

6087
We can try to improve the performance by using multiple threads. Here, I started Julia
61-
with `julia -t6 --gcthreads=6,1` on my machine
88+
with `julia -t4 --gcthreads=4,1` on my machine
6289

6390
```julia
64-
function psample_large_hdf5_file(filename, rngs, n, alg)
91+
function psample_file(filename, rngs, n, alg, format)
6592
rsv = [ReservoirSampler{dtype}(rngs[i], n, alg) for i in 1:Threads.nthreads()]
66-
h5open(filename, "r") do file
67-
dset = file["data"]
68-
for c in chunks(1:numchunks; n=ceil(Int, numchunks/Threads.nthreads()))
69-
Threads.@threads for (j, i) in collect(enumerate(c))
70-
startrow, endrow = (i-1)*chunktuples+1, min(i*chunktuples, totaltuples)
71-
data_chunk, rs = dset[startrow:endrow], rsv[j]
72-
for d in data_chunk
73-
fit!(rs, d)
93+
if format == :hdf5
94+
h5open(filename, "r") do file
95+
dset = file["data"]
96+
for c in chunks(1:numchunks; n=ceil(Int, numchunks/Threads.nthreads()))
97+
Threads.@threads for (j, i) in collect(enumerate(c))
98+
starttpl, endtpl = (i-1)*chunktpl+1, min(i*chunktpl, totaltpl)
99+
data_chunk, rs = dset[starttpl:endtpl], rsv[j]
100+
for d in data_chunk
101+
fit!(rs, d)
102+
end
74103
end
75104
end
76105
end
106+
elseif format == :arrow
107+
data = Arrow.Table(filename).data
108+
Threads.@threads for (i,c) in enumerate(chunks(1:length(data), n=Threads.nthreads()))
109+
@inbounds for j in c
110+
fit!(rsv[i], data[j])
111+
end
112+
end
77113
end
78114
return merge(rsv...)
79115
end
80116

81117
rngs = [Xoshiro(i) for i in 1:Threads.nthreads()]
82-
@time rs = psample_large_hdf5_file("large_random_data.h5", rngs, 10^7, AlgRSWRSKIP())
118+
@time rs = psample_file("random_data.h5", rngs, 10^7, AlgRSWRSKIP(), :hdf5)
83119
```
84120
```julia
85-
21.545665 seconds (937.21 M allocations: 46.525 GiB, 9.50% gc time, 14913 lock conflicts)
121+
23.240628 seconds (937.23 M allocations: 45.185 GiB, 9.52% gc time, 9375 lock conflicts)
86122
```
123+
```julia
124+
@time rs = psample_file("random_data.arrow", rngs, 10^7, AlgRSWRSKIP(), :arrow)
125+
```
126+
```julia
127+
5.868995 seconds (175.91 k allocations: 3.288 GiB, 6.44% gc time, 64714 lock conflicts)
128+
```
129+
130+
As you can see, the speed-up is not linear in the number of threads for an hdf5 file. This is
131+
mainly due to the fact that accessing the chunks is single-threaded, so one would need to use
132+
`MPI.jl` as explained at https://juliaio.github.io/HDF5.jl/stable/mpi/ to improve the multi-threading
133+
performance. Though, we are already sampling at 500MB/s, which is not bad!
87134

88-
As you can see, the speed-up is not linear in the number of threads. This is mainly due to
89-
the fact that accessing the chunks is single-threaded, so one would need to use `MPI.jl` as
90-
explained at [https://juliaio.github.io/HDF5.jl/stable/mpi/](https://juliaio.github.io/HDF5.jl/stable/mpi/)
91-
to improve the multi-threading performance. Though, we are already sampling at 500MB/S, which is not bad!
135+
Using `Arrow.jl` gives an even better performance, and a scalability which is better than
136+
linear somehow, reaching a 2GB/s sampling speed!
92137

93138
## Monitoring
94139

src/WeightedSamplingMulti.jl

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -211,7 +211,7 @@ end
211211
function Base.merge!(s1::MultiAlgAResSampler, ss::MultiAlgAResSampler...)
212212
length(typeof(s1.value.valtree).parameters) == 3 && error("Merging ordered reservoirs is not possible")
213213
s1.n > minimum(s.n for s in ss) && error("The size of the mutated reservoir should be the minimum size between all merged reservoir")
214-
empty!(s1.value)
214+
empty!(s1.value.valtree)
215215
newvalue = reduce_samples(TypeS(), s1, ss...)
216216
for e in newvalue
217217
push!(s1.value, e[1] => e[2])
@@ -222,7 +222,7 @@ end
222222
function Base.merge!(s1::MultiAlgAExpJSampler, ss::MultiAlgAExpJSampler...)
223223
length(typeof(s1.value.valtree).parameters) == 3 && error("Merging ordered reservoirs is not possible")
224224
s1.n > minimum(s.n for s in ss) && error("The size of the mutated reservoir should be the minimum size between all merged reservoir")
225-
empty!(s1.value)
225+
empty!(s1.value.valtree)
226226
newvalue = reduce_samples(TypeS(), s1, ss...)
227227
for e in newvalue
228228
push!(s1.value, e[1] => e[2])

0 commit comments

Comments
 (0)