Skip to content

Commit cc25ae0

Browse files
Decouple of segment storage. Segment Cache implementation.
1 parent f3d0ddb commit cc25ae0

File tree

11 files changed

+365
-56
lines changed

11 files changed

+365
-56
lines changed

client/src/main/java/io/split/client/SplitFactoryImpl.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@
2424
import io.split.engine.experiments.SplitParser;
2525
import io.split.engine.segments.RefreshableSegmentFetcher;
2626
import io.split.engine.segments.SegmentChangeFetcher;
27+
import io.split.engine.segments.storage.SegmentCache;
28+
import io.split.engine.segments.storage.SegmentCacheInMemoryImpl;
2729
import io.split.integrations.IntegrationsConfig;
2830
import org.apache.hc.client5.http.auth.AuthScope;
2931
import org.apache.hc.client5.http.auth.Credentials;
@@ -191,11 +193,13 @@ public SplitFactoryImpl(String apiToken, SplitClientConfig config) throws URISyn
191193

192194
// Segments
193195
SegmentChangeFetcher segmentChangeFetcher = HttpSegmentChangeFetcher.create(httpclient, rootTarget, uncachedFireAndForget);
196+
//This segmentCache is for inMemory Storage (the only one supported by java-client for the moment
197+
SegmentCache segmentCache = new SegmentCacheInMemoryImpl();
194198
final RefreshableSegmentFetcher segmentFetcher = new RefreshableSegmentFetcher(segmentChangeFetcher,
195199
findPollingPeriod(RANDOM, config.segmentsRefreshRate()),
196200
config.numThreadsForSegmentFetch(),
197-
gates);
198-
201+
gates,
202+
segmentCache);
199203

200204
SplitParser splitParser = new SplitParser(segmentFetcher);
201205

client/src/main/java/io/split/engine/segments/RefreshableSegment.java

Lines changed: 29 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,13 @@
22

33
import io.split.client.dtos.SegmentChange;
44
import io.split.engine.SDKReadinessGates;
5+
import io.split.engine.segments.storage.SegmentCache;
6+
import io.split.engine.segments.storage.SegmentCacheInMemoryImpl;
7+
import org.checkerframework.checker.signedness.qual.SignednessGlb;
58
import org.slf4j.Logger;
69
import org.slf4j.LoggerFactory;
710

