Skip to content

Commit 78c5807

Browse files
Merge branch 'segment-storage-mauro' of github.com:splitio/java-client into decouple-segment-storage
2 parents 9842ab5 + 0c98764 commit 78c5807

File tree

10 files changed

+442
-42
lines changed

10 files changed

+442
-42
lines changed
Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
package io.split.cache;
2+
3+
import com.google.common.collect.Maps;
4+
import io.split.engine.segments.SegmentImpMauro;
5+
import org.slf4j.Logger;
6+
import org.slf4j.LoggerFactory;
7+
8+
import java.util.List;
9+
import java.util.concurrent.ConcurrentMap;
10+
11+
public class SegmentCacheInMemoryImpMauro implements SegmentCache {
12+
private static final Logger _log = LoggerFactory.getLogger(SegmentCacheInMemoryImpl.class);
13+
private static final long DEFAULT_CHANGE_NUMBER = -1l;
14+
15+
private final ConcurrentMap<String, SegmentImpMauro> _segmentFetchers = Maps.newConcurrentMap();
16+
17+
@Override
18+
public void updateSegment(String segmentName, List<String> toAdd, List<String> toRemove) {
19+
if(_segmentFetchers.get(segmentName) == null){
20+
_segmentFetchers.put(segmentName, new SegmentImpMauro(DEFAULT_CHANGE_NUMBER, segmentName,toAdd));
21+
}
22+
23+
_segmentFetchers.get(segmentName).update(toAdd,toRemove);
24+
}
25+
26+
@Override
27+
public boolean isInSegment(String segmentName, String key) {
28+
if(_segmentFetchers.get(segmentName) == null){
29+
_log.error("Segment " + segmentName + "Not founded.");
30+
return false;
31+
}
32+
return _segmentFetchers.get(segmentName).contains(key);
33+
}
34+
35+
@Override
36+
public void setChangeNumber(String segmentName, long changeNumber) {
37+
if(_segmentFetchers.get(segmentName) != null){
38+
_segmentFetchers.get(segmentName).setChangeNumber(changeNumber);
39+
}
40+
else{
41+
_log.error("Segment " + segmentName + "Not founded.");
42+
}
43+
}
44+
45+
@Override
46+
public long getChangeNumber(String segmentName) {
47+
if(_segmentFetchers.get(segmentName) == null){
48+
_log.error("Segment " + segmentName + "Not founded.");
49+
return DEFAULT_CHANGE_NUMBER;
50+
}
51+
return _segmentFetchers.get(segmentName).getChangeNumber();
52+
}
53+
54+
@Override
55+
public void clear() {
56+
_segmentFetchers.clear();
57+
}
58+
}

