Skip to content

Commit 86c5ead

Browse files
authored
Introduce NIODecodedAsyncSequence for easy decoding of async sequences (#3407)
Previous PR: #3405 Add an API on top of `AsyncSequence<ByteBuffer>` which can dynamically decode values. ~~Add an API on top of the new `AsyncSequence<ByteBuffer>` APIs which splits the file based on its content.~~ ### Motivation: Provides a nice API to decode files, instead of users having to go though manually handling `BufferedReader.read(while:)`. I struggled with this, as documented in https://swift-open-source.slack.com/archives/C9MMT6VGB/p1760115481607159 ### Modifications: Add `NIODecodedAsyncSequence` + functions on `AsyncSequence<ByteBuffer>` to create such a sequence. ~~Add `NIOSplitMessageDecoder` + stdlib-like functions on `AsyncSequence<ByteBuffer>` to create such a sequence.~~ ### Result: Users can decode an async sequence of `ByteBuffer`s easier. ~~Users can easily split files based on their content.~~ ### Checklist See this comment for a checklist of the remaining things to do: #3407 (comment)
1 parent cdf721f commit 86c5ead

File tree

3 files changed

+426
-4
lines changed

3 files changed

+426
-4
lines changed
Lines changed: 222 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,222 @@
1+
//===----------------------------------------------------------------------===//
2+
//
3+
// This source file is part of the SwiftNIO open source project
4+
//
5+
// Copyright (c) 2025 Apple Inc. and the SwiftNIO project authors
6+
// Licensed under Apache License v2.0
7+
//
8+
// See LICENSE.txt for license information
9+
// See CONTRIBUTORS.txt for the list of SwiftNIO project authors
10+
//
11+
// SPDX-License-Identifier: Apache-2.0
12+
//
13+
//===----------------------------------------------------------------------===//
14+
15+
@available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *)
16+
extension AsyncSequence where Element == ByteBuffer {
17+
/// Decode the `AsyncSequence<ByteBuffer>` into a sequence of `Element`s,
18+
/// using the `Decoder`, where `Decoder.InboundOut` matches `Element`.
19+
///
20+
/// Usage:
21+
/// ```swift
22+
/// let myDecoder = MyNIOSingleStepByteToMessageDecoder()
23+
/// let baseSequence = MyAsyncSequence<ByteBuffer>(...)
24+
/// let decodedSequence = baseSequence.decode(using: myDecoder)
25+
///
26+
/// for try await element in decodedSequence {
27+
/// print("Decoded an element!", element)
28+
/// }
29+
/// ```
30+
///
31+
/// - Parameters:
32+
/// - decoder: The `Decoder` to use to decode the ``ByteBuffer``s.
33+
/// - maximumBufferSize: The maximum number of bytes to aggregate in-memory.
34+
/// An error will be thrown if after decoding an element there is more aggregated data than this amount.
35+
/// - Returns: A ``NIODecodedAsyncSequence`` that decodes the ``ByteBuffer``s into a sequence of `Element`s.
36+
@inlinable
37+
public func decode<Decoder: NIOSingleStepByteToMessageDecoder>(
38+
using decoder: Decoder,
39+
maximumBufferSize: Int? = nil
40+
) -> NIODecodedAsyncSequence<Self, Decoder> {
41+
NIODecodedAsyncSequence(
42+
asyncSequence: self,
43+
decoder: decoder,
44+
maximumBufferSize: maximumBufferSize
45+
)
46+
}
47+
}
48+
49+
/// A type that decodes an `AsyncSequence<ByteBuffer>` into a sequence of ``Element``s,
50+
/// using the `Decoder`, where `Decoder.InboundOut` matches ``Element``.
51+
///
52+
/// Use `AsyncSequence/decode(using:maximumBufferSize:)` to create a ``NIODecodedAsyncSequence``.
53+
///
54+
/// Usage:
55+
/// ```swift
56+
/// let myDecoder = MyNIOSingleStepByteToMessageDecoder()
57+
/// let baseSequence = MyAsyncSequence<ByteBuffer>(...)
58+
/// let decodedSequence = baseSequence.decode(using: myDecoder)
59+
///
60+
/// for try await element in decodedSequence {
61+
/// print("Decoded an element!", element)
62+
/// }
63+
/// ```
64+
@available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *)
65+
public struct NIODecodedAsyncSequence<
66+
Base: AsyncSequence,
67+
Decoder: NIOSingleStepByteToMessageDecoder
68+
> where Base.Element == ByteBuffer {
69+
@usableFromInline
70+
var asyncSequence: Base
71+
@usableFromInline
72+
var decoder: Decoder
73+
@usableFromInline
74+
var maximumBufferSize: Int?
75+
76+
@inlinable
77+
init(asyncSequence: Base, decoder: Decoder, maximumBufferSize: Int? = nil) {
78+
self.asyncSequence = asyncSequence
79+
self.decoder = decoder
80+
self.maximumBufferSize = maximumBufferSize
81+
}
82+
}
83+
84+
@available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *)
85+
extension NIODecodedAsyncSequence: AsyncSequence {
86+
public typealias Element = Decoder.InboundOut
87+
88+
/// Create an ``AsyncIterator`` for this ``NIODecodedAsyncSequence``.
89+
@inlinable
90+
public func makeAsyncIterator() -> AsyncIterator {
91+
AsyncIterator(base: self)
92+
}
93+
94+
/// An ``AsyncIterator`` over a ``NIODecodedAsyncSequence``.
95+
public struct AsyncIterator: AsyncIteratorProtocol {
96+
@usableFromInline
97+
enum State: Sendable {
98+
case readingFromBuffer
99+
case readLastChunkFromBuffer
100+
case finishedDecoding
101+
}
102+
103+
@usableFromInline
104+
var baseIterator: Base.AsyncIterator
105+
@usableFromInline
106+
var processor: NIOSingleStepByteToMessageProcessor<Decoder>
107+
@usableFromInline
108+
var state: State
109+
110+
@inlinable
111+
init(base: NIODecodedAsyncSequence) {
112+
self.baseIterator = base.asyncSequence.makeAsyncIterator()
113+
self.processor = NIOSingleStepByteToMessageProcessor(
114+
base.decoder,
115+
maximumBufferSize: base.maximumBufferSize
116+
)
117+
self.state = .readingFromBuffer
118+
}
119+
120+
/// Retrieve the next element from the ``NIODecodedAsyncSequence``.
121+
///
122+
/// The same as `next(isolation:)` but not isolated to an actor, which allows
123+
/// for less availability restrictions.
124+
@inlinable
125+
public mutating func next() async throws -> Element? {
126+
while true {
127+
switch self.state {
128+
case .finishedDecoding:
129+
return nil
130+
case .readingFromBuffer:
131+
let (decoded, ended) = try self.processor.decodeNext(
132+
decodeMode: .normal,
133+
seenEOF: false
134+
)
135+
136+
// We expect `decodeNext()` to only return `ended == true` only if we've notified it
137+
// that we've read the last chunk from the buffer, using `decodeMode: .last`.
138+
assert(!ended)
139+
140+
if let decoded {
141+
return decoded
142+
}
143+
144+
// Read more data into the buffer so we can decode more messages
145+
guard let nextBuffer = try await self.baseIterator.next() else {
146+
// Ran out of data to read.
147+
self.state = .readLastChunkFromBuffer
148+
continue
149+
}
150+
self.processor.append(nextBuffer)
151+
case .readLastChunkFromBuffer:
152+
let (decoded, ended) = try self.processor.decodeNext(
153+
decodeMode: .last,
154+
seenEOF: true
155+
)
156+
157+
if ended {
158+
self.state = .finishedDecoding
159+
}
160+
161+
return decoded
162+
}
163+
}
164+
165+
fatalError("Unreachable code")
166+
}
167+
168+
/// Retrieve the next element from the ``NIODecodedAsyncSequence``.
169+
///
170+
/// The same as `next()` but isolated to an actor.
171+
@available(macOS 15.0, iOS 18.0, watchOS 11.0, tvOS 18.0, visionOS 2.0, *)
172+
@inlinable
173+
public mutating func next(isolation actor: isolated (any Actor)? = #isolation) async throws -> Element? {
174+
while true {
175+
switch self.state {
176+
case .finishedDecoding:
177+
return nil
178+
case .readingFromBuffer:
179+
let (decoded, ended) = try self.processor.decodeNext(
180+
decodeMode: .normal,
181+
seenEOF: false
182+
)
183+
184+
// We expect `decodeNext()` to only return `ended == true` only if we've notified it
185+
// that we've read the last chunk from the buffer, using `decodeMode: .last`.
186+
assert(!ended)
187+
188+
if let decoded {
189+
return decoded
190+
}
191+
192+
// Read more data into the buffer so we can decode more messages
193+
guard let nextBuffer = try await self.baseIterator.next(isolation: actor) else {
194+
// Ran out of data to read.
195+
self.state = .readLastChunkFromBuffer
196+
continue
197+
}
198+
self.processor.append(nextBuffer)
199+
case .readLastChunkFromBuffer:
200+
let (decoded, ended) = try self.processor.decodeNext(
201+
decodeMode: .last,
202+
seenEOF: true
203+
)
204+
205+
if ended {
206+
self.state = .finishedDecoding
207+
}
208+
209+
return decoded
210+
}
211+
}
212+
213+
fatalError("Unreachable code")
214+
}
215+
}
216+
}
217+
218+
@available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *)
219+
extension NIODecodedAsyncSequence: Sendable where Base: Sendable, Decoder: Sendable {}
220+
221+
@available(*, unavailable)
222+
extension NIODecodedAsyncSequence.AsyncIterator: Sendable {}

