|
4 | 4 | ## |
5 | 5 | ############################################################################## |
6 | 6 |
|
7 | | -function drop_singletons!(esample, fes::Vector{<:FixedEffect}) |
8 | | - ns = Int[] |
9 | | - for fe in Iterators.cycle(fes) |
10 | | - # break loop if number of singletons did not change since the last time fe was iterated on |
11 | | - if length(ns) >= length(fes) && sum(view(ns, (length(ns)-length(fes)+1):length(ns))) == ns[end-length(fes)+1] |
12 | | - break |
| 7 | +function drop_singletons!(esample, fes::Vector{<:FixedEffect}, nthreads) |
| 8 | +# Main._esample = copy(esample); Main._fes=copy(fes); Main._nthreads=nthreads |
| 9 | + nsingletons = 0 |
| 10 | + dirtypasses = length(fes) |
| 11 | + |
| 12 | + bounds = ceil.(Int, (0:nthreads) * (length(esample) / nthreads)) |
| 13 | + chunks = [bounds[t]+1:bounds[t+1] for t ∈ 1:nthreads] |
| 14 | + |
| 15 | + caches = [Vector{UInt8}(undef, fes[i].n) for i ∈ eachindex(fes)] |
| 16 | + for (fe,cache) ∈ Iterators.cycle(zip(fes,caches)) # if 1 set of FE, only need 1 pass |
| 17 | + n = drop_singletons!(esample, fe, cache, chunks) |
| 18 | + if iszero(n) |
| 19 | + dirtypasses -= 1 |
| 20 | + else |
| 21 | + nsingletons += n |
| 22 | + dirtypasses = length(fes) # restart counter |
13 | 23 | end |
14 | | - push!(ns, drop_singletons!(esample, fe)) |
| 24 | + dirtypasses <= 1 && break # done if there are N FE groups and at least last N-1 have been clean (includes case N=1) |
15 | 25 | end |
16 | | - return sum(ns) |
| 26 | + |
| 27 | + nsingletons |
17 | 28 | end |
18 | 29 |
|
19 | | -function drop_singletons!(esample, fe::FixedEffect) |
20 | | - n = 0 |
21 | | - cache = zeros(Int, fe.n) |
22 | | - @inbounds for i in eachindex(esample) |
| 30 | +function drop_singletons!(esample, fe::FixedEffect, cache, chunks)::Int |
| 31 | +# Main._cache = cache; Main._chunks = copy(chunks) |
| 32 | + refs = fe.refs |
| 33 | + |
| 34 | + fill!(cache, zero(UInt8)) |
| 35 | + @inbounds for i ∈ eachindex(esample,refs) # count obs in each FE group |
23 | 36 | if esample[i] |
24 | | - cache[fe.refs[i]] += 1 |
| 37 | + cache[refs[i]] = min(0x02, cache[refs[i]] + 0x01) # stop counting obs in a group at 2, to keep counters in 8-bit integers |
25 | 38 | end |
26 | 39 | end |
27 | | - @inbounds for i in eachindex(esample) |
28 | | - if esample[i] && (cache[fe.refs[i]] == 1) |
29 | | - esample[i] = false |
30 | | - n += 1 |
| 40 | + |
| 41 | + tasks = map(chunks) do chunk # drop newly found singletons |
| 42 | + Threads.@spawn begin |
| 43 | + n = 0 |
| 44 | + @inbounds for i ∈ chunk |
| 45 | + if esample[i] && isone(cache[refs[i]]) |
| 46 | + esample[i] = false |
| 47 | + n += 1 |
| 48 | + end |
| 49 | + end |
| 50 | + n |
31 | 51 | end |
32 | 52 | end |
33 | | - return n |
| 53 | + |
| 54 | + sum(fetch.(tasks)) |
34 | 55 | end |
35 | 56 |
|
| 57 | + |
36 | 58 | ############################################################################## |
37 | 59 | ## |
38 | 60 | ## Number of distinct values (only ever call when fe without missing values) |
|
0 commit comments