Skip to content

Commit 0b57206

Browse files
PR comments fixed
1 parent 2330cff commit 0b57206

File tree

11 files changed

+120
-86
lines changed

11 files changed

+120
-86
lines changed

client/src/main/java/io/split/cache/SegmentCacheInMemoryImpl.java

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,6 @@ public class SegmentCacheInMemoryImpl implements SegmentCache {
1717
private static final long DEFAULT_CHANGE_NUMBER = -1l;
1818
private final ConcurrentMap<String, SegmentImp> _segments = Maps.newConcurrentMap();
1919

20-
public SegmentCacheInMemoryImpl(){};
21-
2220
@Override
2321
public void updateSegment(String segmentName, List<String> toAdd, List<String> toRemove) {
2422
if(_segments.get(segmentName) == null){
@@ -30,11 +28,12 @@ public void updateSegment(String segmentName, List<String> toAdd, List<String> t
3028

3129
@Override
3230
public boolean isInSegment(String segmentName, String key) {
33-
if(_segments.get(segmentName) == null){
34-
_log.error("Segment " + segmentName + "Not founded.");
31+
SegmentImp segmentImp = _segments.get(segmentName);
32+
if(segmentImp == null){
33+
_log.error("Segment " + segmentName + "Not found.");
3534
return false;
3635
}
37-
return _segments.get(segmentName).contains(key);
36+
return segmentImp.contains(key);
3837
}
3938

4039
@Override
@@ -43,17 +42,18 @@ public void setChangeNumber(String segmentName, long changeNumber) {
4342
_segments.get(segmentName).setChangeNumber(changeNumber);
4443
}
4544
else{
46-
_log.error("Segment " + segmentName + "Not founded.");
45+
_log.error("Segment " + segmentName + "Not found.");
4746
}
4847
}
4948

5049
@Override
5150
public long getChangeNumber(String segmentName) {
52-
if(_segments.get(segmentName) == null){
53-
_log.error("Segment " + segmentName + "Not founded.");
51+
SegmentImp segmentImp = _segments.get(segmentName);
52+
if(segmentImp == null){
53+
_log.error("Segment " + segmentName + "Not found.");
5454
return DEFAULT_CHANGE_NUMBER;
5555
}
56-
return _segments.get(segmentName).getChangeNumber();
56+
return segmentImp.getChangeNumber();
5757
}
5858

5959
@Override

client/src/main/java/io/split/client/jmx/SplitJmxMonitor.java

Lines changed: 14 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,11 @@
11
package io.split.client.jmx;
22

33
import io.split.cache.SegmentCache;
4-
import io.split.client.SplitClient;
54
import io.split.cache.SplitCache;
5+
import io.split.client.SplitClient;
66
import io.split.engine.experiments.SplitFetcher;
77
import io.split.engine.segments.SegmentFetcher;
8-
import io.split.engine.segments.SegmentFetcherImp;
9-
import io.split.engine.segments.SegmentSynchronizationTaskImp;
8+
import io.split.engine.segments.SegmentSynchronizationTask;
109
import org.slf4j.Logger;
1110
import org.slf4j.LoggerFactory;
1211

@@ -22,16 +21,15 @@ public class SplitJmxMonitor implements SplitJmxMonitorMBean {
2221
private final SplitClient _client;
2322
private final SplitFetcher _featureFetcher;
2423
private final SplitCache _splitCache;
25-
//private final SegmentFetcher _segmentFetcher;
26-
private final SegmentSynchronizationTaskImp _segmentSynchronizationTaskImp;
24+
private final SegmentSynchronizationTask _segmentSynchronizationTask;
2725
private SegmentCache _segmentCache;
2826

29-
public SplitJmxMonitor(SplitClient splitClient, SplitFetcher featureFetcher, SplitCache splitCache, SegmentFetcher segmentFetcher, SegmentSynchronizationTaskImp segmentSynchronizationTaskImp) {
27+
public SplitJmxMonitor(SplitClient splitClient, SplitFetcher featureFetcher, SplitCache splitCache, SegmentFetcher segmentFetcher, SegmentSynchronizationTask segmentSynchronizationTask, SegmentCache segmentCache) {
3028
_client = checkNotNull(splitClient);
3129
_featureFetcher = checkNotNull(featureFetcher);
3230
_splitCache = checkNotNull(splitCache);
33-
//_segmentFetcher = checkNotNull(segmentFetcher);
34-
_segmentSynchronizationTaskImp = segmentSynchronizationTaskImp;
31+
_segmentSynchronizationTask = checkNotNull(segmentSynchronizationTask);
32+
_segmentCache = checkNotNull(segmentCache);
3533
}
3634

3735
@Override
@@ -43,8 +41,14 @@ public boolean forceSyncFeatures() {
4341

4442
@Override
4543
public boolean forceSyncSegment(String segmentName) {
46-
SegmentFetcher fetcher = _segmentSynchronizationTaskImp.getFetcher(segmentName);
47-
fetcher.fetch();
44+
SegmentFetcher fetcher = _segmentSynchronizationTask.getFetcher(segmentName);
45+
try{
46+
fetcher.forceRefresh();
47+
}
48+
//We are sure this will never happen because getFetcher firts initiate the segment. This try/catch is for safe only.
49+
catch (NullPointerException np){
50+
throw new NullPointerException();
51+
}
4852

4953
_log.info("Segment " + segmentName + " successfully refreshed via JMX");
5054
return true;
@@ -63,6 +67,5 @@ public String fetchDefinition(String featureName) {
6367
@Override
6468
public boolean isKeyInSegment(String key, String segmentName) {
6569
return _segmentCache.isInSegment(segmentName, key);
66-
//return _segmentFetcher.segment(segmentName).contains(key);
6770
}
6871
}

client/src/main/java/io/split/engine/common/SynchronizerImp.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,13 @@ public void localKillSplit(String splitName, String defaultTreatment, long newCh
8989
public void refreshSegment(String segmentName, long changeNumber) {
9090
if (changeNumber > _segmentCache.getChangeNumber(segmentName)) {
9191
SegmentFetcher fetcher = _segmentSynchronizationTaskImp.getFetcher(segmentName);
92-
fetcher.fetch();
92+
try{
93+
fetcher.forceRefresh();
94+
}
95+
//We are sure this will never happen because getFetcher firts initiate the segment. This try/catch is for safe only.
96+
catch (NullPointerException np){
97+
throw new NullPointerException();
98+
}
9399
}
94100
}
95101
}

client/src/main/java/io/split/engine/experiments/SplitParser.java

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,6 @@
2828
import io.split.engine.matchers.strings.StartsWithAnyOfMatcher;
2929
import io.split.engine.matchers.strings.WhitelistMatcher;
3030
import io.split.engine.segments.SegmentSynchronizationTask;
31-
import io.split.engine.segments.SegmentSynchronizationTaskImp;
3231
import org.slf4j.Logger;
3332
import org.slf4j.LoggerFactory;
3433

@@ -47,12 +46,12 @@ public final class SplitParser {
4746
public static final int CONDITIONS_UPPER_LIMIT = 50;
4847
private static final Logger _log = LoggerFactory.getLogger(SplitParser.class);
4948

50-
private final SegmentSynchronizationTask _segmentSynchronizationTaskImp;
49+
private final SegmentSynchronizationTask _segmentSynchronizationTask;
5150
private final SegmentCache _segmentCache;
5251

5352
public SplitParser(SegmentSynchronizationTask segmentSynchronizationTaskImp,
5453
SegmentCache segmentCache) {
55-
_segmentSynchronizationTaskImp = checkNotNull(segmentSynchronizationTaskImp);
54+
_segmentSynchronizationTask = checkNotNull(segmentSynchronizationTaskImp);
5655
_segmentCache = checkNotNull(segmentCache);
5756
}
5857

@@ -110,7 +109,7 @@ private AttributeMatcher toMatcher(Matcher matcher) {
110109
case IN_SEGMENT:
111110
checkNotNull(matcher.userDefinedSegmentMatcherData);
112111
String segmentName = matcher.userDefinedSegmentMatcherData.segmentName;
113-
_segmentSynchronizationTaskImp.initializeSegment(segmentName);
112+
_segmentSynchronizationTask.initializeSegment(segmentName);
114113
delegate = new UserDefinedSegmentMatcher(_segmentCache, segmentName);
115114
break;
116115
case WHITELIST:

client/src/main/java/io/split/engine/segments/SegmentFetcher.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,4 +8,9 @@ public interface SegmentFetcher {
88
* fetch
99
*/
1010
void fetch();
11+
12+
/**
13+
* forceRefresh
14+
*/
15+
void forceRefresh();
1116
}

client/src/main/java/io/split/engine/segments/SegmentFetcherImp.java

Lines changed: 25 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -40,22 +40,23 @@ public void run() {
4040
fetch();
4141
}
4242

43+
public void forceRefresh(){
44+
try {
45+
callLoopRun(false);
46+
} catch (Throwable t) {
47+
_log.error("RefreshableSegmentFetcher failed: " + t.getMessage());
48+
if (_log.isDebugEnabled()) {
49+
_log.debug("Reason:", t);
50+
}
51+
}
52+
}
53+
4354
@Override
4455
public void fetch() {
4556
try {
4657
// Do this again in case the previous call errored out.
4758
_gates.registerSegment(_segmentName);
48-
while (true) {
49-
long start = _segmentCache.getChangeNumber(_segmentName);
50-
runWithoutExceptionHandling();
51-
long end = _segmentCache.getChangeNumber(_segmentName);
52-
if (_log.isDebugEnabled()) {
53-
_log.debug(_segmentName + " segment fetch before: " + start + ", after: " + _segmentCache.getChangeNumber(_segmentName) /*+ " size: " + _concurrentKeySet.size()*/);
54-
}
55-
if (start >= end) {
56-
break;
57-
}
58-
}
59+
callLoopRun(true);
5960

6061
_gates.segmentIsReady(_segmentName);
6162

@@ -134,5 +135,17 @@ private String summarize(List<String> changes) {
134135
return bldr.toString();
135136
}
136137

137-
138+
private void callLoopRun(boolean isFetch){
139+
while (true) {
140+
long start = _segmentCache.getChangeNumber(_segmentName);
141+
runWithoutExceptionHandling();
142+
long end = _segmentCache.getChangeNumber(_segmentName);
143+
if (isFetch && _log.isDebugEnabled()) {
144+
_log.debug(_segmentName + " segment fetch before: " + start + ", after: " + _segmentCache.getChangeNumber(_segmentName) /*+ " size: " + _concurrentKeySet.size()*/);
145+
}
146+
if (start >= end) {
147+
break;
148+
}
149+
}
150+
}
138151
}

client/src/main/java/io/split/engine/segments/SegmentSynchronizationTaskImp.java

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -7,15 +7,21 @@
77
import org.slf4j.Logger;
88
import org.slf4j.LoggerFactory;
99

10+
import java.io.Closeable;
1011
import java.util.List;
11-
import java.util.concurrent.*;
12+
import java.util.concurrent.ConcurrentMap;
13+
import java.util.concurrent.ScheduledExecutorService;
14+
import java.util.concurrent.ScheduledFuture;
15+
import java.util.concurrent.ThreadFactory;
1216
import java.util.concurrent.atomic.AtomicBoolean;
1317
import java.util.concurrent.atomic.AtomicLong;
18+
import java.util.concurrent.Executors;
19+
import java.util.concurrent.TimeUnit;
1420

1521
import static com.google.common.base.Preconditions.checkArgument;
1622
import static com.google.common.base.Preconditions.checkNotNull;
1723

18-
public class SegmentSynchronizationTaskImp implements SegmentSynchronizationTask {
24+
public class SegmentSynchronizationTaskImp implements SegmentSynchronizationTask, Closeable {
1925
private static final Logger _log = LoggerFactory.getLogger(SegmentSynchronizationTaskImp.class);
2026

2127
private final SegmentChangeFetcher _segmentChangeFetcher;
@@ -30,14 +36,12 @@ public class SegmentSynchronizationTaskImp implements SegmentSynchronizationTask
3036
private ScheduledFuture<?> _scheduledFuture;
3137

3238
public SegmentSynchronizationTaskImp(SegmentChangeFetcher segmentChangeFetcher, long refreshEveryNSeconds, int numThreads, SDKReadinessGates gates, SegmentCache segmentCache) {
33-
_segmentChangeFetcher = segmentChangeFetcher;
34-
checkNotNull(_segmentChangeFetcher);
39+
_segmentChangeFetcher = checkNotNull(segmentChangeFetcher);
3540

3641
checkArgument(refreshEveryNSeconds >= 0L);
3742
_refreshEveryNSeconds = new AtomicLong(refreshEveryNSeconds);
3843

39-
_gates = gates;
40-
checkNotNull(_gates);
44+
_gates = checkNotNull(gates);
4145

4246
ThreadFactory threadFactory = new ThreadFactoryBuilder()
4347
.setDaemon(true)
@@ -48,7 +52,7 @@ public SegmentSynchronizationTaskImp(SegmentChangeFetcher segmentChangeFetcher,
4852

4953
_running = new AtomicBoolean(false);
5054

51-
_segmentCache = segmentCache;
55+
_segmentCache = checkNotNull(segmentCache);
5256
}
5357

5458
@Override
@@ -125,6 +129,7 @@ public void stop() {
125129
_log.debug("Stopped PeriodicFetching Segments ...");
126130
}
127131

132+
@Override
128133
public void close() {
129134
if (_scheduledExecutorService == null || _scheduledExecutorService.isShutdown()) {
130135
return;

client/src/test/java/io/split/engine/experiments/RefreshableSplitFetcherTest.java renamed to client/src/test/java/io/split/engine/experiments/SplitFetcherTest.java

Lines changed: 14 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,47 +1,48 @@
11
package io.split.engine.experiments;
22

33
import com.google.common.collect.Lists;
4+
import io.split.cache.InMemoryCacheImp;
5+
import io.split.cache.SegmentCache;
46
import io.split.cache.SegmentCacheInMemoryImpl;
5-
import io.split.client.dtos.Condition;
7+
import io.split.cache.SplitCache;
68
import io.split.client.dtos.Matcher;
79
import io.split.client.dtos.MatcherGroup;
810
import io.split.client.dtos.Split;
9-
import io.split.client.dtos.SplitChange;
1011
import io.split.client.dtos.Status;
12+
import io.split.client.dtos.Condition;
13+
import io.split.client.dtos.SplitChange;
1114
import io.split.engine.ConditionsTestUtil;
1215
import io.split.engine.SDKReadinessGates;
13-
import io.split.cache.InMemoryCacheImp;
14-
import io.split.cache.SplitCache;
1516
import io.split.engine.matchers.AllKeysMatcher;
1617
import io.split.engine.matchers.CombiningMatcher;
17-
import io.split.engine.segments.*;
18-
import io.split.cache.SegmentCache;
18+
import io.split.engine.segments.NoChangeSegmentChangeFetcher;
19+
import io.split.engine.segments.SegmentChangeFetcher;
20+
import io.split.engine.segments.SegmentSynchronizationTask;
21+
import io.split.engine.segments.SegmentSynchronizationTaskImp;
1922
import io.split.grammar.Treatments;
2023
import org.junit.Test;
21-
import org.mockito.Mockito;
2224
import org.slf4j.Logger;
2325
import org.slf4j.LoggerFactory;
2426

25-
import java.util.Collections;
2627
import java.util.List;
2728
import java.util.concurrent.Executors;
2829
import java.util.concurrent.ScheduledExecutorService;
2930
import java.util.concurrent.TimeUnit;
3031

31-
import static org.hamcrest.Matchers.equalTo;
32-
import static org.hamcrest.Matchers.greaterThan;
3332
import static org.hamcrest.Matchers.is;
33+
import static org.hamcrest.Matchers.equalTo;
3434
import static org.hamcrest.Matchers.not;
35+
import static org.hamcrest.Matchers.greaterThan;
3536
import static org.hamcrest.Matchers.nullValue;
36-
import static org.junit.Assert.*;
37+
import static org.junit.Assert.assertThat;
3738
import static org.mockito.Mockito.mock;
3839
import static org.mockito.Mockito.when;
3940

4041
/**
4142
* Created by adilaijaz on 5/11/15.
4243
*/
43-
public class RefreshableSplitFetcherTest {
44-
private static final Logger _log = LoggerFactory.getLogger(RefreshableSplitFetcherTest.class);
44+
public class SplitFetcherTest {
45+
private static final Logger _log = LoggerFactory.getLogger(SplitFetcherTest.class);
4546

4647
@Test
4748
public void works_when_we_start_without_any_state() throws InterruptedException {

0 commit comments

Comments
 (0)