Skip to content

Commit ab64d08

Browse files
committed
pr feedback
1 parent de2b36f commit ab64d08

File tree

6 files changed

+34
-44
lines changed

6 files changed

+34
-44
lines changed

client/src/main/java/io/split/engine/SDKReadinessGates.java

Lines changed: 1 addition & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -45,11 +45,7 @@ public boolean isSDKReady(long milliseconds) throws InterruptedException {
4545

4646
timeLeft = end - System.currentTimeMillis();
4747

48-
boolean ready = areSegmentsReady(timeLeft);
49-
50-
if (ready) sdkInternalReady();
51-
52-
return ready;
48+
return areSegmentsReady(timeLeft);
5349
}
5450

5551
public boolean isSDKReadyNow() {
@@ -172,18 +168,10 @@ public boolean areSplitsReady(long milliseconds) throws InterruptedException {
172168
}
173169

174170
public void sdkInternalReady() {
175-
if (_internalReady.getCount() == 0) {
176-
return;
177-
}
178-
179171
_internalReady.countDown();
180172
}
181173

182174
public void waitUntilInternalReady() throws InterruptedException {
183-
if (_internalReady.getCount() == 0) {
184-
return;
185-
}
186-
187175
_internalReady.await();
188176
}
189177
}

client/src/main/java/io/split/engine/common/SyncManagerImp.java

Lines changed: 18 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -12,10 +12,7 @@
1212
import org.slf4j.Logger;
1313
import org.slf4j.LoggerFactory;
1414

15-
import java.util.concurrent.ExecutorService;
16-
import java.util.concurrent.Executors;
17-
import java.util.concurrent.Future;
18-
import java.util.concurrent.LinkedBlockingQueue;
15+
import java.util.concurrent.*;
1916
import java.util.concurrent.atomic.AtomicBoolean;
2017

2118
import static com.google.common.base.Preconditions.checkNotNull;
@@ -29,6 +26,8 @@ public class SyncManagerImp implements SyncManager {
2926
private final AtomicBoolean _shutdown;
3027
private final LinkedBlockingQueue<PushManager.Status> _incomingPushStatus;
3128
private final ExecutorService _executorService;
29+
private final ExecutorService _pollingExecutorService;
30+
private final SDKReadinessGates _gates;
3231
private Future<?> _pushStatusMonitorTask;
3332
private Backoff _backoff;
3433

@@ -37,7 +36,8 @@ public class SyncManagerImp implements SyncManager {
3736
Synchronizer synchronizer,
3837
PushManager pushManager,
3938
LinkedBlockingQueue<PushManager.Status> pushMessages,
40-
int authRetryBackOffBase) {
39+
int authRetryBackOffBase,
40+
SDKReadinessGates gates) {
4141
_streamingEnabledConfig = new AtomicBoolean(streamingEnabledConfig);
4242
_synchronizer = checkNotNull(synchronizer);
4343
_pushManager = checkNotNull(pushManager);
@@ -47,7 +47,12 @@ public class SyncManagerImp implements SyncManager {
4747
.setNameFormat("SPLIT-PushStatusMonitor-%d")
4848
.setDaemon(true)
4949
.build());
50+
_pollingExecutorService = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder()
51+
.setNameFormat("SPLIT-PollingMode-%d")
52+
.setDaemon(true)
53+
.build());
5054
_backoff = new Backoff(authRetryBackOffBase);
55+
_gates = checkNotNull(gates);
5156
}
5257

