|
5 | 5 | ############################################################################## |
6 | 6 |
|
7 | 7 | function drop_singletons!(esample, fes::Vector{<:FixedEffect}, nthreads) |
8 | | -# Main._esample = copy(esample); Main._fes=copy(fes); Main._nthreads=nthreads |
9 | 8 | nsingletons = 0 |
10 | | - dirtypasses = length(fes) |
11 | | - |
12 | | - bounds = ceil.(Int, (0:nthreads) * (length(esample) / nthreads)) |
13 | | - chunks = [bounds[t]+1:bounds[t+1] for t ∈ 1:nthreads] |
14 | | - |
15 | | - caches = [Vector{UInt8}(undef, fes[i].n) for i ∈ eachindex(fes)] |
16 | | - for (fe,cache) ∈ Iterators.cycle(zip(fes,caches)) # if 1 set of FE, only need 1 pass |
17 | | - n = drop_singletons!(esample, fe, cache, chunks) |
18 | | - if iszero(n) |
19 | | - dirtypasses -= 1 |
| 9 | + ncleanpasses = 0 |
| 10 | + caches = [Vector{UInt8}(undef, fes[i].n) for i in eachindex(fes)] |
| 11 | + for (fe, cache) in Iterators.cycle(zip(fes,caches)) |
| 12 | + n = drop_singletons!(esample, fe, cache) |
| 13 | + nsingletons += n |
| 14 | + if n > 0 |
| 15 | + # found singletons, reset the counter |
| 16 | + ncleanpasses = 0 |
20 | 17 | else |
21 | | - nsingletons += n |
22 | | - dirtypasses = length(fes) # restart counter |
| 18 | + # otherwise, increment counter |
| 19 | + ncleanpasses += 1 |
23 | 20 | end |
24 | | - dirtypasses <= 1 && break # done if there are N FE groups and at least last N-1 have been clean (includes case N=1) |
| 21 | + # if the last N-1 passes have not found singletons (where N is number of FE groups), break the loop |
| 22 | + ncleanpasses >= length(fes) - 1 && break |
25 | 23 | end |
26 | | - |
27 | | - nsingletons |
| 24 | + return nsingletons |
28 | 25 | end |
29 | 26 |
|
30 | | -function drop_singletons!(esample, fe::FixedEffect, cache, chunks)::Int |
31 | | -# Main._cache = cache; Main._chunks = copy(chunks) |
| 27 | +function drop_singletons!(esample, fe::FixedEffect, cache) |
32 | 28 | refs = fe.refs |
33 | | - |
34 | | - fill!(cache, zero(UInt8)) |
35 | | - @inbounds for i ∈ eachindex(esample,refs) # count obs in each FE group |
| 29 | + fill!(cache, 0) |
| 30 | + @inbounds for i in eachindex(esample, refs) # count obs in each FE group |
36 | 31 | if esample[i] |
37 | | - cache[refs[i]] = min(0x02, cache[refs[i]] + 0x01) # stop counting obs in a group at 2, to keep counters in 8-bit integers |
| 32 | + # no need to keep counting obs after 2 (counters are 8-bit integers) |
| 33 | + cache[refs[i]] = min(0x02, cache[refs[i]] + 0x01) |
38 | 34 | end |
39 | 35 | end |
40 | | - |
41 | | - tasks = map(chunks) do chunk # drop newly found singletons |
42 | | - Threads.@spawn begin |
43 | | - n = 0 |
44 | | - @inbounds for i ∈ chunk |
45 | | - if esample[i] && isone(cache[refs[i]]) |
46 | | - esample[i] = false |
47 | | - n += 1 |
48 | | - end |
49 | | - end |
50 | | - n |
| 36 | + n = 0 |
| 37 | + @inbounds for i in eachindex(esample, refs) |
| 38 | + if esample[i] && cache[refs[i]] == 0x01 |
| 39 | + esample[i] = false |
| 40 | + n += 1 |
51 | 41 | end |
52 | 42 | end |
53 | | - |
54 | | - sum(fetch.(tasks)) |
| 43 | + return n |
55 | 44 | end |
56 | 45 |
|
57 | 46 |
|
|
0 commit comments