Skip to content

Commit 43c1dde

Browse files
committed
Number of alive fibers need not be atomic in the FIFO scheduler
1 parent 4334333 commit 43c1dde

File tree

1 file changed

+10
-40
lines changed

1 file changed

+10
-40
lines changed

lib/picos_mux.fifo/picos_mux_fifo.ml

Lines changed: 10 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,6 @@ open Picos
22

33
let[@inline never] quota_non_positive () = invalid_arg "quota must be positive"
44

5-
(* As a minor optimization, we avoid allocating closures, which take slightly
6-
more memory than values of this type. *)
75
type ready =
86
| Spawn of Fiber.t * (Fiber.t -> unit)
97
| Continue of Fiber.t * (unit, unit) Effect.Deep.continuation
@@ -17,7 +15,6 @@ module Mpscq = Picos_aux_mpscq
1715
type t = {
1816
ready : ready Mpscq.t;
1917
needs_wakeup : bool Atomic.t;
20-
num_alive_fibers : int Atomic.t;
2118
mutex : Mutex.t;
2219
condition : Condition.t;
2320
resume :
@@ -33,6 +30,7 @@ type t = {
3330
quota : int;
3431
mutable fiber : Fiber.Maybe.t;
3532
mutable remaining_quota : int;
33+
mutable num_alive_fibers : int;
3634
}
3735

3836
let rec next t =
@@ -55,15 +53,11 @@ let rec next t =
5553
Fiber.resume fiber k
5654
| exception Mpscq.Empty ->
5755
t.fiber <- Fiber.Maybe.nothing;
58-
if Atomic.get t.num_alive_fibers <> 0 then begin
56+
if t.num_alive_fibers <> 0 then begin
5957
if Atomic.get t.needs_wakeup then begin
6058
Mutex.lock t.mutex;
6159
match
62-
if Atomic.get t.needs_wakeup then
63-
(* We assume that there is no poll point after the above
64-
[Mutex.lock] and before the below [Condition.wait] is ready to
65-
be woken up by a [Condition.broadcast]. *)
66-
Condition.wait t.condition t.mutex
60+
if Atomic.get t.needs_wakeup then Condition.wait t.condition t.mutex
6761
with
6862
| () -> Mutex.unlock t.mutex
6963
| exception exn ->
@@ -85,15 +79,13 @@ let run_fiber ?quota ?fatal_exn_handler:(exnc : _ = raise) fiber main =
8579
Select.check_configured ();
8680
let ready = Mpscq.create ~padded:true ()
8781
and needs_wakeup = Atomic.make false |> Multicore_magic.copy_as_padded
88-
and num_alive_fibers = Atomic.make 1 |> Multicore_magic.copy_as_padded
8982
and mutex = Mutex.create ()
9083
and condition = Condition.create () in
9184
let rec t =
9285
{
9386
ready;
9487
fiber = Fiber.Maybe.nothing;
9588
needs_wakeup;
96-
num_alive_fibers;
9789
mutex;
9890
condition;
9991
resume;
@@ -104,11 +96,9 @@ let run_fiber ?quota ?fatal_exn_handler:(exnc : _ = raise) fiber main =
10496
handler;
10597
quota;
10698
remaining_quota = quota;
99+
num_alive_fibers = 1;
107100
}
108101
and current =
109-
(* The current handler must never propagate cancelation, but it would be
110-
possible to continue some other fiber and resume the current fiber
111-
later. *)
112102
Some
113103
(fun k ->
114104
let fiber = Fiber.Maybe.to_fiber t.fiber in
@@ -140,24 +130,17 @@ let run_fiber ?quota ?fatal_exn_handler:(exnc : _ = raise) fiber main =
140130
and[@alert "-handler"] effc :
141131
type a. a Effect.t -> ((a, _) Effect.Deep.continuation -> _) option =
142132
function
143-
| Fiber.Current ->
144-
(* We handle [Current] first as it is perhaps the most latency
145-
sensitive effect. *)
146-
t.current
133+
| Fiber.Current -> t.current
147134
| Fiber.Spawn r ->
148-
(* We check cancelation status once and then either perform the
149-
whole operation or discontinue the fiber. *)
150135
let fiber = Fiber.Maybe.to_fiber t.fiber in
151136
if Fiber.is_canceled fiber then t.discontinue
152137
else begin
153-
Atomic.incr t.num_alive_fibers;
138+
t.num_alive_fibers <- t.num_alive_fibers + 1;
154139
Mpscq.push t.ready (Spawn (r.fiber, r.main));
155140
t.return
156141
end
157142
| Fiber.Yield -> t.yield
158143
| Computation.Cancel_after r -> begin
159-
(* We check cancelation status once and then either perform the
160-
whole operation or discontinue the fiber. *)
161144
let fiber = Fiber.Maybe.to_fiber t.fiber in
162145
if Fiber.is_canceled fiber then t.discontinue
163146
else
@@ -186,33 +169,20 @@ let run_fiber ?quota ?fatal_exn_handler:(exnc : _ = raise) fiber main =
186169
end)
187170
| _ -> None
188171
and retc () =
189-
Atomic.decr t.num_alive_fibers;
172+
t.num_alive_fibers <- t.num_alive_fibers - 1;
190173
next t
191174
and resume trigger fiber k =
192175
let resume = Resume (fiber, k) in
193-
if Fiber.unsuspend fiber trigger then
194-
(* The fiber has not been canceled, so we queue the fiber normally. *)
195-
Mpscq.push t.ready resume
196-
else
197-
(* The fiber has been canceled, so we give priority to it in this
198-
scheduler. *)
199-
Mpscq.push_head t.ready resume;
200-
(* As the trigger might have been signaled from another domain or systhread
201-
outside of the scheduler, we check whether the scheduler needs to be
202-
woken up and take care of it if necessary. *)
176+
if Fiber.unsuspend fiber trigger then Mpscq.push t.ready resume
177+
else Mpscq.push_head t.ready resume;
203178
if
204179
Atomic.get t.needs_wakeup
205180
&& Atomic.compare_and_set t.needs_wakeup true false
206181
then begin
207182
begin
208183
match Mutex.lock t.mutex with
209184
| () -> Mutex.unlock t.mutex
210-
| exception Sys_error _ ->
211-
(* This should mean that [resume] was called from a signal handler
212-
running on the scheduler thread. If the assumption about not
213-
having poll points holds, the [Condition.broadcast] should now be
214-
able to wake up the [Condition.wait] in the scheduler. *)
215-
()
185+
| exception Sys_error _ -> ()
216186
end;
217187
Condition.broadcast t.condition
218188
end

0 commit comments

Comments
 (0)