5358
public static SyncManagerImp build(boolean streamingEnabledConfig,
@@ -66,7 +71,7 @@ public static SyncManagerImp build(boolean streamingEnabledConfig,
6671
LinkedBlockingQueue<PushManager.Status> pushMessages = new LinkedBlockingQueue<>();
6772
Synchronizer synchronizer = new SynchronizerImp(splitSynchronizationTask, splitFetcher, segmentSynchronizationTaskImp, splitCache, segmentCache, streamingRetryDelay, gates);
6873
PushManager pushManager = PushManagerImp.build(synchronizer, streamingServiceUrl, authUrl, httpClient, pushMessages, sseHttpClient);
69-
return new SyncManagerImp(streamingEnabledConfig, synchronizer, pushManager, pushMessages, authRetryBackOffBase);
74+
return new SyncManagerImp(streamingEnabledConfig, synchronizer, pushManager, pushMessages, authRetryBackOffBase, gates);
7075
}
7176

7277
@Override
@@ -76,7 +81,7 @@ public void start() {
7681
if (_streamingEnabledConfig.get()) {
7782
startStreamingMode();
7883
} else {
79-
startPollingMode();
84+
_pollingExecutorService.submit(this::startPollingMode);
8085
}
8186
}
8287

@@ -96,6 +101,12 @@ private void startStreamingMode() {
96101
}
97102

98103
private void startPollingMode() {
104+
try {
105+
_gates.waitUntilInternalReady();
106+
} catch (InterruptedException ex) {
107+
_log.debug(ex.getMessage());
108+
}
109+
99110
_log.debug("Starting in polling mode ...");
100111
_synchronizer.startPeriodicFetching();
101112
}

client/src/main/java/io/split/engine/common/SynchronizerImp.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@
1111
import org.slf4j.Logger;
1212
import org.slf4j.LoggerFactory;
1313

14-
import java.util.ArrayList;
1514
import java.util.concurrent.Executors;
1615
import java.util.concurrent.ScheduledExecutorService;
1716
import java.util.concurrent.ThreadFactory;

client/src/main/java/io/split/engine/experiments/SplitFetcherImp.java

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -65,12 +65,6 @@ public void forceRefresh(boolean addCacheHeader) {
6565

6666
@Override
6767
public void run() {
68-
try {
69-
_gates.waitUntilInternalReady();
70-
} catch (InterruptedException ex) {
71-
_log.debug(ex.getMessage());
72-
}
73-
7468
this.fetchAll(false);
7569
}
7670

client/src/main/java/io/split/engine/segments/SegmentSynchronizationTaskImp.java

Lines changed: 1 addition & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -54,13 +54,6 @@ public SegmentSynchronizationTaskImp(SegmentChangeFetcher segmentChangeFetcher,
5454

5555
@Override
5656
public void run() {
57-
try {
58-
_gates.waitUntilInternalReady();
59-
} catch (InterruptedException ex) {
60-
_log.debug(ex.getMessage());
61-
}
62-
63-
_running.set(true);
6457
_log.debug("Starting PeriodicFetching Segments ...");
6558
this.fetchAll(false);
6659
}
@@ -106,7 +99,7 @@ public SegmentFetcher getFetcher(String segmentName) {
10699

107100
@Override
108101
public void startPeriodicFetching() {
109-
if (_running.get()) {
102+
if (_running.getAndSet(true) ) {
110103
_log.debug("Segments PeriodicFetching is running...");
111104
return;
112105
}

client/src/test/java/io/split/engine/common/SyncManagerTest.java

Lines changed: 14 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package io.split.engine.common;
22

3+
import io.split.engine.SDKReadinessGates;
34
import org.junit.Before;
45
import org.junit.Test;
56
import org.mockito.Mockito;
@@ -10,25 +11,29 @@ public class SyncManagerTest {
1011
private static final int BACKOFF_BASE = 1;
1112
private Synchronizer _synchronizer;
1213
private PushManager _pushManager;
14+
private SDKReadinessGates _gates;
1315

1416
@Before
1517
public void setUp() {
1618
_synchronizer = Mockito.mock(Synchronizer.class);
1719
_pushManager = Mockito.mock(PushManager.class);
20+
_gates = Mockito.mock(SDKReadinessGates.class);
1821
}
1922

2023
@Test
21-
public void startWithStreamingFalseShouldStartPolling() {
22-
SyncManagerImp syncManager = new SyncManagerImp(false, _synchronizer, _pushManager, new LinkedBlockingQueue<>(), BACKOFF_BASE);
24+
public void startWithStreamingFalseShouldStartPolling() throws InterruptedException {
25+
_gates.sdkInternalReady();
26+
SyncManagerImp syncManager = new SyncManagerImp(false, _synchronizer, _pushManager, new LinkedBlockingQueue<>(), BACKOFF_BASE, _gates);
2327
syncManager.start();
28+
Thread.sleep(1000);
2429
Mockito.verify(_synchronizer, Mockito.times(1)).startPeriodicFetching();
2530
Mockito.verify(_synchronizer, Mockito.times(1)).syncAll();
2631
Mockito.verify(_pushManager, Mockito.times(0)).start();
2732
}
2833

2934
@Test
3035
public void startWithStreamingTrueShouldStartSyncAll() {
31-
SyncManager sm = new SyncManagerImp(true, _synchronizer, _pushManager, new LinkedBlockingQueue<>(), BACKOFF_BASE);
36+
SyncManager sm = new SyncManagerImp(true, _synchronizer, _pushManager, new LinkedBlockingQueue<>(), BACKOFF_BASE, _gates);
3237
sm.start();
3338
Mockito.verify(_synchronizer, Mockito.times(0)).startPeriodicFetching();
3439
Mockito.verify(_synchronizer, Mockito.times(1)).syncAll();
@@ -38,7 +43,7 @@ public void startWithStreamingTrueShouldStartSyncAll() {
3843
@Test
3944
public void onStreamingAvailable() throws InterruptedException {
4045
LinkedBlockingQueue<PushManager.Status> messsages = new LinkedBlockingQueue<>();
41-
SyncManagerImp syncManager = new SyncManagerImp(true, _synchronizer, _pushManager, messsages, BACKOFF_BASE);
46+
SyncManagerImp syncManager = new SyncManagerImp(true, _synchronizer, _pushManager, messsages, BACKOFF_BASE, _gates);
4247
Thread t = new Thread(syncManager::incomingPushStatusHandler);
4348
t.start();
4449
messsages.offer(PushManager.Status.STREAMING_READY);
@@ -52,7 +57,7 @@ public void onStreamingAvailable() throws InterruptedException {
5257
@Test
5358
public void onStreamingDisabled() throws InterruptedException {
5459
LinkedBlockingQueue<PushManager.Status> messsages = new LinkedBlockingQueue<>();
55-
SyncManagerImp syncManager = new SyncManagerImp(true, _synchronizer, _pushManager, messsages, BACKOFF_BASE);
60+
SyncManagerImp syncManager = new SyncManagerImp(true, _synchronizer, _pushManager, messsages, BACKOFF_BASE, _gates);
5661
Thread t = new Thread(syncManager::incomingPushStatusHandler);
5762
t.start();
5863
messsages.offer(PushManager.Status.STREAMING_DOWN);
@@ -66,7 +71,7 @@ public void onStreamingDisabled() throws InterruptedException {
6671
@Test
6772
public void onStreamingShutdown() throws InterruptedException {
6873
LinkedBlockingQueue<PushManager.Status> messsages = new LinkedBlockingQueue<>();
69-
SyncManagerImp syncManager = new SyncManagerImp(true, _synchronizer, _pushManager, messsages, BACKOFF_BASE);
74+
SyncManagerImp syncManager = new SyncManagerImp(true, _synchronizer, _pushManager, messsages, BACKOFF_BASE, _gates);
7075
Thread t = new Thread(syncManager::incomingPushStatusHandler);
7176
t.start();
7277
messsages.offer(PushManager.Status.STREAMING_OFF);
@@ -78,7 +83,7 @@ public void onStreamingShutdown() throws InterruptedException {
7883
@Test
7984
public void onConnected() throws InterruptedException {
8085
LinkedBlockingQueue<PushManager.Status> messsages = new LinkedBlockingQueue<>();
81-
SyncManagerImp syncManager = new SyncManagerImp(true, _synchronizer, _pushManager, messsages, BACKOFF_BASE);
86+
SyncManagerImp syncManager = new SyncManagerImp(true, _synchronizer, _pushManager, messsages, BACKOFF_BASE, _gates);
8287
Thread t = new Thread(syncManager::incomingPushStatusHandler);
8388
t.start();
8489
messsages.offer(PushManager.Status.STREAMING_READY);
@@ -91,7 +96,7 @@ public void onConnected() throws InterruptedException {
9196
@Test
9297
public void onDisconnect() throws InterruptedException {
9398
LinkedBlockingQueue<PushManager.Status> messsages = new LinkedBlockingQueue<>();
94-
SyncManagerImp syncManager = new SyncManagerImp(true, _synchronizer, _pushManager, messsages, BACKOFF_BASE);
99+
SyncManagerImp syncManager = new SyncManagerImp(true, _synchronizer, _pushManager, messsages, BACKOFF_BASE, _gates);
95100
Thread t = new Thread(syncManager::incomingPushStatusHandler);
96101
t.start();
97102
messsages.offer(PushManager.Status.STREAMING_OFF);
@@ -103,7 +108,7 @@ public void onDisconnect() throws InterruptedException {
103108
@Test
104109
public void onDisconnectAndReconnect() throws InterruptedException { // Check with mauro. reconnect should call pushManager.start again, right?
105110
LinkedBlockingQueue<PushManager.Status> messsages = new LinkedBlockingQueue<>();
106-
SyncManagerImp syncManager = new SyncManagerImp(true, _synchronizer, _pushManager, messsages, BACKOFF_BASE);
111+
SyncManagerImp syncManager = new SyncManagerImp(true, _synchronizer, _pushManager, messsages, BACKOFF_BASE, _gates);
107112
syncManager.start();
108113
messsages.offer(PushManager.Status.STREAMING_BACKOFF);
109114
Thread.sleep(1200);

0 commit comments

Comments
 (0)