Skip to content

Commit b553e3e

Browse files
authored
Fix pending tracks race condition (#928)
1 parent 6adfb15 commit b553e3e

File tree

7 files changed

+368
-8
lines changed

7 files changed

+368
-8
lines changed

.changes/pending-track-queue

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
patch type="fixed" "race condition with pending tracks"

lib/src/constants.dart

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,13 +16,15 @@ class Timeouts {
1616
final Duration connection;
1717
final Duration debounce;
1818
final Duration publish;
19+
final Duration subscribe;
1920
final Duration peerConnection;
2021
final Duration iceRestart;
2122

2223
const Timeouts({
2324
required this.connection,
2425
required this.debounce,
2526
required this.publish,
27+
required this.subscribe,
2628
required this.peerConnection,
2729
required this.iceRestart,
2830
});
@@ -31,6 +33,7 @@ class Timeouts {
3133
connection: Duration(seconds: 10),
3234
debounce: Duration(milliseconds: 100),
3335
publish: Duration(seconds: 10),
36+
subscribe: Duration(seconds: 10),
3437
peerConnection: Duration(seconds: 10),
3538
iceRestart: Duration(seconds: 10),
3639
);

lib/src/core/engine.dart

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -102,7 +102,7 @@ class Engine extends Disposable with EventsEmittable<EngineEvent> {
102102
String? url;
103103
String? token;
104104

105-
late ConnectOptions connectOptions;
105+
ConnectOptions connectOptions;
106106
RoomOptions roomOptions;
107107
FastConnectOptions? fastConnectOptions;
108108

@@ -188,6 +188,7 @@ class Engine extends Disposable with EventsEmittable<EngineEvent> {
188188
}
189189

190190
Engine({
191+
required this.connectOptions,
191192
required this.roomOptions,
192193
SignalClient? signalClient,
193194
PeerConnectionCreate? peerConnectionCreate,
Lines changed: 153 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,153 @@
1+
// Copyright 2024 LiveKit, Inc.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
import 'package:flutter_webrtc/flutter_webrtc.dart' as rtc;
16+
import 'package:meta/meta.dart';
17+
18+
import '../events.dart';
19+
import '../logger.dart';
20+
import '../types/other.dart';
21+
22+
typedef PendingTrackSubscriber = Future<bool> Function(PendingTrack entry);
23+
typedef TrackExceptionEmitter = void Function(TrackSubscriptionExceptionEvent event);
24+
25+
/// Helper that queues subscriber tracks when participant metadata isn't ready yet.
26+
@internal
27+
class PendingTrackQueue {
28+
final int maxSize;
29+
Duration ttl;
30+
final TrackExceptionEmitter emitException;
31+
32+
// keyed by participant sid
33+
final Map<String, List<PendingTrack>> _pending = {};
34+
35+
PendingTrackQueue({
36+
required this.ttl,
37+
required this.emitException,
38+
this.maxSize = 100,
39+
});
40+
41+
void updateTtl(Duration ttl) {
42+
this.ttl = ttl;
43+
}
44+
45+
void clear() {
46+
_pending.clear();
47+
}
48+
49+
void enqueue({
50+
required rtc.MediaStreamTrack track,
51+
required rtc.MediaStream stream,
52+
required rtc.RTCRtpReceiver? receiver,
53+
required String participantSid,
54+
required String trackSid,
55+
required ConnectionState connectionState,
56+
}) {
57+
// If we're already disconnected, drop immediately.
58+
if (connectionState == ConnectionState.disconnected) {
59+
final event = TrackSubscriptionExceptionEvent(
60+
participant: null,
61+
sid: trackSid,
62+
reason: TrackSubscribeFailReason.noParticipantFound,
63+
);
64+
logger.warning('Dropping pending track while disconnected trackSid:$trackSid participantSid:$participantSid');
65+
emitException(event);
66+
return;
67+
}
68+
69+
_removeExpired();
70+
71+
final totalPending = _pending.values.fold<int>(0, (sum, list) => sum + list.length);
72+
if (totalPending >= maxSize) {
73+
final event = TrackSubscriptionExceptionEvent(
74+
participant: null,
75+
sid: trackSid,
76+
reason: TrackSubscribeFailReason.noParticipantFound,
77+
);
78+
logger.severe('Pending track queue full, dropping trackSid:$trackSid participantSid:$participantSid');
79+
emitException(event);
80+
return;
81+
}
82+
83+
final expiresAt = DateTime.now().add(ttl);
84+
logger.fine('Queueing pending trackSid:$trackSid participantSid:$participantSid until metadata is ready');
85+
final entry = PendingTrack(
86+
track: track,
87+
stream: stream,
88+
receiver: receiver,
89+
participantSid: participantSid,
90+
trackSid: trackSid,
91+
expiresAt: expiresAt,
92+
);
93+
final list = _pending.putIfAbsent(participantSid, () => []);
94+
list.add(entry);
95+
}
96+
97+
@internal
98+
Future<void> flush({
99+
required bool isConnected,
100+
String? participantSid,
101+
required PendingTrackSubscriber subscriber,
102+
}) async {
103+
_removeExpired();
104+
if (!isConnected) return;
105+
106+
final Iterable<PendingTrack> source = participantSid != null
107+
? List<PendingTrack>.from(_pending[participantSid] ?? const [])
108+
: _pending.values.expand((e) => e).toList();
109+
110+
for (final item in source) {
111+
final success = await subscriber(item);
112+
if (success) {
113+
_pending[item.participantSid]?.remove(item);
114+
}
115+
}
116+
}
117+
118+
void _removeExpired() {
119+
final now = DateTime.now();
120+
_pending.forEach((sid, list) {
121+
final expired = list.where((p) => p.expiresAt.isBefore(now)).toList();
122+
for (final item in expired) {
123+
list.remove(item);
124+
final event = TrackSubscriptionExceptionEvent(
125+
participant: null,
126+
sid: item.trackSid,
127+
reason: TrackSubscribeFailReason.noParticipantFound,
128+
);
129+
logger.warning('Pending track expired waiting for participant metadata: $event');
130+
emitException(event);
131+
}
132+
});
133+
}
134+
}
135+
136+
@internal
137+
class PendingTrack {
138+
final rtc.MediaStreamTrack track;
139+
final rtc.MediaStream stream;
140+
final rtc.RTCRtpReceiver? receiver;
141+
final String participantSid;
142+
final String trackSid;
143+
final DateTime expiresAt;
144+
145+
PendingTrack({
146+
required this.track,
147+
required this.stream,
148+
required this.receiver,
149+
required this.participantSid,
150+
required this.trackSid,
151+
required this.expiresAt,
152+
});
153+
}

lib/src/core/room.dart

Lines changed: 54 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@ import '../types/rpc.dart';
5454
import '../types/transcription_segment.dart';
5555
import '../utils.dart' show unpackStreamId;
5656
import 'engine.dart';
57+
import 'pending_track_queue.dart';
5758

5859
/// Room is the primary construct for LiveKit conferences. It contains a
5960
/// group of [Participant]s, each publishing and subscribing to [Track]s.
@@ -135,6 +136,9 @@ class Room extends DisposableChangeNotifier with EventsEmittable<RoomEvent> {
135136
@internal
136137
late final PreConnectAudioBuffer preConnectAudioBuffer;
137138

139+
// Pending subscriber tracks keyed by participantSid, for tracks arriving before metadata or before the room connected.
140+
late final PendingTrackQueue _pendingTrackQueue;
141+
138142
// for testing
139143
@internal
140144
Map<String, RpcRequestHandler> get rpcHandlers => _rpcHandlers;
@@ -152,6 +156,7 @@ class Room extends DisposableChangeNotifier with EventsEmittable<RoomEvent> {
152156
Engine? engine,
153157
}) : engine = engine ??
154158
Engine(
159+
connectOptions: connectOptions,
155160
roomOptions: roomOptions,
156161
) {
157162
//
@@ -161,11 +166,18 @@ class Room extends DisposableChangeNotifier with EventsEmittable<RoomEvent> {
161166
_signalListener = this.engine.signalClient.createListener();
162167
_setUpSignalListeners();
163168

169+
_pendingTrackQueue = PendingTrackQueue(
170+
ttl: this.engine.connectOptions.timeouts.subscribe,
171+
emitException: (event) => events.emit(event),
172+
);
173+
164174
// Any event emitted will trigger ChangeNotifier
165175
events.listen((event) {
166176
logger.finer('[RoomEvent] $event, will notifyListeners()');
167177
notifyListeners();
168178
});
179+
// Keep a connected flush as a fallback in case tracks arrive pre-connected but before metadata.
180+
events.on<RoomConnectedEvent>((event) => _flushPendingTracks());
169181

170182
_setupRpcListeners();
171183

@@ -232,6 +244,7 @@ class Room extends DisposableChangeNotifier with EventsEmittable<RoomEvent> {
232244
}) async {
233245
var roomOptions = this.roomOptions;
234246
connectOptions ??= ConnectOptions();
247+
_pendingTrackQueue.updateTtl(connectOptions.timeouts.subscribe);
235248
// ignore: deprecated_member_use_from_same_package
236249
if ((roomOptions.encryption != null || roomOptions.e2eeOptions != null) && engine.e2eeManager == null) {
237250
if (!lkPlatformSupportsE2EE()) {
@@ -596,12 +609,18 @@ class Room extends DisposableChangeNotifier with EventsEmittable<RoomEvent> {
596609
reason: TrackSubscribeFailReason.invalidServerResponse,
597610
);
598611
}
599-
if (participant == null) {
600-
throw TrackSubscriptionExceptionEvent(
601-
participant: participant,
602-
sid: trackSid,
603-
reason: TrackSubscribeFailReason.noParticipantFound,
612+
613+
final shouldDefer = connectionState != ConnectionState.connected || participant == null;
614+
if (shouldDefer) {
615+
_pendingTrackQueue.enqueue(
616+
track: event.track,
617+
stream: event.stream,
618+
receiver: event.receiver,
619+
participantSid: participantSid,
620+
trackSid: trackSid,
621+
connectionState: connectionState,
604622
);
623+
return;
605624
}
606625
await participant.addSubscribedMediaTrack(
607626
event.track,
@@ -678,6 +697,7 @@ class Room extends DisposableChangeNotifier with EventsEmittable<RoomEvent> {
678697

679698
_remoteParticipants[result.participant.identity] = result.participant;
680699
_sidToIdentity[result.participant.sid] = result.participant.identity;
700+
await _flushPendingTracks(participant: result.participant);
681701
return result;
682702
}
683703

@@ -722,10 +742,12 @@ class Room extends DisposableChangeNotifier with EventsEmittable<RoomEvent> {
722742
}
723743
}
724744
_sidToIdentity[info.sid] = info.identity;
745+
await _flushPendingTracks(participant: result.participant);
725746
} else {
726747
final wasUpdated = await result.participant.updateFromInfo(info);
727748
if (wasUpdated) {
728749
_sidToIdentity[info.sid] = info.identity;
750+
await _flushPendingTracks(participant: result.participant);
729751
}
730752
}
731753
}
@@ -760,6 +782,32 @@ class Room extends DisposableChangeNotifier with EventsEmittable<RoomEvent> {
760782
emitWhenConnected(ActiveSpeakersChangedEvent(speakers: activeSpeakers));
761783
}
762784

785+
Future<void> _flushPendingTracks({RemoteParticipant? participant}) => _pendingTrackQueue.flush(
786+
isConnected: connectionState == ConnectionState.connected,
787+
participantSid: participant?.sid,
788+
subscriber: (pending) async {
789+
final target = participant ?? _getRemoteParticipantBySid(pending.participantSid);
790+
if (target == null) return false;
791+
try {
792+
await target.addSubscribedMediaTrack(
793+
pending.track,
794+
pending.stream,
795+
pending.trackSid,
796+
receiver: pending.receiver,
797+
audioOutputOptions: roomOptions.defaultAudioOutputOptions,
798+
);
799+
return true;
800+
} on TrackSubscriptionExceptionEvent catch (event) {
801+
logger.severe('Track subscription failed during flush: ${event}');
802+
events.emit(event);
803+
return true;
804+
} catch (exception) {
805+
logger.warning('Unknown exception during pending track flush: ${exception}');
806+
return false;
807+
}
808+
},
809+
);
810+
763811
// from data channel
764812
// updates are sent only when there's a change to speaker ordering
765813
void _onEngineActiveSpeakersUpdateEvent(List<lk_models.SpeakerInfo> speakers) {
@@ -941,6 +989,7 @@ extension RoomPrivateMethods on Room {
941989
}
942990
_remoteParticipants.clear();
943991
_sidToIdentity.clear();
992+
_pendingTrackQueue.clear();
944993

945994
// clean up LocalParticipant
946995
await localParticipant?.unpublishAllTracks();

0 commit comments

Comments
 (0)