|
1 | 1 |
|
2 | | -# An Illustrative Example |
| 2 | +# Illustrative Examples |
| 3 | + |
| 4 | +## Sampling from Data on Disk |
| 5 | + |
| 6 | +Suppose we want to sample from large datasets stored on disk. `StreamSampling.jl` |
| 7 | +is very suited for this task. Let's simulate this task by generating some data in |
| 8 | +HDF5 format and batch sampling them. You will need 10GB of space on disk for running |
| 9 | +this example. If not available you can set a smaller size for `totaltuples`. |
| 10 | + |
| 11 | +We first generate the dataset and store it with |
| 12 | + |
| 13 | +```julia |
| 14 | +using StreamSampling, Random, ChunkSplitters, HDF5 |
| 15 | + |
| 16 | +const dtype = @NamedTuple{a::Float64, b::Float64, c::Float64, d::Float64} |
| 17 | +const totaltuples = 10^10÷32 |
| 18 | +const chunktuples = 5*10^5 |
| 19 | +const numchunks = ceil(Int, totaltuples / chunktuples) |
| 20 | + |
| 21 | +function generate_large_hdf5_file(filename) |
| 22 | + h5open(filename, "w") do file |
| 23 | + dset = create_dataset(file, "data", dtype, (totaltuples,), chunk=(chunktuples,)) |
| 24 | + Threads.@threads for i in 1:numchunks |
| 25 | + startrow, endrow = (i-1)*chunktuples+1, min(i*chunktuples, totaltuples) |
| 26 | + dset[startrow:endrow] = map(i -> (a=rand(), b=rand(), c=rand(), d=rand()), |
| 27 | + 1:endrow-startrow+1) |
| 28 | + end |
| 29 | + end |
| 30 | +end |
| 31 | + |
| 32 | +!isfile("large_random_data.h5") && generate_large_hdf5_file("large_random_data.h5") |
| 33 | +``` |
| 34 | + |
| 35 | +Then we can sample it using 1 thread with |
| 36 | + |
| 37 | +```julia |
| 38 | +function sample_large_hdf5_file(filename, rng, n, alg) |
| 39 | + rs = ReservoirSampler{dtype}(rng, n, alg) |
| 40 | + h5open(filename, "r") do file |
| 41 | + dset = file["data"] |
| 42 | + for i in 1:numchunks |
| 43 | + startrow, endrow = (i-1)*chunktuples+1, min(i*chunktuples, totaltuples) |
| 44 | + data_chunk = dset[startrow:endrow] |
| 45 | + for d in data_chunk |
| 46 | + fit!(rs, d) |
| 47 | + end |
| 48 | + end |
| 49 | + end |
| 50 | + return rs |
| 51 | +end |
| 52 | + |
| 53 | +rng = Xoshiro(42) |
| 54 | +@time rs = sample_large_hdf5_file("large_random_data.h5", rng, 10^7, AlgRSWRSKIP()) |
| 55 | +``` |
| 56 | +```julia |
| 57 | + 43.514238 seconds (937.21 M allocations: 42.502 GiB, 2.57% gc time) |
| 58 | +``` |
| 59 | + |
| 60 | +We can try to improve the performance by using multiple threads. Here, I started Julia |
| 61 | +with `julia -t6 --gcthreads=6,1` on my machine |
| 62 | + |
| 63 | +```julia |
| 64 | +function psample_large_hdf5_file(filename, rngs, n, alg) |
| 65 | + rsv = [ReservoirSampler{DATATYPE}(rngs[i], n, alg) for i in 1:Threads.nthreads()] |
| 66 | + h5open(filename, "r") do file |
| 67 | + dset = file["data"] |
| 68 | + for c in chunks(1:numchunks; n=ceil(Int, numchunks/Threads.nthreads())) |
| 69 | + Threads.@threads for (j, i) in collect(enumerate(c)) |
| 70 | + startrow, endrow = (i-1)*chunktuples+1, min(i*chunktuples, totaltuples) |
| 71 | + data_chunk, rs = dset[startrow:endrow], rsv[j] |
| 72 | + for d in data_chunk |
| 73 | + fit!(rs, d) |
| 74 | + end |
| 75 | + end |
| 76 | + end |
| 77 | + end |
| 78 | + return merge(rsv...) |
| 79 | +end |
| 80 | + |
| 81 | +rngs = [Xoshiro(i) for i in 1:Threads.nthreads()] |
| 82 | +@time rs = psample_large_hdf5_file("large_random_data.h5", rngs, 10^7, AlgRSWRSKIP()) |
| 83 | +``` |
| 84 | +```julia |
| 85 | + 21.545665 seconds (937.21 M allocations: 46.525 GiB, 9.50% gc time, 14913 lock conflicts) |
| 86 | +``` |
| 87 | + |
| 88 | +As you can see, the speed-up is not linear in the number of threads. This is mainly due to |
| 89 | +the fact that accessing the chunks is single-threaded, so one would need to use `MPI.jl` as |
| 90 | +explained at https://juliaio.github.io/HDF5.jl/stable/mpi/ to improve the multi-threading |
| 91 | +performance. Though, we are already sampling at 500MB/S, which is not bad! |
| 92 | + |
| 93 | +## Monitoring |
3 | 94 |
|
4 | 95 | Suppose to receive data about some process in the form of a stream and you want |
5 | 96 | to detect if anything is going wrong in the data being received. A reservoir |
6 | 97 | sampling approach could be useful to evaluate properties on the data stream. |
7 | 98 | This is a demonstration of such a use case using `StreamSampling.jl`. We will |
8 | 99 | assume that the monitored statistic in this case is the mean of the data, and |
9 | 100 | you want that to be lower than a certain threshold otherwise some malfunctioning |
10 | | -is expected. |
| 101 | +is expected |
11 | 102 |
|
12 | | -```@example 1 |
| 103 | +```julia |
13 | 104 | using StreamSampling, Statistics, Random |
14 | 105 |
|
15 | 106 | function monitor(stream, thr) |
|
29 | 120 |
|
30 | 121 | We use some toy data for illustration |
31 | 122 |
|
32 | | -```@example 1 |
| 123 | +```julia |
33 | 124 | stream = 1:10^8; # the data stream |
34 | 125 | thr = 2*10^7; # the threshold for the mean monitoring |
35 | 126 | ``` |
36 | 127 |
|
37 | 128 | Then, we run the monitoring |
38 | 129 |
|
39 | | -```@example 1 |
| 130 | +```julia |
40 | 131 | rs = monitor(stream, thr); |
41 | 132 | ``` |
42 | 133 |
|
43 | 134 | The number of observations until the detection is triggered is |
44 | 135 | given by |
45 | 136 |
|
46 | | -```@example 1 |
| 137 | +```julia |
47 | 138 | nobs(rs) |
48 | 139 | ``` |
49 | 140 |
|
|
0 commit comments