Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions src/ProtocolZoo/ProtocolZoo.jl
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
module ProtocolZoo

using QuantumSavory
import QuantumSavory: get_time_tracker, Tag, isolderthan, onchange
import QuantumSavory: get_time_tracker, Tag, isolderthan, onchange, QueryOnRegResult
using QuantumSavory: Wildcard
using QuantumSavory.CircuitZoo: EntanglementSwap, LocalEntanglementSwap

Expand Down Expand Up @@ -402,7 +402,7 @@ EntanglementTracker(net::RegisterNet, node::Int) = EntanglementTracker(get_time_
continue
end

error("`EntanglementTracker` on node $(prot.node) received a message $(msg) that it does not know how to handle (due to the absence of corresponding `EntanglementCounterpart` or `EntanglementHistory` or `EntanglementDelete` tags). This might have happened due to `CutoffProt` deleting qubits while swaps are happening. Make sure that the retention times in `CutoffProt` are sufficiently larger than the `agelimit` in `SwapperProt`. Otherwise, this is a bug in the protocol and should not happen -- please report an issue at QuantumSavory's repository.")
error("`EntanglementTracker` on node $(prot.node) received a message $(msg) that it does not know how to handle (due to the absence of corresponding `EntanglementCounterpart` or `EntanglementHistory` or `EntanglementDelete` tags). This might happen due to a race condition from infinitely fast classical messages. Make sure that `classical_delay` in `RegisterNet` is not zero. This might also happen due to `CutoffProt` deleting qubits while swaps are happening. Make sure that the retention times in `CutoffProt` are sufficiently larger than the `agelimit` in `SwapperProt`. Otherwise, this is a bug in the protocol and should not happen -- please report an issue at QuantumSavory's repository.")
end
end
@debug "EntanglementTracker @$(prot.node): Starting message wait at $(now(prot.sim)) with MessageBuffer containing: $(mb.buffer)"
Expand Down
74 changes: 41 additions & 33 deletions src/ProtocolZoo/cutoff.jl
Original file line number Diff line number Diff line change
Expand Up @@ -36,45 +36,53 @@ CutoffProt(net::RegisterNet, node::Int; kwargs...) = CutoffProt(get_time_tracker

@resumable function (prot::CutoffProt)()
reg = prot.net[prot.node]
for slot in reg
@process per_slot_cutoff(prot.sim, slot, prot)
end
end

@resumable function per_slot_cutoff(sim, slot::RegRef, prot::CutoffProt)
empty_query = false
while true
for slot in reg # TODO these should be done in parallel, otherwise we will be waiting on each slot, greatly slowing down the cutoffs
islocked(slot) && continue
@yield lock(slot)
info = query(slot, EntanglementCounterpart, ❓, ❓)
if isnothing(info)
unlock(slot)
continue
end
if now(prot.sim) - reg.tag_info[info.id][3] > prot.retention_time # TODO this should be part of the query interface, not using non-public implementation details
untag!(slot, info.id)
traceout!(slot)
msg = Tag(EntanglementDelete, prot.node, slot.idx, info.tag[2], info.tag[3])
tag!(slot, msg)
(prot.announce) && put!(channel(prot.net, prot.node=>msg[4]; permit_forward=true), msg)
@debug "CutoffProt @$(prot.node): Send message to $(msg[4]) | message=`$msg` | time=$(now(prot.sim))"
if empty_query
if isnothing(prot.period)
@yield onchange(slot, Tag) # TODO this should be just for the slot, not for the whole register
else
@yield timeout(prot.sim, prot.period::Float64)
end
end
@yield lock(slot)
info = query(slot, EntanglementCounterpart, ❓, ❓)
if isnothing(info) || now(sim) - info.time < prot.retention_time
empty_query = true
unlock(slot)
continue
end
println("$(now(sim)) $slot info $info")

# TODO the tag deletions below are not necessary when announce=true and EntanglementTracker is running on other nodes. Verify the veracity of that statement, make tests for both cases, and document.

# delete old history tags
info = query(slot, EntanglementHistory, ❓, ❓, ❓, ❓, ❓;filo=false) # TODO we should have a warning if `queryall` returns more than one result -- what does it even mean to have multiple history tags here
if !isnothing(info) && now(prot.sim) - reg.tag_info[info.id][3] > prot.retention_time # TODO this should be part of the query interface, not using non-public implementation details
untag!(slot, info.id)
end
untag!(slot, info.id)
traceout!(slot)
println("$(now(sim)) $slot traced out")
msg = Tag(EntanglementDelete, prot.node, slot.idx, info.tag[2], info.tag[3])
tag!(slot, msg)
(prot.announce) && put!(channel(prot.net, prot.node=>msg[4]; permit_forward=true), msg)
@debug "CutoffProt @$(prot.node): Send message to $(msg[4]) | message=`$msg` | time=$(now(prot.sim))"

# delete old EntanglementDelete tags
# TODO Why do we have separate entanglementhistory and entanglementupdate but we have only a single entanglementdelete that serves both roles? We should probably have both be pairs of tags, for consistency and ease of reasoning
info = query(slot, EntanglementDelete, prot.node, slot.idx , ❓, ❓) # TODO we should have a warning if `queryall` returns more than one result -- what does it even mean to have multiple delete tags here
if !isnothing(info) && now(prot.sim) - reg.tag_info[info.id][3] > prot.retention_time # TODO this should be part of the query interface, not using non-public implementation details
untag!(slot, info.id)
end
# TODO the tag deletions below are not necessary when announce=true and EntanglementTracker is running on other nodes. Verify the veracity of that statement, make tests for both cases, and document.

unlock(slot)
# delete old history tags
info = query(slot, EntanglementHistory, ❓, ❓, ❓, ❓, ❓;filo=false) # TODO we should have a warning if `queryall` returns more than one result -- what does it even mean to have multiple history tags here
if !isnothing(info) && now(prot.sim) - info.time > prot.retention_time
untag!(slot, info.id)
end
if isnothing(prot.period)
@yield onchange(reg, Tag)
else
@yield timeout(prot.sim, prot.period::Float64)

# delete old EntanglementDelete tags
# TODO Why do we have separate entanglementhistory and entanglementupdate but we have only a single entanglementdelete that serves both roles? We should probably have both be pairs of tags, for consistency and ease of reasoning
info = query(slot, EntanglementDelete, prot.node, slot.idx , ❓, ❓) # TODO we should have a warning if `queryall` returns more than one result -- what does it even mean to have multiple delete tags here
if !isnothing(info) && now(prot.sim) - info.time > prot.retention_time
untag!(slot, info.id)
end

unlock(slot)
end
end
2 changes: 1 addition & 1 deletion src/ProtocolZoo/swapping.jl
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ SwapperProt(net::RegisterNet, node::Int; kwargs...) = SwapperProt(get_time_track
continue
end
# The compiler is not smart enough to figure out that qubit_pair_ is not nothing, so we need to tell it explicitly. A new variable name is needed due to @resumable.
qubit_pair = qubit_pair_::NTuple{2, Base.NamedTuple{(:slot, :id, :tag), Base.Tuple{RegRef, Int128, Tag}}} # TODO: replace by `NTuple{2, @NamedTuple{slot::RegRef, id::Int128, tag::Tag}}` once https://github.com/JuliaDynamics/ResumableFunctions.jl/issues/104 is resolved
qubit_pair = qubit_pair_::NTuple{2, QueryOnRegResult}

(q1, id1, tag1) = qubit_pair[1].slot, qubit_pair[1].id, qubit_pair[1].tag
(q2, id2, tag2) = qubit_pair[2].slot, qubit_pair[2].id, qubit_pair[2].tag
Expand Down
6 changes: 4 additions & 2 deletions src/QuantumSavory.jl
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,9 @@ export
observable,
# uptotime.jl
uptotime!, overwritetime!,
# tags.jl and queries.jl
Tag, tag!, untag!, W, ❓, query, queryall, querydelete!, findfreeslot,
# tags.jl and queries.jl and querywait.jl
Tag, tag!, untag!, W, ❓, query, queryall, querydelete!, query_wait, querydelete_wait!,
findfreeslot,
#messagebuffer.jl
MessageBuffer,
# quantumchannel.jl
Expand Down Expand Up @@ -140,6 +141,7 @@ include("baseops/uptotime.jl")
include("baseops/observable.jl")

include("queries.jl")
include("querywait.jl")

include("representations.jl")
include("backgrounds.jl")
Expand Down
33 changes: 7 additions & 26 deletions src/messagebuffer.jl
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,7 @@ struct MessageBuffer{T}
net # TODO ::RegisterNet -- this can not be typed due to circular dependency, see https://github.com/JuliaLang/julia/issues/269
node::Int
buffer::Vector{NamedTuple{(:src,:tag), Tuple{Union{Nothing, Int},T}}}
waiters::IdDict{Resource,Resource}
no_wait::Ref{Int} # keeps track of the situation when something is pushed in the buffer and no waiters are present. In that case, when the waiters are available after it they would get locked while the code that was supposed to unlock them has already run. So, we keep track the number of times this happens and put no lock on the waiters in this situation.
tag_waiter::AsymmetricSemaphore
end

function peektags(mb::MessageBuffer)
Expand All @@ -34,6 +33,7 @@ function Base.put!(mb::MessageBuffer, tag)
put_and_unlock_waiters(mb, nothing, convert(Tag,tag))
nothing
end
Base.put!(mb::MessageBuffer, args...) = put!(mb, Tag(args...))

tag!(::MessageBuffer, args...) = throw(ArgumentError("MessageBuffer does not support `tag!`. Use `put!(::MessageBuffer, Tag(...))` instead."))

Expand Down Expand Up @@ -65,42 +65,23 @@ end

function put_and_unlock_waiters(mb::MessageBuffer, src, tag)
@debug "MessageBuffer @$(mb.node) at t=$(now(mb.sim)): Receiving from source $(src) | message=`$(tag)`"
length(mb.waiters) == 0 && @debug "MessageBuffer @$(mb.node) received a message from $(src), but there is no one waiting on that message buffer. The message was `$(tag)`."
if length(mb.waiters) == 0
mb.no_wait[] += 1
end
islocked(mb.tag_waiter) || @debug "MessageBuffer @$(mb.node) received a message from $(src), but there is no one waiting on that message buffer. The message was `$(tag)`."
push!(mb.buffer, (;src,tag));
for waiter in keys(mb.waiters)
unlock(waiter)
end
unlock(mb.tag_waiter)
end

function MessageBuffer(net, node::Int, qs::Vector{NamedTuple{(:src,:channel), Tuple{Int, DelayQueue{T}}}}) where {T}
sim = get_time_tracker(net)
signal = IdDict{Resource,Resource}()
no_wait = Ref{Int}(0)
mb = MessageBuffer{T}(sim, net, node, Tuple{Int,T}[], signal, no_wait)
mb = MessageBuffer{T}(sim, net, node, Tuple{Int,T}[], AsymmetricSemaphore(sim))
for (;src, channel) in qs
@process take_loop_mb(sim, channel, src, mb)
end
mb
end

@resumable function wait_process(sim, mb)
if mb.no_wait[] != 0 # This happens only in the specific case when something is put in the buffer before there any waiters.
mb.no_wait[] -= 1
return
end
waitresource = Resource(sim)
lock(waitresource)
mb.waiters[waitresource] = waitresource
@yield lock(waitresource)
pop!(mb.waiters, waitresource)
end

function Base.wait(mb::MessageBuffer)
Base.depwarn("wait(::MessageBuffer) is deprecated, use onchange(::MessageBuffer) instead", :wait)
@process wait_process(mb.sim, mb)
return lock(mb.tag_waiter)
end

"""
Expand All @@ -110,7 +91,7 @@ E.g. `onchange(r, Tag)` will wait only on changes to tags and metadata.
function onchange end

function onchange(mb::MessageBuffer)
@process wait_process(mb.sim, mb)
return lock(mb.tag_waiter)
end

function onchange(mb::MessageBuffer, ::Type{Tag})
Expand Down
6 changes: 4 additions & 2 deletions src/networks.jl
Original file line number Diff line number Diff line change
Expand Up @@ -205,8 +205,10 @@ See also: [`channel`](@ref)
"""
function messagebuffer(ref::RegOrRegRef)
reg = get_register(ref)
net = reg.netparent[]
return messagebuffer(net, net.reverse_lookup[reg])
net = parent(reg)
idx = parentindex(reg)
isnothing(net) && throw(ArgumentError("The register does not have a parent network and thus it does not have an assigned message buffer."))
return messagebuffer(net, idx)
end

function achannel(net::RegisterNet, src::Int, dst::Int, ::Val{:C}; permit_forward=false)
Expand Down
14 changes: 8 additions & 6 deletions src/queries.jl
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
const QueryOnRegResult = NamedTuple{(:slot, :id, :tag, :time), Tuple{RegRef, Int128, Tag, Float64}}

struct QueryError <: Exception
msg
f
Expand Down Expand Up @@ -190,17 +192,17 @@ for i in 1:10 # Vararg{Union{...}, N} does not specialize well, so we are explic
) where {allB, filoB} # queryargs is so specifically typed in order to trigger the compiler heuristics for specialization, leading to very significant performance improvements
ref = isa(reg, RegRef) ? reg : nothing
reg = get_register(reg)
res = NamedTuple{(:slot, :id, :tag), Tuple{RegRef, Int128, Tag}}[]
res = QueryOnRegResult[]
l = length(reg.guids)
indices = filoB ? (l:-1:1) : (1:l)
for i in indices
i = reg.guids[i]
tag = reg.tag_info[i].tag
(;tag, time) = reg.tag_info[i]
slot = reg[reg.tag_info[i].slot]
if _nothingor(ref, slot) && _nothingor(locked, islocked(slot)) && _nothingor(assigned, isassigned(slot))
good = query_good(tag, $(args...))
if good
allB ? push!(res, (slot=slot, id=i, tag=tag)) : return (slot=slot, id=i, tag=tag)
allB ? push!(res, (;slot, id=i, tag, time)) : return (;slot, id=i, tag, time)
end
end
end
Expand Down Expand Up @@ -363,15 +365,15 @@ tag!(tagcontainer, args...) = tag!(tagcontainer, Tag(args...))
function _query(reg::RegOrRegRef, ::Val{allB}, ::Val{filoB}, query::Tag; locked::Union{Nothing,Bool}=nothing, assigned::Union{Nothing,Bool}=nothing) where {allB, filoB}
ref = isa(reg, RegRef) ? reg : nothing
reg = get_register(reg)
res = NamedTuple{(:slot, :id, :tag), Tuple{RegRef, Int128, Tag}}[]
res = QueryOnRegResult[]
l = length(reg.guids)
indices = filoB ? (l:-1:1) : (1:l)
for i in indices
i = reg.guids[i]
tag = reg.tag_info[i].tag
(;tag, time) = reg.tag_info[i]
slot = reg[reg.tag_info[i].slot]
if _nothingor(ref, slot) && _nothingor(locked, islocked(slot)) && _nothingor(assigned, isassigned(slot)) && tag==query
allB ? push!(res, (slot=slot, id=i, tag=tag)) : return (slot=slot, id=i, tag=tag)
allB ? push!(res, (;slot, id=i, tag, time)) : return (;slot, id=i, tag, time)
end
end
allB ? res : nothing
Expand Down
49 changes: 49 additions & 0 deletions src/querywait.jl
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
# TODO weird ordering of what should be kwargs due to https://github.com/JuliaDynamics/ResumableFunctions.jl/issues/135
@resumable function _query_wait(sim, reg::Register, locked::Union{Nothing,Bool}, assigned::Union{Nothing,Bool}, args...)
q = query(reg, args...; locked, assigned)
while isnothing(q)
@yield onchange_tag(reg)
q = query(reg, args...; locked, assigned)
end
return q
end
function query_wait(store::Register, args...; locked::Union{Nothing,Bool}=nothing, assigned::Union{Nothing,Bool}=nothing)
# TODO weird ordering of what should be kwargs due to https://github.com/JuliaDynamics/ResumableFunctions.jl/issues/135
return @process _query_wait(get_time_tracker(store), store, locked, assigned, args...)
end

@resumable function _query_wait(sim, mb::MessageBuffer, args...)
q = query(mb, args...)
while isnothing(q)
@yield wait(mb)
q = query(mb, args...)
end
return q
end
function query_wait(store::MessageBuffer, args...)
return @process _query_wait(get_time_tracker(store), store, args...)
end

@resumable function _querydelete_wait(sim, mb::Register, locked::Union{Nothing,Bool}, assigned::Union{Nothing,Bool}, args...)
q = querydelete!(mb, args...; locked, assigned)
while isnothing(q)
@yield onchange_tag(mb)
q = querydelete!(mb, args...; locked, assigned)
end
return q
end
function querydelete_wait!(store::Register, args...; locked::Union{Nothing,Bool}=nothing, assigned::Union{Nothing,Bool}=nothing)
return @process _querydelete_wait(get_time_tracker(store), store, locked, assigned, args...)
end

@resumable function _querydelete_wait(sim, mb::MessageBuffer, args...)
q = querydelete!(mb, args...)
while isnothing(q)
@yield wait(mb)
q = querydelete!(mb, args...)
end
return q
end
function querydelete_wait!(store::MessageBuffer, args...)
return @process _querydelete_wait(get_time_tracker(store), store, args...)
end
6 changes: 6 additions & 0 deletions src/semaphore.jl
Original file line number Diff line number Diff line change
Expand Up @@ -64,3 +64,9 @@ function islocked(s::AsymmetricSemaphore)
sem = s.semaphorepair[s.current_semaphore]
return islocked(sem)
end

function nbwaiters(s::AsymmetricSemaphore)
#s1, s2 = s.semaphorepair
#return s1.nbwaiters + s2.nbwaiters
return s.semaphorepair[s.current_semaphore].nbwaiters
end
33 changes: 33 additions & 0 deletions test/test_messagebuffer.jl
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
@testitem "Message Buffer" tags=[:messagebuffer] begin
using Test
using QuantumSavory
using QuantumSavory: tag_types
using QuantumSavory.ProtocolZoo
using ResumableFunctions, ConcurrentSim
Expand Down Expand Up @@ -41,4 +43,35 @@ proc4 = put!(messagebuffer(net[1]), SwitchRequest(2,3))
run(sim, 10)
@test QuantumSavory.peektags(messagebuffer(net,1)) == [Tag(SwitchRequest(2,3)), Tag(SwitchRequest(2,3)), Tag(SwitchRequest(2,3)), Tag(SwitchRequest(2,3))]
@test_throws "does not support `tag!`" tag!(messagebuffer(net, 1), EntanglementCounterpart, 1, 10)

##

reg = Register(10)
net = RegisterNet([reg])
sim = get_time_tracker(net)
mb = messagebuffer(reg)
put!(mb, Tag(:something, 1, 2))
put!(mb, Tag(:something, 1, 2))
put!(mb, Tag(:something, 1, 2))
put!(mb, Tag(:something, 1, 2))
@resumable function receiver(sim, LOG)
@yield wait(mb)
push!(LOG, "got result immediately")
end
LOG = []
@process receiver(sim, LOG)
put!(mb, Tag(:something, 1, 2))
@process receiver(sim, LOG)
@process receiver(sim, LOG)
@process receiver(sim, LOG)
run(sim, 1.0)
@process receiver(sim, LOG)
@process receiver(sim, LOG)
put!(mb, Tag(:something, 1, 2))
put!(mb, Tag(:something, 1, 2))
@process receiver(sim, LOG)
@process receiver(sim, LOG)
run(sim, 2.0)
@test length(LOG) == 7

end
Loading
Loading