@@ -81,17 +81,10 @@ class AsyncChannel(AsyncIterable[T]):
8181 """
8282
8383 def __init__ (
84- self ,
85- source : Union [Iterable [T ], AsyncIterable [T ]] = tuple (),
86- * ,
87- buffer_limit : int = 0 ,
88- close : bool = False ,
84+ self , * , buffer_limit : int = 0 , close : bool = False ,
8985 ):
9086 self ._queue : asyncio .Queue [Union [T , object ]] = asyncio .Queue (buffer_limit )
9187 self ._closed = False
92- self ._sending_task = (
93- asyncio .ensure_future (self .send_from (source , close )) if source else None
94- )
9588 self ._waiting_recievers : int = 0
9689 # Track whether flush has been invoked so it can only happen once
9790 self ._flushed = False
@@ -132,7 +125,7 @@ def done(self) -> bool:
132125
133126 async def send_from (
134127 self , source : Union [Iterable [T ], AsyncIterable [T ]], close : bool = False
135- ):
128+ ) -> "AsyncChannel[T]" :
136129 """
137130 Iterates the given [Async]Iterable and sends all the resulting items.
138131 If close is set to True then subsequent send calls will be rejected with a
@@ -153,15 +146,17 @@ async def send_from(
153146 if close :
154147 # Complete the closing process
155148 self .close ()
149+ return self
156150
157- async def send (self , item : T ):
151+ async def send (self , item : T ) -> "AsyncChannel[T]" :
158152 """
159153 Send a single item over this channel.
160154 :param item: The item to send
161155 """
162156 if self ._closed :
163157 raise ChannelClosed ("Cannot send through a closed channel" )
164158 await self ._queue .put (item )
159+ return self
165160
166161 async def recieve (self ) -> Optional [T ]:
167162 """
@@ -185,8 +180,6 @@ def close(self):
185180 """
186181 Close this channel to new items
187182 """
188- if self ._sending_task is not None :
189- self ._sending_task .cancel ()
190183 self ._closed = True
191184 asyncio .ensure_future (self ._flush_queue ())
192185
0 commit comments