Skip to content

Commit d7f9b52

Browse files
Tests and final fix
1 parent 90b3d03 commit d7f9b52

File tree

9 files changed

+96
-38
lines changed

9 files changed

+96
-38
lines changed

client/src/main/java/io/split/engine/common/PushManager.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,4 +13,5 @@ enum Status {
1313
void stop();
1414
void startWorkers();
1515
void stopWorkers();
16+
void scheduleConnectionReset();
1617
}

client/src/main/java/io/split/engine/common/PushManagerImp.java

Lines changed: 8 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -32,25 +32,23 @@ public class PushManagerImp implements PushManager {
3232

3333
private final AuthApiClient _authApiClient;
3434
private final EventSourceClient _eventSourceClient;
35-
private final Backoff _backoff;
3635
private final SplitsWorker _splitsWorker;
3736
private final Worker<SegmentQueueDto> _segmentWorker;
3837
private final PushStatusTracker _pushStatusTracker;
3938

4039
private Future<?> _nextTokenRefreshTask;
4140
private final ScheduledExecutorService _scheduledExecutorService;
41+
private long _expirationTime;
4242

4343
@VisibleForTesting
4444
/* package private */ PushManagerImp(AuthApiClient authApiClient,
4545
EventSourceClient eventSourceClient,
4646
SplitsWorker splitsWorker,
4747
Worker<SegmentQueueDto> segmentWorker,
48-
Backoff backoff,
4948
PushStatusTracker pushStatusTracker) {
5049

5150
_authApiClient = checkNotNull(authApiClient);
5251
_eventSourceClient = checkNotNull(eventSourceClient);
53-
_backoff = checkNotNull(backoff);
5452
_splitsWorker = splitsWorker;
5553
_segmentWorker = segmentWorker;
5654
_pushStatusTracker = pushStatusTracker;
@@ -74,7 +72,6 @@ public static PushManagerImp build(Synchronizer synchronizer,
7472
EventSourceClientImp.build(streamingUrl, splitsWorker, segmentWorker, pushStatusTracker, sseHttpClient),
7573
splitsWorker,
7674
segmentWorker,
77-
new Backoff(authRetryBackOffBase),
7875
pushStatusTracker);
7976
}
8077

@@ -83,14 +80,13 @@ public synchronized void start() {
8380
AuthenticationResponse response = _authApiClient.Authenticate();
8481
_log.debug(String.format("Auth service response pushEnabled: %s", response.isPushEnabled()));
8582
if (response.isPushEnabled() && startSse(response.getToken(), response.getChannels())) {
86-
scheduleConnectionReset(response.getExpiration());
87-
_backoff.reset();
83+
_expirationTime = response.getExpiration();
8884
return;
8985
}
9086

9187
stop();
9288
if (response.isRetry()) {
93-
scheduleConnectionReset(_backoff.interval());
89+
_pushStatusTracker.forceRetryableError();//retriable error
9490
} else {
9591
_pushStatusTracker.forcePushDisable();
9692
}
@@ -106,13 +102,15 @@ public synchronized void stop() {
106102
}
107103
}
108104

109-
private void scheduleConnectionReset(long time) {
110-
_log.debug(String.format("scheduleNextTokenRefresh in %s SECONDS", time));
105+
@Override
106+
public synchronized void scheduleConnectionReset() {
107+
_expirationTime = 120l;
108+
_log.debug(String.format("scheduleNextTokenRefresh in %s SECONDS", _expirationTime));
111109
_nextTokenRefreshTask = _scheduledExecutorService.schedule(() -> {
112110
_log.debug("Starting scheduleNextTokenRefresh ...");
113111
stop();
114112
start();
115-
}, time, TimeUnit.SECONDS);
113+
}, _expirationTime, TimeUnit.SECONDS);
116114
}
117115

