Skip to content

Commit 43debcb

Browse files
authored
Merge pull request #180 from splitio/decouple-segment-storage
[Lucas] Decouple segment storage
2 parents a68dee1 + da9d743 commit 43debcb

31 files changed

+939
-867
lines changed
Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
package io.split.cache;
2+
3+
import java.util.List;
4+
5+
/**
6+
* Memory for segments
7+
* @author lucasecheverz
8+
*/
9+
public interface SegmentCache {
10+
11+
/**
12+
* update segment
13+
* @param segmentName
14+
* @param toAdd
15+
* @param toRemove
16+
*/
17+
void updateSegment(String segmentName, List<String> toAdd, List<String> toRemove) ;
18+
19+
/**
20+
* evaluates if a key belongs to a segment
21+
* @param segmentName
22+
* @param key
23+
* @return
24+
*/
25+
boolean isInSegment(String segmentName, String key);
26+
27+
/**
28+
* update the changeNumber of a segment
29+
* @param segmentName
30+
* @param changeNumber
31+
*/
32+
void setChangeNumber(String segmentName, long changeNumber);
33+
34+
/**
35+
* returns the changeNumber of a segment
36+
* @param segmentName
37+
* @return
38+
*/
39+
long getChangeNumber(String segmentName);
40+
41+
/**
42+
* clear all segments
43+
*/
44+
void clear();
45+
}
Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
package io.split.cache;
2+
3+
import com.google.common.collect.Maps;
4+
import io.split.engine.segments.SegmentImp;
5+
import org.slf4j.Logger;
6+
import org.slf4j.LoggerFactory;
7+
8+
import java.util.List;
9+
import java.util.concurrent.ConcurrentMap;
10+
11+
/**
12+
* InMemoryCache Implementation
13+
* @author lucasecheverz
14+
*/
15+
public class SegmentCacheInMemoryImpl implements SegmentCache {
16+
private static final Logger _log = LoggerFactory.getLogger(SegmentCacheInMemoryImpl.class);
17+
private static final long DEFAULT_CHANGE_NUMBER = -1l;
18+
private final ConcurrentMap<String, SegmentImp> _segments = Maps.newConcurrentMap();
19+
20+
@Override
21+
public void updateSegment(String segmentName, List<String> toAdd, List<String> toRemove) {
22+
if(_segments.get(segmentName) == null){
23+
_segments.put(segmentName, new SegmentImp(DEFAULT_CHANGE_NUMBER, segmentName,toAdd));
24+
}
25+
26+
_segments.get(segmentName).update(toAdd,toRemove);
27+
}
28+
29+
@Override
30+
public boolean isInSegment(String segmentName, String key) {
31+
SegmentImp segmentImp = _segments.get(segmentName);
32+
if(segmentImp == null){
33+
_log.error("Segment " + segmentName + "Not found.");
34+
return false;
35+
}
36+
return segmentImp.contains(key);
37+
}
38+
39+
@Override
40+
public void setChangeNumber(String segmentName, long changeNumber) {
41+
if(_segments.get(segmentName) == null){
42+
_log.error("Segment " + segmentName + "Not found.");
43+
return ;
44+
}
45+
_segments.get(segmentName).setChangeNumber(changeNumber);
46+
}
47+
48+
@Override
49+
public long getChangeNumber(String segmentName) {
50+
SegmentImp segmentImp = _segments.get(segmentName);
51+
if(segmentImp == null){
52+
_log.error("Segment " + segmentName + "Not found.");
53+
return DEFAULT_CHANGE_NUMBER;
54+
}
55+
return segmentImp.getChangeNumber();
56+
}
57+
58+
@Override
59+
public void clear() {
60+
_segments.clear();
61+
}
62+
}

