Skip to content

Commit af68d6a

Browse files
committed
added splitCache and splitFetcher refactor
1 parent 4f104b9 commit af68d6a

File tree

6 files changed

+172
-82
lines changed

6 files changed

+172
-82
lines changed

client/src/main/java/io/split/client/SplitFactoryImpl.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,8 @@
1111
import io.split.client.metrics.CachedMetrics;
1212
import io.split.client.metrics.FireAndForgetMetrics;
1313
import io.split.client.metrics.HttpMetrics;
14+
import io.split.engine.cache.InMemoryCacheImp;
15+
import io.split.engine.cache.SplitCache;
1416
import io.split.engine.evaluator.Evaluator;
1517
import io.split.engine.evaluator.EvaluatorImp;
1618
import io.split.engine.SDKReadinessGates;
@@ -200,7 +202,8 @@ public SplitFactoryImpl(String apiToken, SplitClientConfig config) throws URISyn
200202
// Feature Changes
201203
SplitChangeFetcher splitChangeFetcher = HttpSplitChangeFetcher.create(httpclient, rootTarget, uncachedFireAndForget);
202204

203-
final RefreshableSplitFetcherProvider splitFetcherProvider = new RefreshableSplitFetcherProvider(splitChangeFetcher, splitParser, findPollingPeriod(RANDOM, config.featuresRefreshRate()), gates);
205+
final SplitCache cache = new InMemoryCacheImp(-1);
206+
final RefreshableSplitFetcherProvider splitFetcherProvider = new RefreshableSplitFetcherProvider(splitChangeFetcher, splitParser, findPollingPeriod(RANDOM, config.featuresRefreshRate()), gates, cache);
204207

205208

206209
List<ImpressionListener> impressionListeners = new ArrayList<>();
Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,92 @@
1+
package io.split.engine.cache;
2+
3+
import com.google.common.collect.Maps;
4+
import io.split.engine.experiments.ParsedSplit;
5+
import org.slf4j.Logger;
6+
import org.slf4j.LoggerFactory;
7+
8+
import java.util.ArrayList;
9+
import java.util.Collection;
10+
import java.util.List;
11+
import java.util.Map;
12+
import java.util.concurrent.ConcurrentMap;
13+
import java.util.concurrent.atomic.AtomicLong;
14+
15+
public class InMemoryCacheImp implements SplitCache {
16+
17+
private static final Logger _log = LoggerFactory.getLogger(InMemoryCacheImp.class);
18+
19+
private final ConcurrentMap<String, ParsedSplit> _concurrentMap;
20+
private AtomicLong _changeNumber;
21+
22+
public InMemoryCacheImp(long startingChangeNumber) {
23+
_concurrentMap = Maps.newConcurrentMap();
24+
_changeNumber = new AtomicLong(startingChangeNumber);
25+
}
26+
27+
@Override
28+
public void put(ParsedSplit split) {
29+
_concurrentMap.put(split.feature(), split);
30+
}
31+
32+
@Override
33+
public void putAll(Map<String, ParsedSplit> splits) {
34+
_concurrentMap.putAll(splits);
35+
}
36+
37+
@Override
38+
public boolean remove(String name) {
39+
ParsedSplit removed = _concurrentMap.remove(name);
40+
41+
return removed != null;
42+
}
43+
44+
@Override
45+
public ParsedSplit get(String name) {
46+
return _concurrentMap.get(name);
47+
}
48+
49+
@Override
50+
public Collection<ParsedSplit> getAll() {
51+
return _concurrentMap.values();
52+
}
53+
54+
@Override
55+
public Collection<ParsedSplit> getMany(List<String> names) {
56+
List<ParsedSplit> splits = new ArrayList<>();
57+
58+
for (String name : names) {
59+
ParsedSplit split = _concurrentMap.get(name);
60+
61+
if (split != null) {
62+
splits.add(split);
63+
}
64+
}
65+
66+
return splits;
67+
}
68+
69+
@Override
70+
public long getChangeNumber() {
71+
return _changeNumber.get();
72+
}
73+
74+
@Override
75+
public void setChangeNumber(long changeNumber) {
76+
if (changeNumber < _changeNumber.get()) {
77+
_log.error("ChangeNumber for splits cache is less than previous");
78+
}
79+
80+
_changeNumber.set(changeNumber);
81+
}
82+
83+
@Override
84+
public boolean trafficTypeExists(String trafficType) {
85+
return false;
86+
}
87+
88+
@Override
89+
public void clear() {
90+
_concurrentMap.clear();
91+
}
92+
}
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
package io.split.engine.cache;
2+
3+
import io.split.engine.experiments.ParsedSplit;
4+
5+
import java.util.Collection;
6+
import java.util.List;
7+
import java.util.Map;
8+
9+
public interface SplitCache {
10+
void put(ParsedSplit split);
11+
void putAll(Map<String, ParsedSplit> splits);
12+
boolean remove(String name);
13+
ParsedSplit get(String name);
14+
Collection<ParsedSplit> getAll();
15+
Collection<ParsedSplit> getMany(List<String> names);
16+
long getChangeNumber();
17+
void setChangeNumber(long changeNumber);
18+
boolean trafficTypeExists(String trafficType);
19+
void clear();
20+
}

