@@ -36,21 +36,31 @@ interface ChannelInterface
3636 *
3737 * This function is used to send a message to the channel.
3838 */
39- public function send ($ message ): Generator ;
39+ public function sendGen ($ message ): Generator ;
4040
4141 /**
42+ * @param mixed $message
43+ * @return void
44+ *
45+ * This function is used to send a message to the channel.
46+ */
47+ public function send ($ message ): void ;
48+
49+ /**
50+ * @param callable $callback
4251 * @return Generator
4352 *
4453 * This function is used to receive a message from the channel.
4554 */
46- public function receiveGen (): Generator ;
55+ public function receiveGen (callable $ callback ): Generator ;
4756
4857 /**
49- * @return mixed
58+ * @param callable $callback
59+ * @return void
5060 *
5161 * This function is used to receive a message from the channel.
5262 */
53- public function receive (): mixed ;
63+ public function receive (callable $ callback ): void ;
5464
5565 /**
5666 * @return bool
@@ -87,7 +97,7 @@ final class Channel implements ChannelInterface
8797
8898 private bool $ closed = false ;
8999
90- public function send ($ message ): Generator
100+ public function sendGen ($ message ): Generator
91101 {
92102 $ this ->exceptionIfClosed ();
93103 while ($ this ->locked ) yield ;
@@ -96,29 +106,33 @@ public function send($message): Generator
96106 $ this ->locked = false ;
97107 }
98108
99- public function receiveGen ( ): Generator
109+ public function send ( $ message ): void
100110 {
101111 $ this ->exceptionIfClosed ();
102112 while ($ this ->locked ) {
103113 CoroutineGen::run ();
104- yield ;
105114 }
106115 $ this ->locked = true ;
107- $ message = array_shift ( $ this ->queue ) ;
116+ $ this ->queue [] = $ message ;
108117 $ this ->locked = false ;
109- return yield $ message ;
110118 }
111119
112- public function receive ( ): mixed
120+ public function receiveGen ( callable $ callback ): Generator
113121 {
114- $ this ->exceptionIfClosed ();
115- while ($ this ->locked ) {
122+ while (!$ this ->closed || !empty ($ this ->queue )) {
123+ $ message = array_shift ($ this ->queue );
124+ if ($ message !== null ) $ callback ($ message );
125+ yield ;
126+ }
127+ }
128+
129+ public function receive (callable $ callback ): void
130+ {
131+ while (!$ this ->closed || !empty ($ this ->queue )) {
132+ $ message = array_shift ($ this ->queue );
133+ if ($ message !== null ) $ callback ($ message );
116134 CoroutineGen::run ();
117135 }
118- $ this ->locked = true ;
119- $ message = array_shift ($ this ->queue );
120- $ this ->locked = false ;
121- return $ message ;
122136 }
123137
124138 public function isEmpty (): bool
0 commit comments