@@ -721,7 +721,10 @@ static int janet_channel_pop(JanetChannel *channel, Janet *item, int is_choice)
721721
722722/* Channel Methods */
723723
724- static Janet cfun_channel_push (int32_t argc , Janet * argv ) {
724+ JANET_CORE_FN (cfun_channel_push ,
725+ "(ev/give channel value)" ,
726+ "Write a value to a channel, suspending the current fiber if the channel is full."
727+ ) {
725728 janet_fixarity (argc , 2 );
726729 JanetChannel * channel = janet_getabstract (argv , 0 , & ChannelAT );
727730 if (janet_channel_push (channel , argv [1 ], 0 )) {
@@ -730,7 +733,10 @@ static Janet cfun_channel_push(int32_t argc, Janet *argv) {
730733 return argv [0 ];
731734}
732735
733- static Janet cfun_channel_pop (int32_t argc , Janet * argv ) {
736+ JANET_CORE_FN (cfun_channel_pop ,
737+ "(ev/take channel)" ,
738+ "Read from a channel, suspending the current fiber if no value is available."
739+ ) {
734740 janet_fixarity (argc , 1 );
735741 JanetChannel * channel = janet_getabstract (argv , 0 , & ChannelAT );
736742 Janet item ;
@@ -740,7 +746,13 @@ static Janet cfun_channel_pop(int32_t argc, Janet *argv) {
740746 janet_await ();
741747}
742748
743- static Janet cfun_channel_choice (int32_t argc , Janet * argv ) {
749+ JANET_CORE_FN (cfun_channel_choice ,
750+ "(ev/select & clauses)" ,
751+ "Block until the first of several channel operations occur. Returns a tuple of the form [:give chan] or [:take chan x], where "
752+ "a :give tuple is the result of a write and :take tuple is the result of a write. Each clause must be either a channel (for "
753+ "a channel take operation) or a tuple [channel x] for a channel give operation. Operations are tried in order, such that the first "
754+ "clauses will take precedence over later clauses."
755+ ) {
744756 janet_arity (argc , 1 , -1 );
745757 int32_t len ;
746758 const Janet * data ;
@@ -782,19 +794,28 @@ static Janet cfun_channel_choice(int32_t argc, Janet *argv) {
782794 janet_await ();
783795}
784796
785- static Janet cfun_channel_full (int32_t argc , Janet * argv ) {
797+ JANET_CORE_FN (cfun_channel_full ,
798+ "(ev/full channel)" ,
799+ "Check if a channel is full or not."
800+ ) {
786801 janet_fixarity (argc , 1 );
787802 JanetChannel * channel = janet_getabstract (argv , 0 , & ChannelAT );
788803 return janet_wrap_boolean (janet_q_count (& channel -> items ) >= channel -> limit );
789804}
790805
791- static Janet cfun_channel_capacity (int32_t argc , Janet * argv ) {
806+ JANET_CORE_FN (cfun_channel_capacity ,
807+ "(ev/capacity channel)" ,
808+ "Get the number of items a channel will store before blocking writers."
809+ ) {
792810 janet_fixarity (argc , 1 );
793811 JanetChannel * channel = janet_getabstract (argv , 0 , & ChannelAT );
794812 return janet_wrap_integer (channel -> limit );
795813}
796814
797- static Janet cfun_channel_count (int32_t argc , Janet * argv ) {
815+ JANET_CORE_FN (cfun_channel_count ,
816+ "(ev/count channel)" ,
817+ "Get the number of items currently waiting in a channel."
818+ ) {
798819 janet_fixarity (argc , 1 );
799820 JanetChannel * channel = janet_getabstract (argv , 0 , & ChannelAT );
800821 return janet_wrap_integer (janet_q_count (& channel -> items ));
@@ -810,12 +831,19 @@ static void fisher_yates_args(int32_t argc, Janet *argv) {
810831 }
811832}
812833
813- static Janet cfun_channel_rchoice (int32_t argc , Janet * argv ) {
834+ JANET_CORE_FN (cfun_channel_rchoice ,
835+ "(ev/rselect & clauses)" ,
836+ "Similar to ev/select, but will try clauses in a random order for fairness."
837+ ) {
814838 fisher_yates_args (argc , argv );
815839 return cfun_channel_choice (argc , argv );
816840}
817841
818- static Janet cfun_channel_new (int32_t argc , Janet * argv ) {
842+ JANET_CORE_FN (cfun_channel_new ,
843+ "(ev/chan &opt capacity)" ,
844+ "Create a new channel. capacity is the number of values to queue before "
845+ "blocking writers, defaults to 0 if not provided. Returns a new channel."
846+ ) {
819847 janet_arity (argc , 0 , 1 );
820848 int32_t limit = janet_optnat (argv , argc , 0 , 0 );
821849 JanetChannel * channel = janet_abstract (& ChannelAT , sizeof (JanetChannel ));
@@ -2116,7 +2144,14 @@ int janet_make_pipe(JanetHandle handles[2], int mode) {
21162144
21172145/* C functions */
21182146
2119- static Janet cfun_ev_go (int32_t argc , Janet * argv ) {
2147+ JANET_CORE_FN (cfun_ev_go ,
2148+ "(ev/go fiber &opt value supervisor)" ,
2149+ "Put a fiber on the event loop to be resumed later. Optionally pass "
2150+ "a value to resume with, otherwise resumes with nil. Returns the fiber. "
2151+ "An optional `core/channel` can be provided as well as a supervisor. When various "
2152+ "events occur in the newly scheduled fiber, an event will be pushed to the supervisor. "
2153+ "If not provided, the new fiber will inherit the current supervisor."
2154+ ) {
21202155 janet_arity (argc , 1 , 3 );
21212156 JanetFiber * fiber = janet_getfiber (argv , 0 );
21222157 Janet value = argc >= 2 ? argv [1 ] : janet_wrap_nil ();
@@ -2168,7 +2203,14 @@ static JanetEVGenericMessage janet_go_thread_subr(JanetEVGenericMessage args) {
21682203 return args ;
21692204}
21702205
2171- static Janet cfun_ev_thread (int32_t argc , Janet * argv ) {
2206+ JANET_CORE_FN (cfun_ev_thread ,
2207+ "(ev/thread fiber &opt value flags)" ,
2208+ "Resume a (copy of a) `fiber` in a new operating system thread, optionally passing `value` "
2209+ "to resume with. "
2210+ "Unlike `ev/go`, this function will suspend the current fiber until the thread is complete. "
2211+ "If you want to run the thread without waiting for a result, pass the `:n` flag to return nil immediately. "
2212+ "Otherwise, returns (a copy of) the final result from the fiber on the new thread."
2213+ ) {
21722214 janet_arity (argc , 1 , 3 );
21732215 janet_getfiber (argv , 0 );
21742216 Janet value = argc >= 2 ? argv [1 ] : janet_wrap_nil ();
@@ -2200,7 +2242,12 @@ static Janet cfun_ev_thread(int32_t argc, Janet *argv) {
22002242 }
22012243}
22022244
2203- static Janet cfun_ev_give_supervisor (int32_t argc , Janet * argv ) {
2245+ JANET_CORE_FN (cfun_ev_give_supervisor ,
2246+ "(ev/give-supervisor tag & payload)" ,
2247+ "Send a message to the current supervior channel if there is one. The message will be a "
2248+ "tuple of all of the arguments combined into a single message, where the first element is tag. "
2249+ "By convention, tag should be a keyword indicating the type of message. Returns nil."
2250+ ) {
22042251 janet_arity (argc , 1 , -1 );
22052252 JanetChannel * chan = janet_vm .root_fiber -> supervisor_channel ;
22062253 if (NULL != chan ) {
@@ -2222,13 +2269,22 @@ JANET_NO_RETURN void janet_sleep_await(double sec) {
22222269 janet_await ();
22232270}
22242271
2225- static Janet cfun_ev_sleep (int32_t argc , Janet * argv ) {
2272+ JANET_CORE_FN (cfun_ev_sleep ,
2273+ "(ev/sleep sec)" ,
2274+ "Suspend the current fiber for sec seconds without blocking the event loop."
2275+ ) {
22262276 janet_fixarity (argc , 1 );
22272277 double sec = janet_getnumber (argv , 0 );
22282278 janet_sleep_await (sec );
22292279}
22302280
2231- static Janet cfun_ev_deadline (int32_t argc , Janet * argv ) {
2281+ JANET_CORE_FN (cfun_ev_deadline ,
2282+ "(ev/deadline sec &opt tocancel tocheck)" ,
2283+ "Set a deadline for a fiber `tocheck`. If `tocheck` is not finished after `sec` seconds, "
2284+ "`tocancel` will be canceled as with `ev/cancel`. "
2285+ "If `tocancel` and `tocheck` are not given, they default to `(fiber/root)` and "
2286+ "`(fiber/current)` respectively. Returns `tocancel`."
2287+ ) {
22322288 janet_arity (argc , 1 , 3 );
22332289 double sec = janet_getnumber (argv , 0 );
22342290 JanetFiber * tocancel = janet_optfiber (argv , argc , 1 , janet_vm .root_fiber );
@@ -2243,22 +2299,36 @@ static Janet cfun_ev_deadline(int32_t argc, Janet *argv) {
22432299 return janet_wrap_fiber (tocancel );
22442300}
22452301
2246- static Janet cfun_ev_cancel (int32_t argc , Janet * argv ) {
2302+ JANET_CORE_FN (cfun_ev_cancel ,
2303+ "(ev/cancel fiber err)" ,
2304+ "Cancel a suspended fiber in the event loop. Differs from cancel in that it returns the canceled fiber immediately"
2305+ ) {
22472306 janet_fixarity (argc , 2 );
22482307 JanetFiber * fiber = janet_getfiber (argv , 0 );
22492308 Janet err = argv [1 ];
22502309 janet_cancel (fiber , err );
22512310 return argv [0 ];
22522311}
22532312
2254- Janet janet_cfun_stream_close (int32_t argc , Janet * argv ) {
2313+ JANET_CORE_FN (janet_cfun_stream_close ,
2314+ "(ev/close stream)" ,
2315+ "Close a stream. This should be the same as calling (:close stream) for all streams."
2316+ ) {
22552317 janet_fixarity (argc , 1 );
22562318 JanetStream * stream = janet_getabstract (argv , 0 , & janet_stream_type );
22572319 janet_stream_close (stream );
22582320 return argv [0 ];
22592321}
22602322
2261- Janet janet_cfun_stream_read (int32_t argc , Janet * argv ) {
2323+ JANET_CORE_FN (janet_cfun_stream_read ,
2324+ "(ev/read stream n &opt buffer timeout)" ,
2325+ "Read up to n bytes into a buffer asynchronously from a stream. `n` can also be the keyword "
2326+ "`:all` to read into the buffer until end of stream. "
2327+ "Optionally provide a buffer to write into "
2328+ "as well as a timeout in seconds after which to cancel the operation and raise an error. "
2329+ "Returns the buffer if the read was successful or nil if end-of-stream reached. Will raise an "
2330+ "error if there are problems with the IO operation."
2331+ ) {
22622332 janet_arity (argc , 2 , 4 );
22632333 JanetStream * stream = janet_getabstract (argv , 0 , & janet_stream_type );
22642334 janet_stream_flags (stream , JANET_STREAM_READABLE );
@@ -2275,7 +2345,11 @@ Janet janet_cfun_stream_read(int32_t argc, Janet *argv) {
22752345 janet_await ();
22762346}
22772347
2278- Janet janet_cfun_stream_chunk (int32_t argc , Janet * argv ) {
2348+ JANET_CORE_FN (janet_cfun_stream_chunk ,
2349+ "(ev/chunk stream n &opt buffer timeout)" ,
2350+ "Same as ev/read, but will not return early if less than n bytes are available. If an end of "
2351+ "stream is reached, will also return early with the collected bytes."
2352+ ) {
22792353 janet_arity (argc , 2 , 4 );
22802354 JanetStream * stream = janet_getabstract (argv , 0 , & janet_stream_type );
22812355 janet_stream_flags (stream , JANET_STREAM_READABLE );
@@ -2287,7 +2361,11 @@ Janet janet_cfun_stream_chunk(int32_t argc, Janet *argv) {
22872361 janet_await ();
22882362}
22892363
2290- Janet janet_cfun_stream_write (int32_t argc , Janet * argv ) {
2364+ JANET_CORE_FN (janet_cfun_stream_write ,
2365+ "(ev/write stream data &opt timeout)" ,
2366+ "Write data to a stream, suspending the current fiber until the write "
2367+ "completes. Takes an optional timeout in seconds, after which will return nil. "
2368+ "Returns nil, or raises an error if the write failed." ) {
22912369 janet_arity (argc , 2 , 3 );
22922370 JanetStream * stream = janet_getabstract (argv , 0 , & janet_stream_type );
22932371 janet_stream_flags (stream , JANET_STREAM_WRITABLE );
@@ -2303,127 +2381,30 @@ Janet janet_cfun_stream_write(int32_t argc, Janet *argv) {
23032381 janet_await ();
23042382}
23052383
2306- static const JanetReg ev_cfuns [] = {
2307- {
2308- "ev/go" , cfun_ev_go ,
2309- JDOC ("(ev/go fiber &opt value supervisor)\n\n"
2310- "Put a fiber on the event loop to be resumed later. Optionally pass "
2311- "a value to resume with, otherwise resumes with nil. Returns the fiber. "
2312- "An optional `core/channel` can be provided as well as a supervisor. When various "
2313- "events occur in the newly scheduled fiber, an event will be pushed to the supervisor. "
2314- "If not provided, the new fiber will inherit the current supervisor." )
2315- },
2316- {
2317- "ev/thread" , cfun_ev_thread ,
2318- JDOC ("(ev/thread fiber &opt value flags)\n\n"
2319- "Resume a (copy of a) `fiber` in a new operating system thread, optionally passing `value` "
2320- "to resume with. "
2321- "Unlike `ev/go`, this function will suspend the current fiber until the thread is complete. "
2322- "If you want to run the thread without waiting for a result, pass the `:n` flag to return nil immediately. "
2323- "Otherwise, returns (a copy of) the final result from the fiber on the new thread." )
2324- },
2325- {
2326- "ev/give-supervisor" , cfun_ev_give_supervisor ,
2327- JDOC ("(ev/give-supervsior tag & payload)\n\n"
2328- "Send a message to the current supervior channel if there is one. The message will be a "
2329- "tuple of all of the arguments combined into a single message, where the first element is tag. "
2330- "By convention, tag should be a keyword indicating the type of message. Returns nil." )
2331- },
2332- {
2333- "ev/sleep" , cfun_ev_sleep ,
2334- JDOC ("(ev/sleep sec)\n\n"
2335- "Suspend the current fiber for sec seconds without blocking the event loop." )
2336- },
2337- {
2338- "ev/deadline" , cfun_ev_deadline ,
2339- JDOC ("(ev/deadline sec &opt tocancel tocheck)\n\n"
2340- "Set a deadline for a fiber `tocheck`. If `tocheck` is not finished after `sec` seconds, "
2341- "`tocancel` will be canceled as with `ev/cancel`. "
2342- "If `tocancel` and `tocheck` are not given, they default to `(fiber/root)` and "
2343- "`(fiber/current)` respectively. Returns `tocancel`." )
2344- },
2345- {
2346- "ev/chan" , cfun_channel_new ,
2347- JDOC ("(ev/chan &opt capacity)\n\n"
2348- "Create a new channel. capacity is the number of values to queue before "
2349- "blocking writers, defaults to 0 if not provided. Returns a new channel." )
2350- },
2351- {
2352- "ev/give" , cfun_channel_push ,
2353- JDOC ("(ev/give channel value)\n\n"
2354- "Write a value to a channel, suspending the current fiber if the channel is full." )
2355- },
2356- {
2357- "ev/take" , cfun_channel_pop ,
2358- JDOC ("(ev/take channel)\n\n"
2359- "Read from a channel, suspending the current fiber if no value is available." )
2360- },
2361- {
2362- "ev/full" , cfun_channel_full ,
2363- JDOC ("(ev/full channel)\n\n"
2364- "Check if a channel is full or not." )
2365- },
2366- {
2367- "ev/capacity" , cfun_channel_capacity ,
2368- JDOC ("(ev/capacity channel)\n\n"
2369- "Get the number of items a channel will store before blocking writers." )
2370- },
2371- {
2372- "ev/count" , cfun_channel_count ,
2373- JDOC ("(ev/count channel)\n\n"
2374- "Get the number of items currently waiting in a channel." )
2375- },
2376- {
2377- "ev/cancel" , cfun_ev_cancel ,
2378- JDOC ("(ev/cancel fiber err)\n\n"
2379- "Cancel a suspended fiber in the event loop. Differs from cancel in that it returns the canceled fiber immediately" )
2380- },
2381- {
2382- "ev/select" , cfun_channel_choice ,
2383- JDOC ("(ev/select & clauses)\n\n"
2384- "Block until the first of several channel operations occur. Returns a tuple of the form [:give chan] or [:take chan x], where "
2385- "a :give tuple is the result of a write and :take tuple is the result of a write. Each clause must be either a channel (for "
2386- "a channel take operation) or a tuple [channel x] for a channel give operation. Operations are tried in order, such that the first "
2387- "clauses will take precedence over later clauses." )
2388- },
2389- {
2390- "ev/rselect" , cfun_channel_rchoice ,
2391- JDOC ("(ev/rselect & clauses)\n\n"
2392- "Similar to ev/select, but will try clauses in a random order for fairness." )
2393- },
2394- {
2395- "ev/close" , janet_cfun_stream_close ,
2396- JDOC ("(ev/close stream)\n\n"
2397- "Close a stream. This should be the same as calling (:close stream) for all streams." )
2398- },
2399- {
2400- "ev/read" , janet_cfun_stream_read ,
2401- JDOC ("(ev/read stream n &opt buffer timeout)\n\n"
2402- "Read up to n bytes into a buffer asynchronously from a stream. `n` can also be the keyword "
2403- "`:all` to read into the buffer until end of stream. "
2404- "Optionally provide a buffer to write into "
2405- "as well as a timeout in seconds after which to cancel the operation and raise an error. "
2406- "Returns the buffer if the read was successful or nil if end-of-stream reached. Will raise an "
2407- "error if there are problems with the IO operation." )
2408- },
2409- {
2410- "ev/chunk" , janet_cfun_stream_chunk ,
2411- JDOC ("(ev/chunk stream n &opt buffer timeout)\n\n"
2412- "Same as ev/read, but will not return early if less than n bytes are available. If an end of "
2413- "stream is reached, will also return early with the collected bytes." )
2414- },
2415- {
2416- "ev/write" , janet_cfun_stream_write ,
2417- JDOC ("(ev/write stream data &opt timeout)\n\n"
2418- "Write data to a stream, suspending the current fiber until the write "
2419- "completes. Takes an optional timeout in seconds, after which will return nil. "
2420- "Returns nil, or raises an error if the write failed." )
2421- },
2422- {NULL , NULL , NULL }
2423- };
2424-
24252384void janet_lib_ev (JanetTable * env ) {
2426- janet_core_cfuns (env , NULL , ev_cfuns );
2385+ JanetRegExt ev_cfuns_ext [] = {
2386+ JANET_CORE_REG ("ev/give" , cfun_channel_push ),
2387+ JANET_CORE_REG ("ev/take" , cfun_channel_pop ),
2388+ JANET_CORE_REG ("ev/full" , cfun_channel_full ),
2389+ JANET_CORE_REG ("ev/capacity" , cfun_channel_capacity ),
2390+ JANET_CORE_REG ("ev/count" , cfun_channel_count ),
2391+ JANET_CORE_REG ("ev/select" , cfun_channel_choice ),
2392+ JANET_CORE_REG ("ev/rselect" , cfun_channel_rchoice ),
2393+ JANET_CORE_REG ("ev/chan" , cfun_channel_new ),
2394+ JANET_CORE_REG ("ev/go" , cfun_ev_go ),
2395+ JANET_CORE_REG ("ev/thread" , cfun_ev_thread ),
2396+ JANET_CORE_REG ("ev/give-supervisor" , cfun_ev_give_supervisor ),
2397+ JANET_CORE_REG ("ev/sleep" , cfun_ev_sleep ),
2398+ JANET_CORE_REG ("ev/deadline" , cfun_ev_deadline ),
2399+ JANET_CORE_REG ("ev/cancel" , cfun_ev_cancel ),
2400+ JANET_CORE_REG ("ev/close" , janet_cfun_stream_close ),
2401+ JANET_CORE_REG ("ev/read" , janet_cfun_stream_read ),
2402+ JANET_CORE_REG ("ev/chunk" , janet_cfun_stream_chunk ),
2403+ JANET_CORE_REG ("ev/write" , janet_cfun_stream_write ),
2404+ JANET_REG_END
2405+ };
2406+
2407+ janet_core_cfuns_ext (env , NULL , ev_cfuns_ext );
24272408 janet_register_abstract_type (& janet_stream_type );
24282409}
24292410
0 commit comments