client/src/main/java/io/split/engine/experiments/RefreshableSplitFetcher.java

Lines changed: 30 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -6,21 +6,18 @@
66
import com.google.common.collect.Multiset;
77
import com.google.common.collect.Multisets;
88
import com.google.common.collect.Sets;
9-
import io.split.client.dtos.Condition;
10-
import io.split.client.dtos.Matcher;
11-
import io.split.client.dtos.MatcherType;
129
import io.split.client.dtos.Split;
1310
import io.split.client.dtos.SplitChange;
1411
import io.split.client.dtos.Status;
1512
import io.split.engine.SDKReadinessGates;
13+
import io.split.engine.cache.SplitCache;
1614
import org.slf4j.Logger;
1715
import org.slf4j.LoggerFactory;
1816

1917
import java.util.Collection;
2018
import java.util.List;
2119
import java.util.Map;
2220
import java.util.Set;
23-
import java.util.concurrent.atomic.AtomicLong;
2421

2522
import static com.google.common.base.Preconditions.checkNotNull;
2623

@@ -35,9 +32,9 @@ public class RefreshableSplitFetcher implements SplitFetcher, Runnable {
3532

3633
private final SplitParser _parser;
3734
private final SplitChangeFetcher _splitChangeFetcher;
38-
private final AtomicLong _changeNumber;
39-
40-
private Map<String, ParsedSplit> _concurrentMap = Maps.newConcurrentMap();
35+
private final SplitCache _splitCache;
36+
private final SDKReadinessGates _gates;
37+
private final Object _lock = new Object();
4138

4239
/**
4340
* Contains all the traffic types that are currently being used by the splits and also the count
@@ -49,45 +46,22 @@ public class RefreshableSplitFetcher implements SplitFetcher, Runnable {
4946
* an ARCHIVED split is received, we know if we need to remove a traffic type from the multiset.
5047
*/
5148
Multiset<String> _concurrentTrafficTypeNameSet = ConcurrentHashMultiset.create();
52-
private final SDKReadinessGates _gates;
53-
54-
private final Object _lock = new Object();
5549

56-
57-
public RefreshableSplitFetcher(SplitChangeFetcher splitChangeFetcher, SplitParser parser, SDKReadinessGates gates) {
58-
this(splitChangeFetcher, parser, gates, -1);
59-
}
60-
61-
/**
62-
* This constructor is package private because it is meant primarily for unit tests
63-
* where we want to set the starting change number. All regular clients should use
64-
* the public constructor.
65-
*
66-
* @param splitChangeFetcher MUST NOT be null
67-
* @param parser MUST NOT be null
68-
* @param startingChangeNumber
69-
*/
70-
/*package private*/ RefreshableSplitFetcher(SplitChangeFetcher splitChangeFetcher,
71-
SplitParser parser,
72-
SDKReadinessGates gates,
73-
long startingChangeNumber) {
74-
_splitChangeFetcher = splitChangeFetcher;
75-
_parser = parser;
76-
_gates = gates;
77-
_changeNumber = new AtomicLong(startingChangeNumber);
78-
79-
checkNotNull(_parser);
80-
checkNotNull(_splitChangeFetcher);
50+
public RefreshableSplitFetcher(SplitChangeFetcher splitChangeFetcher, SplitParser parser, SDKReadinessGates gates, SplitCache splitCache) {
51+
_splitChangeFetcher = checkNotNull(splitChangeFetcher);
52+
_parser = checkNotNull(parser);
53+
_gates = checkNotNull(gates);
54+
_splitCache = checkNotNull(splitCache);
8155
}
8256

8357
@Override
8458
public void forceRefresh() {
8559
_log.debug("Force Refresh splits starting ...");
8660
try {
8761
while (true) {
88-
long start = _changeNumber.get();
62+
long start = _splitCache.getChangeNumber();
8963
runWithoutExceptionHandling();
90-
long end = _changeNumber.get();
64+
long end = _splitCache.getChangeNumber();
9165

9266
if (start >= end) {
9367
break;
@@ -103,13 +77,13 @@ public void forceRefresh() {
10377

10478
@Override
10579
public long changeNumber() {
106-
return _changeNumber.get();
80+
return _splitCache.getChangeNumber();
10781
}
10882

10983
@Override
11084
public void killSplit(String splitName, String defaultTreatment, long changeNumber) {
11185
synchronized (_lock) {
112-
ParsedSplit parsedSplit = _concurrentMap.get(splitName);
86+
ParsedSplit parsedSplit = _splitCache.get(splitName);
11387

11488
ParsedSplit updatedSplit = new ParsedSplit(parsedSplit.feature(),
11589
parsedSplit.seed(),
@@ -123,17 +97,17 @@ public void killSplit(String splitName, String defaultTreatment, long changeNumb
12397
parsedSplit.algo(),
12498
parsedSplit.configurations());
12599

126-
_concurrentMap.put(splitName, updatedSplit);
100+
_splitCache.put(updatedSplit);
127101
}
128102
}
129103

130104
@Override
131105
public ParsedSplit fetch(String test) {
132-
return _concurrentMap.get(test);
106+
return _splitCache.get(test);
133107
}
134108

135109
public List<ParsedSplit> fetchAll() {
136-
return Lists.newArrayList(_concurrentMap.values());
110+
return Lists.newArrayList(_splitCache.getAll());
137111
}
138112

139113
@Override
@@ -145,18 +119,18 @@ public Set<String> fetchKnownTrafficTypes() {
145119
}
146120

147121
public Collection<ParsedSplit> fetch() {
148-
return _concurrentMap.values();
122+
return _splitCache.getAll();
149123
}
150124

151125
public void clear() {
152-
_concurrentMap.clear();
126+
_splitCache.clear();
153127
_concurrentTrafficTypeNameSet.clear();
154128
}
155129

156130
@Override
157131
public void run() {
158132
_log.debug("Fetch splits starting ...");
159-
long start = _changeNumber.get();
133+
long start = _splitCache.getChangeNumber();
160134
try {
161135
runWithoutExceptionHandling();
162136
_gates.splitsAreReady();
@@ -170,38 +144,38 @@ public void run() {
170144
}
171145
} finally {
172146
if (_log.isDebugEnabled()) {
173-
_log.debug("split fetch before: " + start + ", after: " + _changeNumber.get());
147+
_log.debug("split fetch before: " + start + ", after: " + _splitCache.getChangeNumber());
174148
}
175149
}
176150
}
177151

178152
public void runWithoutExceptionHandling() throws InterruptedException {
179-
SplitChange change = _splitChangeFetcher.fetch(_changeNumber.get());
153+
SplitChange change = _splitChangeFetcher.fetch(_splitCache.getChangeNumber());
180154

181155
if (change == null) {
182156
throw new IllegalStateException("SplitChange was null");
183157
}
184158

185-
if (change.till == _changeNumber.get()) {
159+
if (change.till == _splitCache.getChangeNumber()) {
186160
// no change.
187161
return;
188162
}
189163

190-
if (change.since != _changeNumber.get() || change.till < _changeNumber.get()) {
164+
if (change.since != _splitCache.getChangeNumber() || change.till < _splitCache.getChangeNumber()) {
191165
// some other thread may have updated the shared state. exit
192166
return;
193167
}
194168

195169
if (change.splits.isEmpty()) {
196170
// there are no changes. weird!
197-
_changeNumber.set(change.till);
171+
_splitCache.setChangeNumber(change.till);
198172
return;
199173
}
200174

201175
synchronized (_lock) {
202176
// check state one more time.
203-
if (change.since != _changeNumber.get()
204-
|| change.till < _changeNumber.get()) {
177+
if (change.since != _splitCache.getChangeNumber()
178+
|| change.till < _splitCache.getChangeNumber()) {
205179
// some other thread may have updated the shared state. exit
206180
return;
207181
}
@@ -243,7 +217,7 @@ public void runWithoutExceptionHandling() throws InterruptedException {
243217
// If it's deleted & recreated, the old one should be decreased and the new one increased.
244218
// To handle both cases, we simply delete the old one if the split is present.
245219
// The new one is always increased.
246-
ParsedSplit current = _concurrentMap.get(split.name);
220+
ParsedSplit current = _splitCache.get(split.name);
247221
if (current != null && current.trafficTypeName() != null) {
248222
trafficTypeNamesToRemove.add(current.trafficTypeName());
249223
}
@@ -253,13 +227,13 @@ public void runWithoutExceptionHandling() throws InterruptedException {
253227
}
254228
}
255229

256-
_concurrentMap.putAll(toAdd);
230+
_splitCache.putAll(toAdd);
257231
_concurrentTrafficTypeNameSet.addAll(trafficTypeNamesToAdd);
258232
//removeAll does not work here, since it wont remove all the occurrences, just one
259233
Multisets.removeOccurrences(_concurrentTrafficTypeNameSet, trafficTypeNamesToRemove);
260234

261235
for (String remove : toRemove) {
262-
_concurrentMap.remove(remove);
236+
_splitCache.remove(remove);
263237
}
264238

265239
if (!toAdd.isEmpty()) {
@@ -270,7 +244,7 @@ public void runWithoutExceptionHandling() throws InterruptedException {
270244
_log.debug("Deleted features: " + toRemove);
271245
}
272246

273-
_changeNumber.set(change.till);
247+
_splitCache.setChangeNumber(change.till);
274248
}
275249
}
276250
}

0 commit comments

Comments
 (0)