client/src/main/java/io/split/client/SplitFactoryImpl.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import io.split.engine.segments.SegmentChangeFetcher;
2727
import io.split.cache.SegmentCache;
2828
import io.split.cache.SegmentCacheInMemoryImpl;
29+
import io.split.engine.segments.SegmentSynchronizationTaskMauro;
2930
import io.split.integrations.IntegrationsConfig;
3031
import org.apache.hc.client5.http.auth.AuthScope;
3132
import org.apache.hc.client5.http.auth.Credentials;
@@ -194,13 +195,13 @@ public SplitFactoryImpl(String apiToken, SplitClientConfig config) throws URISyn
194195
SegmentChangeFetcher segmentChangeFetcher = HttpSegmentChangeFetcher.create(httpclient, rootTarget, uncachedFireAndForget);
195196
//This segmentCache is for inMemory Storage (the only one supported by java-client for the moment
196197
SegmentCache segmentCache = new SegmentCacheInMemoryImpl();
197-
final RefreshableSegmentFetcher segmentFetcher = new RefreshableSegmentFetcher(segmentChangeFetcher,
198+
final SegmentSynchronizationTaskMauro segmentSynchronizationTaskMauro = new SegmentSynchronizationTaskMauro(segmentChangeFetcher,
198199
findPollingPeriod(RANDOM, config.segmentsRefreshRate()),
199200
config.numThreadsForSegmentFetch(),
200201
gates,
201202
segmentCache);
202203

203-
SplitParser splitParser = new SplitParser(segmentFetcher);
204+
SplitParser splitParser = new SplitParser(segmentSynchronizationTaskMauro, segmentCache);
204205

205206
// Feature Changes
206207
SplitChangeFetcher splitChangeFetcher = HttpSplitChangeFetcher.create(httpclient, rootTarget, uncachedFireAndForget);
@@ -230,7 +231,7 @@ public SplitFactoryImpl(String apiToken, SplitClientConfig config) throws URISyn
230231
final EventClient eventClient = EventClientImpl.create(httpclient, eventsRootTarget, config.eventsQueueSize(), config.eventFlushIntervalInMillis(), config.waitBeforeShutdown());
231232

232233
// SyncManager
233-
final SyncManager syncManager = SyncManagerImp.build(config.streamingEnabled(), splitSynchronizationTask, splitFetcher, segmentFetcher, splitCache, config.authServiceURL(), httpclient, config.streamingServiceURL(), config.authRetryBackoffBase(), buildSSEdHttpClient(config));
234+
final SyncManager syncManager = SyncManagerImp.build(config.streamingEnabled(), splitSynchronizationTask, splitFetcher, segmentSynchronizationTaskMauro, splitCache, config.authServiceURL(), httpclient, config.streamingServiceURL(), config.authRetryBackoffBase(), buildSSEdHttpClient(config), segmentCache);
234235
syncManager.start();
235236

236237
// Evaluator
@@ -240,7 +241,7 @@ public SplitFactoryImpl(String apiToken, SplitClientConfig config) throws URISyn
240241
public void run() {
241242
_log.info("Shutdown called for split");
242243
try {
243-
segmentFetcher.close();
244+
segmentSynchronizationTaskMauro.close();
244245
_log.info("Successful shutdown of segment fetchers");
245246
splitSynchronizationTask.close();
246247
_log.info("Successful shutdown of splits");

client/src/main/java/io/split/client/jmx/SplitJmxMonitor.java

Lines changed: 15 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,12 @@
11
package io.split.client.jmx;
22

3+
import io.split.cache.SegmentCache;
34
import io.split.client.SplitClient;
45
import io.split.cache.SplitCache;
56
import io.split.engine.experiments.SplitFetcher;
67
import io.split.engine.segments.SegmentFetcher;
8+
import io.split.engine.segments.SegmentFetcherImpMauro;
9+
import io.split.engine.segments.SegmentSynchronizationTaskMauro;
710
import org.slf4j.Logger;
811
import org.slf4j.LoggerFactory;
912

@@ -19,13 +22,16 @@ public class SplitJmxMonitor implements SplitJmxMonitorMBean {
1922
private final SplitClient _client;
2023
private final SplitFetcher _featureFetcher;
2124
private final SplitCache _splitCache;
22-
private final SegmentFetcher _segmentFetcher;
25+
//private final SegmentFetcher _segmentFetcher;
26+
private final SegmentSynchronizationTaskMauro _segmentSynchronizationTaskMauro;
27+
private SegmentCache _segmentCache;
2328

24-
public SplitJmxMonitor(SplitClient splitClient, SplitFetcher featureFetcher, SplitCache splitCache, SegmentFetcher segmentFetcher) {
29+
public SplitJmxMonitor(SplitClient splitClient, SplitFetcher featureFetcher, SplitCache splitCache, SegmentFetcher segmentFetcher, SegmentSynchronizationTaskMauro segmentSynchronizationTaskMauro) {
2530
_client = checkNotNull(splitClient);
2631
_featureFetcher = checkNotNull(featureFetcher);
2732
_splitCache = checkNotNull(splitCache);
28-
_segmentFetcher = checkNotNull(segmentFetcher);
33+
//_segmentFetcher = checkNotNull(segmentFetcher);
34+
_segmentSynchronizationTaskMauro = segmentSynchronizationTaskMauro;
2935
}
3036

3137
@Override
@@ -37,7 +43,10 @@ public boolean forceSyncFeatures() {
3743

3844
@Override
3945
public boolean forceSyncSegment(String segmentName) {
40-
_segmentFetcher.segment(segmentName).forceRefresh();
46+
//_segmentFetcher.segment(segmentName).forceRefresh();
47+
SegmentFetcherImpMauro fetcher = _segmentSynchronizationTaskMauro.getFetcher(segmentName);
48+
fetcher.fetch();
49+
4150
_log.info("Segment " + segmentName + " successfully refreshed via JMX");
4251
return true;
4352
}
@@ -54,6 +63,7 @@ public String fetchDefinition(String featureName) {
5463

5564
@Override
5665
public boolean isKeyInSegment(String key, String segmentName) {
57-
return _segmentFetcher.segment(segmentName).contains(key);
66+
return _segmentCache.isInSegment(segmentName, key);
67+
//return _segmentFetcher.segment(segmentName).contains(key);
5868
}
5969
}

client/src/main/java/io/split/engine/common/SyncManagerImp.java

Lines changed: 13 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,12 @@
22

33
import com.google.common.annotations.VisibleForTesting;
44
import com.google.common.util.concurrent.ThreadFactoryBuilder;
5+
import io.split.cache.SegmentCache;
56
import io.split.cache.SplitCache;
67
import io.split.engine.experiments.SplitFetcherImp;
78
import io.split.engine.experiments.SplitSynchronizationTask;
89
import io.split.engine.segments.RefreshableSegmentFetcher;
10+
import io.split.engine.segments.SegmentSynchronizationTaskMauro;
911
import org.apache.hc.client5.http.impl.classic.CloseableHttpClient;
1012
import org.slf4j.Logger;
1113
import org.slf4j.LoggerFactory;
@@ -46,17 +48,18 @@ public class SyncManagerImp implements SyncManager {
4648
}
4749

4850
public static SyncManagerImp build(boolean streamingEnabledConfig,
49-
SplitSynchronizationTask splitSynchronizationTask,
50-
SplitFetcherImp splitFetcher,
51-
RefreshableSegmentFetcher segmentFetcher,
52-
SplitCache splitCache,
53-
String authUrl,
54-
CloseableHttpClient httpClient,
55-
String streamingServiceUrl,
56-
int authRetryBackOffBase,
57-
CloseableHttpClient sseHttpClient) {
51+
SplitSynchronizationTask splitSynchronizationTask,
52+
SplitFetcherImp splitFetcher,
53+
SegmentSynchronizationTaskMauro segmentSynchronizationTaskMauro,
54+
SplitCache splitCache,
55+
String authUrl,
56+
CloseableHttpClient httpClient,
57+
String streamingServiceUrl,
58+
int authRetryBackOffBase,
59+
CloseableHttpClient sseHttpClient,
60+
SegmentCache segmentCache) {
5861
LinkedBlockingQueue<PushManager.Status> pushMessages = new LinkedBlockingQueue<>();
59-
Synchronizer synchronizer = new SynchronizerImp(splitSynchronizationTask, splitFetcher, segmentFetcher, splitCache);
62+
Synchronizer synchronizer = new SynchronizerImp(splitSynchronizationTask, splitFetcher, segmentSynchronizationTaskMauro, splitCache, segmentCache);
6063
PushManager pushManager = PushManagerImp.build(synchronizer, streamingServiceUrl, authUrl, httpClient, authRetryBackOffBase, pushMessages, sseHttpClient);
6164
return new SyncManagerImp(streamingEnabledConfig, synchronizer, pushManager, pushMessages);
6265
}

client/src/main/java/io/split/engine/common/SynchronizerImp.java

Lines changed: 16 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,12 @@
22

33
import com.google.common.util.concurrent.ThreadFactoryBuilder;
44

5+
import io.split.cache.SegmentCache;
56
import io.split.cache.SplitCache;
67
import io.split.engine.experiments.SplitFetcherImp;
78
import io.split.engine.experiments.SplitSynchronizationTask;
8-
import io.split.engine.segments.RefreshableSegmentFetcher;
9+
import io.split.engine.segments.SegmentFetcherImpMauro;
10+
import io.split.engine.segments.SegmentSynchronizationTaskMauro;
911
import org.slf4j.Logger;
1012
import org.slf4j.LoggerFactory;
1113

@@ -21,18 +23,21 @@ public class SynchronizerImp implements Synchronizer {
2123

2224
private final SplitSynchronizationTask _splitSynchronizationTask;
2325
private final SplitFetcherImp _splitFetcher;
24-
private final RefreshableSegmentFetcher _segmentFetcher;
26+
private final SegmentSynchronizationTaskMauro _segmentSynchronizationTaskMauro;
2527
private final ScheduledExecutorService _syncAllScheduledExecutorService;
2628
private final SplitCache _splitCache;
29+
private final SegmentCache _segmentCache;
2730

2831
public SynchronizerImp(SplitSynchronizationTask splitSynchronizationTask,
2932
SplitFetcherImp splitFetcher,
30-
RefreshableSegmentFetcher segmentFetcher,
31-
SplitCache splitCache) {
33+
SegmentSynchronizationTaskMauro segmentSynchronizationTaskMauro,
34+
SplitCache splitCache,
35+
SegmentCache segmentCache) {
3236
_splitSynchronizationTask = checkNotNull(splitSynchronizationTask);
3337
_splitFetcher = checkNotNull(splitFetcher);
34-
_segmentFetcher = checkNotNull(segmentFetcher);
38+
_segmentSynchronizationTaskMauro = checkNotNull(segmentSynchronizationTaskMauro);
3539
_splitCache = checkNotNull(splitCache);
40+
_segmentCache = checkNotNull(segmentCache);
3641

3742
ThreadFactory splitsThreadFactory = new ThreadFactoryBuilder()
3843
.setDaemon(true)
@@ -45,22 +50,22 @@ public SynchronizerImp(SplitSynchronizationTask splitSynchronizationTask,
4550
public void syncAll() {
4651
_syncAllScheduledExecutorService.schedule(() -> {
4752
_splitFetcher.run();
48-
_segmentFetcher.forceRefreshAll();
53+
_segmentSynchronizationTaskMauro.run();
4954
}, 0, TimeUnit.SECONDS);
5055
}
5156

5257
@Override
5358
public void startPeriodicFetching() {
5459
_log.debug("Starting Periodic Fetching ...");
5560
_splitSynchronizationTask.startPeriodicFetching();
56-
_segmentFetcher.startPeriodicFetching();
61+
_segmentSynchronizationTaskMauro.startPeriodicFetching();
5762
}
5863

5964
@Override
6065
public void stopPeriodicFetching() {
6166
_log.debug("Stop Periodic Fetching ...");
6267
_splitSynchronizationTask.stop();
63-
_segmentFetcher.stop();
68+
_segmentSynchronizationTaskMauro.stop();
6469
}
6570

6671
@Override
@@ -80,8 +85,9 @@ public void localKillSplit(String splitName, String defaultTreatment, long newCh
8085

8186
@Override
8287
public void refreshSegment(String segmentName, long changeNumber) {
83-
if (changeNumber > _segmentFetcher.getChangeNumber(segmentName)) {
84-
_segmentFetcher.forceRefresh(segmentName);
88+
if (changeNumber > _segmentCache.getChangeNumber(segmentName)) {
89+
SegmentFetcherImpMauro fetcher = _segmentSynchronizationTaskMauro.getFetcher(segmentName);
90+
fetcher.fetch();
8591
}
8692
}
8793
}

client/src/main/java/io/split/engine/experiments/SplitParser.java

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package io.split.engine.experiments;
22

33
import com.google.common.collect.Lists;
4+
import io.split.cache.SegmentCache;
45
import io.split.client.dtos.Condition;
56
import io.split.client.dtos.Matcher;
67
import io.split.client.dtos.MatcherGroup;
@@ -28,6 +29,7 @@
2829
import io.split.engine.matchers.strings.WhitelistMatcher;
2930
import io.split.engine.segments.Segment;
3031
import io.split.engine.segments.SegmentFetcher;
32+
import io.split.engine.segments.SegmentSynchronizationTaskMauro;
3133
import org.slf4j.Logger;
3234
import org.slf4j.LoggerFactory;
3335

@@ -46,11 +48,13 @@ public final class SplitParser {
4648
public static final int CONDITIONS_UPPER_LIMIT = 50;
4749
private static final Logger _log = LoggerFactory.getLogger(SplitParser.class);
4850

49-
private SegmentFetcher _segmentFetcher;
51+
private final SegmentSynchronizationTaskMauro _segmentSynchronizationTaskMauro;
52+
private final SegmentCache _segmentCache;
5053

51-
public SplitParser(SegmentFetcher segmentFetcher) {
52-
_segmentFetcher = segmentFetcher;
53-
checkNotNull(_segmentFetcher);
54+
public SplitParser(SegmentSynchronizationTaskMauro segmentSynchronizationTaskMauro,
55+
SegmentCache segmentCache) {
56+
_segmentSynchronizationTaskMauro = checkNotNull(segmentSynchronizationTaskMauro);
57+
_segmentCache = checkNotNull(segmentCache);
5458
}
5559

5660
public ParsedSplit parse(Split split) {
@@ -106,8 +110,9 @@ private AttributeMatcher toMatcher(Matcher matcher) {
106110
break;
107111
case IN_SEGMENT:
108112
checkNotNull(matcher.userDefinedSegmentMatcherData);
109-
Segment segment = _segmentFetcher.segment(matcher.userDefinedSegmentMatcherData.segmentName);
110-
delegate = new UserDefinedSegmentMatcher(segment);
113+
String segmentName = matcher.userDefinedSegmentMatcherData.segmentName;
114+
_segmentSynchronizationTaskMauro.initializeSegment(segmentName);
115+
delegate = new UserDefinedSegmentMatcher(_segmentCache, segmentName);
111116
break;
112117
case WHITELIST:
113118
checkNotNull(matcher.whitelistMatcherData);

client/src/main/java/io/split/engine/matchers/UserDefinedSegmentMatcher.java

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package io.split.engine.matchers;
22

3+
import io.split.cache.SegmentCache;
34
import io.split.engine.evaluator.Evaluator;
45
import io.split.engine.segments.Segment;
56

@@ -16,13 +17,11 @@
1617
*/
1718
public class UserDefinedSegmentMatcher implements Matcher {
1819
private final String _segmentName;
19-
private final Segment _segment;
20+
private final SegmentCache _segmentCache;
2021

21-
public UserDefinedSegmentMatcher(Segment segment) {
22-
checkNotNull(segment);
23-
_segmentName = segment.segmentName();
24-
_segment = segment;
25-
checkNotNull(_segmentName);
22+
public UserDefinedSegmentMatcher(SegmentCache segmentCache, String segmentName) {
23+
_segmentCache = checkNotNull(segmentCache);
24+
_segmentName = checkNotNull(segmentName);
2625
}
2726

2827

@@ -31,7 +30,8 @@ public boolean match(Object matchValue, String bucketingKey, Map<String, Object>
3130
if (!(matchValue instanceof String)) {
3231
return false;
3332
}
34-
return _segment.contains((String) matchValue);
33+
34+
return _segmentCache.isInSegment(_segmentName, (String) matchValue);
3535
}
3636

3737
@Override

0 commit comments

Comments
 (0)