@@ -1422,48 +1422,64 @@ using blocking or completion-based I/O, no buffering is necessary. This
14221422buffering is analogous to the buffering performed in kernel memory by a
14231423` pipe() ` .
14241424
1425- Given the above, we can define the ` {Readable,Writable}StreamEnd ` classes that
1426- are actually stored in the component instance table. The classes are almost
1427- entirely symmetric, with the only difference being whether the polymorphic
1428- ` copy ` method (used below) calls ` read ` or ` write ` :
1425+ The two ends of a stream are stored as separate elements in the component
1426+ instance's table and each end has a separate ` CopyState ` that reflects what
1427+ * that end * is currently doing or has done. This ` state ` field is factored
1428+ out into the ` CopyEnd ` class that is derived below :
14291429``` python
1430- class StreamEnd ( Waitable ):
1431- shared: ReadableStream | WritableStream
1432- copying: bool
1433- done: bool
1430+ class CopyState ( Enum ):
1431+ IDLE = 1
1432+ COPYING = 2
1433+ DONE = 3
14341434
1435- def __init__ (self , shared ):
1435+ class CopyEnd (Waitable ):
1436+ state: CopyState
1437+
1438+ def __init__ (self ):
14361439 Waitable.__init__ (self )
1437- self .shared = shared
1438- self .copying = False
1439- self .done = False
1440+ self .state = CopyState.IDLE
14401441
14411442 def drop (self ):
1442- trap_if(self .copying)
1443- self .shared.drop()
1443+ trap_if(self .state == CopyState.COPYING )
14441444 Waitable.drop(self )
1445+ ```
1446+ As shown in ` drop ` , attempting to drop a readable or writable end while a copy
1447+ is in progress traps. This means that client code must take care to wait for
1448+ these operations to finish (potentially cancelling them via
1449+ ` stream.cancel-{read,write} ` ) before dropping.
1450+
1451+ Given the above, we can define the concrete ` {Readable,Writable}StreamEnd `
1452+ classes which are almost entirely symmetric, with the only difference being
1453+ whether the polymorphic ` copy ` method (used below) calls ` read ` or ` write ` :
1454+ ``` python
1455+ class ReadableStreamEnd (CopyEnd ):
1456+ shared: ReadableStream
1457+
1458+ def __init__ (self , shared ):
1459+ CopyEnd.__init__ (self )
1460+ self .shared = shared
14451461
1446- class ReadableStreamEnd (StreamEnd ):
14471462 def copy (self , inst , dst , on_copy , on_copy_done ):
14481463 self .shared.read(inst, dst, on_copy, on_copy_done)
14491464
1450- class WritableStreamEnd (StreamEnd ):
1465+ def drop (self ):
1466+ self .shared.drop()
1467+ CopyEnd.drop(self )
1468+
1469+ class WritableStreamEnd (CopyEnd ):
1470+ shared: WritableStream
1471+
1472+ def __init__ (self , shared ):
1473+ CopyEnd.__init__ (self )
1474+ self .shared = shared
1475+
14511476 def copy (self , inst , src , on_copy , on_copy_done ):
14521477 self .shared.write(inst, src, on_copy, on_copy_done)
1453- ```
1454- The ` copying ` field tracks whether there is an asynchronous read or write in
1455- progress and is maintained by the definitions of ` stream.{read,write} ` below.
1456- The ` done ` field tracks whether this end has been notified that the other end
1457- was dropped (via ` CopyResult.DROPPED ` ) and thus no further read/write
1458- operations are allowed. Importantly, ` copying ` and ` done ` are per-* end* , not
1459- per-* stream* (unlike the fields of ` SharedStreamImpl ` shown above, which are
1460- per-stream and shared by both ends via their ` shared ` field).
14611478
1462- Dropping a stream end while an asynchronous read or write is in progress traps
1463- since the async read or write cannot be cancelled without blocking and ` drop `
1464- (called by ` stream.drop-{readable,writable} ` ) is synchronous and non-blocking.
1465- This means that client code must take care to wait for these operations to
1466- finish before dropping.
1479+ def drop (self ):
1480+ self .shared.drop()
1481+ CopyEnd.drop(self )
1482+ ```
14671483
14681484
14691485#### Future State
@@ -1569,39 +1585,34 @@ Lastly, the `{Readable,Writable}FutureEnd` classes are mostly symmetric with
15691585` WritableFutureEnd.drop ` traps if the writer hasn't successfully written a
15701586value or been notified of the reader dropping their end:
15711587``` python
1572- class FutureEnd (Waitable ):
1573- shared: ReadableFuture| WritableFuture
1574- copying: bool
1575- done: bool
1588+ class ReadableFutureEnd (CopyEnd ):
1589+ shared: ReadableFuture
15761590
15771591 def __init__ (self , shared ):
1578- Waitable .__init__ (self )
1592+ CopyEnd .__init__ (self )
15791593 self .shared = shared
1580- self .copying = False
1581- self .done = False
1582-
1583- def drop (self ):
1584- trap_if(self .copying)
1585- Waitable.drop(self )
15861594
1587- class ReadableFutureEnd (FutureEnd ):
15881595 def copy (self , inst , src_buffer , on_copy_done ):
15891596 self .shared.read(inst, src_buffer, on_copy_done)
15901597
15911598 def drop (self ):
15921599 self .shared.drop()
1593- FutureEnd.drop(self )
1600+ CopyEnd.drop(self )
1601+
1602+ class WritableFutureEnd (CopyEnd ):
1603+ shared: WritableFuture
1604+
1605+ def __init__ (self , shared ):
1606+ CopyEnd.__init__ (self )
1607+ self .shared = shared
15941608
1595- class WritableFutureEnd (FutureEnd ):
15961609 def copy (self , inst , dst_buffer , on_copy_done ):
15971610 self .shared.write(inst, dst_buffer, on_copy_done)
15981611
15991612 def drop (self ):
1600- trap_if(not self .done )
1601- FutureEnd .drop(self )
1613+ trap_if(self .state != CopyState. DONE )
1614+ CopyEnd .drop(self )
16021615```
1603- The ` copying ` and ` done ` fields are maintained by the ` future ` built-ins
1604- defined below.
16051616
16061617
16071618### Despecialization
@@ -2066,8 +2077,8 @@ transitively-borrowed handle.
20662077
20672078Streams and futures are entirely symmetric, transferring ownership of the
20682079readable end from the lifting component to the host or lowering component and
2069- trapping if the readable end is in the middle of ` copying ` (which would create
2070- a dangling-pointer situation) or is already ` done ` (in which case the only
2080+ trapping if the readable end is in the middle of copying (which would create
2081+ a dangling-pointer situation) or is in the ` DONE ` state (in which case the only
20712082valid operation is ` {stream,future}.drop-{readable,writable} ` ).
20722083``` python
20732084def lift_stream (cx , i , t ):
@@ -2081,8 +2092,7 @@ def lift_async_value(ReadableEndT, cx, i, t):
20812092 e = cx.inst.table.remove(i)
20822093 trap_if(not isinstance (e, ReadableEndT))
20832094 trap_if(e.shared.t != t)
2084- trap_if(e.copying)
2085- trap_if(e.done)
2095+ trap_if(e.state != CopyState.IDLE )
20862096 return e.shared
20872097```
20882098
@@ -3953,16 +3963,16 @@ def canon_stream_write(stream_t, opts, task, i, ptr, n):
39533963```
39543964
39553965Introducing the ` stream_copy ` function in chunks, ` stream_copy ` first checks
3956- that the element at index ` i ` is of the right type, not already ` copying ` , and
3957- not already ` done ` (as defined next) . (In the future, the ` copying ` trap could
3958- be relaxed, allowing a finite number of pipelined reads or writes.)
3966+ that the element at index ` i ` is of the right type and allowed to start a new
3967+ copy . (In the future, the "trap if not ` IDLE ` " condition could be relaxed to
3968+ allow multiple pipelined reads or writes.)
39593969``` python
39603970def stream_copy (EndT , BufferT , event_code , stream_t , opts , task , i , ptr , n ):
39613971 trap_if(not task.inst.may_leave)
39623972 e = task.inst.table.get(i)
39633973 trap_if(not isinstance (e, EndT))
39643974 trap_if(e.shared.t != stream_t.t)
3965- trap_if(e.copying or e.done )
3975+ trap_if(e.state != CopyState. IDLE )
39663976```
39673977Then a readable or writable buffer is created which (in ` Buffer ` 's constructor)
39683978eagerly checks the alignment and bounds of (` i ` , ` n ` ). (In the future, the
@@ -3982,18 +3992,19 @@ event is delivered to core wasm. `stream_event` first calls `reclaim_buffer` to
39823992regain ownership of ` buffer ` and prevent any further partial reads/writes.
39833993Thus, up until event delivery, the other end of the stream is free to
39843994repeatedly read/write from/to ` buffer ` , ideally filling it up and minimizing
3985- context switches. Next, ` copying ` is cleared to reenable future
3986- ` stream.{read,write} ` calls. However, if the ` CopyResult ` is ` DROPPED ` , ` done `
3987- is set to disallow all future use of this stream end. Lastly, ` stream_event `
3988- packs the ` CopyResult ` and number of elements copied up until this point into a
3989- single ` i32 ` payload for core wasm.
3995+ context switches. Next, the stream's ` state ` is updated based on the result
3996+ being delivered to core wasm so that, once a stream end has been notified that
3997+ the other end dropped, calling anything other than ` stream.drop-* ` traps.
3998+ Lastly, ` stream_event ` packs the ` CopyResult ` and number of elements copied up
3999+ until this point into a single ` i32 ` payload for core wasm.
39904000``` python
39914001 def stream_event (result , reclaim_buffer ):
39924002 reclaim_buffer()
3993- assert (e.copying)
3994- e.copying = False
4003+ assert (e.state == CopyState.COPYING )
39954004 if result == CopyResult.DROPPED :
3996- e.done = True
4005+ e.state = CopyState.DONE
4006+ else :
4007+ e.state = CopyState.IDLE
39974008 assert (0 <= result < 2 ** 4 )
39984009 assert (buffer.progress <= Buffer.MAX_LENGTH < 2 ** 28 )
39994010 packed_result = result | (buffer.progress << 4 )
@@ -4005,7 +4016,7 @@ single `i32` payload for core wasm.
40054016 def on_copy_done (result ):
40064017 e.set_pending_event(partial(stream_event, result, reclaim_buffer = lambda :()))
40074018
4008- e.copying = True
4019+ e.state = CopyState. COPYING
40094020 e.copy(task.inst, buffer, on_copy, on_copy_done)
40104021```
40114022
@@ -4063,7 +4074,7 @@ def future_copy(EndT, BufferT, event_code, future_t, opts, task, i, ptr):
40634074 e = task.inst.table.get(i)
40644075 trap_if(not isinstance (e, EndT))
40654076 trap_if(e.shared.t != future_t.t)
4066- trap_if(e.copying or e.done )
4077+ trap_if(e.state != CopyState. IDLE )
40674078
40684079 assert (not contains_borrow(future_t))
40694080 cx = LiftLowerContext(opts, task.inst, borrow_scope = None )
@@ -4072,27 +4083,28 @@ def future_copy(EndT, BufferT, event_code, future_t, opts, task, i, ptr):
40724083Next, the ` copy ` method of ` {Readable,Writable}FutureEnd.copy ` is called to
40734084perform the actual read/write. Other than the simplifications allowed by the
40744085absence of repeated partial copies, the main difference in the following code
4075- from the stream code is that ` future_event ` sets the ` done ` flag for * both * the
4076- ` DROPPED ` and ` COMPLETED ` results, whereas ` stream_event ` sets ` done ` only for
4077- ` DROPPED ` . This ensures that futures are read/written at most once and futures
4078- are only passed to other components in a state where they are ready to be
4079- read/written. Another important difference is that, since the buffer length is
4080- always implied by the ` CopyResult ` , the number of elements copied is not packed
4081- in the high 28 bits; they're always zero.
4086+ from the stream code is that ` future_event ` transitions the end to the ` DONE `
4087+ state (in which the only valid operation is to call ` future.drop-* ` ) on
4088+ * either * the ` DROPPED ` and ` COMPLETED ` results . This ensures that futures are
4089+ read/written at most once and futures are only passed to other components in a
4090+ state where they are ready to be read/written. Another important difference is
4091+ that, since the buffer length is always implied by the ` CopyResult ` , the number
4092+ of elements copied is not packed in the high 28 bits; they're always zero.
40824093``` python
40834094 def future_event (result ):
40844095 assert ((buffer.remain() == 0 ) == (result == CopyResult.COMPLETED ))
4085- assert (e.copying)
4086- e.copying = False
4096+ assert (e.state == CopyState.COPYING )
40874097 if result == CopyResult.DROPPED or result == CopyResult.COMPLETED :
4088- e.done = True
4098+ e.state = CopyState.DONE
4099+ else :
4100+ e.state = CopyState.IDLE
40894101 return (event_code, i, result)
40904102
40914103 def on_copy_done (result ):
40924104 assert (result != CopyResult.DROPPED or event_code == EventCode.FUTURE_WRITE )
40934105 e.set_pending_event(partial(future_event, result))
40944106
4095- e.copying = True
4107+ e.state = CopyState. COPYING
40964108 e.copy(task.inst, buffer, on_copy_done)
40974109```
40984110
@@ -4144,7 +4156,7 @@ def cancel_copy(EndT, event_code, stream_or_future_t, sync, task, i):
41444156 e = task.inst.table.get(i)
41454157 trap_if(not isinstance (e, EndT))
41464158 trap_if(e.shared.t != stream_or_future_t.t)
4147- trap_if(not e.copying )
4159+ trap_if(e.state != CopyState. COPYING )
41484160 if not e.has_pending_event():
41494161 e.shared.cancel()
41504162 if not e.has_pending_event():
@@ -4153,7 +4165,7 @@ def cancel_copy(EndT, event_code, stream_or_future_t, sync, task, i):
41534165 else :
41544166 return [BLOCKED ]
41554167 code,index,payload = e.get_pending_event()
4156- assert (not e.copying and code == event_code and index == i)
4168+ assert (e.state != CopyState. COPYING and code == event_code and index == i)
41574169 return [payload]
41584170```
41594171The * first* check for ` e.has_pending_event() ` catches the case where the copy has
0 commit comments