Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 20 additions & 10 deletions design/mvp/CanonicalABI.md
Original file line number Diff line number Diff line change
Expand Up @@ -1450,7 +1450,7 @@ remains blocked:
self.set_pending(inst, dst_buffer, on_copy, on_copy_done)
else:
assert(self.t == dst_buffer.t == self.pending_buffer.t)
trap_if(inst is self.pending_inst and self.t is not None) # temporary
trap_if(inst is self.pending_inst and not none_or_number_type(self.t)) # temporary
if self.pending_buffer.remain() > 0:
if dst_buffer.remain() > 0:
n = min(dst_buffer.remain(), self.pending_buffer.remain())
Expand All @@ -1462,11 +1462,12 @@ remains blocked:
self.set_pending(inst, dst_buffer, on_copy, on_copy_done)
```
Currently, there is a trap when both the `read` and `write` come from the same
component instance and there is a non-empty element type. This trap will be
removed in a subsequent release; the reason for the trap is that when lifting
and lowering can alias the same memory, interleavings can be complex and must
be handled carefully. Future improvements to the Canonical ABI ([lazy lowering])
can greatly simplify this interleaving and be more practical to implement.
component instance and there is a non-empty, non-number element type. This trap
will be removed in a subsequent release; the reason for the trap is that when
lifting and lowering can alias the same memory, interleavings can be complex
and must be handled carefully. Future improvements to the Canonical ABI ([lazy
lowering]) can greatly simplify this interleaving and be more practical to
implement.

The `write` method implements `WritableStream.write` and is called by the
`stream.write` built-in (noting that the host cannot be passed the writable end
Expand All @@ -1483,7 +1484,7 @@ pending:
self.set_pending(inst, src_buffer, on_copy, on_copy_done)
else:
assert(self.t == src_buffer.t == self.pending_buffer.t)
trap_if(inst is self.pending_inst and self.t is not None) # temporary
trap_if(inst is self.pending_inst and not none_or_number_type(self.t)) # temporary
if self.pending_buffer.remain() > 0:
if src_buffer.remain() > 0:
n = min(src_buffer.remain(), self.pending_buffer.remain())
Expand All @@ -1506,6 +1507,15 @@ notifying the reader end and allowing it to rendezvous with a non-zero-length
`read` and make progress. See the [stream readiness] section in the async
explainer for more background on purpose of zero-length reads and writes.

The `none_or_number_type` predicate used above includes both the integer and
floating point number types:
```python
def none_or_number_type(t):
return t is None or isinstance(t, U8Type | U16Type | U32Type | U64Type |
S8Type | S16Type | S32Type | S64Type |
F32Type | F64Type)
```

The two ends of a stream are stored as separate elements in the component
instance's table and each end has a separate `CopyState` that reflects what
*that end* is currently doing or has done. This `state` field is factored
Expand Down Expand Up @@ -1649,7 +1659,7 @@ end was dropped before receiving a value.
if not self.pending_buffer:
self.set_pending(inst, dst_buffer, on_copy_done)
else:
trap_if(inst is self.pending_inst and self.t is not None) # temporary
trap_if(inst is self.pending_inst and not none_or_number_type(self.t)) # temporary
dst_buffer.write(self.pending_buffer.read(1))
self.reset_and_notify_pending(CopyResult.COMPLETED)
on_copy_done(CopyResult.COMPLETED)
Expand All @@ -1661,14 +1671,14 @@ end was dropped before receiving a value.
elif not self.pending_buffer:
self.set_pending(inst, src_buffer, on_copy_done)
else:
trap_if(inst is self.pending_inst and self.t is not None) # temporary
trap_if(inst is self.pending_inst and not none_or_number_type(self.t)) # temporary
self.pending_buffer.write(src_buffer.read(1))
self.reset_and_notify_pending(CopyResult.COMPLETED)
on_copy_done(CopyResult.COMPLETED)
```
As with streams, the `# temporary` limitation shown above is that a future
cannot be read and written from the same component instance when it has a
non-empty value type.
non-empty, non-number value type.

