Skip to content

Commit b3217be

Browse files
committed
updated approach
1 parent 36a22c6 commit b3217be

File tree

6 files changed

+112
-106
lines changed

6 files changed

+112
-106
lines changed

client/src/main/java/io/split/engine/common/SynchronizerImp.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ public SynchronizerImp(RefreshableSplitFetcherProvider refreshableSplitFetcherPr
3838
@Override
3939
public void syncAll() {
4040
_syncAllScheduledExecutorService.schedule(() -> {
41-
_splitFetcher.forceRefresh();
41+
_splitFetcher.run();
4242
_segmentFetcher.forceRefreshAll();
4343
}, 0, TimeUnit.SECONDS);
4444
}

client/src/main/java/io/split/engine/experiments/RefreshableSplitFetcher.java

Lines changed: 91 additions & 77 deletions
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,23 @@ public RefreshableSplitFetcher(SplitChangeFetcher splitChangeFetcher, SplitParse
8282

8383
@Override
8484
public void forceRefresh() {
85-
run();
85+
_log.debug("Force Refresh splits starting ...");
86+
try {
87+
while (true) {
88+
long start = _changeNumber.get();
89+
runWithoutExceptionHandling();
90+
long end = _changeNumber.get();
91+
92+
if (start >= end) {
93+
break;
94+
}
95+
}
96+
} catch (InterruptedException e) {
97+
_log.warn("Interrupting split fetcher task");
98+
Thread.currentThread().interrupt();
99+
} catch (Throwable t) {
100+
_log.error("RefreshableSplitFetcher failed: " + t.getMessage());
101+
}
86102
}
87103

88104
@Override
@@ -160,103 +176,101 @@ public void run() {
160176
}
161177

162178
public void runWithoutExceptionHandling() throws InterruptedException {
163-
while (true) {
164-
SplitChange change = _splitChangeFetcher.fetch(_changeNumber.get());
179+
SplitChange change = _splitChangeFetcher.fetch(_changeNumber.get());
165180

166-
if (change == null) {
167-
throw new IllegalStateException("SplitChange was null");
168-
}
181+
if (change == null) {
182+
throw new IllegalStateException("SplitChange was null");
183+
}
169184

170-
if (change.till == _changeNumber.get()) {
171-
// no change.
172-
break;
173-
}
185+
if (change.till == _changeNumber.get()) {
186+
// no change.
187+
return;
188+
}
174189

175-
if (change.since != _changeNumber.get() || change.till < _changeNumber.get()) {
190+
if (change.since != _changeNumber.get() || change.till < _changeNumber.get()) {
191+
// some other thread may have updated the shared state. exit
192+
return;
193+
}
194+
195+
if (change.splits.isEmpty()) {
196+
// there are no changes. weird!
197+
_changeNumber.set(change.till);
198+
return;
199+
}
200+
201+
synchronized (_lock) {
202+
// check state one more time.
203+
if (change.since != _changeNumber.get()
204+
|| change.till < _changeNumber.get()) {
176205
// some other thread may have updated the shared state. exit
177-
break;
206+
return;
178207
}
179208

180-
if (change.splits.isEmpty()) {
181-
// there are no changes. weird!
182-
_changeNumber.set(change.till);
183-
break;
184-
}
209+
Set<String> toRemove = Sets.newHashSet();
210+
Map<String, ParsedSplit> toAdd = Maps.newHashMap();
211+
List<String> trafficTypeNamesToRemove = Lists.newArrayList();
212+
List<String> trafficTypeNamesToAdd = Lists.newArrayList();
185213

186-
synchronized (_lock) {
187-
// check state one more time.
188-
if (change.since != _changeNumber.get()
189-
|| change.till < _changeNumber.get()) {
190-
// some other thread may have updated the shared state. exit
191-
break;
214+
for (Split split : change.splits) {
215+
if (Thread.currentThread().isInterrupted()) {
216+
throw new InterruptedException();
192217
}
193218

194-
Set<String> toRemove = Sets.newHashSet();
195-
Map<String, ParsedSplit> toAdd = Maps.newHashMap();
196-
List<String> trafficTypeNamesToRemove = Lists.newArrayList();
197-
List<String> trafficTypeNamesToAdd = Lists.newArrayList();
198-
199-
for (Split split : change.splits) {
200-
if (Thread.currentThread().isInterrupted()) {
201-
throw new InterruptedException();
202-
}
203-
204-
if (split.status != Status.ACTIVE) {
205-
// archive.
206-
toRemove.add(split.name);
207-
if (split.trafficTypeName != null) {
208-
trafficTypeNamesToRemove.add(split.trafficTypeName);
209-
}
210-
continue;
211-
}
212-
213-
ParsedSplit parsedSplit = _parser.parse(split);
214-
if (parsedSplit == null) {
215-
_log.info("We could not parse the experiment definition for: " + split.name + " so we are removing it completely to be careful");
216-
toRemove.add(split.name);
217-
if (split.trafficTypeName != null) {
218-
trafficTypeNamesToRemove.add(split.trafficTypeName);
219-
}
220-
continue;
221-
}
222-
223-
toAdd.put(split.name, parsedSplit);
224-
225-
// If the split already exists, this is either an update, or the split has been
226-
// deleted and recreated (possibly with a different traffic type).
227-
// If it's an update, the traffic type should NOT be increased.
228-
// If it's deleted & recreated, the old one should be decreased and the new one increased.
229-
// To handle both cases, we simply delete the old one if the split is present.
230-
// The new one is always increased.
231-
ParsedSplit current = _concurrentMap.get(split.name);
232-
if (current != null && current.trafficTypeName() != null) {
233-
trafficTypeNamesToRemove.add(current.trafficTypeName());
219+
if (split.status != Status.ACTIVE) {
220+
// archive.
221+
toRemove.add(split.name);
222+
if (split.trafficTypeName != null) {
223+
trafficTypeNamesToRemove.add(split.trafficTypeName);
234224
}
225+
continue;
226+
}
235227

228+
ParsedSplit parsedSplit = _parser.parse(split);
229+
if (parsedSplit == null) {
230+
_log.info("We could not parse the experiment definition for: " + split.name + " so we are removing it completely to be careful");
231+
toRemove.add(split.name);
236232
if (split.trafficTypeName != null) {
237-
trafficTypeNamesToAdd.add(split.trafficTypeName);
233+
trafficTypeNamesToRemove.add(split.trafficTypeName);
238234
}
235+
continue;
239236
}
240237

241-
_concurrentMap.putAll(toAdd);
242-
_concurrentTrafficTypeNameSet.addAll(trafficTypeNamesToAdd);
243-
//removeAll does not work here, since it wont remove all the occurrences, just one
244-
Multisets.removeOccurrences(_concurrentTrafficTypeNameSet, trafficTypeNamesToRemove);
245-
246-
for (String remove : toRemove) {
247-
_concurrentMap.remove(remove);
238+
toAdd.put(split.name, parsedSplit);
239+
240+
// If the split already exists, this is either an update, or the split has been
241+
// deleted and recreated (possibly with a different traffic type).
242+
// If it's an update, the traffic type should NOT be increased.
243+
// If it's deleted & recreated, the old one should be decreased and the new one increased.
244+
// To handle both cases, we simply delete the old one if the split is present.
245+
// The new one is always increased.
246+
ParsedSplit current = _concurrentMap.get(split.name);
247+
if (current != null && current.trafficTypeName() != null) {
248+
trafficTypeNamesToRemove.add(current.trafficTypeName());
248249
}
249250

250-
if (!toAdd.isEmpty()) {
251-
_log.debug("Updated features: " + toAdd.keySet());
251+
if (split.trafficTypeName != null) {
252+
trafficTypeNamesToAdd.add(split.trafficTypeName);
252253
}
254+
}
253255

254-
if (!toRemove.isEmpty()) {
255-
_log.debug("Deleted features: " + toRemove);
256-
}
256+
_concurrentMap.putAll(toAdd);
257+
_concurrentTrafficTypeNameSet.addAll(trafficTypeNamesToAdd);
258+
//removeAll does not work here, since it wont remove all the occurrences, just one
259+
Multisets.removeOccurrences(_concurrentTrafficTypeNameSet, trafficTypeNamesToRemove);
257260

258-
_changeNumber.set(change.till);
261+
for (String remove : toRemove) {
262+
_concurrentMap.remove(remove);
259263
}
264+
265+
if (!toAdd.isEmpty()) {
266+
_log.debug("Updated features: " + toAdd.keySet());
267+
}
268+
269+
if (!toRemove.isEmpty()) {
270+
_log.debug("Deleted features: " + toRemove);
271+
}
272+
273+
_changeNumber.set(change.till);
260274
}
261275
}
262276
}

client/src/main/java/io/split/engine/segments/RefreshableSegment.java

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,20 @@ public boolean contains(String key) {
4545

4646
@Override
4747
public void forceRefresh() {
48-
run();
48+
try {
49+
_log.debug("Force Refresh segment starting ...");
50+
while (true) {
51+
long start = _changeNumber.get();
52+
runWithoutExceptionHandling();
53+
long end = _changeNumber.get();
54+
55+
if (start >= end) {
56+
break;
57+
}
58+
}
59+
} catch (Throwable t) {
60+
_log.error("forceRefresh segment failed: " + t.getMessage());
61+
}
4962
}
5063

5164
@Override

client/src/test/java/io/split/SplitMockServer.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,12 +46,14 @@ public MockResponse dispatch(RecordedRequest request) {
4646
return new MockResponse().setBody(inputStreamToString("splits2.json"));
4747
case "/api/splitChanges?since=1585948850111":
4848
return new MockResponse().setBody(inputStreamToString("splits_killed.json"));
49+
case "/api/splitChanges?since=1585948850112":
50+
return new MockResponse().setBody("{\"splits\": [], \"since\":1585948850112, \"till\":1585948850112}");
4951
case "/api/segmentChanges/segment-test?since=-1":
5052
return new MockResponse().setBody("{\"name\": \"segment3\",\"added\": [],\"removed\": [],\"since\": -1,\"till\": -1}");
5153
case "/api/segmentChanges/segment3?since=-1":
5254
return new MockResponse().setBody(inputStreamToString("segment3.json"));
5355
case "/api/segmentChanges/segment3?since=1585948850110":
54-
return new MockResponse().setBody("{\"name\": \"segment3\",\"added\": [],\"removed\": [],\"since\": 1585948850110,\"till\": 1585948850111}");
56+
return new MockResponse().setBody("{\"name\": \"segment3\",\"added\": [],\"removed\": [],\"since\": 1585948850110,\"till\": 1585948850110}");
5557
case "/api/segmentChanges/segment3?since=1585948850111":
5658
return new MockResponse().setBody("{\"name\": \"segment3\",\"added\": [],\"removed\": [],\"since\": 1585948850111,\"till\": 1585948850111}");
5759
case "/api/metrics/time":

client/src/test/java/io/split/client/SplitClientIntegrationTest.java

Lines changed: 2 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -402,7 +402,7 @@ public void splitClientMultiFactory() throws IOException, TimeoutException, Inte
402402

403403
Awaitility.await()
404404
.atMost(50L, TimeUnit.SECONDS)
405-
.until(() -> "after_notification_received".equals(client1.getTreatment("admin", "push_test")));
405+
.until(() -> "split_killed".equals(client1.getTreatment("admin", "push_test")));
406406

407407
Awaitility.await()
408408
.atMost(50L, TimeUnit.SECONDS)
@@ -420,30 +420,7 @@ public void splitClientMultiFactory() throws IOException, TimeoutException, Inte
420420

421421
Awaitility.await()
422422
.atMost(50L, TimeUnit.SECONDS)
423-
.until(() -> "after_notification_received".equals(client1.getTreatment("admin", "push_test")));
424-
425-
Awaitility.await()
426-
.atMost(50L, TimeUnit.SECONDS)
427-
.until(() -> "on_whitelist".equals(client2.getTreatment("admin", "push_test")));
428-
429-
Awaitility.await()
430-
.atMost(50L, TimeUnit.SECONDS)
431-
.until(() -> "after_notification_received".equals(client3.getTreatment("admin", "push_test")));
432-
433-
Awaitility.await()
434-
.atMost(50L, TimeUnit.SECONDS)
435-
.until(() -> "on_whitelist".equals(client4.getTreatment("admin", "push_test")));
436-
437-
OutboundSseEvent sseEventSplitUpdate3 = new OutboundEvent
438-
.Builder()
439-
.name("message")
440-
.data("{\"id\":\"22\",\"clientId\":\"22\",\"timestamp\":1592590436082,\"encoding\":\"json\",\"channel\":\"xxxx_xxxx_splits\",\"data\":\"{\\\"type\\\":\\\"SPLIT_UPDATE\\\",\\\"changeNumber\\\":1585948850112}\"}")
441-
.build();
442-
eventQueue3.push(sseEventSplitUpdate3);
443-
444-
Awaitility.await()
445-
.atMost(50L, TimeUnit.SECONDS)
446-
.until(() -> "after_notification_received".equals(client1.getTreatment("admin", "push_test")));
423+
.until(() -> "split_killed".equals(client1.getTreatment("admin", "push_test")));
447424

448425
Awaitility.await()
449426
.atMost(50L, TimeUnit.SECONDS)

client/src/test/java/io/split/engine/common/SynchronizerTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ public void syncAll() throws InterruptedException {
2525
synchronizer.syncAll();
2626

2727
Thread.sleep(100);
28-
Mockito.verify(splitFetcher, Mockito.times(1)).forceRefresh();
28+
Mockito.verify(splitFetcher, Mockito.times(1)).run();
2929
Mockito.verify(segmentFetcher, Mockito.times(1)).forceRefreshAll();
3030
}
3131

0 commit comments

Comments
 (0)