client/src/main/java/io/split/client/SplitFactoryImpl.java

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,10 @@
2222
import io.split.engine.experiments.SplitSynchronizationTask;
2323
import io.split.engine.experiments.SplitChangeFetcher;
2424
import io.split.engine.experiments.SplitParser;
25-
import io.split.engine.segments.RefreshableSegmentFetcher;
2625
import io.split.engine.segments.SegmentChangeFetcher;
26+
import io.split.cache.SegmentCache;
27+
import io.split.cache.SegmentCacheInMemoryImpl;
28+
import io.split.engine.segments.SegmentSynchronizationTaskImp;
2729
import io.split.integrations.IntegrationsConfig;
2830
import org.apache.hc.client5.http.auth.AuthScope;
2931
import org.apache.hc.client5.http.auth.Credentials;
@@ -106,12 +108,15 @@ public SplitFactoryImpl(String apiToken, SplitClientConfig config) throws URISyn
106108

107109
// Segments
108110
SegmentChangeFetcher segmentChangeFetcher = HttpSegmentChangeFetcher.create(httpclient, rootTarget, uncachedFireAndForget);
109-
final RefreshableSegmentFetcher segmentFetcher = new RefreshableSegmentFetcher(segmentChangeFetcher,
111+
//This segmentCache is for inMemory Storage (the only one supported by java-client for the moment
112+
SegmentCache segmentCache = new SegmentCacheInMemoryImpl();
113+
final SegmentSynchronizationTaskImp segmentSynchronizationTaskImp = new SegmentSynchronizationTaskImp(segmentChangeFetcher,
110114
findPollingPeriod(RANDOM, config.segmentsRefreshRate()),
111115
config.numThreadsForSegmentFetch(),
112-
gates);
116+
gates,
117+
segmentCache);
113118

114-
SplitParser splitParser = new SplitParser(segmentFetcher);
119+
SplitParser splitParser = new SplitParser(segmentSynchronizationTaskImp, segmentCache);
115120

116121
// Feature Changes
117122
SplitChangeFetcher splitChangeFetcher = HttpSplitChangeFetcher.create(httpclient, rootTarget, uncachedFireAndForget);
@@ -141,7 +146,7 @@ public SplitFactoryImpl(String apiToken, SplitClientConfig config) throws URISyn
141146
final EventClient eventClient = EventClientImpl.create(httpclient, eventsRootTarget, config.eventsQueueSize(), config.eventFlushIntervalInMillis(), config.waitBeforeShutdown());
142147

143148
// SyncManager
144-
final SyncManager syncManager = SyncManagerImp.build(config.streamingEnabled(), splitSynchronizationTask, splitFetcher, segmentFetcher, splitCache, config.authServiceURL(), httpclient, config.streamingServiceURL(), config.authRetryBackoffBase(), buildSSEdHttpClient(config));
149+
final SyncManager syncManager = SyncManagerImp.build(config.streamingEnabled(), splitSynchronizationTask, splitFetcher, segmentSynchronizationTaskImp, splitCache, config.authServiceURL(), httpclient, config.streamingServiceURL(), config.authRetryBackoffBase(), buildSSEdHttpClient(config), segmentCache);
145150
syncManager.start();
146151