11+
import java.util.ArrayList;
812
import java.util.Collections;
913
import java.util.List;
1014
import java.util.Set;
@@ -23,10 +27,9 @@ public class RefreshableSegment implements Runnable, Segment {
2327

2428
private final String _segmentName;
2529
private final SegmentChangeFetcher _segmentChangeFetcher;
26-
private final AtomicLong _changeNumber;
30+
private final SegmentCache _segmentCache;
2731
private final SDKReadinessGates _gates;
2832

29-
private Set<String> _concurrentKeySet = Collections.newSetFromMap(new ConcurrentHashMap<String, Boolean>());
3033
private final Object _lock = new Object();
3134

3235
@Override
@@ -36,21 +39,21 @@ public String segmentName() {
3639

3740
@Override
3841
public boolean contains(String key) {
39-
return _concurrentKeySet.contains(key);
42+
return _segmentCache.isInSegment(_segmentName, key);
4043
}
4144

42-
/*package private*/ Set<String> fetch() {
45+
/*package private*/ /*Set<String> fetch() {
4346
return Collections.unmodifiableSet(_concurrentKeySet);
44-
}
47+
}*/
4548

4649
@Override
4750
public void forceRefresh() {
4851
try {
4952
_log.debug("Force Refresh segment starting ...");
5053
while (true) {
51-
long start = _changeNumber.get();
54+
long start = _segmentCache.getChangeNumber(_segmentName);
5255
runWithoutExceptionHandling();
53-
long end = _changeNumber.get();
56+
long end = _segmentCache.getChangeNumber(_segmentName);
5457

5558
if (start >= end) {
5659
break;
@@ -63,23 +66,24 @@ public void forceRefresh() {
6366

6467
@Override
6568
public long changeNumber() {
66-
return _changeNumber.get();
69+
return _segmentCache.getChangeNumber(_segmentName);
6770
}
6871

69-
public static RefreshableSegment create(String segmentName, SegmentChangeFetcher segmentChangeFetcher, SDKReadinessGates gates) {
70-
return new RefreshableSegment(segmentName, segmentChangeFetcher, -1L, gates);
72+
public static RefreshableSegment create(String segmentName, SegmentChangeFetcher segmentChangeFetcher, SDKReadinessGates gates, SegmentCache segmentCache) {
73+
return new RefreshableSegment(segmentName, segmentChangeFetcher, -1L, gates, segmentCache);
7174
}
7275

7376

74-
public RefreshableSegment(String segmentName, SegmentChangeFetcher segmentChangeFetcher, long changeNumber, SDKReadinessGates gates) {
77+
public RefreshableSegment(String segmentName, SegmentChangeFetcher segmentChangeFetcher, long changeNumber, SDKReadinessGates gates, SegmentCache segmentCache) {
7578
_segmentName = segmentName;
7679
_segmentChangeFetcher = segmentChangeFetcher;
77-
_changeNumber = new AtomicLong(changeNumber);
80+
_segmentCache = segmentCache;
7881
_gates = gates;
7982

8083
checkNotNull(_segmentChangeFetcher);
8184
checkNotNull(_segmentName);
8285
checkNotNull(_gates);
86+
checkNotNull(_segmentCache);
8387
}
8488

8589
@Override
@@ -88,11 +92,11 @@ public void run() {
8892
// Do this again in case the previous call errored out.
8993
_gates.registerSegment(_segmentName);
9094
while (true) {
91-
long start = _changeNumber.get();
95+
long start = _segmentCache.getChangeNumber(_segmentName);
9296
runWithoutExceptionHandling();
93-
long end = _changeNumber.get();
97+
long end = _segmentCache.getChangeNumber(_segmentName);
9498
if (_log.isDebugEnabled()) {
95-
_log.debug(_segmentName + " segment fetch before: " + start + ", after: " + _changeNumber.get() + " size: " + _concurrentKeySet.size());
99+
_log.debug(_segmentName + " segment fetch before: " + start + ", after: " + _segmentCache.getChangeNumber(_segmentName) /*+ " size: " + _concurrentKeySet.size()*/);
96100
}
97101
if (start >= end) {
98102
break;
@@ -110,55 +114,49 @@ public void run() {
110114
}
111115

112116
private void runWithoutExceptionHandling() {
113-
SegmentChange change = _segmentChangeFetcher.fetch(_segmentName, _changeNumber.get());
117+
SegmentChange change = _segmentChangeFetcher.fetch(_segmentName, _segmentCache.getChangeNumber(_segmentName));
114118

115119
if (change == null) {
116120
throw new IllegalStateException("SegmentChange was null");
117121
}
118122

119-
if (change.till == _changeNumber.get()) {
123+
if (change.till == _segmentCache.getChangeNumber(_segmentName)) {
120124
// no change.
121125
return;
122126
}
123127

124-
if (change.since != _changeNumber.get()
125-
|| change.since < _changeNumber.get()) {
128+
if (change.since != _segmentCache.getChangeNumber(_segmentName)
129+
|| change.since < _segmentCache.getChangeNumber(_segmentName)) {
126130
// some other thread may have updated the shared state. exit
127131
return;
128132
}
129133

130134

131135
if (change.added.isEmpty() && change.removed.isEmpty()) {
132136
// there are no changes. weird!
133-
_changeNumber.set(change.till);
137+
_segmentCache.setChangeNumber(_segmentName,change.till);
134138
return;
135139
}
136140

137141
synchronized (_lock) {
138142
// check state one more time.
139-
if (change.since != _changeNumber.get()
140-
|| change.till < _changeNumber.get()) {
143+
if (change.since != _segmentCache.getChangeNumber(_segmentName)
144+
|| change.till < _segmentCache.getChangeNumber(_segmentName)) {
141145
// some other thread may have updated the shared state. exit
142146
return;
143147
}
144-
145-
for (String added : change.added) {
146-
_concurrentKeySet.add(added);
147-
}
148+
//updateSegment(sn, toadd, tormv, chngN)
149+
_segmentCache.updateSegment(_segmentName,change.added, change.removed);
148150

149151
if (!change.added.isEmpty()) {
150152
_log.info(_segmentName + " added keys: " + summarize(change.added));
151153
}
152154

153-
for (String removed : change.removed) {
154-
_concurrentKeySet.remove(removed);
155-
}
156-
157155
if (!change.removed.isEmpty()) {
158156
_log.info(_segmentName + " removed keys: " + summarize(change.removed));
159157
}
160158

161-
_changeNumber.set(change.till);
159+
_segmentCache.setChangeNumber(_segmentName,change.till);
162160
}
163161
}
164162

client/src/main/java/io/split/engine/segments/RefreshableSegmentFetcher.java

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@
33
import com.google.common.collect.Maps;
44
import com.google.common.util.concurrent.ThreadFactoryBuilder;
55
import io.split.engine.SDKReadinessGates;
6+
import io.split.engine.cache.InMemoryCacheImp;
7+
import io.split.engine.segments.storage.SegmentCache;
68
import org.slf4j.Logger;
79
import org.slf4j.LoggerFactory;
810

@@ -33,12 +35,14 @@ public class RefreshableSegmentFetcher implements Closeable, SegmentFetcher, Run
3335
private final AtomicBoolean _running;
3436
private final Object _lock = new Object();
3537
private final ConcurrentMap<String, RefreshableSegment> _segmentFetchers = Maps.newConcurrentMap();
38+
private final SegmentCache _segmentCache;
3639
private final SDKReadinessGates _gates;
3740
private final ScheduledExecutorService _scheduledExecutorService;
3841

3942
private ScheduledFuture<?> _scheduledFuture;
4043

41-
public RefreshableSegmentFetcher(SegmentChangeFetcher segmentChangeFetcher, long refreshEveryNSeconds, int numThreads, SDKReadinessGates gates) {
44+
public RefreshableSegmentFetcher(SegmentChangeFetcher segmentChangeFetcher, long refreshEveryNSeconds, int numThreads, SDKReadinessGates gates,
45+
SegmentCache segmentCache) {
4246
_segmentChangeFetcher = segmentChangeFetcher;
4347
checkNotNull(_segmentChangeFetcher);
4448

@@ -56,6 +60,8 @@ public RefreshableSegmentFetcher(SegmentChangeFetcher segmentChangeFetcher, long
5660
_scheduledExecutorService = Executors.newScheduledThreadPool(numThreads, threadFactory);
5761

5862
_running = new AtomicBoolean(false);
63+
64+
_segmentCache = segmentCache;
5965
}
6066

6167
public RefreshableSegment segment(String segmentName) {
@@ -79,7 +85,7 @@ public RefreshableSegment segment(String segmentName) {
7985
_log.error("Unable to register segment " + segmentName);
8086
// We will try again inside the RefreshableSegment.
8187
}
82-
segment = RefreshableSegment.create(segmentName, _segmentChangeFetcher, _gates);
88+
segment = RefreshableSegment.create(segmentName, _segmentChangeFetcher, _gates, _segmentCache);
8389

8490
if (_running.get()) {
8591
_scheduledExecutorService.submit(segment);
@@ -93,13 +99,7 @@ public RefreshableSegment segment(String segmentName) {
9399

94100
@Override
95101
public long getChangeNumber(String segmentName) {
96-
RefreshableSegment segment = _segmentFetchers.get(segmentName);
97-
98-
if (segment == null) {
99-
return -1;
100-
}
101-
102-
return segment.changeNumber();
102+
return _segmentCache.getChangeNumber(segmentName);
103103
}
104104

105105
@Override
Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
package io.split.engine.segments;
2+
3+
import java.util.Collections;
4+
import java.util.List;
5+
import java.util.Set;
6+
import java.util.concurrent.ConcurrentHashMap;
7+
import java.util.concurrent.atomic.AtomicLong;
8+
9+
/**
10+
* Segment Implementation
11+
* @author lucasecheverz
12+
*/
13+
import static com.google.common.base.Preconditions.checkNotNull;
14+
15+
public class SegmentImplementation implements Segment{
16+
17+
private final String _segmentName;
18+
private final AtomicLong _changeNumber;
19+
private Set<String> _concurrentKeySet = Collections.newSetFromMap(new ConcurrentHashMap<String, Boolean>());
20+
21+
public SegmentImplementation(long changeNumber, String segmentName){
22+
_segmentName = segmentName;
23+
_changeNumber = new AtomicLong(changeNumber);
24+
25+
checkNotNull(_segmentName);
26+
}
27+
28+
public SegmentImplementation(long changeNumber, String segmentName, List<String> keys){
29+
this(changeNumber,segmentName);
30+
_concurrentKeySet.addAll(keys);
31+
}
32+
33+
@Override
34+
public String segmentName() {
35+
return _segmentName;
36+
}
37+
38+
@Override
39+
public boolean contains(String key) {
40+
return _concurrentKeySet.contains(key);
41+
}
42+
43+
@Override
44+
public void forceRefresh() {
45+
return;
46+
}
47+
48+
@Override
49+
public long changeNumber() {
50+
return _changeNumber.get();
51+
}
52+
53+
public void setChangeNumber(long changeNumber){
54+
_changeNumber.set(changeNumber);
55+
}
56+
57+
public void updateSegment(List<String> toAdd, List<String> toRemove){
58+
_concurrentKeySet.removeAll(toRemove);
59+
_concurrentKeySet.addAll(toAdd);
60+
}
61+
}
Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
package io.split.engine.segments.storage;
2+
3+
import java.util.List;
4+
5+
/**
6+
* Memory for segments
7+
* @author lucasecheverz
8+
*/
9+
public interface SegmentCache {
10+
11+
/**
12+
* update segment
13+
* @param segmentName
14+
* @param toAdd
15+
* @param toRemove
16+
*/
17+
void updateSegment(String segmentName, List<String> toAdd, List<String> toRemove) ;
18+
19+
/**
20+
* evaluates if a key belongs to a segment
21+
* @param segmentName
22+
* @param key
23+
* @return
24+
*/
25+
boolean isInSegment(String segmentName, String key);
26+
27+
/**
28+
* update the changeNumber of a segment
29+
* @param segmentName
30+
* @param changeNumber
31+
*/
32+
void setChangeNumber(String segmentName, long changeNumber);
33+
34+
/**
35+
* returns the changeNumber of a segment
36+
* @param segmentName
37+
* @return
38+
*/
39+
long getChangeNumber(String segmentName);
40+
41+
/**
42+
* clear all segments
43+
*/
44+
void clear();
45+
}

0 commit comments

Comments
 (0)