118116
private boolean startSse(String token, String channels) {

client/src/main/java/io/split/engine/common/SyncManagerImp.java

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,12 +26,14 @@ public class SyncManagerImp implements SyncManager {
2626
private final LinkedBlockingQueue<PushManager.Status> _incomingPushStatus;
2727
private final ExecutorService _executorService;
2828
private Future<?> _pushStatusMonitorTask;
29+
private Backoff _backoff;
2930

3031
@VisibleForTesting
3132
/* package private */ SyncManagerImp(boolean streamingEnabledConfig,
3233
Synchronizer synchronizer,
3334
PushManager pushManager,
34-
LinkedBlockingQueue<PushManager.Status> pushMessages) {
35+
LinkedBlockingQueue<PushManager.Status> pushMessages,
36+
int authRetryBackOffBase) {
3537
_streamingEnabledConfig = new AtomicBoolean(streamingEnabledConfig);
3638
_synchronizer = checkNotNull(synchronizer);
3739
_pushManager = checkNotNull(pushManager);
@@ -41,6 +43,7 @@ public class SyncManagerImp implements SyncManager {
4143
.setNameFormat("SPLIT-PushStatusMonitor-%d")
4244
.setDaemon(true)
4345
.build());
46+
_backoff = new Backoff(authRetryBackOffBase);
4447
}
4548

4649
public static SyncManagerImp build(boolean streamingEnabledConfig,
@@ -54,7 +57,7 @@ public static SyncManagerImp build(boolean streamingEnabledConfig,
5457
LinkedBlockingQueue<PushManager.Status> pushMessages = new LinkedBlockingQueue<>();
5558
Synchronizer synchronizer = new SynchronizerImp(refreshableSplitFetcherProvider, segmentFetcher);
5659
PushManager pushManager = PushManagerImp.build(synchronizer, streamingServiceUrl, authUrl, httpClient, authRetryBackOffBase, pushMessages, sseHttpClient);
57-
return new SyncManagerImp(streamingEnabledConfig, synchronizer, pushManager, pushMessages);
60+
return new SyncManagerImp(streamingEnabledConfig, synchronizer, pushManager, pushMessages, authRetryBackOffBase);
5861
}
5962

6063
@Override
@@ -99,14 +102,20 @@ private void startPollingMode() {
99102
_synchronizer.stopPeriodicFetching();
100103
_synchronizer.syncAll();
101104
_pushManager.startWorkers();
105+
_pushManager.scheduleConnectionReset();
106+
_backoff.reset();
102107
break;
103108
case STREAMING_DOWN:
104109
_pushManager.stopWorkers();
105110
_synchronizer.startPeriodicFetching();
106111
break;
107112
case STREAMING_BACKOFF:
113+
long howLong = _backoff.interval() * 1000;
114+
_log.error(String.format("Retryable error in streaming subsystem. Switching to polling and retrying in %d seconds", howLong/1000));
108115
_synchronizer.startPeriodicFetching();
109116
_pushManager.stopWorkers();
117+
_pushManager.stop();
118+
Thread.sleep(howLong);
110119
_pushManager.start();
111120
break;
112121
case STREAMING_OFF:

client/src/main/java/io/split/engine/sse/PushStatusTracker.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,4 +12,5 @@ public interface PushStatusTracker {
1212
void handleSseStatus(SSEClient.StatusMessage newStatus);
1313
void forcePushDisable();
1414
void notifyStreamingReady();
15+
void forceRetryableError();
1516
}

client/src/main/java/io/split/engine/sse/PushStatusTrackerImp.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -114,6 +114,7 @@ public void handleIncomingAblyError(ErrorNotification notification) {
114114
}
115115
if (notification.getCode() >= 40140 && notification.getCode() <= 40149) {
116116
_statusMessages.offer(PushManager.Status.STREAMING_BACKOFF);
117+
return;
117118
}
118119
if (notification.getCode() >= 40000 && notification.getCode() <= 49999) {
119120
_statusMessages.offer(PushManager.Status.STREAMING_OFF);
@@ -134,4 +135,9 @@ public synchronized void forcePushDisable() {
134135
public void notifyStreamingReady() {
135136
_statusMessages.offer(PushManager.Status.STREAMING_READY);
136137
}
138+
139+
@Override
140+
public void forceRetryableError() {
141+
_statusMessages.offer(PushManager.Status.STREAMING_BACKOFF);
142+
}
137143
}

client/src/main/java/io/split/engine/sse/client/SSEClient.java

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import java.util.concurrent.ExecutorService;
2020
import java.util.concurrent.Executors;
2121
import java.util.concurrent.TimeUnit;
22+
import java.util.concurrent.atomic.AtomicBoolean;
2223
import java.util.concurrent.atomic.AtomicReference;
2324
import java.util.function.Function;
2425

@@ -54,13 +55,15 @@ private enum ConnectionState {
5455
private final AtomicReference<ConnectionState> _state = new AtomicReference<>(ConnectionState.CLOSED);
5556
private final AtomicReference<CloseableHttpResponse> _ongoingResponse = new AtomicReference<>();
5657
private final AtomicReference<HttpGet> _ongoingRequest = new AtomicReference<>();
58+
private AtomicBoolean _forcedStop;
5759

5860
public SSEClient(Function<RawEvent, Void> eventCallback,
5961
Function<StatusMessage, Void> statusCallback,
6062
CloseableHttpClient client) {
6163
_eventCallback = eventCallback;
6264
_statusCallback = statusCallback;
6365
_client = client;
66+
_forcedStop = new AtomicBoolean();
6467
}
6568

6669
public synchronized boolean open(URI uri) {
@@ -90,13 +93,14 @@ public boolean isOpen() {
9093
}
9194

9295
public synchronized void close() {
96+
_forcedStop.set(true);
9397
if (_state.compareAndSet(ConnectionState.OPEN, ConnectionState.CLOSED)) {
9498
if (_ongoingResponse.get() != null) {
9599
try {
96100
_ongoingRequest.get().abort();
97101
_ongoingResponse.get().close();
98102
} catch (IOException e) {
99-
_log.info(String.format("Error closing SSEClient: %s", e.getMessage()));
103+
_log.debug(String.format("SSEClient close forced: %s", e.getMessage()));
100104
}
101105
}
102106
}
@@ -127,9 +131,11 @@ private void connectAndLoop(URI uri, CountDownLatch signal) {
127131
_statusCallback.apply(StatusMessage.RETRYABLE_ERROR);
128132
return;
129133
} catch (IOException exc) { // Other type of connection error
130-
_log.info(String.format("SSE connection ended abruptly: %s. Retying", exc.getMessage()));
131-
_statusCallback.apply(StatusMessage.RETRYABLE_ERROR);
132-
return;
134+
if(!_forcedStop.get()) {
135+
_log.debug(String.format("SSE connection ended abruptly: %s. Retying", exc.getMessage()));
136+
_statusCallback.apply(StatusMessage.RETRYABLE_ERROR);
137+
return;
138+
}
133139
}
134140
}
135141
} catch (Exception e) { // Any other error non related to the connection disables streaming altogether
@@ -144,6 +150,7 @@ private void connectAndLoop(URI uri, CountDownLatch signal) {
144150

145151
_state.set(ConnectionState.CLOSED);
146152
_log.debug("SSEClient finished.");
153+
_forcedStop.set(false);
147154
}
148155
}
149156

client/src/test/java/io/split/client/SplitClientIntegrationTest.java

Lines changed: 39 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
import javax.ws.rs.sse.OutboundSseEvent;
1414
import java.io.IOException;
1515
import java.net.URISyntaxException;
16+
import java.util.Date;
1617
import java.util.List;
1718
import java.util.concurrent.TimeUnit;
1819
import java.util.concurrent.TimeoutException;
@@ -376,7 +377,7 @@ public void splitClientMultiFactory() throws IOException, TimeoutException, Inte
376377
SplitClient client3 = factory3.client();
377378
client3.blockUntilReady();
378379

379-
SplitClientConfig config4 = buildSplitClientConfig("disabled", splitServer.getUrl(), sseServer4.getPort(), true, 50);
380+
SplitClientConfig config4 = buildSplitClientConfig("disabled", splitServer.getUrl(), sseServer4.getPort(), true, 100);
380381
SplitFactory factory4 = SplitFactoryBuilder.build("fake-api-token-4", config4);
381382
SplitClient client4 = factory4.client();
382383
client4.blockUntilReady();
@@ -393,17 +394,29 @@ public void splitClientMultiFactory() throws IOException, TimeoutException, Inte
393394
String result4 = client4.getTreatment("admin", "push_test");
394395
Assert.assertEquals("on_whitelist", result4);
395396

397+
398+
OutboundSseEvent sseEventInitial = new OutboundEvent
399+
.Builder()
400+
.comment("initializing")
401+
.id("fakeid")
402+
.build();
396403
OutboundSseEvent sseEventSplitUpdate = new OutboundEvent
397404
.Builder()
398405
.name("message")
399406
.data("{\"id\":\"22\",\"clientId\":\"22\",\"timestamp\":1592590436082,\"encoding\":\"json\",\"channel\":\"xxxx_xxxx_splits\",\"data\":\"{\\\"type\\\":\\\"SPLIT_UPDATE\\\",\\\"changeNumber\\\":1585948850111}\"}")
400407
.build();
408+
eventQueue1.push(sseEventInitial);
409+
eventQueue2.push(sseEventInitial);
410+
eventQueue3.push(sseEventInitial);
411+
eventQueue4.push(sseEventInitial);
412+
401413
eventQueue1.push(sseEventSplitUpdate);
402414

403415
Awaitility.await()
404416
.atMost(50L, TimeUnit.SECONDS)
405417
.until(() -> "split_killed".equals(client1.getTreatment("admin", "push_test")));
406418

419+
407420
Awaitility.await()
408421
.atMost(50L, TimeUnit.SECONDS)
409422
.until(() -> "on_whitelist".equals(client2.getTreatment("admin", "push_test")));
@@ -427,13 +440,14 @@ public void splitClientMultiFactory() throws IOException, TimeoutException, Inte
427440
.until(() -> "on_whitelist".equals(client2.getTreatment("admin", "push_test")));
428441

429442
Awaitility.await()
430-
.atMost(50L, TimeUnit.SECONDS)
443+
.atMost(100L, TimeUnit.SECONDS)
431444
.until(() -> "split_killed".equals(client3.getTreatment("admin", "push_test")));
432445

433446
Awaitility.await()
434447
.atMost(50L, TimeUnit.SECONDS)
435448
.until(() -> "on_whitelist".equals(client4.getTreatment("admin", "push_test")));
436449

450+
437451
client1.destroy();
438452
client2.destroy();
439453
client3.destroy();
@@ -486,17 +500,28 @@ public void testConnectionClosedByRemoteHostIsProperlyHandled() throws IOExcepti
486500
splitServer.start();
487501
sseServer.start();
488502

489-
SplitClientConfig config = buildSplitClientConfig("enabled", splitServer.getUrl(), sseServer.getPort(), true, 50);
503+
SplitClientConfig config = buildSplitClientConfig("enabled", splitServer.getUrl(), sseServer.getPort(), true, 100);
490504
SplitFactory factory = SplitFactoryBuilder.build("fake-api-token-1", config);
491505
SplitClient client = factory.client();
492506
client.blockUntilReady();
493507

508+
OutboundSseEvent sseEventInitial = new OutboundEvent
509+
.Builder()
510+
.comment("initializing")
511+
.id("fakeid")
512+
.name("message")
513+
.data("{\"id\":\"222\",\"timestamp\":1588254668328,\"encoding\":\"json\",\"channel\":\"[?occupancy=metrics.publishers]control_pri\",\"data\":\"{\\\"metrics\\\":{\\\"publishers\\\":2}}\",\"name\":\"[meta]occupancy\"}")
514+
.build();
515+
516+
eventQueue.push(sseEventInitial);
517+
494518
String result = client.getTreatment("admin", "push_test");
495519
Assert.assertEquals("on_whitelist", result);
520+
Thread.sleep(1000);
496521
eventQueue.push(SSEMockServer.CONNECTION_CLOSED_BY_REMOTE_HOST);
497522
Thread.sleep(1000);
498523
result = client.getTreatment("admin", "push_test");
499-
//Assert.assertNotEquals("on_whitelist", result);
524+
Assert.assertNotEquals("on_whitelist", result);
500525
}
501526

502527
@Test
@@ -508,18 +533,26 @@ public void testConnectionClosedIsProperlyHandled() throws IOException, TimeoutE
508533
splitServer.start();
509534
sseServer.start();
510535

511-
SplitClientConfig config = buildSplitClientConfig("enabled", splitServer.getUrl(), sseServer.getPort(), true, 50);
536+
SplitClientConfig config = buildSplitClientConfig("enabled", splitServer.getUrl(), sseServer.getPort(), true, 5);
512537
SplitFactory factory = SplitFactoryBuilder.build("fake-api-token-1", config);
513538
SplitClient client = factory.client();
514539
client.blockUntilReady();
515540

541+
OutboundSseEvent sseEventInitial = new OutboundEvent
542+
.Builder()
543+
.comment("initializing")
544+
.id("fakeid")
545+
.build();
546+
547+
eventQueue.push(sseEventInitial);
548+
516549
String result = client.getTreatment("admin", "push_test");
517550
Assert.assertEquals("on_whitelist", result);
518551
Thread.sleep(1000);
519552
sseServer.stop();
520553
Thread.sleep(1000);
521554
result = client.getTreatment("admin", "push_test");
522-
//Assert.assertNotEquals("on_whitelist", result);
555+
Assert.assertNotEquals("on_whitelist", result);
523556
}
524557

525558
private SSEMockServer buildSSEMockServer(SSEMockServer.SseEventQueue eventQueue) {

client/src/test/java/io/split/engine/common/PushManagerTest.java

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
import io.split.engine.sse.AuthApiClient;
44
import io.split.engine.sse.EventSourceClient;
5+
import io.split.engine.sse.PushStatusTracker;
56
import io.split.engine.sse.PushStatusTrackerImp;
67
import io.split.engine.sse.dtos.AuthenticationResponse;
78
import io.split.engine.sse.workers.SegmentsWorkerImp;
@@ -17,18 +18,19 @@ public class PushManagerTest {
1718
private EventSourceClient _eventSourceClient;
1819
private Backoff _backoff;
1920
private PushManager _pushManager;
21+
private PushStatusTracker _pushStatusTracker;
2022

2123
@Before
2224
public void setUp() {
2325
_authApiClient = Mockito.mock(AuthApiClient.class);
2426
_eventSourceClient = Mockito.mock(EventSourceClient.class);
2527
_backoff = Mockito.mock(Backoff.class);
28+
_pushStatusTracker = Mockito.mock(PushStatusTrackerImp.class);
2629
_pushManager = new PushManagerImp(_authApiClient,
2730
_eventSourceClient,
2831
Mockito.mock(SplitsWorker.class),
2932
Mockito.mock(SegmentsWorkerImp.class),
30-
_backoff,
31-
new PushStatusTrackerImp(new LinkedBlockingQueue<>()));
33+
_pushStatusTracker);
3234
}
3335

3436
@Test
@@ -52,8 +54,9 @@ public void startWithPushEnabledShouldConnect() throws InterruptedException {
5254
Mockito.verify(_eventSourceClient, Mockito.times(1)).start(response.getChannels(), response.getToken());
5355

5456
Thread.sleep(1500);
55-
Mockito.verify(_authApiClient, Mockito.times(2)).Authenticate();
56-
Mockito.verify(_eventSourceClient, Mockito.times(1)).start(response2.getChannels(), response2.getToken());
57+
58+
Mockito.verify(_pushStatusTracker, Mockito.times(0)).forceRetryableError();
59+
Mockito.verify(_pushStatusTracker, Mockito.times(0)).forcePushDisable();
5760
}
5861

5962
@Test
@@ -89,13 +92,12 @@ public void startWithPushDisabledAndRetryTrueShouldConnect() throws InterruptedE
8992

9093
_pushManager.start();
9194

95+
9296
Mockito.verify(_authApiClient, Mockito.times(1)).Authenticate();
9397
Mockito.verify(_eventSourceClient, Mockito.never()).start(Mockito.any(String.class), Mockito.any(String.class));
9498
Mockito.verify(_eventSourceClient, Mockito.times(1)).stop();
9599

96100
Thread.sleep(1500);
97-
98-
Mockito.verify(_authApiClient, Mockito.times(2)).Authenticate();
99-
Mockito.verify(_eventSourceClient, Mockito.times(1)).start(response2.getChannels(), response2.getToken());
101+
Mockito.verify(_pushStatusTracker, Mockito.times(1)).forceRetryableError();
100102
}
101103
}

0 commit comments

Comments
 (0)