Skip to content

Commit 31f31ce

Browse files
authored
Merge pull request #158 from splitio/development
Development into master
2 parents 319d6ce + 38988a3 commit 31f31ce

File tree

12 files changed

+53
-51
lines changed

12 files changed

+53
-51
lines changed

client/CHANGES.txt

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,8 @@
11
CHANGES
22

3+
4.1.1 (Sep 30, 2020)
4+
- Fixed fetch retries after received an SPLIT_CHANGE.
5+
36
4.1.0 (Sep 25, 2020)
47
- Add local impressions deduping (enabled by default)
58

client/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
<parent>
66
<groupId>io.split.client</groupId>
77
<artifactId>java-client-parent</artifactId>
8-
<version>4.1.0</version>
8+
<version>4.1.1</version>
99
</parent>
1010
<artifactId>java-client</artifactId>
1111
<packaging>jar</packaging>

client/src/main/java/io/split/engine/common/SynchronizerImp.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ public SynchronizerImp(RefreshableSplitFetcherProvider refreshableSplitFetcherPr
3838
@Override
3939
public void syncAll() {
4040
_syncAllScheduledExecutorService.schedule(() -> {
41-
_splitFetcher.forceRefresh();
41+
_splitFetcher.run();
4242
_segmentFetcher.forceRefreshAll();
4343
}, 0, TimeUnit.SECONDS);
4444
}
@@ -68,6 +68,7 @@ public void refreshSplits(long targetChangeNumber) {
6868
public void localKillSplit(String splitName, String defaultTreatment, long newChangeNumber) {
6969
if (newChangeNumber > _splitFetcher.changeNumber()) {
7070
_splitFetcher.killSplit(splitName, defaultTreatment, newChangeNumber);
71+
refreshSplits(newChangeNumber);
7172
}
7273
}
7374

client/src/main/java/io/split/engine/experiments/RefreshableSplitFetcher.java

Lines changed: 18 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,23 @@ public RefreshableSplitFetcher(SplitChangeFetcher splitChangeFetcher, SplitParse
8282

8383
@Override
8484
public void forceRefresh() {
85-
run();
85+
_log.debug("Force Refresh splits starting ...");
86+
try {
87+
while (true) {
88+
long start = _changeNumber.get();
89+
runWithoutExceptionHandling();
90+
long end = _changeNumber.get();
91+
92+
if (start >= end) {
93+
break;
94+
}
95+
}
96+
} catch (InterruptedException e) {
97+
_log.warn("Interrupting split fetcher task");
98+
Thread.currentThread().interrupt();
99+
} catch (Throwable t) {
100+
_log.error("RefreshableSplitFetcher failed: " + t.getMessage());
101+
}
86102
}
87103

88104
@Override
@@ -171,8 +187,7 @@ public void runWithoutExceptionHandling() throws InterruptedException {
171187
return;
172188
}
173189

174-
if (change.since != _changeNumber.get()
175-
|| change.till < _changeNumber.get()) {
190+
if (change.since != _changeNumber.get() || change.till < _changeNumber.get()) {
176191
// some other thread may have updated the shared state. exit
177192
return;
178193
}
@@ -257,20 +272,5 @@ public void runWithoutExceptionHandling() throws InterruptedException {
257272

258273
_changeNumber.set(change.till);
259274
}
260-
261-
}
262-
263-
private List<String> collectSegmentsInUse(Split split) {
264-
List<String> result = Lists.newArrayList();
265-
for (Condition condition : split.conditions) {
266-
for (Matcher matcher : condition.matcherGroup.matchers) {
267-
if (matcher.matcherType == MatcherType.IN_SEGMENT) {
268-
if (matcher.userDefinedSegmentMatcherData != null && matcher.userDefinedSegmentMatcherData.segmentName != null) {
269-
result.add(matcher.userDefinedSegmentMatcherData.segmentName);
270-
}
271-
}
272-
}
273-
}
274-
return result;
275275
}
276276
}

client/src/main/java/io/split/engine/segments/RefreshableSegment.java

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,20 @@ public boolean contains(String key) {
4545

4646
@Override
4747
public void forceRefresh() {
48-
run();
48+
try {
49+
_log.debug("Force Refresh segment starting ...");
50+
while (true) {
51+
long start = _changeNumber.get();
52+
runWithoutExceptionHandling();
53+
long end = _changeNumber.get();
54+
55+
if (start >= end) {
56+
break;
57+
}
58+
}
59+
} catch (Throwable t) {
60+
_log.error("forceRefresh segment failed: " + t.getMessage());
61+
}
4962
}
5063

5164
@Override

client/src/main/java/io/split/engine/sse/EventSourceClientImp.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -103,7 +103,7 @@ private void onMessage(RawEvent event) {
103103
} catch (EventParsingException ex) {
104104
_log.debug(String.format("Error parsing the event: %s. Payload: %s", ex.getMessage(), ex.getPayload()));
105105
} catch (Exception e) {
106-
_log.warn(String.format("Error onMessage: %s", e.getMessage()));
106+
_log.debug(String.format("Error onMessage: %s", e.getMessage()));
107107
}
108108
}
109109
}

client/src/test/java/io/split/SplitMockServer.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,12 +46,16 @@ public MockResponse dispatch(RecordedRequest request) {
4646
return new MockResponse().setBody(inputStreamToString("splits2.json"));
4747
case "/api/splitChanges?since=1585948850111":
4848
return new MockResponse().setBody(inputStreamToString("splits_killed.json"));
49+
case "/api/splitChanges?since=1585948850112":
50+
return new MockResponse().setBody("{\"splits\": [], \"since\":1585948850112, \"till\":1585948850112}");
4951
case "/api/segmentChanges/segment-test?since=-1":
5052
return new MockResponse().setBody("{\"name\": \"segment3\",\"added\": [],\"removed\": [],\"since\": -1,\"till\": -1}");
5153
case "/api/segmentChanges/segment3?since=-1":
5254
return new MockResponse().setBody(inputStreamToString("segment3.json"));
5355
case "/api/segmentChanges/segment3?since=1585948850110":
5456
return new MockResponse().setBody("{\"name\": \"segment3\",\"added\": [],\"removed\": [],\"since\": 1585948850110,\"till\": 1585948850110}");
57+
case "/api/segmentChanges/segment3?since=1585948850111":
58+
return new MockResponse().setBody("{\"name\": \"segment3\",\"added\": [],\"removed\": [],\"since\": 1585948850111,\"till\": 1585948850111}");
5559
case "/api/metrics/time":
5660
case "api/metrics/counter":
5761
return new MockResponse().setResponseCode(200);

client/src/test/java/io/split/client/SplitClientIntegrationTest.java

Lines changed: 2 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -402,7 +402,7 @@ public void splitClientMultiFactory() throws IOException, TimeoutException, Inte
402402

403403
Awaitility.await()
404404
.atMost(50L, TimeUnit.SECONDS)
405-
.until(() -> "after_notification_received".equals(client1.getTreatment("admin", "push_test")));
405+
.until(() -> "split_killed".equals(client1.getTreatment("admin", "push_test")));
406406

407407
Awaitility.await()
408408
.atMost(50L, TimeUnit.SECONDS)
@@ -420,30 +420,7 @@ public void splitClientMultiFactory() throws IOException, TimeoutException, Inte
420420

421421
Awaitility.await()
422422
.atMost(50L, TimeUnit.SECONDS)
423-
.until(() -> "after_notification_received".equals(client1.getTreatment("admin", "push_test")));
424-
425-
Awaitility.await()
426-
.atMost(50L, TimeUnit.SECONDS)
427-
.until(() -> "on_whitelist".equals(client2.getTreatment("admin", "push_test")));
428-
429-
Awaitility.await()
430-
.atMost(50L, TimeUnit.SECONDS)
431-
.until(() -> "after_notification_received".equals(client3.getTreatment("admin", "push_test")));
432-
433-
Awaitility.await()
434-
.atMost(50L, TimeUnit.SECONDS)
435-
.until(() -> "on_whitelist".equals(client4.getTreatment("admin", "push_test")));
436-
437-
OutboundSseEvent sseEventSplitUpdate3 = new OutboundEvent
438-
.Builder()
439-
.name("message")
440-
.data("{\"id\":\"22\",\"clientId\":\"22\",\"timestamp\":1592590436082,\"encoding\":\"json\",\"channel\":\"xxxx_xxxx_splits\",\"data\":\"{\\\"type\\\":\\\"SPLIT_UPDATE\\\",\\\"changeNumber\\\":1585948850112}\"}")
441-
.build();
442-
eventQueue3.push(sseEventSplitUpdate3);
443-
444-
Awaitility.await()
445-
.atMost(50L, TimeUnit.SECONDS)
446-
.until(() -> "after_notification_received".equals(client1.getTreatment("admin", "push_test")));
423+
.until(() -> "split_killed".equals(client1.getTreatment("admin", "push_test")));
447424

448425
Awaitility.await()
449426
.atMost(50L, TimeUnit.SECONDS)

client/src/test/java/io/split/engine/common/SynchronizerTest.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,12 @@
11
package io.split.engine.common;
22

3+
import io.split.client.HttpSegmentChangeFetcher;
4+
import io.split.engine.SDKReadinessGates;
35
import io.split.engine.experiments.RefreshableSplitFetcher;
46
import io.split.engine.experiments.RefreshableSplitFetcherProvider;
57
import io.split.engine.segments.RefreshableSegmentFetcher;
8+
import io.split.engine.segments.SegmentChangeFetcher;
9+
import org.apache.http.impl.client.CloseableHttpClient;
610
import org.junit.Test;
711
import org.mockito.Mockito;
812

@@ -21,7 +25,7 @@ public void syncAll() throws InterruptedException {
2125
synchronizer.syncAll();
2226

2327
Thread.sleep(100);
24-
Mockito.verify(splitFetcher, Mockito.times(1)).forceRefresh();
28+
Mockito.verify(splitFetcher, Mockito.times(1)).run();
2529
Mockito.verify(segmentFetcher, Mockito.times(1)).forceRefreshAll();
2630
}
2731

client/src/test/java/io/split/engine/sse/workers/SplitsWorkerTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ public void killShouldTriggerFetch() {
6060
}
6161

6262
@Test
63-
public void messagesNotProcesedWhenWorkerStopped() throws InterruptedException {
63+
public void messagesNotProcessedWhenWorkerStopped() throws InterruptedException {
6464
Synchronizer syncMock = Mockito.mock(Synchronizer.class);
6565
SplitsWorker splitsWorker = new SplitsWorkerImp(syncMock);
6666
splitsWorker.start();

0 commit comments

Comments
 (0)