Lastly, the `{Readable,Writable}FutureEnd` classes are mostly symmetric with
`{Readable,Writable}StreamEnd`, with the only difference being that
Expand Down
5 changes: 3 additions & 2 deletions design/mvp/Concurrency.md
Original file line number Diff line number Diff line change
Expand Up @@ -411,8 +411,9 @@ signalled by performing a `0`-length read or write (see the [Stream State]
section in the Canonical ABI explainer for details).

As a temporary limitation, if a `read` and `write` for a single stream or
future occur from within the same component and the element type is non-empty,
there is a trap. In the future this limitation will be removed.
future occur from within the same component and the element type is a
non-empty, non-number type, there is a trap. In the future this limitation will
be removed.

The `T` element type of streams and futures is optional, such that `future` and
`stream` can be written in WIT without a trailing `<T>`. In this case, the
Expand Down
13 changes: 9 additions & 4 deletions design/mvp/canonical-abi/definitions.py
Original file line number Diff line number Diff line change
Expand Up @@ -851,7 +851,7 @@ def read(self, inst, dst_buffer, on_copy, on_copy_done):
self.set_pending(inst, dst_buffer, on_copy, on_copy_done)
else:
assert(self.t == dst_buffer.t == self.pending_buffer.t)
trap_if(inst is self.pending_inst and self.t is not None) # temporary
trap_if(inst is self.pending_inst and not none_or_number_type(self.t)) # temporary
if self.pending_buffer.remain() > 0:
if dst_buffer.remain() > 0:
n = min(dst_buffer.remain(), self.pending_buffer.remain())
Expand All @@ -869,7 +869,7 @@ def write(self, inst, src_buffer, on_copy, on_copy_done):
self.set_pending(inst, src_buffer, on_copy, on_copy_done)
else:
assert(self.t == src_buffer.t == self.pending_buffer.t)
trap_if(inst is self.pending_inst and self.t is not None) # temporary
trap_if(inst is self.pending_inst and not none_or_number_type(self.t)) # temporary
if self.pending_buffer.remain() > 0:
if src_buffer.remain() > 0:
n = min(src_buffer.remain(), self.pending_buffer.remain())
Expand All @@ -882,6 +882,11 @@ def write(self, inst, src_buffer, on_copy, on_copy_done):
self.reset_and_notify_pending(CopyResult.COMPLETED)
self.set_pending(inst, src_buffer, on_copy, on_copy_done)

def none_or_number_type(t):
return t is None or isinstance(t, U8Type | U16Type | U32Type | U64Type |
S8Type | S16Type | S32Type | S64Type |
F32Type | F64Type)

class CopyState(Enum):
IDLE = 1
SYNC_COPYING = 2
Expand Down Expand Up @@ -983,7 +988,7 @@ def read(self, inst, dst_buffer, on_copy_done):
if not self.pending_buffer:
self.set_pending(inst, dst_buffer, on_copy_done)
else:
trap_if(inst is self.pending_inst and self.t is not None) # temporary
trap_if(inst is self.pending_inst and not none_or_number_type(self.t)) # temporary
dst_buffer.write(self.pending_buffer.read(1))
self.reset_and_notify_pending(CopyResult.COMPLETED)
on_copy_done(CopyResult.COMPLETED)
Expand All @@ -995,7 +1000,7 @@ def write(self, inst, src_buffer, on_copy_done):
elif not self.pending_buffer:
self.set_pending(inst, src_buffer, on_copy_done)
else:
trap_if(inst is self.pending_inst and self.t is not None) # temporary
trap_if(inst is self.pending_inst and not none_or_number_type(self.t)) # temporary
self.pending_buffer.write(src_buffer.read(1))
self.reset_and_notify_pending(CopyResult.COMPLETED)
on_copy_done(CopyResult.COMPLETED)
Expand Down
30 changes: 16 additions & 14 deletions design/mvp/canonical-abi/run_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -2476,46 +2476,46 @@ def on_resolve(results):
assert(got[0] == 42)


