Skip to content

Commit 3f3c0fc

Browse files
committed
fixed fetchers retry until changeNumber > cahce.ChangeNumber
1 parent 9feea18 commit 3f3c0fc

File tree

5 files changed

+58
-7
lines changed

5 files changed

+58
-7
lines changed

client/src/main/java/io/split/engine/common/SynchronizerImp.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -58,8 +58,8 @@ public void stopPeriodicFetching() {
5858
}
5959

6060
@Override
61-
public void refreshSplits(long targetChangeNumber) {
62-
if (targetChangeNumber > _splitFetcher.changeNumber()) {
61+
public synchronized void refreshSplits(long targetChangeNumber) {
62+
while (targetChangeNumber > _splitFetcher.changeNumber()) {
6363
_splitFetcher.forceRefresh();
6464
}
6565
}
@@ -68,12 +68,13 @@ public void refreshSplits(long targetChangeNumber) {
6868
public void localKillSplit(String splitName, String defaultTreatment, long newChangeNumber) {
6969
if (newChangeNumber > _splitFetcher.changeNumber()) {
7070
_splitFetcher.killSplit(splitName, defaultTreatment, newChangeNumber);
71+
refreshSplits(newChangeNumber);
7172
}
7273
}
7374

7475
@Override
75-
public void refreshSegment(String segmentName, long changeNumber) {
76-
if (changeNumber > _segmentFetcher.getChangeNumber(segmentName)) {
76+
public synchronized void refreshSegment(String segmentName, long changeNumber) {
77+
while (changeNumber > _segmentFetcher.getChangeNumber(segmentName)) {
7778
_segmentFetcher.forceRefresh(segmentName);
7879
}
7980
}

client/src/main/java/io/split/engine/sse/EventSourceClientImp.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -103,7 +103,7 @@ private void onMessage(RawEvent event) {
103103
} catch (EventParsingException ex) {
104104
_log.debug(String.format("Error parsing the event: %s. Payload: %s", ex.getMessage(), ex.getPayload()));
105105
} catch (Exception e) {
106-
_log.warn(String.format("Error onMessage: %s", e.getMessage()));
106+
_log.debug(String.format("Error onMessage: %s", e.getMessage()));
107107
}
108108
}
109109
}

client/src/test/java/io/split/SplitMockServer.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,9 @@ public MockResponse dispatch(RecordedRequest request) {
5151
case "/api/segmentChanges/segment3?since=-1":
5252
return new MockResponse().setBody(inputStreamToString("segment3.json"));
5353
case "/api/segmentChanges/segment3?since=1585948850110":
54-
return new MockResponse().setBody("{\"name\": \"segment3\",\"added\": [],\"removed\": [],\"since\": 1585948850110,\"till\": 1585948850110}");
54+
return new MockResponse().setBody("{\"name\": \"segment3\",\"added\": [],\"removed\": [],\"since\": 1585948850110,\"till\": 1585948850111}");
55+
case "/api/segmentChanges/segment3?since=1585948850111":
56+
return new MockResponse().setBody("{\"name\": \"segment3\",\"added\": [],\"removed\": [],\"since\": 1585948850111,\"till\": 1585948850111}");
5557
case "/api/metrics/time":
5658
case "api/metrics/counter":
5759
return new MockResponse().setResponseCode(200);

client/src/test/java/io/split/engine/common/SynchronizerTest.java

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,12 @@
11
package io.split.engine.common;
22

3+
import io.split.client.HttpSegmentChangeFetcher;
4+
import io.split.engine.SDKReadinessGates;
35
import io.split.engine.experiments.RefreshableSplitFetcher;
46
import io.split.engine.experiments.RefreshableSplitFetcherProvider;
57
import io.split.engine.segments.RefreshableSegmentFetcher;
8+
import io.split.engine.segments.SegmentChangeFetcher;
9+
import org.apache.http.impl.client.CloseableHttpClient;
610
import org.junit.Test;
711
import org.mockito.Mockito;
812

@@ -56,4 +60,48 @@ public void stopPeriodicFetching() {
5660
Mockito.verify(refreshableSplitFetcherProvider, Mockito.times(1)).stop();
5761
Mockito.verify(segmentFetcher, Mockito.times(1)).stop();
5862
}
63+
64+
@Test
65+
public void refreshSplits() {
66+
RefreshableSplitFetcherProvider refreshableSplitFetcherProvider = Mockito.mock(RefreshableSplitFetcherProvider.class);
67+
RefreshableSegmentFetcher segmentFetcher = Mockito.mock(RefreshableSegmentFetcher.class);
68+
RefreshableSplitFetcher splitFetcher = Mockito.mock(RefreshableSplitFetcher.class);
69+
70+
Mockito.when(refreshableSplitFetcherProvider.getFetcher())
71+
.thenReturn(splitFetcher);
72+
73+
Mockito.when(splitFetcher.changeNumber())
74+
.thenReturn(450L)
75+
.thenReturn(460L)
76+
.thenReturn(470L)
77+
.thenReturn(480L)
78+
.thenReturn(500L);
79+
80+
Synchronizer synchronizer = new SynchronizerImp(refreshableSplitFetcherProvider, segmentFetcher);
81+
synchronizer.refreshSplits(500);
82+
83+
Mockito.verify(splitFetcher, Mockito.times(4)).forceRefresh();
84+
}
85+
86+
@Test
87+
public void refreshSegment() {
88+
RefreshableSplitFetcherProvider refreshableSplitFetcherProvider = Mockito.mock(RefreshableSplitFetcherProvider.class);
89+
RefreshableSegmentFetcher segmentFetcher = Mockito.mock(RefreshableSegmentFetcher.class);
90+
RefreshableSplitFetcher splitFetcher = Mockito.mock(RefreshableSplitFetcher.class);
91+
92+
Mockito.when(refreshableSplitFetcherProvider.getFetcher())
93+
.thenReturn(splitFetcher);
94+
95+
Mockito.when(segmentFetcher.getChangeNumber("segment-name"))
96+
.thenReturn(450L)
97+
.thenReturn(460L)
98+
.thenReturn(470L)
99+
.thenReturn(480L)
100+
.thenReturn(500L);
101+
102+
Synchronizer synchronizer = new SynchronizerImp(refreshableSplitFetcherProvider, segmentFetcher);
103+
synchronizer.refreshSegment("segment-name", 500);
104+
105+
Mockito.verify(segmentFetcher, Mockito.times(4)).forceRefresh("segment-name");
106+
}
59107
}

client/src/test/java/io/split/engine/sse/workers/SplitsWorkerTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ public void killShouldTriggerFetch() {
6060
}
6161

6262
@Test
63-
public void messagesNotProcesedWhenWorkerStopped() throws InterruptedException {
63+
public void messagesNotProcessedWhenWorkerStopped() throws InterruptedException {
6464
Synchronizer syncMock = Mockito.mock(Synchronizer.class);
6565
SplitsWorker splitsWorker = new SplitsWorkerImp(syncMock);
6666
splitsWorker.start();

0 commit comments

Comments
 (0)