Skip to content

Commit 59af044

Browse files
committed
handle more cases and add some more integration tests
1 parent d8e4608 commit 59af044

File tree

3 files changed

+60
-17
lines changed

3 files changed

+60
-17
lines changed

client/src/main/java/io/split/engine/sse/client/SSEClient.java

Lines changed: 7 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@
1616
import java.io.InputStream;
1717
import java.io.InputStreamReader;
1818
import java.net.SocketException;
19-
import java.net.SocketTimeoutException;
2019
import java.net.URI;
2120
import java.util.concurrent.CountDownLatch;
2221
import java.util.concurrent.TimeUnit;
@@ -40,7 +39,7 @@ private enum ConnectionState {
4039
CLOSED
4140
}
4241

43-
42+
private final static String SOCKET_CLOSED_MESSAGE = "Socket closed";
4443
private final static String KEEP_ALIVE_PAYLOAD = ":keepalive\n";
4544
private final static Integer CONNECT_TIMEOUT = 30000;
4645
private final static Integer SOCKET_TIMEOUT = 70000;
@@ -110,29 +109,21 @@ private void connectAndLoop(URI uri, CountDownLatch signal) {
110109
while (isOpen() && !Thread.currentThread().isInterrupted()) {
111110
try {
112111
handleMessage(readMessageAsString(reader));
113-
} catch (EOFException | SocketTimeoutException exc) {
114-
// EOFException: This is when ably closes the connection on their end. IE: an invalid or expired token.
115-
// SocketTimeoutException: KeepAlive expired
116-
_log.warn(exc.getMessage());
117-
_statusCallback.apply(StatusMessage.RETRYABLE_ERROR);
118-
return;
119112
} catch (SocketException exc) {
120113
_log.warn(exc.getMessage());
121-
122-
if ("Socket closed".equals(exc.getMessage())) {
123-
// Connection closed by us
114+
if (SOCKET_CLOSED_MESSAGE.equals(exc.getMessage())) { // Connection closed by us
124115
_statusCallback.apply(StatusMessage.FORCED_STOP);
125116
return;
126117
}
127-
128118
// Connection closed by server
129119
_statusCallback.apply(StatusMessage.RETRYABLE_ERROR);
130-
return;
120+
} catch (IOException exc) { // Other type of connection error
121+
_log.warn(exc.getMessage());
122+
_statusCallback.apply(StatusMessage.RETRYABLE_ERROR);
131123
}
132124
}
133-
} catch (Exception e) {
134-
_log.error(e.getMessage());
135-
125+
} catch (Exception e) { // Any other error non related to the connection disables streaming altogether
126+
_log.error(e.getMessage(), e);
136127
_statusCallback.apply(StatusMessage.NONRETRYABLE_ERROR);
137128
} finally {
138129
try {

client/src/test/java/io/split/SSEMockServer.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
import org.glassfish.grizzly.utils.Pair;
55
import org.glassfish.hk2.utilities.binding.AbstractBinder;
66
import org.glassfish.jersey.grizzly2.httpserver.GrizzlyHttpServerFactory;
7+
import org.glassfish.jersey.media.sse.OutboundEvent;
78
import org.glassfish.jersey.server.ResourceConfig;
89
import org.jvnet.hk2.annotations.Service;
910

@@ -28,9 +29,12 @@ public class SSEMockServer {
2829
private static final String BASE_URL = "http://localhost:%d";
2930
private final SseEventQueue _queue;
3031
private final Validator _validator;
31-
private AtomicInteger _port;
32+
private final AtomicInteger _port;
3233
private HttpServer _server;
3334

35+
public static final OutboundEvent CONNECTION_CLOSED_BY_REMOTE_HOST = new OutboundEvent.Builder().comment("CCBRH").build();
36+
// public static final OutboundEvent CONNECTION_RESET_EVENT = new OutboundEvent.Builder().comment("RST").build();
37+
3438
public SSEMockServer(SseEventQueue queue, Validator validator) {
3539
_queue = queue;
3640
_validator = validator;
@@ -110,6 +114,10 @@ public void getServerSentEvents(@Context SseEventSink eventSink,
110114
while(!eventSink.isClosed()) {
111115
try {
112116
OutboundSseEvent event = _eventsToSend.pull();
117+
if (CONNECTION_CLOSED_BY_REMOTE_HOST == event) { // Comparing references, no need for .equals()
118+
eventSink.close();
119+
return;
120+
}
113121
eventSink.send(event);
114122
} catch (InterruptedException e) {
115123
break;

client/src/test/java/io/split/client/SplitClientIntegrationTest.java

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -500,6 +500,50 @@ public void keepAlive() throws IOException, TimeoutException, InterruptedExcepti
500500
sseServer.stop();
501501
}
502502

503+
@Test
504+
public void testConnectionClosedByRemoteHostIsProperlyHandled() throws IOException, TimeoutException, InterruptedException, URISyntaxException {
505+
SplitMockServer splitServer = new SplitMockServer();
506+
SSEMockServer.SseEventQueue eventQueue = new SSEMockServer.SseEventQueue();
507+
SSEMockServer sseServer = buildSSEMockServer(eventQueue);
508+
509+
splitServer.start();
510+
sseServer.start();
511+
512+
SplitClientConfig config = buildSplitClientConfig("enabled", splitServer.getUrl(), sseServer.getPort(), true, 50);
513+
SplitFactory factory = SplitFactoryBuilder.build("fake-api-token-1", config);
514+
SplitClient client = factory.client();
515+
client.blockUntilReady();
516+
517+
String result = client.getTreatment("admin", "push_test");
518+
Assert.assertEquals("on_whitelist", result);
519+
eventQueue.push(SSEMockServer.CONNECTION_CLOSED_BY_REMOTE_HOST);
520+
Thread.sleep(1000);
521+
result = client.getTreatment("admin", "push_test");
522+
Assert.assertEquals("on_whitelist", result);
523+
}
524+
525+
@Test
526+
public void testConnectionClosedIsProperlyHandled() throws IOException, TimeoutException, InterruptedException, URISyntaxException {
527+
SplitMockServer splitServer = new SplitMockServer();
528+
SSEMockServer.SseEventQueue eventQueue = new SSEMockServer.SseEventQueue();
529+
SSEMockServer sseServer = buildSSEMockServer(eventQueue);
530+
531+
splitServer.start();
532+
sseServer.start();
533+
534+
SplitClientConfig config = buildSplitClientConfig("enabled", splitServer.getUrl(), sseServer.getPort(), true, 50);
535+
SplitFactory factory = SplitFactoryBuilder.build("fake-api-token-1", config);
536+
SplitClient client = factory.client();
537+
client.blockUntilReady();
538+
539+
String result = client.getTreatment("admin", "push_test");
540+
Assert.assertEquals("on_whitelist", result);
541+
sseServer.stop();
542+
Thread.sleep(1000);
543+
result = client.getTreatment("admin", "push_test");
544+
Assert.assertEquals("on_whitelist", result);
545+
}
546+
503547
private SSEMockServer buildSSEMockServer(SSEMockServer.SseEventQueue eventQueue) {
504548
return new SSEMockServer(eventQueue, (token, version, channel) -> {
505549
if (!"1.1".equals(version)) {

0 commit comments

Comments
 (0)