def test_self_empty():
def test_self_copy(elemt):
store = Store()
inst = ComponentInstance(store)
mem = bytearray(24)
mem = bytearray(40)
sync_opts = mk_opts(memory=mem, async_=False)
async_opts = mk_opts(memory=mem, async_=True)

ft = FuncType([],[])
def core_func(thread, args):
[seti] = canon_waitable_set_new(thread)

[packed] = canon_future_new(FutureType(None), thread)
[packed] = canon_future_new(FutureType(elemt), thread)
rfi,wfi = unpack_new_ends(packed)

[ret] = canon_future_write(FutureType(None), async_opts, thread, wfi, 0xdeadbeef)
[ret] = canon_future_write(FutureType(elemt), async_opts, thread, wfi, 0)
assert(ret == definitions.BLOCKED)

[ret] = canon_future_read(FutureType(None), async_opts, thread, rfi, 0xdeadbeef)
[ret] = canon_future_read(FutureType(elemt), async_opts, thread, rfi, 0)
assert(ret == CopyResult.COMPLETED)
[] = canon_future_drop_readable(FutureType(None), thread, rfi)
[] = canon_future_drop_readable(FutureType(elemt), thread, rfi)

[] = canon_waitable_join(thread, wfi, seti)
[event] = canon_waitable_set_wait(True, mem, thread, seti, 0)
assert(event == EventCode.FUTURE_WRITE)
assert(mem[0] == wfi)
assert(mem[4] == CopyResult.COMPLETED)
[] = canon_future_drop_writable(FutureType(None), thread, wfi)
[] = canon_future_drop_writable(FutureType(elemt), thread, wfi)

[packed] = canon_stream_new(StreamType(None), thread)
[packed] = canon_stream_new(StreamType(elemt), thread)
rsi,wsi = unpack_new_ends(packed)
[ret] = canon_stream_write(StreamType(None), async_opts, thread, wsi, 10000, 3)
[ret] = canon_stream_write(StreamType(elemt), async_opts, thread, wsi, 0, 3)
assert(ret == definitions.BLOCKED)

[ret] = canon_stream_read(StreamType(None), async_opts, thread, rsi, 2000, 1)
[ret] = canon_stream_read(StreamType(elemt), async_opts, thread, rsi, 0, 1)
result,n = unpack_result(ret)
assert(n == 1 and result == CopyResult.COMPLETED)
[ret] = canon_stream_read(StreamType(None), async_opts, thread, rsi, 2000, 4)
[ret] = canon_stream_read(StreamType(elemt), async_opts, thread, rsi, 0, 4)
result,n = unpack_result(ret)
assert(n == 2 and result == CopyResult.COMPLETED)
[] = canon_stream_drop_readable(StreamType(None), thread, rsi)
[] = canon_stream_drop_readable(StreamType(elemt), thread, rsi)

[] = canon_waitable_join(thread, wsi, seti)
[event] = canon_waitable_set_wait(True, mem, thread, seti, 0)
Expand All @@ -2524,7 +2524,7 @@ def core_func(thread, args):
result,n = unpack_result(mem[4])
assert(result == CopyResult.DROPPED)
assert(n == 3)
[] = canon_stream_drop_writable(StreamType(None), thread, wsi)
[] = canon_stream_drop_writable(StreamType(elemt), thread, wsi)

[] = canon_waitable_set_drop(thread, seti)
return []
Expand Down Expand Up @@ -2743,7 +2743,9 @@ def core_consumer(thread, args):
test_cancel_copy()
test_futures()
test_cancel_subtask()
test_self_empty()
test_self_copy(None)
test_self_copy(U8Type())
test_self_copy(F64Type())
test_async_flat_params()
test_threads()
test_thread_cancel_callback()
Expand Down