Sources/NIOCore/SingleStepByteToMessageDecoder.swift

Lines changed: 76 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -206,14 +206,16 @@ public final class NIOSingleStepByteToMessageProcessor<Decoder: NIOSingleStepByt
206206
/// - Parameters:
207207
/// - decoder: The `NIOSingleStepByteToMessageDecoder` to decode the bytes into message.
208208
/// - maximumBufferSize: The maximum number of bytes to aggregate in-memory.
209+
/// An error will be thrown if after decoding elements there is more aggregated data than this amount.
209210
@inlinable
210211
public init(_ decoder: Decoder, maximumBufferSize: Int? = nil) {
211212
self.decoder = decoder
212213
self.maximumBufferSize = maximumBufferSize
213214
}
214215

216+
/// Append a new buffer to this processor.
215217
@inlinable
216-
func _append(_ buffer: ByteBuffer) {
218+
func append(_ buffer: ByteBuffer) {
217219
if self._buffer == nil || self._buffer!.readableBytes == 0 {
218220
self._buffer = buffer
219221
} else {
@@ -247,7 +249,7 @@ public final class NIOSingleStepByteToMessageProcessor<Decoder: NIOSingleStepByt
247249
) throws {
248250
// we want to call decodeLast once with an empty buffer if we have nothing
249251
if decodeMode == .last && (self._buffer == nil || self._buffer!.readableBytes == 0) {
250-
var emptyBuffer = self._buffer == nil ? ByteBuffer() : self._buffer!
252+
var emptyBuffer = self._buffer ?? ByteBuffer()
251253
if let message = try self.decoder.decodeLast(buffer: &emptyBuffer, seenEOF: seenEOF) {
252254
try messageReceiver(message)
253255
}
@@ -269,6 +271,70 @@ public final class NIOSingleStepByteToMessageProcessor<Decoder: NIOSingleStepByt
269271
try messageReceiver(message)
270272
}
271273

274+
try _postDecodeCheck()
275+
}
276+
277+
/// Decode the next message from the `NIOSingleStepByteToMessageProcessor`
278+
///
279+
/// This function is useful to manually decode the next message from the `NIOSingleStepByteToMessageProcessor`.
280+
/// It should be used in combination with the `append(_:)` function.
281+
/// Whenever you receive a new chunk of data, feed it into the `NIOSingleStepByteToMessageProcessor` using `append(_:)`,
282+
/// then call this function to decode the next message.
283+
///
284+
/// When you've already received the last chunk of data, call this function with `receivedLastChunk` set to `true`.
285+
/// In this case, if the function returns `nil`, you are done decoding and there are no more messages to decode.
286+
/// Note that you might need to call `decodeNext` _multiple times_, even if `receivedLastChunk` is true, as there
287+
/// might be multiple messages left in the buffer.
288+
///
289+
/// If `decodeMode` is `.normal`, this function will never return `ended == true`.
290+
///
291+
/// If `decodeMode` is `.last`, this function will try to decode a message even if it means only with an empty buffer.
292+
/// It'll then return the decoded message with `ended == true`. When you've received `ended == true`, you should
293+
/// simply end the decoding process.
294+
///
295+
/// `seenEOF` should only be true if `decodeMode == .last`. Otherwise it'll be ignored.
296+
///
297+
/// After a `decoder.decode(buffer:)` or `decoder.decodeLast(buffer:seenEOF:)` returns without throwing,
298+
/// the aggregated buffer will have to contain less than or equal to `maximumBufferSize` amount of bytes.
299+
/// Otherwise an error will be thrown.
300+
///
301+
/// - Parameters:
302+
/// - decodeMode: Either 'normal', or 'last' if the last chunk has been received and appended to the processor.
303+
/// - seenEOF: Whether an EOF was seen on the stream
304+
/// - Returns: A tuple containing the decoded message and a boolean indicating whether the decoding has ended.
305+
@inlinable
306+
func decodeNext(
307+
decodeMode: DecodeMode,
308+
seenEOF: Bool = false
309+
) throws -> (decoded: Decoder.InboundOut?, ended: Bool) {
310+
// we want to call decodeLast once with an empty buffer if we have nothing
311+
if decodeMode == .last && (self._buffer == nil || self._buffer!.readableBytes == 0) {
312+
var emptyBuffer = self._buffer ?? ByteBuffer()
313+
let message = try self.decoder.decodeLast(buffer: &emptyBuffer, seenEOF: seenEOF)
314+
return (message, true)
315+
}
316+
317+
if self._buffer == nil {
318+
return (nil, false)
319+
}
320+
321+
func decodeOnce(buffer: inout ByteBuffer) throws -> Decoder.InboundOut? {
322+
if decodeMode == .normal {
323+
return try self.decoder.decode(buffer: &buffer)
324+
} else {
325+
return try self.decoder.decodeLast(buffer: &buffer, seenEOF: seenEOF)
326+
}
327+
}
328+
329+
let message = try self._withNonCoWBuffer(decodeOnce)
330+
331+
try _postDecodeCheck()
332+
333+
return (message, false)
334+
}
335+
336+
@inlinable
337+
func _postDecodeCheck() throws {
272338
if let maximumBufferSize = self.maximumBufferSize, self._buffer!.readableBytes > maximumBufferSize {
273339
throw ByteToMessageDecoderError.PayloadTooLargeError()
274340
}
@@ -292,20 +358,26 @@ extension NIOSingleStepByteToMessageProcessor {
292358
self._buffer?.readableBytes ?? 0
293359
}
294360

295-
/// Feed data into the `NIOSingleStepByteToMessageProcessor`
361+
/// Feed data into the `NIOSingleStepByteToMessageProcessor` and process it
362+
///
363+
/// This function will decode as many `Decoder.InboundOut` messages from the `NIOSingleStepByteToMessageProcessor` as possible,
364+
/// and call the `messageReceiver` closure for each message.
296365
///
297366
/// - Parameters:
298367
/// - buffer: The `ByteBuffer` containing the next data in the stream
299368
/// - messageReceiver: A closure called for each message produced by the `Decoder`
300369
@inlinable
301370
public func process(buffer: ByteBuffer, _ messageReceiver: (Decoder.InboundOut) throws -> Void) throws {
302-
self._append(buffer)
371+
self.append(buffer)
303372
try self._decodeLoop(decodeMode: .normal, messageReceiver)
304373
}
305374

306375
/// Call when there is no data left in the stream. Calls `Decoder`.`decodeLast` one or more times. If there is no data left
307376
/// `decodeLast` will be called one time with an empty `ByteBuffer`.
308377
///
378+
/// This function will decode as many `Decoder.InboundOut` messages from the `NIOSingleStepByteToMessageProcessor` as possible,
379+
/// and call the `messageReceiver` closure for each message.
380+
///
309381
/// - Parameters:
310382
/// - seenEOF: Whether an EOF was seen on the stream.
311383
/// - messageReceiver: A closure called for each message produced by the `Decoder`.

0 commit comments

Comments
 (0)