Skip to content

Commit eeeb747

Browse files
authored
Merge pull request #216 from splitio/add-internal-ready
sdk internal ready implementation
2 parents 5eb5a9e + c21fcd1 commit eeeb747

File tree

11 files changed

+86
-34
lines changed

11 files changed

+86
-34
lines changed

client/src/main/java/io/split/client/SplitFactoryImpl.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,6 @@
5151
import java.util.ArrayList;
5252
import java.util.List;
5353
import java.util.Random;
54-
import java.util.concurrent.TimeUnit;
5554
import java.util.stream.Collectors;
5655

5756
public class SplitFactoryImpl implements SplitFactory {
@@ -127,12 +126,11 @@ public SplitFactoryImpl(String apiToken, SplitClientConfig config) throws URISyn
127126
// Impressions
128127
_impressionsManager = buildImpressionsManager(config);
129128

130-
131129
// EventClient
132130
_eventClient = EventClientImpl.create(_httpclient, _eventsRootTarget, config.eventsQueueSize(), config.eventFlushIntervalInMillis(), config.waitBeforeShutdown());
133131

134132
// SyncManager
135-
_syncManager = SyncManagerImp.build(config.streamingEnabled(), _splitSynchronizationTask, _splitFetcher, _segmentSynchronizationTaskImp, _splitCache, config.authServiceURL(), _httpclient, config.streamingServiceURL(), config.authRetryBackoffBase(), buildSSEdHttpClient(config), _segmentCache, config.streamingRetryDelay());
133+
_syncManager = SyncManagerImp.build(config.streamingEnabled(), _splitSynchronizationTask, _splitFetcher, _segmentSynchronizationTaskImp, _splitCache, config.authServiceURL(), _httpclient, config.streamingServiceURL(), config.authRetryBackoffBase(), buildSSEdHttpClient(config), _segmentCache, config.streamingRetryDelay(), _gates);
136134
_syncManager.start();
137135

138136
// Evaluator

client/src/main/java/io/split/engine/SDKReadinessGates.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,9 @@ public class SDKReadinessGates {
1616
private static final Logger _log = LoggerFactory.getLogger(SDKReadinessGates.class);
1717

1818
private final CountDownLatch _splitsAreReady = new CountDownLatch(1);
19+
private final CountDownLatch _internalReady = new CountDownLatch(1);
1920
private final ConcurrentMap<String, CountDownLatch> _segmentsAreReady = new ConcurrentHashMap<>();
2021

21-
2222
/**
2323
* Returns true if the SDK is ready. The SDK is ready when:
2424
* <ol>
@@ -166,4 +166,12 @@ public boolean areSegmentsReady(long milliseconds) throws InterruptedException {
166166
public boolean areSplitsReady(long milliseconds) throws InterruptedException {
167167
return _splitsAreReady.await(milliseconds, TimeUnit.MILLISECONDS);
168168
}
169+
170+
public void sdkInternalReady() {
171+
_internalReady.countDown();
172+
}
173+
174+
public void waitUntilInternalReady() throws InterruptedException {
175+
_internalReady.await();
176+
}
169177
}

client/src/main/java/io/split/engine/common/SyncManager.java

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,5 @@
11
package io.split.engine.common;
22

3-
import io.split.engine.sse.listeners.FeedbackLoopListener;
4-
import io.split.engine.sse.listeners.NotificationKeeperListener;
5-
63
public interface SyncManager {
74
void start();
85
void shutdown();

client/src/main/java/io/split/engine/common/SyncManagerImp.java

Lines changed: 23 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
import com.google.common.util.concurrent.ThreadFactoryBuilder;
55
import io.split.cache.SegmentCache;
66
import io.split.cache.SplitCache;
7+
import io.split.engine.SDKReadinessGates;
78
import io.split.engine.experiments.SplitFetcher;
89
import io.split.engine.experiments.SplitSynchronizationTask;
910
import io.split.engine.segments.SegmentSynchronizationTaskImp;
@@ -28,6 +29,8 @@ public class SyncManagerImp implements SyncManager {
2829
private final AtomicBoolean _shutdown;
2930
private final LinkedBlockingQueue<PushManager.Status> _incomingPushStatus;
3031
private final ExecutorService _executorService;
32+
private final ExecutorService _pollingExecutorService;
33+
private final SDKReadinessGates _gates;
3134
private Future<?> _pushStatusMonitorTask;
3235
private Backoff _backoff;
3336

@@ -36,7 +39,8 @@ public class SyncManagerImp implements SyncManager {
3639
Synchronizer synchronizer,
3740
PushManager pushManager,
3841
LinkedBlockingQueue<PushManager.Status> pushMessages,
39-
int authRetryBackOffBase) {
42+
int authRetryBackOffBase,
43+
SDKReadinessGates gates) {
4044
_streamingEnabledConfig = new AtomicBoolean(streamingEnabledConfig);
4145
_synchronizer = checkNotNull(synchronizer);
4246
_pushManager = checkNotNull(pushManager);
@@ -46,7 +50,12 @@ public class SyncManagerImp implements SyncManager {
4650
.setNameFormat("SPLIT-PushStatusMonitor-%d")
4751
.setDaemon(true)
4852
.build());
53+
_pollingExecutorService = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder()
54+
.setNameFormat("SPLIT-PollingMode-%d")
55+
.setDaemon(true)
56+
.build());
4957
_backoff = new Backoff(authRetryBackOffBase);
58+
_gates = checkNotNull(gates);
5059
}
5160

5261
public static SyncManagerImp build(boolean streamingEnabledConfig,
@@ -60,19 +69,22 @@ public static SyncManagerImp build(boolean streamingEnabledConfig,
6069
int authRetryBackOffBase,
6170
CloseableHttpClient sseHttpClient,
6271
SegmentCache segmentCache,
63-
int streamingRetryDelay) {
72+
int streamingRetryDelay,
73+
SDKReadinessGates gates) {
6474
LinkedBlockingQueue<PushManager.Status> pushMessages = new LinkedBlockingQueue<>();
65-
Synchronizer synchronizer = new SynchronizerImp(splitSynchronizationTask, splitFetcher, segmentSynchronizationTaskImp, splitCache, segmentCache, streamingRetryDelay);
75+
Synchronizer synchronizer = new SynchronizerImp(splitSynchronizationTask, splitFetcher, segmentSynchronizationTaskImp, splitCache, segmentCache, streamingRetryDelay, gates);
6676
PushManager pushManager = PushManagerImp.build(synchronizer, streamingServiceUrl, authUrl, httpClient, pushMessages, sseHttpClient);
67-
return new SyncManagerImp(streamingEnabledConfig, synchronizer, pushManager, pushMessages, authRetryBackOffBase);
77+
return new SyncManagerImp(streamingEnabledConfig, synchronizer, pushManager, pushMessages, authRetryBackOffBase, gates);
6878
}
6979

7080
@Override
7181
public void start() {
82+
_synchronizer.syncAll();
83+
7284
if (_streamingEnabledConfig.get()) {
7385
startStreamingMode();
7486
} else {
75-
startPollingMode();
87+
_pollingExecutorService.submit(this::startPollingMode);
7688
}
7789
}
7890

@@ -85,15 +97,19 @@ public void shutdown() {
8597

8698
private void startStreamingMode() {
8799
_log.debug("Starting in streaming mode ...");
88-
_synchronizer.syncAll();
89100
if (null == _pushStatusMonitorTask) {
90101
_pushStatusMonitorTask = _executorService.submit(this::incomingPushStatusHandler);
91102
}
92103
_pushManager.start();
93-
94104
}
95105

96106
private void startPollingMode() {
107+
try {
108+
_gates.waitUntilInternalReady();
109+
} catch (InterruptedException ex) {
110+
_log.debug(ex.getMessage());
111+
}
112+
97113
_log.debug("Starting in polling mode ...");
98114
_synchronizer.startPeriodicFetching();
99115
}

client/src/main/java/io/split/engine/common/SynchronizerImp.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import com.google.common.util.concurrent.ThreadFactoryBuilder;
44
import io.split.cache.SegmentCache;
55
import io.split.cache.SplitCache;
6+
import io.split.engine.SDKReadinessGates;
67
import io.split.engine.experiments.SplitFetcher;
78
import io.split.engine.experiments.SplitSynchronizationTask;
89
import io.split.engine.segments.SegmentFetcher;
@@ -28,19 +29,22 @@ public class SynchronizerImp implements Synchronizer {
2829
private final SplitCache _splitCache;
2930
private final SegmentCache _segmentCache;
3031
private final int _onDemandFetchRetryDelayMs;
32+
private final SDKReadinessGates _gates;
3133

3234
public SynchronizerImp(SplitSynchronizationTask splitSynchronizationTask,
3335
SplitFetcher splitFetcher,
3436
SegmentSynchronizationTask segmentSynchronizationTaskImp,
3537
SplitCache splitCache,
3638
SegmentCache segmentCache,
37-
int onDemandFetchRetryDelayMs) {
39+
int onDemandFetchRetryDelayMs,
40+
SDKReadinessGates gates) {
3841
_splitSynchronizationTask = checkNotNull(splitSynchronizationTask);
3942
_splitFetcher = checkNotNull(splitFetcher);
4043
_segmentSynchronizationTaskImp = checkNotNull(segmentSynchronizationTaskImp);
4144
_splitCache = checkNotNull(splitCache);
4245
_segmentCache = checkNotNull(segmentCache);
4346
_onDemandFetchRetryDelayMs = checkNotNull(onDemandFetchRetryDelayMs);
47+
_gates = checkNotNull(gates);
4448

4549
ThreadFactory splitsThreadFactory = new ThreadFactoryBuilder()
4650
.setDaemon(true)
@@ -53,7 +57,8 @@ public SynchronizerImp(SplitSynchronizationTask splitSynchronizationTask,
5357
public void syncAll() {
5458
_syncAllScheduledExecutorService.schedule(() -> {
5559
_splitFetcher.fetchAll(true);
56-
_segmentSynchronizationTaskImp.fetchAll(true);
60+
_segmentSynchronizationTaskImp.fetchAllSynchronous();
61+
_gates.sdkInternalReady();
5762
}, 0, TimeUnit.SECONDS);
5863
}
5964

client/src/main/java/io/split/engine/segments/SegmentFetcherImp.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -152,6 +152,4 @@ private void fetchAndUpdate(boolean addCacheHeader) {
152152
public void fetchAll() {
153153
this.fetchAndUpdate(false);
154154
}
155-
156-
157155
}

client/src/main/java/io/split/engine/segments/SegmentSynchronizationTask.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,4 +29,9 @@ public interface SegmentSynchronizationTask extends Runnable {
2929
* @param addCacheHeader
3030
*/
3131
void fetchAll(boolean addCacheHeader);
32+
33+
/**
34+
* fetch every Segment Synchronous
35+
*/
36+
void fetchAllSynchronous();
3237
}

client/src/main/java/io/split/engine/segments/SegmentSynchronizationTaskImp.java

Lines changed: 19 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,13 +11,14 @@
1111
import java.util.List;
1212
import java.util.Map;
1313
import java.util.concurrent.ConcurrentMap;
14+
import java.util.concurrent.Executors;
1415
import java.util.concurrent.ScheduledExecutorService;
1516
import java.util.concurrent.ScheduledFuture;
1617
import java.util.concurrent.ThreadFactory;
18+
import java.util.concurrent.TimeUnit;
1719
import java.util.concurrent.atomic.AtomicBoolean;
1820
import java.util.concurrent.atomic.AtomicLong;
19-
import java.util.concurrent.Executors;
20-
import java.util.concurrent.TimeUnit;
21+
import java.util.stream.Collectors;
2122

2223
import static com.google.common.base.Preconditions.checkArgument;
2324
import static com.google.common.base.Preconditions.checkNotNull;
@@ -102,7 +103,7 @@ public SegmentFetcher getFetcher(String segmentName) {
102103

103104
@Override
104105
public void startPeriodicFetching() {
105-
if (_running.getAndSet(true)) {
106+
if (_running.getAndSet(true) ) {
106107
_log.debug("Segments PeriodicFetching is running...");
107108
return;
108109
}
@@ -154,7 +155,22 @@ public void fetchAll(boolean addCacheHeader) {
154155
_scheduledExecutorService.submit(fetcher::runWhitCacheHeader);
155156
continue;
156157
}
158+
157159
_scheduledExecutorService.submit(fetcher::fetchAll);
158160
}
159161
}
162+
163+
@Override
164+
public void fetchAllSynchronous() {
165+
_segmentFetchers
166+
.entrySet()
167+
.stream().map(e -> _scheduledExecutorService.submit(e.getValue()::runWhitCacheHeader))
168+
.collect(Collectors.toList())
169+
.stream().forEach(future -> {
170+
try {
171+
future.get();
172+
} catch (Exception ex) {
173+
_log.error(ex.getMessage());
174+
}});
175+
}
160176
}

client/src/test/java/io/split/engine/common/SyncManagerTest.java

Lines changed: 15 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package io.split.engine.common;
22

3+
import io.split.engine.SDKReadinessGates;
34
import org.junit.Before;
45
import org.junit.Test;
56
import org.mockito.Mockito;
@@ -10,25 +11,29 @@ public class SyncManagerTest {
1011
private static final int BACKOFF_BASE = 1;
1112
private Synchronizer _synchronizer;
1213
private PushManager _pushManager;
14+
private SDKReadinessGates _gates;
1315

1416
@Before
1517
public void setUp() {
1618
_synchronizer = Mockito.mock(Synchronizer.class);
1719
_pushManager = Mockito.mock(PushManager.class);
20+
_gates = Mockito.mock(SDKReadinessGates.class);
1821
}
1922

2023
@Test
21-
public void startWithStreamingFalseShouldStartPolling() {
22-
SyncManagerImp syncManager = new SyncManagerImp(false, _synchronizer, _pushManager, new LinkedBlockingQueue<>(), BACKOFF_BASE);
24+
public void startWithStreamingFalseShouldStartPolling() throws InterruptedException {
25+
_gates.sdkInternalReady();
26+
SyncManagerImp syncManager = new SyncManagerImp(false, _synchronizer, _pushManager, new LinkedBlockingQueue<>(), BACKOFF_BASE, _gates);
2327
syncManager.start();
28+
Thread.sleep(1000);
2429
Mockito.verify(_synchronizer, Mockito.times(1)).startPeriodicFetching();
25-
Mockito.verify(_synchronizer, Mockito.times(0)).syncAll();
30+
Mockito.verify(_synchronizer, Mockito.times(1)).syncAll();
2631
Mockito.verify(_pushManager, Mockito.times(0)).start();
2732
}
2833

2934
@Test
3035
public void startWithStreamingTrueShouldStartSyncAll() {
31-
SyncManager sm = new SyncManagerImp(true, _synchronizer, _pushManager, new LinkedBlockingQueue<>(), BACKOFF_BASE);
36+
SyncManager sm = new SyncManagerImp(true, _synchronizer, _pushManager, new LinkedBlockingQueue<>(), BACKOFF_BASE, _gates);
3237
sm.start();
3338
Mockito.verify(_synchronizer, Mockito.times(0)).startPeriodicFetching();
3439
Mockito.verify(_synchronizer, Mockito.times(1)).syncAll();
@@ -38,7 +43,7 @@ public void startWithStreamingTrueShouldStartSyncAll() {
3843
@Test
3944
public void onStreamingAvailable() throws InterruptedException {
4045
LinkedBlockingQueue<PushManager.Status> messsages = new LinkedBlockingQueue<>();
41-
SyncManagerImp syncManager = new SyncManagerImp(true, _synchronizer, _pushManager, messsages, BACKOFF_BASE);
46+
SyncManagerImp syncManager = new SyncManagerImp(true, _synchronizer, _pushManager, messsages, BACKOFF_BASE, _gates);
4247
Thread t = new Thread(syncManager::incomingPushStatusHandler);
4348
t.start();
4449
messsages.offer(PushManager.Status.STREAMING_READY);
@@ -52,7 +57,7 @@ public void onStreamingAvailable() throws InterruptedException {
5257
@Test
5358
public void onStreamingDisabled() throws InterruptedException {
5459
LinkedBlockingQueue<PushManager.Status> messsages = new LinkedBlockingQueue<>();
55-
SyncManagerImp syncManager = new SyncManagerImp(true, _synchronizer, _pushManager, messsages, BACKOFF_BASE);
60+
SyncManagerImp syncManager = new SyncManagerImp(true, _synchronizer, _pushManager, messsages, BACKOFF_BASE, _gates);
5661
Thread t = new Thread(syncManager::incomingPushStatusHandler);
5762
t.start();
5863
messsages.offer(PushManager.Status.STREAMING_DOWN);
@@ -66,7 +71,7 @@ public void onStreamingDisabled() throws InterruptedException {
6671
@Test
6772
public void onStreamingShutdown() throws InterruptedException {
6873
LinkedBlockingQueue<PushManager.Status> messsages = new LinkedBlockingQueue<>();
69-
SyncManagerImp syncManager = new SyncManagerImp(true, _synchronizer, _pushManager, messsages, BACKOFF_BASE);
74+
SyncManagerImp syncManager = new SyncManagerImp(true, _synchronizer, _pushManager, messsages, BACKOFF_BASE, _gates);
7075
Thread t = new Thread(syncManager::incomingPushStatusHandler);
7176
t.start();
7277
messsages.offer(PushManager.Status.STREAMING_OFF);
@@ -78,7 +83,7 @@ public void onStreamingShutdown() throws InterruptedException {
7883
@Test
7984
public void onConnected() throws InterruptedException {
8085
LinkedBlockingQueue<PushManager.Status> messsages = new LinkedBlockingQueue<>();
81-
SyncManagerImp syncManager = new SyncManagerImp(true, _synchronizer, _pushManager, messsages, BACKOFF_BASE);
86+
SyncManagerImp syncManager = new SyncManagerImp(true, _synchronizer, _pushManager, messsages, BACKOFF_BASE, _gates);
8287
Thread t = new Thread(syncManager::incomingPushStatusHandler);
8388
t.start();
8489
messsages.offer(PushManager.Status.STREAMING_READY);
@@ -91,7 +96,7 @@ public void onConnected() throws InterruptedException {
9196
@Test
9297
public void onDisconnect() throws InterruptedException {
9398
LinkedBlockingQueue<PushManager.Status> messsages = new LinkedBlockingQueue<>();
94-
SyncManagerImp syncManager = new SyncManagerImp(true, _synchronizer, _pushManager, messsages, BACKOFF_BASE);
99+
SyncManagerImp syncManager = new SyncManagerImp(true, _synchronizer, _pushManager, messsages, BACKOFF_BASE, _gates);
95100
Thread t = new Thread(syncManager::incomingPushStatusHandler);
96101
t.start();
97102
messsages.offer(PushManager.Status.STREAMING_OFF);
@@ -103,7 +108,7 @@ public void onDisconnect() throws InterruptedException {
103108
@Test
104109
public void onDisconnectAndReconnect() throws InterruptedException { // Check with mauro. reconnect should call pushManager.start again, right?
105110
LinkedBlockingQueue<PushManager.Status> messsages = new LinkedBlockingQueue<>();
106-
SyncManagerImp syncManager = new SyncManagerImp(true, _synchronizer, _pushManager, messsages, BACKOFF_BASE);
111+
SyncManagerImp syncManager = new SyncManagerImp(true, _synchronizer, _pushManager, messsages, BACKOFF_BASE, _gates);
107112
syncManager.start();
108113
messsages.offer(PushManager.Status.STREAMING_BACKOFF);
109114
Thread.sleep(1200);

client/src/test/java/io/split/engine/common/SynchronizerTest.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
import io.split.cache.SegmentCache;
44
import io.split.cache.SplitCache;
5+
import io.split.engine.SDKReadinessGates;
56
import io.split.engine.experiments.SplitFetcherImp;
67
import io.split.engine.experiments.SplitSynchronizationTask;
78
import io.split.engine.segments.SegmentFetcher;
@@ -17,6 +18,7 @@ public class SynchronizerTest {
1718
private SplitCache _splitCache;
1819
private Synchronizer _synchronizer;
1920
private SegmentCache _segmentCache;
21+
private SDKReadinessGates _gates;
2022

2123
@Before
2224
public void beforeMethod() {
@@ -25,8 +27,9 @@ public void beforeMethod() {
2527
_splitFetcher = Mockito.mock(SplitFetcherImp.class);
2628
_splitCache = Mockito.mock(SplitCache.class);
2729
_segmentCache = Mockito.mock(SegmentCache.class);
30+
_gates = Mockito.mock(SDKReadinessGates.class);
2831

29-
_synchronizer = new SynchronizerImp(_refreshableSplitFetcherTask, _splitFetcher, _segmentFetcher, _splitCache, _segmentCache, 50);
32+
_synchronizer = new SynchronizerImp(_refreshableSplitFetcherTask, _splitFetcher, _segmentFetcher, _splitCache, _segmentCache, 50, _gates);
3033
}
3134

3235
@Test
@@ -35,7 +38,8 @@ public void syncAll() throws InterruptedException {
3538

3639
Thread.sleep(100);
3740
Mockito.verify(_splitFetcher, Mockito.times(1)).fetchAll(true);
38-
Mockito.verify(_segmentFetcher, Mockito.times(1)).fetchAll(true);
41+
Mockito.verify(_segmentFetcher, Mockito.times(1)).fetchAllSynchronous();
42+
Mockito.verify(_gates, Mockito.times(1)).sdkInternalReady();
3943
}
4044

4145
@Test

0 commit comments

Comments
 (0)