Skip to content

Commit 2dd455c

Browse files
Making syncManager start async and telemetry init
1 parent f5b39c5 commit 2dd455c

File tree

17 files changed

+136
-137
lines changed

17 files changed

+136
-137
lines changed

client/src/main/java/io/split/client/SplitFactoryImpl.java

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,6 @@
2828
import io.split.telemetry.storage.InMemoryTelemetryStorage;
2929
import io.split.telemetry.storage.TelemetryStorage;
3030
import io.split.telemetry.synchronizer.SynchronizerMemory;
31-
import io.split.telemetry.synchronizer.TelemetryConfigInitializer;
3231
import io.split.telemetry.synchronizer.TelemetrySyncTask;
3332
import io.split.telemetry.synchronizer.TelemetrySynchronizer;
3433
import org.apache.hc.client5.http.auth.AuthScope;
@@ -95,7 +94,6 @@ public class SplitFactoryImpl implements SplitFactory {
9594
private final TelemetrySynchronizer _telemetrySynchronizer;
9695
private final TelemetrySyncTask _telemetrySyncTask;
9796
private final long _startTime;
98-
private final TelemetryConfigInitializer _telemetryConfigInitializer;
9997

10098
public SplitFactoryImpl(String apiToken, SplitClientConfig config) throws URISyntaxException {
10199
_startTime = System.currentTimeMillis();
@@ -125,9 +123,7 @@ public SplitFactoryImpl(String apiToken, SplitClientConfig config) throws URISyn
125123
// Cache Initialisations
126124
_segmentCache = new SegmentCacheInMemoryImpl();
127125
_splitCache = new InMemoryCacheImp();
128-
_telemetrySynchronizer = new SynchronizerMemory(_httpclient, URI.create(config.get_telemetryURL()), _telemetryStorage, _splitCache, _segmentCache, _telemetryStorage);
129-
_telemetrySyncTask = new TelemetrySyncTask(config.get_telemetryRefreshRate(), _telemetrySynchronizer);
130-
_telemetryConfigInitializer = new TelemetryConfigInitializer(_telemetrySynchronizer,_gates,config);
126+
_telemetrySynchronizer = new SynchronizerMemory(_httpclient, URI.create(config.get_telemetryURL()), _telemetryStorage, _splitCache, _segmentCache, _telemetryStorage, _startTime);
131127

132128

133129
// Segments
@@ -145,9 +141,7 @@ public SplitFactoryImpl(String apiToken, SplitClientConfig config) throws URISyn
145141
// EventClient
146142
_eventClient = EventClientImpl.create(_httpclient, _eventsRootTarget, config.eventsQueueSize(), config.eventFlushIntervalInMillis(), config.waitBeforeShutdown(), _telemetryStorage);
147143

148-
// SyncManager
149-
_syncManager = SyncManagerImp.build(config.streamingEnabled(), _splitSynchronizationTask, _splitFetcher, _segmentSynchronizationTaskImp, _splitCache, config.authServiceURL(), _httpclient, config.streamingServiceURL(), config.authRetryBackoffBase(), buildSSEdHttpClient(apiToken, config), _segmentCache, config.streamingRetryDelay(), _gates, _telemetryStorage);
150-
_syncManager.start();
144+
_telemetrySyncTask = new TelemetrySyncTask(config.get_telemetryRefreshRate(), _telemetrySynchronizer);
151145

152146
// Evaluator
153147
_evaluator = new EvaluatorImp(_splitCache);
@@ -158,6 +152,12 @@ public SplitFactoryImpl(String apiToken, SplitClientConfig config) throws URISyn
158152
// SplitManager
159153
_manager = new SplitManagerImpl(_splitCache, config, _gates, _telemetryStorage);
160154

155+
// SyncManager
156+
_syncManager = SyncManagerImp.build(config.streamingEnabled(), _splitSynchronizationTask, _splitFetcher, _segmentSynchronizationTaskImp, _splitCache,
157+
config.authServiceURL(), _httpclient, config.streamingServiceURL(), config.authRetryBackoffBase(), buildSSEdHttpClient(apiToken, config),
158+
_segmentCache, config.streamingRetryDelay(), _gates, _telemetryStorage, _telemetrySynchronizer,config);
159+
_syncManager.start();
160+
161161
// DestroyOnShutDown
162162
if (config.destroyOnShutDown()) {
163163
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
@@ -316,7 +316,7 @@ private SplitFetcher buildSplitFetcher() throws URISyntaxException {
316316
SplitChangeFetcher splitChangeFetcher = HttpSplitChangeFetcher.create(_httpclient, _rootTarget, _telemetryStorage);
317317
SplitParser splitParser = new SplitParser(_segmentSynchronizationTaskImp, _segmentCache);
318318

319-
return new SplitFetcherImp(splitChangeFetcher, splitParser, _gates, _splitCache, _telemetryStorage);
319+
return new SplitFetcherImp(splitChangeFetcher, splitParser, _splitCache, _telemetryStorage);
320320
}
321321

322322
private ImpressionsManagerImpl buildImpressionsManager(SplitClientConfig config) throws URISyntaxException {

client/src/main/java/io/split/engine/SDKReadinessGates.java

Lines changed: 1 addition & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -35,17 +35,7 @@ public class SDKReadinessGates {
3535
* @throws InterruptedException if this operation was interrupted.
3636
*/
3737
public boolean isSDKReady(long milliseconds) throws InterruptedException {
38-
long end = System.currentTimeMillis() + milliseconds;
39-
long timeLeft = milliseconds;
40-
41-
boolean splits = areSplitsReady(timeLeft);
42-
if (!splits) {
43-
return false;
44-
}
45-
46-
timeLeft = end - System.currentTimeMillis();
47-
48-
return areSegmentsReady(timeLeft);
38+
return _internalReady.await(milliseconds, TimeUnit.MILLISECONDS);
4939
}
5040

5141
public boolean isSDKReadyNow() {

client/src/main/java/io/split/engine/common/SyncManagerImp.java

Lines changed: 34 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -4,17 +4,21 @@
44
import com.google.common.util.concurrent.ThreadFactoryBuilder;
55
import io.split.cache.SegmentCache;
66
import io.split.cache.SplitCache;
7+
import io.split.client.ApiKeyCounter;
8+
import io.split.client.SplitClientConfig;
79
import io.split.engine.SDKReadinessGates;
810
import io.split.engine.experiments.SplitFetcher;
911
import io.split.engine.experiments.SplitSynchronizationTask;
1012
import io.split.engine.segments.SegmentSynchronizationTaskImp;
1113
import io.split.telemetry.domain.StreamingEvent;
1214
import io.split.telemetry.domain.enums.StreamEventsEnum;
1315
import io.split.telemetry.storage.TelemetryRuntimeProducer;
16+
import io.split.telemetry.synchronizer.TelemetrySynchronizer;
1417
import org.apache.hc.client5.http.impl.classic.CloseableHttpClient;
1518
import org.slf4j.Logger;
1619
import org.slf4j.LoggerFactory;
1720

21+
import java.util.ArrayList;
1822
import java.util.concurrent.ExecutorService;
1923
import java.util.concurrent.Executors;
2024
import java.util.concurrent.Future;
@@ -34,19 +38,23 @@ public class SyncManagerImp implements SyncManager {
3438
private final AtomicBoolean _shutdown;
3539
private final LinkedBlockingQueue<PushManager.Status> _incomingPushStatus;
3640
private final ExecutorService _executorService;
37-
private final ExecutorService _pollingExecutorService;
41+
private final ExecutorService _startExecutorService;
3842
private final SDKReadinessGates _gates;
3943
private Future<?> _pushStatusMonitorTask;
4044
private Backoff _backoff;
4145
private final TelemetryRuntimeProducer _telemetryRuntimeProducer;
46+
private final TelemetrySynchronizer _telemetrySynchronizer;
47+
private final SplitClientConfig _config;
4248

4349
@VisibleForTesting
4450
/* package private */ SyncManagerImp(boolean streamingEnabledConfig,
4551
Synchronizer synchronizer,
4652
PushManager pushManager,
4753
LinkedBlockingQueue<PushManager.Status> pushMessages,
4854
int authRetryBackOffBase,
49-
SDKReadinessGates gates, TelemetryRuntimeProducer telemetryRuntimeProducer) {
55+
SDKReadinessGates gates, TelemetryRuntimeProducer telemetryRuntimeProducer,
56+
TelemetrySynchronizer telemetrySynchronizer,
57+
SplitClientConfig config) {
5058
_streamingEnabledConfig = new AtomicBoolean(streamingEnabledConfig);
5159
_synchronizer = checkNotNull(synchronizer);
5260
_pushManager = checkNotNull(pushManager);
@@ -56,13 +64,15 @@ public class SyncManagerImp implements SyncManager {
5664
.setNameFormat("SPLIT-PushStatusMonitor-%d")
5765
.setDaemon(true)
5866
.build());
59-
_pollingExecutorService = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder()
67+
_startExecutorService = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder()
6068
.setNameFormat("SPLIT-PollingMode-%d")
6169
.setDaemon(true)
6270
.build());
6371
_backoff = new Backoff(authRetryBackOffBase);
6472
_gates = checkNotNull(gates);
6573
_telemetryRuntimeProducer = checkNotNull(telemetryRuntimeProducer);
74+
_telemetrySynchronizer = checkNotNull(telemetrySynchronizer);
75+
_config = checkNotNull(config);
6676
}
6777

6878
public static SyncManagerImp build(boolean streamingEnabledConfig,
@@ -78,21 +88,34 @@ public static SyncManagerImp build(boolean streamingEnabledConfig,
7888
SegmentCache segmentCache,
7989
int streamingRetryDelay,
8090
SDKReadinessGates gates,
81-
TelemetryRuntimeProducer telemetryRuntimeProducer) {
91+
TelemetryRuntimeProducer telemetryRuntimeProducer,
92+
TelemetrySynchronizer telemetrySynchronizer,
93+
SplitClientConfig config) {
8294
LinkedBlockingQueue<PushManager.Status> pushMessages = new LinkedBlockingQueue<>();
8395
Synchronizer synchronizer = new SynchronizerImp(splitSynchronizationTask, splitFetcher, segmentSynchronizationTaskImp, splitCache, segmentCache, streamingRetryDelay, gates);
8496
PushManager pushManager = PushManagerImp.build(synchronizer, streamingServiceUrl, authUrl, httpClient, pushMessages, sseHttpClient, telemetryRuntimeProducer);
85-
return new SyncManagerImp(streamingEnabledConfig, synchronizer, pushManager, pushMessages, authRetryBackOffBase, gates, telemetryRuntimeProducer);
97+
return new SyncManagerImp(streamingEnabledConfig, synchronizer, pushManager, pushMessages, authRetryBackOffBase, gates, telemetryRuntimeProducer,telemetrySynchronizer, config);
8698
}
8799

88100
@Override
89101
public void start() {
90-
_synchronizer.syncAll();
91-
if (_streamingEnabledConfig.get()) {
92-
startStreamingMode();
93-
} else {
94-
_pollingExecutorService.submit(this::startPollingMode);
95-
}
102+
_startExecutorService.submit(() -> {
103+
while(!_synchronizer.syncAll()) {
104+
try {
105+
Thread.currentThread().sleep(1000);
106+
} catch (InterruptedException e) {
107+
e.printStackTrace();
108+
Thread.currentThread().interrupt();
109+
}
110+
}
111+
_gates.sdkInternalReady();
112+
_telemetrySynchronizer.synchronizeConfig(_config, System.currentTimeMillis(), ApiKeyCounter.getApiKeyCounterInstance().getFactoryInstances(), new ArrayList<>());
113+
if (_streamingEnabledConfig.get()) {
114+
startStreamingMode();
115+
} else {
116+
startPollingMode();
117+
}
118+
});
96119
}
97120

98121
@Override
@@ -112,12 +135,6 @@ private void startStreamingMode() {
112135
}
113136

114137
private void startPollingMode() {
115-
try {
116-
_gates.waitUntilInternalReady();
117-
} catch (InterruptedException ex) {
118-
_log.debug(ex.getMessage());
119-
}
120-
121138
_log.debug("Starting in polling mode ...");
122139
_synchronizer.startPeriodicFetching();
123140
_telemetryRuntimeProducer.recordStreamingEvents(new StreamingEvent(StreamEventsEnum.SYNC_MODE_UPDATE.get_type(), POLLING_STREAMING_EVENT, System.currentTimeMillis()));

client/src/main/java/io/split/engine/common/Synchronizer.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
package io.split.engine.common;
22

33
public interface Synchronizer {
4-
void syncAll();
4+
boolean syncAll();
55
void startPeriodicFetching();
66
void stopPeriodicFetching();
77
void refreshSplits(long targetChangeNumber);

client/src/main/java/io/split/engine/common/SynchronizerImp.java

Lines changed: 9 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -11,10 +11,8 @@
1111
import org.slf4j.Logger;
1212
import org.slf4j.LoggerFactory;
1313

14-
import java.util.concurrent.Executors;
15-
import java.util.concurrent.ScheduledExecutorService;
16-
import java.util.concurrent.ThreadFactory;
17-
import java.util.concurrent.TimeUnit;
14+
import java.util.concurrent.*;
15+
import java.util.concurrent.atomic.AtomicBoolean;
1816

1917
import static com.google.common.base.Preconditions.checkNotNull;
2018

@@ -54,12 +52,13 @@ public SynchronizerImp(SplitSynchronizationTask splitSynchronizationTask,
5452
}
5553

5654
@Override
57-
public void syncAll() {
58-
_syncAllScheduledExecutorService.schedule(() -> {
59-
_splitFetcher.fetchAll(true);
60-
_segmentSynchronizationTaskImp.fetchAllSynchronous();
61-
_gates.sdkInternalReady();
62-
}, 0, TimeUnit.SECONDS);
55+
public boolean syncAll() {
56+
AtomicBoolean syncStatus = new AtomicBoolean(false);
57+
if(_splitFetcher.fetchAll(true) &&
58+
_segmentSynchronizationTaskImp.fetchAllSynchronous()) {
59+
syncStatus.set(true);
60+
}
61+
return syncStatus.get();
6362
}
6463

6564
@Override

client/src/main/java/io/split/engine/experiments/SplitFetcher.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,5 +14,5 @@ public interface SplitFetcher extends Runnable {
1414
* Forces a sync of ALL splits, outside of any scheduled
1515
* syncs. This method MUST NOT throw any exceptions.
1616
*/
17-
void fetchAll(boolean addCacheHeader);
17+
boolean fetchAll(boolean addCacheHeader);
1818
}

client/src/main/java/io/split/engine/experiments/SplitFetcherImp.java

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@ public class SplitFetcherImp implements SplitFetcher {
2525
private final SplitParser _parser;
2626
private final SplitChangeFetcher _splitChangeFetcher;
2727
private final SplitCache _splitCache;
28-
private final SDKReadinessGates _gates;
2928
private final Object _lock = new Object();
3029
private final TelemetryRuntimeProducer _telemetryRuntimeProducer;
3130

@@ -39,10 +38,9 @@ public class SplitFetcherImp implements SplitFetcher {
3938
* an ARCHIVED split is received, we know if we need to remove a traffic type from the multiset.
4039
*/
4140

42-
public SplitFetcherImp(SplitChangeFetcher splitChangeFetcher, SplitParser parser, SDKReadinessGates gates, SplitCache splitCache, TelemetryRuntimeProducer telemetryRuntimeProducer) {
41+
public SplitFetcherImp(SplitChangeFetcher splitChangeFetcher, SplitParser parser, SplitCache splitCache, TelemetryRuntimeProducer telemetryRuntimeProducer) {
4342
_splitChangeFetcher = checkNotNull(splitChangeFetcher);
4443
_parser = checkNotNull(parser);
45-
_gates = checkNotNull(gates);
4644
_splitCache = checkNotNull(splitCache);
4745
_telemetryRuntimeProducer = checkNotNull(telemetryRuntimeProducer);
4846
}
@@ -146,16 +144,18 @@ private void runWithoutExceptionHandling(boolean addCacheHeader) throws Interrup
146144
}
147145
}
148146
@Override
149-
public void fetchAll(boolean addCacheHeader) {
147+
public boolean fetchAll(boolean addCacheHeader) {
148+
boolean fetchAllStatus = true;
150149
_log.debug("Fetch splits starting ...");
151150
long start = _splitCache.getChangeNumber();
152151
try {
153152
runWithoutExceptionHandling(addCacheHeader);
154-
_gates.splitsAreReady();
155153
} catch (InterruptedException e) {
154+
fetchAllStatus = false;
156155
_log.warn("Interrupting split fetcher task");
157156
Thread.currentThread().interrupt();
158157
} catch (Throwable t) {
158+
fetchAllStatus = false;
159159
_log.error("RefreshableSplitFetcher failed: " + t.getMessage());
160160
if (_log.isDebugEnabled()) {
161161
_log.debug("Reason:", t);
@@ -165,5 +165,6 @@ public void fetchAll(boolean addCacheHeader) {
165165
_log.debug("split fetch before: " + start + ", after: " + _splitCache.getChangeNumber());
166166
}
167167
}
168+
return fetchAllStatus;
168169
}
169170
}

client/src/main/java/io/split/engine/segments/SegmentSynchronizationTask.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,5 +33,5 @@ public interface SegmentSynchronizationTask extends Runnable {
3333
/**
3434
* fetch every Segment Synchronous
3535
*/
36-
void fetchAllSynchronous();
36+
boolean fetchAllSynchronous();
3737
}

client/src/main/java/io/split/engine/segments/SegmentSynchronizationTaskImp.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -165,7 +165,8 @@ public void fetchAll(boolean addCacheHeader) {
165165
}
166166

167167
@Override
168-
public void fetchAllSynchronous() {
168+
public boolean fetchAllSynchronous() {
169+
AtomicBoolean fetchAllStatus = new AtomicBoolean(true);
169170
_segmentFetchers
170171
.entrySet()
171172
.stream().map(e -> _scheduledExecutorService.submit(e.getValue()::runWhitCacheHeader))
@@ -174,7 +175,9 @@ public void fetchAllSynchronous() {
174175
try {
175176
future.get();
176177
} catch (Exception ex) {
178+
fetchAllStatus.set(false);
177179
_log.error(ex.getMessage());
178180
}});
181+
return fetchAllStatus.get();
179182
}
180183
}

client/src/main/java/io/split/telemetry/synchronizer/SynchronizerMemory.java

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -32,18 +32,20 @@ public class SynchronizerMemory implements TelemetrySynchronizer{
3232
private TelemetryStorageConsumer _teleTelemetryStorageConsumer;
3333
private SplitCache _splitCache;
3434
private SegmentCache _segmentCache;
35+
private final long _initStartTime;
3536

3637
public SynchronizerMemory(CloseableHttpClient client, URI telemetryRootEndpoint, TelemetryStorageConsumer telemetryStorageConsumer, SplitCache splitCache,
37-
SegmentCache segmentCache, TelemetryRuntimeProducer telemetryRuntimeProducer) throws URISyntaxException {
38+
SegmentCache segmentCache, TelemetryRuntimeProducer telemetryRuntimeProducer, long initStartTime) throws URISyntaxException {
3839
_httpHttpTelemetryMemorySender = HttpTelemetryMemorySender.create(client, telemetryRootEndpoint, telemetryRuntimeProducer);
3940
_teleTelemetryStorageConsumer = telemetryStorageConsumer;
4041
_splitCache = splitCache;
4142
_segmentCache = segmentCache;
43+
_initStartTime = initStartTime;
4244
}
4345

4446
@Override
45-
public void synchronizeConfig(SplitClientConfig config, long timeUntilReady, Map<String, Long> factoryInstances, List<String> tags) {
46-
_httpHttpTelemetryMemorySender.postConfig(generateConfig(config, timeUntilReady, factoryInstances, tags));
47+
public void synchronizeConfig(SplitClientConfig config, long readyTimeStamp, Map<String, Long> factoryInstances, List<String> tags) {
48+
_httpHttpTelemetryMemorySender.postConfig(generateConfig(config, readyTimeStamp, factoryInstances, tags));
4749
}
4850

4951
@Override
@@ -74,7 +76,7 @@ private Stats generateStats() throws Exception {
7476
return stats;
7577
}
7678

77-
private Config generateConfig(SplitClientConfig splitClientConfig, long timeUntilReady, Map<String, Long> factoryInstances, List<String> tags) {
79+
private Config generateConfig(SplitClientConfig splitClientConfig, long readyTimestamp, Map<String, Long> factoryInstances, List<String> tags) {
7880
Config config = new Config();
7981
Rates rates = new Rates();
8082
URLOverrides urlOverrides = new URLOverrides();
@@ -110,7 +112,7 @@ private Config generateConfig(SplitClientConfig splitClientConfig, long timeUnti
110112
config.set_eventsQueueSize(splitClientConfig.eventsQueueSize());
111113
config.set_tags(getListMaxSize(tags));
112114
config.set_activeFactories(factoryInstances.size());
113-
config.set_timeUntilReady(timeUntilReady);
115+
config.set_timeUntilReady(readyTimestamp - _initStartTime);
114116
config.set_rates(rates);
115117
config.set_urlOverrides(urlOverrides);
116118
config.set_streamingEnabled(splitClientConfig.streamingEnabled());

0 commit comments

Comments
 (0)