Skip to content

Commit ce08d80

Browse files
Pulling develompent's changes
1 parent e10449f commit ce08d80

File tree

4 files changed

+7
-6
lines changed

4 files changed

+7
-6
lines changed

client/src/main/java/io/split/engine/common/PushManagerImp.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,6 @@ public static PushManagerImp build(Synchronizer synchronizer,
6262
String streamingUrl,
6363
String authUrl,
6464
CloseableHttpClient httpClient,
65-
int authRetryBackOffBase,
6665
LinkedBlockingQueue<PushManager.Status> statusMessages,
6766
CloseableHttpClient sseHttpClient) {
6867
SplitsWorker splitsWorker = new SplitsWorkerImp(synchronizer);
@@ -104,7 +103,6 @@ public synchronized void stop() {
104103

105104
@Override
106105
public synchronized void scheduleConnectionReset() {
107-
_expirationTime = 120l;
108106
_log.debug(String.format("scheduleNextTokenRefresh in %s SECONDS", _expirationTime));
109107
_nextTokenRefreshTask = _scheduledExecutorService.schedule(() -> {
110108
_log.debug("Starting scheduleNextTokenRefresh ...");

client/src/main/java/io/split/engine/common/SyncManagerImp.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ public static SyncManagerImp build(boolean streamingEnabledConfig,
6262
SegmentCache segmentCache) {
6363
LinkedBlockingQueue<PushManager.Status> pushMessages = new LinkedBlockingQueue<>();
6464
Synchronizer synchronizer = new SynchronizerImp(splitSynchronizationTask, splitFetcher, segmentSynchronizationTaskImp, splitCache, segmentCache);
65-
PushManager pushManager = PushManagerImp.build(synchronizer, streamingServiceUrl, authUrl, httpClient, authRetryBackOffBase, pushMessages, sseHttpClient);
65+
PushManager pushManager = PushManagerImp.build(synchronizer, streamingServiceUrl, authUrl, httpClient, pushMessages, sseHttpClient);
6666
return new SyncManagerImp(streamingEnabledConfig, synchronizer, pushManager, pushMessages, authRetryBackOffBase);
6767
}
6868

client/src/main/java/io/split/engine/sse/PushStatusTrackerImp.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -144,6 +144,7 @@ public void notifyStreamingReady() {
144144
@Override
145145
public void forceRetryableError() {
146146
_statusMessages.offer(PushManager.Status.STREAMING_BACKOFF);
147+
}
147148

148149
private boolean isPublishers() {
149150
for(Integer publisher : regions.values()) {

client/src/main/java/io/split/engine/sse/client/SSEClient.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -131,9 +131,11 @@ private void connectAndLoop(URI uri, CountDownLatch signal) {
131131
_statusCallback.apply(StatusMessage.RETRYABLE_ERROR);
132132
return;
133133
} catch (IOException exc) { // Other type of connection error
134-
_log.info(String.format("SSE connection ended abruptly: %s. Retrying", exc.getMessage()));
135-
_statusCallback.apply(StatusMessage.RETRYABLE_ERROR);
136-
return;
134+
if(!_forcedStop.get()) {
135+
_log.debug(String.format("SSE connection ended abruptly: %s. Retying", exc.getMessage()));
136+
_statusCallback.apply(StatusMessage.RETRYABLE_ERROR);
137+
return;
138+
}
137139
}
138140
}
139141
} catch (Exception e) { // Any other error non related to the connection disables streaming altogether

0 commit comments

Comments
 (0)