Skip to content

Commit 123a981

Browse files
committed
Add asynchronous event source
1 parent 893f0a6 commit 123a981

File tree

2 files changed

+203
-0
lines changed

2 files changed

+203
-0
lines changed
Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
// Copyright 2019-2024 Spotify AB.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
import Foundation
16+
17+
@available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *)
18+
public extension CompositeEventSourceBuilder {
19+
/// Returns a new `CompositeEventSourceBuilder` with the specified `AsyncSequence` added to it.
20+
///
21+
/// - Note: The `consumerQueue` parameter is intended to be used when building a `MobiusLoop`.
22+
/// It can safely be omitted when building a `MobiusController`, which automatically handles sending events to the loop queue.
23+
///
24+
/// - Parameter sequence: An `AsyncSequence` producing `Event`s.
25+
/// - Parameter consumerQueue: An optional callback queue to consume events on.
26+
/// - Returns: A `CompositeEventSourceBuilder` that includes the given event source.
27+
func addEventSource<Sequence: AsyncSequence>(
28+
_ sequence: Sequence,
29+
on consumerQueue: DispatchQueue? = nil
30+
) -> CompositeEventSourceBuilder<Event> where Sequence.Element == Event {
31+
addEventSource(AsyncSequenceEventSource(sequence: sequence, consumerQueue: consumerQueue))
32+
}
33+
}
34+
35+
@available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *)
36+
private final class AsyncSequenceEventSource<Sequence: AsyncSequence>: EventSource {
37+
private let sequence: Sequence
38+
private let consumerQueue: DispatchQueue?
39+
40+
init(sequence: Sequence, consumerQueue: DispatchQueue? = nil) {
41+
self.sequence = sequence
42+
self.consumerQueue = consumerQueue
43+
}
44+
45+
func subscribe(consumer: @escaping Consumer<Sequence.Element>) -> Disposable {
46+
// Prevents sending events after dispose by wrapping the consumer to enforce synchronous access.
47+
let protectedConsumer = Synchronized<Consumer<Sequence.Element>?>(value: consumer)
48+
let threadSafeConsumer = { event in protectedConsumer.read { consumer in consumer?(event) } }
49+
50+
let task = Task { [consumerQueue] in
51+
for try await event in sequence {
52+
if let consumerQueue {
53+
consumerQueue.async { threadSafeConsumer(event) }
54+
} else {
55+
threadSafeConsumer(event)
56+
}
57+
}
58+
}
59+
60+
return AnonymousDisposable {
61+
protectedConsumer.value = nil
62+
task.cancel()
63+
}
64+
}
65+
}
Lines changed: 138 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,138 @@
1+
// Copyright 2019-2024 Spotify AB.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
import Foundation
16+
import MobiusCore
17+
import Nimble
18+
import Quick
19+
20+
@available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *)
21+
final class CompositeEventSourceBuilder_ConcurrencyTests: QuickSpec {
22+
// swiftlint:disable:next function_body_length
23+
override func spec() {
24+
describe("CompositeEventSourceBuilder") {
25+
var sequence: AsyncStream<String>!
26+
var elementProducer: AsyncStream<String>.Continuation!
27+
28+
beforeEach {
29+
sequence = AsyncStream<String> { continuation in
30+
elementProducer = continuation
31+
}
32+
}
33+
34+
context("when configuring the composite event source builder") {
35+
var compositeEventSource: AnyEventSource<String>!
36+
var disposable: Disposable!
37+
var receivedEvents: [String]!
38+
39+
context("with an AsyncSequence event source") {
40+
beforeEach {
41+
let sut = CompositeEventSourceBuilder<String>()
42+
.addEventSource(sequence, on: .main)
43+
44+
compositeEventSource = sut.build()
45+
receivedEvents = []
46+
47+
disposable = compositeEventSource.subscribe {
48+
receivedEvents.append($0)
49+
}
50+
}
51+
52+
afterEach {
53+
disposable.dispose()
54+
}
55+
56+
it("should receive events from the sequence") {
57+
elementProducer.yield("foo")
58+
expect(receivedEvents).toEventually(equal(["foo"]))
59+
60+
elementProducer.yield("bar")
61+
expect(receivedEvents).toEventually(equal(["foo", "bar"]))
62+
}
63+
}
64+
}
65+
66+
describe("DelayedSequence") {
67+
context("MobiusLoop with an AsyncSequence event source") {
68+
var loop: MobiusLoop<String, String, String>!
69+
var receivedModels: [String]!
70+
71+
beforeEach {
72+
let effectHandler = EffectRouter<String, String>()
73+
.asConnectable
74+
75+
let eventSource = CompositeEventSourceBuilder<String>()
76+
.addEventSource(sequence, on: .main)
77+
.build()
78+
79+
loop = Mobius
80+
.loop(update: { _, event in .next(event) }, effectHandler: effectHandler)
81+
.withEventSource(eventSource)
82+
.start(from: "foo")
83+
84+
receivedModels = []
85+
loop.addObserver { model in receivedModels.append(model) }
86+
}
87+
88+
it("should prevent events from being submitted after dispose") {
89+
elementProducer.yield("bar")
90+
expect(receivedModels).toEventually(equal(["foo", "bar"]))
91+
92+
loop.dispose()
93+
94+
elementProducer.yield("baz")
95+
expect(receivedModels).toNever(equal(["foo", "bar", "baz"]))
96+
}
97+
}
98+
99+
context("MobiusController with an AsyncSequence event source") {
100+
let loopQueue = DispatchQueue(label: "loop queue")
101+
let viewQueue = DispatchQueue(label: "view queue")
102+
103+
var controller: MobiusController<String, String, String>!
104+
var view: RecordingTestConnectable!
105+
106+
beforeEach {
107+
let effectHandler = EffectRouter<String, String>()
108+
.asConnectable
109+
110+
let eventSource = CompositeEventSourceBuilder<String>()
111+
.addEventSource(sequence)
112+
.build()
113+
114+
controller = Mobius
115+
.loop(update: { _, event in .next(String(event)) }, effectHandler: effectHandler)
116+
.withEventSource(eventSource)
117+
.makeController(from: "foo", loopQueue: loopQueue, viewQueue: viewQueue)
118+
119+
view = RecordingTestConnectable(expectedQueue: viewQueue)
120+
controller.connectView(view)
121+
}
122+
123+
it("should prevent events from being submitted after dispose") {
124+
controller.start()
125+
126+
elementProducer.yield("bar")
127+
expect(view.recorder.items).toEventually(equal(["foo", "bar"]))
128+
129+
controller.stop()
130+
131+
elementProducer.yield("baz")
132+
expect(view.recorder.items).toNever(equal(["foo", "bar", "baz"]))
133+
}
134+
}
135+
}
136+
}
137+
}
138+
}

0 commit comments

Comments
 (0)