147152
// Evaluator
@@ -151,7 +156,7 @@ public SplitFactoryImpl(String apiToken, SplitClientConfig config) throws URISyn
151156
public void run() {
152157
_log.info("Shutdown called for split");
153158
try {
154-
segmentFetcher.close();
159+
segmentSynchronizationTaskImp.close();
155160
_log.info("Successful shutdown of segment fetchers");
156161
splitSynchronizationTask.close();
157162
_log.info("Successful shutdown of splits");

client/src/main/java/io/split/client/jmx/SplitJmxMonitor.java

Lines changed: 18 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,11 @@
11
package io.split.client.jmx;
22

3-
import io.split.client.SplitClient;
3+
import io.split.cache.SegmentCache;
44
import io.split.cache.SplitCache;
5+
import io.split.client.SplitClient;
56
import io.split.engine.experiments.SplitFetcher;
67
import io.split.engine.segments.SegmentFetcher;
8+
import io.split.engine.segments.SegmentSynchronizationTask;
79
import org.slf4j.Logger;
810
import org.slf4j.LoggerFactory;
911

@@ -19,13 +21,15 @@ public class SplitJmxMonitor implements SplitJmxMonitorMBean {
1921
private final SplitClient _client;
2022
private final SplitFetcher _featureFetcher;
2123
private final SplitCache _splitCache;
22-
private final SegmentFetcher _segmentFetcher;
24+
private final SegmentSynchronizationTask _segmentSynchronizationTask;
25+
private SegmentCache _segmentCache;
2326

24-
public SplitJmxMonitor(SplitClient splitClient, SplitFetcher featureFetcher, SplitCache splitCache, SegmentFetcher segmentFetcher) {
27+
public SplitJmxMonitor(SplitClient splitClient, SplitFetcher featureFetcher, SplitCache splitCache, SegmentSynchronizationTask segmentSynchronizationTask, SegmentCache segmentCache) {
2528
_client = checkNotNull(splitClient);
2629
_featureFetcher = checkNotNull(featureFetcher);
2730
_splitCache = checkNotNull(splitCache);
28-
_segmentFetcher = checkNotNull(segmentFetcher);
31+
_segmentSynchronizationTask = checkNotNull(segmentSynchronizationTask);
32+
_segmentCache = checkNotNull(segmentCache);
2933
}
3034

3135
@Override
@@ -37,7 +41,15 @@ public boolean forceSyncFeatures() {
3741

3842
@Override
3943
public boolean forceSyncSegment(String segmentName) {
40-
_segmentFetcher.segment(segmentName).forceRefresh();
44+
SegmentFetcher fetcher = _segmentSynchronizationTask.getFetcher(segmentName);
45+
try{
46+
fetcher.fetch();
47+
}
48+
//We are sure this will never happen because getFetcher firts initiate the segment. This try/catch is for safe only.
49+
catch (NullPointerException np){
50+
throw new NullPointerException();
51+
}
52+
4153
_log.info("Segment " + segmentName + " successfully refreshed via JMX");
4254
return true;
4355
}
@@ -54,6 +66,6 @@ public String fetchDefinition(String featureName) {
5466

5567
@Override
5668
public boolean isKeyInSegment(String key, String segmentName) {
57-
return _segmentFetcher.segment(segmentName).contains(key);
69+
return _segmentCache.isInSegment(segmentName, key);
5870
}
5971
}

client/src/main/java/io/split/engine/common/SyncManagerImp.java

Lines changed: 13 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,11 @@
22

33
import com.google.common.annotations.VisibleForTesting;
44
import com.google.common.util.concurrent.ThreadFactoryBuilder;
5+
import io.split.cache.SegmentCache;
56
import io.split.cache.SplitCache;
67
import io.split.engine.experiments.SplitFetcherImp;
78
import io.split.engine.experiments.SplitSynchronizationTask;
8-
import io.split.engine.segments.RefreshableSegmentFetcher;
9+
import io.split.engine.segments.SegmentSynchronizationTaskImp;
910
import org.apache.hc.client5.http.impl.classic.CloseableHttpClient;
1011
import org.slf4j.Logger;
1112
import org.slf4j.LoggerFactory;
@@ -46,17 +47,18 @@ public class SyncManagerImp implements SyncManager {
4647
}
4748

4849
public static SyncManagerImp build(boolean streamingEnabledConfig,
49-
SplitSynchronizationTask splitSynchronizationTask,
50-
SplitFetcherImp splitFetcher,
51-
RefreshableSegmentFetcher segmentFetcher,
52-
SplitCache splitCache,
53-
String authUrl,
54-
CloseableHttpClient httpClient,
55-
String streamingServiceUrl,
56-
int authRetryBackOffBase,
57-
CloseableHttpClient sseHttpClient) {
50+
SplitSynchronizationTask splitSynchronizationTask,
51+
SplitFetcherImp splitFetcher,
52+
SegmentSynchronizationTaskImp segmentSynchronizationTaskImp,
53+
SplitCache splitCache,
54+
String authUrl,
55+
CloseableHttpClient httpClient,
56+
String streamingServiceUrl,
57+
int authRetryBackOffBase,
58+
CloseableHttpClient sseHttpClient,
59+
SegmentCache segmentCache) {
5860
LinkedBlockingQueue<PushManager.Status> pushMessages = new LinkedBlockingQueue<>();
59-
Synchronizer synchronizer = new SynchronizerImp(splitSynchronizationTask, splitFetcher, segmentFetcher, splitCache);
61+
Synchronizer synchronizer = new SynchronizerImp(splitSynchronizationTask, splitFetcher, segmentSynchronizationTaskImp, splitCache, segmentCache);
6062
PushManager pushManager = PushManagerImp.build(synchronizer, streamingServiceUrl, authUrl, httpClient, authRetryBackOffBase, pushMessages, sseHttpClient);
6163
return new SyncManagerImp(streamingEnabledConfig, synchronizer, pushManager, pushMessages);
6264
}

client/src/main/java/io/split/engine/common/SynchronizerImp.java

Lines changed: 22 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,12 @@
11
package io.split.engine.common;
22

33
import com.google.common.util.concurrent.ThreadFactoryBuilder;
4-
4+
import io.split.cache.SegmentCache;
55
import io.split.cache.SplitCache;
66
import io.split.engine.experiments.SplitFetcherImp;
77
import io.split.engine.experiments.SplitSynchronizationTask;
8-
import io.split.engine.segments.RefreshableSegmentFetcher;
8+
import io.split.engine.segments.SegmentFetcher;
9+
import io.split.engine.segments.SegmentSynchronizationTask;
910
import org.slf4j.Logger;
1011
import org.slf4j.LoggerFactory;
1112

@@ -21,18 +22,21 @@ public class SynchronizerImp implements Synchronizer {
2122

2223
private final SplitSynchronizationTask _splitSynchronizationTask;
2324
private final SplitFetcherImp _splitFetcher;
24-
private final RefreshableSegmentFetcher _segmentFetcher;
25+
private final SegmentSynchronizationTask _segmentSynchronizationTaskImp;
2526
private final ScheduledExecutorService _syncAllScheduledExecutorService;
2627
private final SplitCache _splitCache;
28+
private final SegmentCache _segmentCache;
2729

2830
public SynchronizerImp(SplitSynchronizationTask splitSynchronizationTask,
2931
SplitFetcherImp splitFetcher,
30-
RefreshableSegmentFetcher segmentFetcher,
31-
SplitCache splitCache) {
32+
SegmentSynchronizationTask segmentSynchronizationTaskImp,
33+
SplitCache splitCache,
34+
SegmentCache segmentCache) {
3235
_splitSynchronizationTask = checkNotNull(splitSynchronizationTask);
3336
_splitFetcher = checkNotNull(splitFetcher);
34-
_segmentFetcher = checkNotNull(segmentFetcher);
37+
_segmentSynchronizationTaskImp = checkNotNull(segmentSynchronizationTaskImp);
3538
_splitCache = checkNotNull(splitCache);
39+
_segmentCache = checkNotNull(segmentCache);
3640

3741
ThreadFactory splitsThreadFactory = new ThreadFactoryBuilder()
3842
.setDaemon(true)
@@ -45,22 +49,22 @@ public SynchronizerImp(SplitSynchronizationTask splitSynchronizationTask,
4549
public void syncAll() {
4650
_syncAllScheduledExecutorService.schedule(() -> {
4751
_splitFetcher.run();
48-
_segmentFetcher.forceRefreshAll();
52+
_segmentSynchronizationTaskImp.run();
4953
}, 0, TimeUnit.SECONDS);
5054
}
5155

5256
@Override
5357
public void startPeriodicFetching() {
5458
_log.debug("Starting Periodic Fetching ...");
5559
_splitSynchronizationTask.startPeriodicFetching();
56-
_segmentFetcher.startPeriodicFetching();
60+
_segmentSynchronizationTaskImp.startPeriodicFetching();
5761
}
5862

5963
@Override
6064
public void stopPeriodicFetching() {
6165
_log.debug("Stop Periodic Fetching ...");
6266
_splitSynchronizationTask.stop();
63-
_segmentFetcher.stop();
67+
_segmentSynchronizationTaskImp.stop();
6468
}
6569

6670
@Override
@@ -80,8 +84,15 @@ public void localKillSplit(String splitName, String defaultTreatment, long newCh
8084

8185
@Override
8286
public void refreshSegment(String segmentName, long changeNumber) {
83-
if (changeNumber > _segmentFetcher.getChangeNumber(segmentName)) {
84-
_segmentFetcher.forceRefresh(segmentName);
87+
if (changeNumber > _segmentCache.getChangeNumber(segmentName)) {
88+
SegmentFetcher fetcher = _segmentSynchronizationTaskImp.getFetcher(segmentName);
89+
try{
90+
fetcher.fetch();
91+
}
92+
//We are sure this will never happen because getFetcher firts initiate the segment. This try/catch is for safe only.
93+
catch (NullPointerException np){
94+
throw new NullPointerException();
95+
}
8596
}
8697
}
8798
}

client/src/main/java/io/split/engine/experiments/SplitParser.java

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package io.split.engine.experiments;
22

33
import com.google.common.collect.Lists;
4+
import io.split.cache.SegmentCache;
45
import io.split.client.dtos.Condition;
56
import io.split.client.dtos.Matcher;
67
import io.split.client.dtos.MatcherGroup;
@@ -26,8 +27,7 @@
2627
import io.split.engine.matchers.strings.RegularExpressionMatcher;
2728
import io.split.engine.matchers.strings.StartsWithAnyOfMatcher;
2829
import io.split.engine.matchers.strings.WhitelistMatcher;
29-
import io.split.engine.segments.Segment;
30-
import io.split.engine.segments.SegmentFetcher;
30+
import io.split.engine.segments.SegmentSynchronizationTask;
3131
import org.slf4j.Logger;
3232
import org.slf4j.LoggerFactory;
3333

@@ -46,11 +46,13 @@ public final class SplitParser {
4646
public static final int CONDITIONS_UPPER_LIMIT = 50;
4747
private static final Logger _log = LoggerFactory.getLogger(SplitParser.class);
4848

49-
private SegmentFetcher _segmentFetcher;
49+
private final SegmentSynchronizationTask _segmentSynchronizationTask;
50+
private final SegmentCache _segmentCache;
5051

51-
public SplitParser(SegmentFetcher segmentFetcher) {
52-
_segmentFetcher = segmentFetcher;
53-
checkNotNull(_segmentFetcher);
52+
public SplitParser(SegmentSynchronizationTask segmentSynchronizationTaskImp,
53+
SegmentCache segmentCache) {
54+
_segmentSynchronizationTask = checkNotNull(segmentSynchronizationTaskImp);
55+
_segmentCache = checkNotNull(segmentCache);
5456
}
5557

5658
public ParsedSplit parse(Split split) {
@@ -106,8 +108,9 @@ private AttributeMatcher toMatcher(Matcher matcher) {
106108
break;
107109
case IN_SEGMENT:
108110
checkNotNull(matcher.userDefinedSegmentMatcherData);
109-
Segment segment = _segmentFetcher.segment(matcher.userDefinedSegmentMatcherData.segmentName);
110-
delegate = new UserDefinedSegmentMatcher(segment);
111+
String segmentName = matcher.userDefinedSegmentMatcherData.segmentName;
112+
_segmentSynchronizationTask.initializeSegment(segmentName);
113+
delegate = new UserDefinedSegmentMatcher(_segmentCache, segmentName);
111114
break;
112115
case WHITELIST:
113116
checkNotNull(matcher.whitelistMatcherData);

0 commit comments

Comments
 (0)