Skip to content

Commit 1551fcf

Browse files
committed
sseClient improvements
1 parent 8b4db01 commit 1551fcf

File tree

2 files changed

+17
-13
lines changed

2 files changed

+17
-13
lines changed

client/src/main/java/io/split/engine/sse/PushStatusTrackerImp.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ public class PushStatusTrackerImp implements PushStatusTracker {
1717
private static final Logger _log = LoggerFactory.getLogger(PushStatusTracker.class);
1818

1919
private final AtomicBoolean _publishersOnline = new AtomicBoolean(true);
20-
private final AtomicReference<SSEClient.StatusMessage> _sseStatus = new AtomicReference<>(SSEClient.StatusMessage.DISCONNECTED);
20+
private final AtomicReference<SSEClient.StatusMessage> _sseStatus = new AtomicReference<>(SSEClient.StatusMessage.INITIALIZATION_IN_PROGRESS);
2121
private final AtomicReference<ControlType> _backendStatus = new AtomicReference<>(ControlType.STREAMING_RESUMED);
2222
private final LinkedBlockingQueue<PushManager.Status> _statusMessages;
2323

@@ -27,7 +27,7 @@ public PushStatusTrackerImp(LinkedBlockingQueue<PushManager.Status> statusMessag
2727

2828
public synchronized void reset() {
2929
_publishersOnline.set(true);
30-
_sseStatus.set(SSEClient.StatusMessage.DISCONNECTED);
30+
_sseStatus.set(SSEClient.StatusMessage.INITIALIZATION_IN_PROGRESS);
3131
_backendStatus.set(ControlType.STREAMING_RESUMED);
3232
}
3333

@@ -37,7 +37,7 @@ public void handleSseStatus(SSEClient.StatusMessage newStatus) {
3737
_log.debug(String.format("handleSseStatus current status: %s", _sseStatus.get().toString()));
3838
switch(newStatus) {
3939
case CONNECTED:
40-
if (_sseStatus.compareAndSet(SSEClient.StatusMessage.DISCONNECTED, SSEClient.StatusMessage.CONNECTED)
40+
if (_sseStatus.compareAndSet(SSEClient.StatusMessage.INITIALIZATION_IN_PROGRESS, SSEClient.StatusMessage.CONNECTED)
4141
|| _sseStatus.compareAndSet(SSEClient.StatusMessage.RETRYABLE_ERROR, SSEClient.StatusMessage.CONNECTED)) {
4242
_statusMessages.offer(PushManager.Status.STREAMING_READY);
4343
}
@@ -53,7 +53,7 @@ public void handleSseStatus(SSEClient.StatusMessage newStatus) {
5353
_statusMessages.offer(PushManager.Status.STREAMING_OFF);
5454
}
5555
break;
56-
case DISCONNECTED: // Restore initial status
56+
case INITIALIZATION_IN_PROGRESS: // Restore initial status
5757
reset();
5858
break;
5959
}
@@ -114,7 +114,7 @@ public void handleIncomingAblyError(ErrorNotification notification) {
114114
public synchronized void forcePushDisable() {
115115
_log.debug("forcePushDisable");
116116
_publishersOnline.set(false);
117-
_sseStatus.set(SSEClient.StatusMessage.DISCONNECTED);
117+
_sseStatus.set(SSEClient.StatusMessage.INITIALIZATION_IN_PROGRESS);
118118
_backendStatus.set(ControlType.STREAMING_DISABLED);
119119
_statusMessages.offer(PushManager.Status.STREAMING_OFF);
120120
}

client/src/main/java/io/split/engine/sse/client/SSEClient.java

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
package io.split.engine.sse.client;
22

33
import com.google.common.base.Strings;
4-
import io.split.engine.sse.EventSourceClient;
4+
import io.split.engine.sse.exceptions.EventParsingException;
55
import org.apache.http.client.config.RequestConfig;
66
import org.apache.http.client.methods.CloseableHttpResponse;
77
import org.apache.http.client.methods.HttpGet;
@@ -32,7 +32,7 @@ public enum StatusMessage {
3232
CONNECTED,
3333
RETRYABLE_ERROR,
3434
NONRETRYABLE_ERROR,
35-
DISCONNECTED
35+
INITIALIZATION_IN_PROGRESS
3636
}
3737

3838
private enum ConnectionState {
@@ -63,6 +63,8 @@ public synchronized boolean open(URI uri) {
6363
return false;
6464
}
6565

66+
_statusCallback.apply(StatusMessage.INITIALIZATION_IN_PROGRESS);
67+
6668
CountDownLatch signal = new CountDownLatch(1);
6769
Thread thread = new Thread(() -> connectAndLoop(uri, signal));
6870
thread.setDaemon(true);
@@ -109,29 +111,31 @@ private void connectAndLoop(URI uri, CountDownLatch signal) {
109111
handleMessage(readMessageAsString(reader));
110112
} catch (EOFException exc) {
111113
// This is when ably closes the connection on their end. IE: an invalid or expired token.
112-
// Evaluate if we should send the DISCONNECTED event or not.
113-
_statusCallback.apply(StatusMessage.DISCONNECTED);
114+
_statusCallback.apply(StatusMessage.RETRYABLE_ERROR);
114115
return;
115116
} catch (SocketTimeoutException exc) { // KeepAlive expired
116117
_statusCallback.apply(StatusMessage.RETRYABLE_ERROR);
117118
} catch (SocketException exc) { // Connection closed by us
118119
if ("Socket closed".equals(exc.getMessage())) {
119-
_statusCallback.apply(StatusMessage.DISCONNECTED);
120+
_statusCallback.apply(StatusMessage.NONRETRYABLE_ERROR);
120121
return;
121122
}
123+
122124
throw exc; // If it's not a socket closed (caused by us), rethrow the exception
123125
}
124126
}
125-
} catch (IOException e) {
126-
_log.warn(e.getMessage());
127+
} catch (Exception e) {
128+
_log.error(e.getMessage());
127129
_statusCallback.apply(StatusMessage.NONRETRYABLE_ERROR);
128-
} finally {
130+
}
131+
finally {
129132
try {
130133
_ongoingResponse.get().close();
131134
} catch (IOException e) {
132135
_log.warn(e.getMessage());
133136
}
134137

138+
_state.set(ConnectionState.CLOSED);
135139
_log.warn("SSEClient finished.");
136140
}
137141
}

0 commit comments

Comments
 (0)