Skip to content

Commit 76669cd

Browse files
authored
Merge pull request #176 from splitio/split-fetcher-refactor-task
Splits fetcher: Decouple periodic task
2 parents f3d0ddb + d3f672d commit 76669cd

20 files changed

+363
-455
lines changed

client/src/main/java/io/split/engine/cache/InMemoryCacheImp.java renamed to client/src/main/java/io/split/cache/InMemoryCacheImp.java

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
package io.split.engine.cache;
1+
package io.split.cache;
22

33
import com.google.common.collect.ConcurrentHashMultiset;
44
import com.google.common.collect.Maps;
@@ -11,7 +11,6 @@
1111
import java.util.ArrayList;
1212
import java.util.Collection;
1313
import java.util.List;
14-
import java.util.Set;
1514
import java.util.concurrent.ConcurrentMap;
1615
import java.util.concurrent.atomic.AtomicLong;
1716

@@ -100,6 +99,25 @@ public boolean trafficTypeExists(String trafficTypeName) {
10099
return Sets.newHashSet(_concurrentTrafficTypeNameSet.elementSet()).contains(trafficTypeName);
101100
}
102101

102+
@Override
103+
public void kill(String splitName, String defaultTreatment, long changeNumber) {
104+
ParsedSplit parsedSplit = _concurrentMap.get(splitName);
105+
106+
ParsedSplit updatedSplit = new ParsedSplit(parsedSplit.feature(),
107+
parsedSplit.seed(),
108+
true,
109+
defaultTreatment,
110+
parsedSplit.parsedConditions(),
111+
parsedSplit.trafficTypeName(),
112+
changeNumber,
113+
parsedSplit.trafficAllocation(),
114+
parsedSplit.trafficAllocationSeed(),
115+
parsedSplit.algo(),
116+
parsedSplit.configurations());
117+
118+
_concurrentMap.put(splitName, updatedSplit);
119+
}
120+
103121
@Override
104122
public void clear() {
105123
_concurrentMap.clear();

client/src/main/java/io/split/engine/cache/SplitCache.java renamed to client/src/main/java/io/split/cache/SplitCache.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,9 @@
1-
package io.split.engine.cache;
1+
package io.split.cache;
22

33
import io.split.engine.experiments.ParsedSplit;
44

55
import java.util.Collection;
66
import java.util.List;
7-
import java.util.Set;
87

98
public interface SplitCache {
109
void put(ParsedSplit split);
@@ -15,5 +14,6 @@ public interface SplitCache {
1514
long getChangeNumber();
1615
void setChangeNumber(long changeNumber);
1716
boolean trafficTypeExists(String trafficTypeName);
17+
void kill(String splitName, String defaultTreatment, long changeNumber);
1818
void clear();
1919
}

client/src/main/java/io/split/client/SplitClientImpl.java

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -5,10 +5,10 @@
55
import io.split.client.dtos.Event;
66
import io.split.client.impressions.Impression;
77
import io.split.client.impressions.ImpressionsManager;
8+
import io.split.cache.SplitCache;
89
import io.split.engine.evaluator.Evaluator;
910
import io.split.engine.SDKReadinessGates;
1011
import io.split.engine.evaluator.EvaluatorImp;
11-
import io.split.engine.experiments.SplitFetcher;
1212
import io.split.engine.metrics.Metrics;
1313
import io.split.grammar.Treatments;
1414
import org.slf4j.Logger;
@@ -32,11 +32,12 @@ public final class SplitClientImpl implements SplitClient {
3232
public static final SplitResult SPLIT_RESULT_CONTROL = new SplitResult(Treatments.CONTROL, null);
3333

3434
private static final String GET_TREATMENT_LABEL = "sdk.getTreatment";
35+
private static final String DEFINITION_NOT_FOUND = "definition not found";
3536

3637
private static final Logger _log = LoggerFactory.getLogger(SplitClientImpl.class);
3738

3839
private final SplitFactory _container;
39-
private final SplitFetcher _splitFetcher;
40+
private final SplitCache _splitCache;
4041
private final ImpressionsManager _impressionManager;
4142
private final Metrics _metrics;
4243
private final SplitClientConfig _config;
@@ -45,15 +46,15 @@ public final class SplitClientImpl implements SplitClient {
4546
private final Evaluator _evaluator;
4647

4748
public SplitClientImpl(SplitFactory container,
48-
SplitFetcher splitFetcher,
49+
SplitCache splitCache,
4950
ImpressionsManager impressionManager,
5051
Metrics metrics,
5152
EventClient eventClient,
5253
SplitClientConfig config,
5354
SDKReadinessGates gates,
5455
Evaluator evaluator) {
5556
_container = container;
56-
_splitFetcher = checkNotNull(splitFetcher);
57+
_splitCache = checkNotNull(splitCache);
5758
_impressionManager = checkNotNull(impressionManager);
5859
_metrics = metrics;
5960
_eventClient = eventClient;
@@ -192,7 +193,7 @@ private boolean track(Event event) {
192193
event.trafficTypeName = event.trafficTypeName.toLowerCase();
193194
}
194195

195-
if (!_splitFetcher.trafficTypeExists(event.trafficTypeName)) {
196+
if (!_splitCache.trafficTypeExists(event.trafficTypeName)) {
196197
_log.warn("track: Traffic Type " + event.trafficTypeName + " does not have any corresponding Splits in this environment, " +
197198
"make sure you’re tracking your events to a valid traffic type defined in the Split console.");
198199
}
@@ -314,6 +315,12 @@ private SplitResult getTreatmentWithConfigInternal(String label, String matching
314315

315316
EvaluatorImp.TreatmentLabelAndChangeNumber result = _evaluator.evaluateFeature(matchingKey, bucketingKey, split, attributes);
316317

318+
if (result.treatment.equals(Treatments.CONTROL) && result.label.equals(DEFINITION_NOT_FOUND) && _gates.isSDKReadyNow()) {
319+
_log.warn(
320+
"getTreatment: you passed \"" + split + "\" that does not exist in this environment, " +
321+
"please double check what Splits exist in the web console.");
322+
}
323+
317324
recordStats(
318325
matchingKey,
319326
bucketingKey,

client/src/main/java/io/split/client/SplitFactoryImpl.java

Lines changed: 14 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -11,16 +11,16 @@
1111
import io.split.client.metrics.CachedMetrics;
1212
import io.split.client.metrics.FireAndForgetMetrics;
1313
import io.split.client.metrics.HttpMetrics;
14-
import io.split.engine.cache.InMemoryCacheImp;
15-
import io.split.engine.cache.SplitCache;
14+
import io.split.cache.InMemoryCacheImp;
15+
import io.split.cache.SplitCache;
1616
import io.split.engine.evaluator.Evaluator;
1717
import io.split.engine.evaluator.EvaluatorImp;
1818
import io.split.engine.SDKReadinessGates;
1919
import io.split.engine.common.SyncManager;
2020
import io.split.engine.common.SyncManagerImp;
21-
import io.split.engine.experiments.RefreshableSplitFetcherProvider;
21+
import io.split.engine.experiments.SplitFetcherImp;
22+
import io.split.engine.experiments.SplitSynchronizationTask;
2223
import io.split.engine.experiments.SplitChangeFetcher;
23-
import io.split.engine.experiments.SplitFetcher;
2424
import io.split.engine.experiments.SplitParser;
2525
import io.split.engine.segments.RefreshableSegmentFetcher;
2626
import io.split.engine.segments.SegmentChangeFetcher;
@@ -177,7 +177,6 @@ public SplitFactoryImpl(String apiToken, SplitClientConfig config) throws URISyn
177177

178178
}
179179

180-
181180
final CloseableHttpClient httpclient = buildHttpClient(apiToken, config);
182181

183182
URI rootTarget = URI.create(config.endpoint());
@@ -202,9 +201,9 @@ public SplitFactoryImpl(String apiToken, SplitClientConfig config) throws URISyn
202201
// Feature Changes
203202
SplitChangeFetcher splitChangeFetcher = HttpSplitChangeFetcher.create(httpclient, rootTarget, uncachedFireAndForget);
204203

205-
final SplitCache cache = new InMemoryCacheImp();
206-
final RefreshableSplitFetcherProvider splitFetcherProvider = new RefreshableSplitFetcherProvider(splitChangeFetcher, splitParser, findPollingPeriod(RANDOM, config.featuresRefreshRate()), gates, cache);
207-
204+
final SplitCache splitCache = new InMemoryCacheImp();
205+
final SplitFetcherImp splitFetcher = new SplitFetcherImp(splitChangeFetcher, splitParser, gates, splitCache);
206+
final SplitSynchronizationTask splitSynchronizationTask = new SplitSynchronizationTask(splitFetcher, splitCache, findPollingPeriod(RANDOM, config.featuresRefreshRate()));
208207

209208
List<ImpressionListener> impressionListeners = new ArrayList<>();
210209
// Setup integrations
@@ -227,16 +226,19 @@ public SplitFactoryImpl(String apiToken, SplitClientConfig config) throws URISyn
227226
final EventClient eventClient = EventClientImpl.create(httpclient, eventsRootTarget, config.eventsQueueSize(), config.eventFlushIntervalInMillis(), config.waitBeforeShutdown());
228227

229228
// SyncManager
230-
final SyncManager syncManager = SyncManagerImp.build(config.streamingEnabled(), splitFetcherProvider, segmentFetcher, config.authServiceURL(), httpclient, config.streamingServiceURL(), config.authRetryBackoffBase(), buildSSEdHttpClient(config));
229+
final SyncManager syncManager = SyncManagerImp.build(config.streamingEnabled(), splitSynchronizationTask, splitFetcher, segmentFetcher, splitCache, config.authServiceURL(), httpclient, config.streamingServiceURL(), config.authRetryBackoffBase(), buildSSEdHttpClient(config));
231230
syncManager.start();
232231

232+
// Evaluator
233+
final Evaluator evaluator = new EvaluatorImp(splitCache);
234+
233235
destroyer = new Runnable() {
234236
public void run() {
235237
_log.info("Shutdown called for split");
236238
try {
237239
segmentFetcher.close();
238240
_log.info("Successful shutdown of segment fetchers");
239-
splitFetcherProvider.close();
241+
splitSynchronizationTask.close();
240242
_log.info("Successful shutdown of splits");
241243
impressionsManager.close();
242244
_log.info("Successful shutdown of impressions manager");
@@ -266,19 +268,15 @@ public void run() {
266268
});
267269
}
268270

269-
270-
SplitFetcher splitFetcher = splitFetcherProvider.getFetcher();
271-
Evaluator evaluator = new EvaluatorImp(gates, splitFetcher);
272-
273271
_client = new SplitClientImpl(this,
274-
splitFetcher,
272+
splitCache,
275273
impressionsManager,
276274
cachedFireAndForgetMetrics,
277275
eventClient,
278276
config,
279277
gates,
280278
evaluator);
281-
_manager = new SplitManagerImpl(splitFetcherProvider.getFetcher(), config, gates);
279+
_manager = new SplitManagerImpl(splitCache, config, gates);
282280
}
283281

284282
private static int findPollingPeriod(Random rand, int max) {

client/src/main/java/io/split/client/SplitManagerImpl.java

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -4,17 +4,19 @@
44
import io.split.client.api.SplitView;
55
import io.split.client.dtos.Partition;
66
import io.split.engine.SDKReadinessGates;
7+
import io.split.cache.SplitCache;
78
import io.split.engine.experiments.ParsedCondition;
89
import io.split.engine.experiments.ParsedSplit;
9-
import io.split.engine.experiments.SplitFetcher;
1010
import org.slf4j.Logger;
1111
import org.slf4j.LoggerFactory;
1212

1313
import java.util.ArrayList;
14+
import java.util.Collection;
1415
import java.util.Collections;
1516
import java.util.HashSet;
1617
import java.util.List;
1718
import java.util.Set;
19+
1820
import java.util.concurrent.TimeoutException;
1921

2022
/**
@@ -24,23 +26,23 @@ public class SplitManagerImpl implements SplitManager {
2426

2527
private static final Logger _log = LoggerFactory.getLogger(SplitManagerImpl.class);
2628

27-
private final SplitFetcher _splitFetcher;
29+
private final SplitCache _splitCache;
2830
private final SplitClientConfig _config;
2931
private final SDKReadinessGates _gates;
3032

3133

32-
public SplitManagerImpl(SplitFetcher splitFetcher,
34+
public SplitManagerImpl(SplitCache splitCache,
3335
SplitClientConfig config,
3436
SDKReadinessGates gates) {
3537
_config = Preconditions.checkNotNull(config);
36-
_splitFetcher = Preconditions.checkNotNull(splitFetcher);
38+
_splitCache = Preconditions.checkNotNull(splitCache);
3739
_gates = Preconditions.checkNotNull(gates);
3840
}
3941

4042
@Override
4143
public List<SplitView> splits() {
4244
List<SplitView> result = new ArrayList<>();
43-
List<ParsedSplit> parsedSplits = _splitFetcher.fetchAll();
45+
Collection<ParsedSplit> parsedSplits = _splitCache.getAll();
4446
for (ParsedSplit split : parsedSplits) {
4547
result.add(toSplitView(split));
4648
}
@@ -57,7 +59,7 @@ public SplitView split(String featureName) {
5759
_log.error("split: you passed an empty split name, split name must be a non-empty string");
5860
return null;
5961
}
60-
ParsedSplit parsedSplit = _splitFetcher.fetch(featureName);
62+
ParsedSplit parsedSplit = _splitCache.get(featureName);
6163
if (parsedSplit == null) {
6264
if (_gates.isSDKReadyNow()) {
6365
_log.warn("split: you passed \"" + featureName + "\" that does not exist in this environment, " +
@@ -71,7 +73,7 @@ public SplitView split(String featureName) {
7173
@Override
7274
public List<String> splitNames() {
7375
List<String> result = new ArrayList<>();
74-
List<ParsedSplit> parsedSplits = _splitFetcher.fetchAll();
76+
Collection<ParsedSplit> parsedSplits = _splitCache.getAll();
7577
for (ParsedSplit split : parsedSplits) {
7678
result.add(split.feature());
7779
}

client/src/main/java/io/split/client/jmx/SplitJmxMonitor.java

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,14 @@
11
package io.split.client.jmx;
22

33
import io.split.client.SplitClient;
4+
import io.split.cache.SplitCache;
45
import io.split.engine.experiments.SplitFetcher;
56
import io.split.engine.segments.SegmentFetcher;
67
import org.slf4j.Logger;
78
import org.slf4j.LoggerFactory;
89

10+
import static com.google.common.base.Preconditions.checkNotNull;
11+
912
/**
1013
* Created by patricioe on 1/18/16.
1114
*/
@@ -15,12 +18,14 @@ public class SplitJmxMonitor implements SplitJmxMonitorMBean {
1518

1619
private final SplitClient _client;
1720
private final SplitFetcher _featureFetcher;
21+
private final SplitCache _splitCache;
1822
private final SegmentFetcher _segmentFetcher;
1923

20-
public SplitJmxMonitor(SplitClient splitClient, SplitFetcher fetcher, SegmentFetcher segmentFetcher) {
21-
_client = splitClient;
22-
_featureFetcher = fetcher;
23-
_segmentFetcher = segmentFetcher;
24+
public SplitJmxMonitor(SplitClient splitClient, SplitFetcher featureFetcher, SplitCache splitCache, SegmentFetcher segmentFetcher) {
25+
_client = checkNotNull(splitClient);
26+
_featureFetcher = checkNotNull(featureFetcher);
27+
_splitCache = checkNotNull(splitCache);
28+
_segmentFetcher = checkNotNull(segmentFetcher);
2429
}
2530

2631
@Override
@@ -44,7 +49,7 @@ public String getTreatment(String key, String featureName) {
4449

4550
@Override
4651
public String fetchDefinition(String featureName) {
47-
return _featureFetcher.fetch(featureName).toString();
52+
return _splitCache.get(featureName).toString();
4853
}
4954

5055
@Override

client/src/main/java/io/split/engine/common/SyncManagerImp.java

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,9 @@
22

33
import com.google.common.annotations.VisibleForTesting;
44
import com.google.common.util.concurrent.ThreadFactoryBuilder;
5-
import io.split.engine.experiments.RefreshableSplitFetcherProvider;
5+
import io.split.cache.SplitCache;
6+
import io.split.engine.experiments.SplitFetcherImp;
7+
import io.split.engine.experiments.SplitSynchronizationTask;
68
import io.split.engine.segments.RefreshableSegmentFetcher;
79
import org.apache.hc.client5.http.impl.classic.CloseableHttpClient;
810
import org.slf4j.Logger;
@@ -44,15 +46,17 @@ public class SyncManagerImp implements SyncManager {
4446
}
4547

4648
public static SyncManagerImp build(boolean streamingEnabledConfig,
47-
RefreshableSplitFetcherProvider refreshableSplitFetcherProvider,
49+
SplitSynchronizationTask splitSynchronizationTask,
50+
SplitFetcherImp splitFetcher,
4851
RefreshableSegmentFetcher segmentFetcher,
52+
SplitCache splitCache,
4953
String authUrl,
5054
CloseableHttpClient httpClient,
5155
String streamingServiceUrl,
5256
int authRetryBackOffBase,
5357
CloseableHttpClient sseHttpClient) {
5458
LinkedBlockingQueue<PushManager.Status> pushMessages = new LinkedBlockingQueue<>();
55-
Synchronizer synchronizer = new SynchronizerImp(refreshableSplitFetcherProvider, segmentFetcher);
59+
Synchronizer synchronizer = new SynchronizerImp(splitSynchronizationTask, splitFetcher, segmentFetcher, splitCache);
5660
PushManager pushManager = PushManagerImp.build(synchronizer, streamingServiceUrl, authUrl, httpClient, authRetryBackOffBase, pushMessages, sseHttpClient);
5761
return new SyncManagerImp(streamingEnabledConfig, synchronizer, pushManager, pushMessages);
5862
}

0 commit comments

Comments
 (0)