Skip to content

Commit e76b8da

Browse files
committed
Add semantics for closing channels.
This makes certain algorithms simpler as channels now have an explicit lifetime - multiple readers can coordinate closing without needing to ensure the same number of reads as writes.
1 parent 7e5f226 commit e76b8da

File tree

2 files changed

+94
-15
lines changed

2 files changed

+94
-15
lines changed

src/core/ev.c

Lines changed: 52 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,4 @@
1-
/*
2-
* Copyright (c) 2021 Calvin Rose and contributors.
3-
*
4-
* Permission is hereby granted, free of charge, to any person obtaining a copy
5-
* of this software and associated documentation files (the "Software"), to
6-
* deal in the Software without restriction, including without limitation the
7-
* rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
8-
* sell copies of the Software, and to permit persons to whom the Software is
9-
* furnished to do so, subject to the following conditions:
10-
*
11-
* The above copyright notice and this permission notice shall be included in
1+
/* The above copyright notice and this permission notice shall be included in
122
* all copies or substantial portions of the Software.
133
*
144
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
@@ -70,6 +60,7 @@ typedef struct {
7060
JanetQueue read_pending;
7161
JanetQueue write_pending;
7262
int32_t limit;
63+
int closed;
7364
} JanetChannel;
7465

7566
typedef struct {
@@ -571,6 +562,7 @@ void janet_ev_dec_refcount(void) {
571562

572563
static void janet_chan_init(JanetChannel *chan, int32_t limit) {
573564
chan->limit = limit;
565+
chan->closed = 0;
574566
janet_q_init(&chan->items);
575567
janet_q_init(&chan->read_pending);
576568
janet_q_init(&chan->write_pending);
@@ -660,6 +652,13 @@ static Janet make_read_result(JanetChannel *channel, Janet x) {
660652
return janet_wrap_tuple(janet_tuple_end(tup));
661653
}
662654

655+
static Janet make_close_result(JanetChannel *channel) {
656+
Janet *tup = janet_tuple_begin(2);
657+
tup[0] = janet_ckeywordv("close");
658+
tup[1] = janet_wrap_abstract(channel);
659+
return janet_wrap_tuple(janet_tuple_end(tup));
660+
}
661+
663662
/* Push a value to a channel, and return 1 if channel should block, zero otherwise.
664663
* If the push would block, will add to the write_pending queue in the channel. */
665664
static int janet_channel_push(JanetChannel *channel, Janet x, int mode) {
@@ -723,9 +722,13 @@ static int janet_channel_pop(JanetChannel *channel, Janet *item, int is_choice)
723722

724723
JANET_CORE_FN(cfun_channel_push,
725724
"(ev/give channel value)",
726-
"Write a value to a channel, suspending the current fiber if the channel is full.") {
725+
"Write a value to a channel, suspending the current fiber if the channel is full. "
726+
"Returns the channel if the write succeeded, nil otherwise.") {
727727
janet_fixarity(argc, 2);
728728
JanetChannel *channel = janet_getabstract(argv, 0, &ChannelAT);
729+
if (channel->closed) {
730+
janet_panic("cannot write to closed channel");
731+
}
729732
if (janet_channel_push(channel, argv[1], 0)) {
730733
janet_await();
731734
}
@@ -738,6 +741,7 @@ JANET_CORE_FN(cfun_channel_pop,
738741
janet_fixarity(argc, 1);
739742
JanetChannel *channel = janet_getabstract(argv, 0, &ChannelAT);
740743
Janet item;
744+
if (channel->closed) return janet_wrap_nil();
741745
if (janet_channel_pop(channel, &item, 0)) {
742746
janet_schedule(janet_vm.root_fiber, item);
743747
}
@@ -746,10 +750,10 @@ JANET_CORE_FN(cfun_channel_pop,
746750

747751
JANET_CORE_FN(cfun_channel_choice,
748752
"(ev/select & clauses)",
749-
"Block until the first of several channel operations occur. Returns a tuple of the form [:give chan] or [:take chan x], where "
753+
"Block until the first of several channel operations occur. Returns a tuple of the form [:give chan], [:take chan x], or [:close chan], where "
750754
"a :give tuple is the result of a write and :take tuple is the result of a write. Each clause must be either a channel (for "
751755
"a channel take operation) or a tuple [channel x] for a channel give operation. Operations are tried in order, such that the first "
752-
"clauses will take precedence over later clauses.") {
756+
"clauses will take precedence over later clauses. Both and give and take operation can return a [:close chan] tuple, which indicates that the specified channel was closed.") {
753757
janet_arity(argc, 1, -1);
754758
int32_t len;
755759
const Janet *data;
@@ -759,13 +763,15 @@ JANET_CORE_FN(cfun_channel_choice,
759763
if (janet_indexed_view(argv[i], &data, &len) && len == 2) {
760764
/* Write */
761765
JanetChannel *chan = janet_getabstract(data, 0, &ChannelAT);
766+
if (chan->closed) continue;
762767
if (janet_q_count(&chan->items) < chan->limit) {
763768
janet_channel_push(chan, data[1], 1);
764769
return make_write_result(chan);
765770
}
766771
} else {
767772
/* Read */
768773
JanetChannel *chan = janet_getabstract(argv, i, &ChannelAT);
774+
if (chan->closed) continue;
769775
if (chan->items.head != chan->items.tail) {
770776
Janet item;
771777
janet_channel_pop(chan, &item, 1);
@@ -779,11 +785,13 @@ JANET_CORE_FN(cfun_channel_choice,
779785
if (janet_indexed_view(argv[i], &data, &len) && len == 2) {
780786
/* Write */
781787
JanetChannel *chan = janet_getabstract(data, 0, &ChannelAT);
788+
if (chan->closed) continue;
782789
janet_channel_push(chan, data[1], 1);
783790
} else {
784791
/* Read */
785792
Janet item;
786793
JanetChannel *chan = janet_getabstract(argv, i, &ChannelAT);
794+
if (chan->closed) continue;
787795
janet_channel_pop(chan, &item, 1);
788796
}
789797
}
@@ -843,6 +851,34 @@ JANET_CORE_FN(cfun_channel_new,
843851
return janet_wrap_abstract(channel);
844852
}
845853

854+
JANET_CORE_FN(cfun_channel_close,
855+
"(ev/chan-close chan)",
856+
"Close a channel. A closed channel will cause all pending reads and writes to return nil. "
857+
"Returns the channel.") {
858+
janet_fixarity(argc, 1);
859+
JanetChannel *channel = janet_getabstract(argv, 0, &ChannelAT);
860+
if (!channel->closed) {
861+
channel->closed = 1;
862+
JanetChannelPending writer;
863+
while (!janet_q_pop(&channel->write_pending, &writer, sizeof(writer))) {
864+
if (writer.mode == JANET_CP_MODE_CHOICE_WRITE) {
865+
janet_schedule(writer.fiber, janet_wrap_nil());
866+
} else {
867+
janet_schedule(writer.fiber, make_close_result(channel));
868+
}
869+
}
870+
JanetChannelPending reader;
871+
while (!janet_q_pop(&channel->read_pending, &reader, sizeof(reader))) {
872+
if (reader.mode == JANET_CP_MODE_CHOICE_READ) {
873+
janet_schedule(reader.fiber, janet_wrap_nil());
874+
} else {
875+
janet_schedule(reader.fiber, make_close_result(channel));
876+
}
877+
}
878+
}
879+
return janet_wrap_abstract(channel);
880+
}
881+
846882
static const JanetMethod ev_chanat_methods[] = {
847883
{"select", cfun_channel_choice},
848884
{"rselect", cfun_channel_rchoice},
@@ -851,6 +887,7 @@ static const JanetMethod ev_chanat_methods[] = {
851887
{"give", cfun_channel_push},
852888
{"capacity", cfun_channel_capacity},
853889
{"full", cfun_channel_full},
890+
{"close", cfun_channel_close},
854891
{NULL, NULL}
855892
};
856893

@@ -2413,6 +2450,7 @@ void janet_lib_ev(JanetTable *env) {
24132450
JANET_CORE_REG("ev/read", janet_cfun_stream_read),
24142451
JANET_CORE_REG("ev/chunk", janet_cfun_stream_chunk),
24152452
JANET_CORE_REG("ev/write", janet_cfun_stream_write),
2453+
JANET_CORE_REG("ev/chan-close", cfun_channel_close),
24162454
JANET_REG_END
24172455
};
24182456

test/suite0009.janet

Lines changed: 42 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -116,7 +116,6 @@
116116
(assert (= "123\n456\n" (string (slurp "unique.txt"))) "File writing 4.2")
117117
(os/rm "unique.txt"))
118118

119-
120119
# ev/gather
121120

122121
(assert (deep= @[1 2 3] (ev/gather 1 2 3)) "ev/gather 1")
@@ -180,4 +179,46 @@
180179

181180
(assert (os/execute [janet "-e" `(+ 1 2 3)`] :xp) "os/execute self")
182181

182+
# Test some channel
183+
184+
(def c1 (ev/chan))
185+
(def c2 (ev/chan))
186+
(def arr @[])
187+
(ev/spawn
188+
(while (def x (ev/take c1))
189+
(array/push arr x))
190+
(ev/chan-close c2))
191+
(for i 0 1000
192+
(ev/give c1 i))
193+
(ev/chan-close c1)
194+
(ev/take c2)
195+
(assert (= (slice arr) (slice (range 1000))) "ev/chan-close 1")
196+
197+
(def c1 (ev/chan))
198+
(def c2 (ev/chan))
199+
(def arr @[])
200+
(ev/spawn
201+
(while (def x (ev/take c1))
202+
(array/push arr x))
203+
(ev/sleep 0.1)
204+
(ev/chan-close c2))
205+
(for i 0 100
206+
(ev/give c1 i))
207+
(ev/chan-close c1)
208+
(ev/select c2)
209+
(assert (= (slice arr) (slice (range 100))) "ev/chan-close 2")
210+
211+
(def c1 (ev/chan))
212+
(def c2 (ev/chan))
213+
(def arr @[])
214+
(ev/spawn
215+
(while (def x (ev/take c1))
216+
(array/push arr x))
217+
(ev/chan-close c2))
218+
(for i 0 100
219+
(ev/give c1 i))
220+
(ev/chan-close c1)
221+
(ev/rselect c2)
222+
(assert (= (slice arr) (slice (range 100))) "ev/chan-close 3")
223+
183224
(end-suite)

0 commit comments

Comments
 (0)