Skip to content

Commit 4529f81

Browse files
Fixing PR comments
1 parent a81e171 commit 4529f81

File tree

4 files changed

+9
-6
lines changed

4 files changed

+9
-6
lines changed

client/src/main/java/io/split/engine/common/PushManagerImp.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import java.util.concurrent.ScheduledExecutorService;
2525
import java.util.concurrent.Executors;
2626
import java.util.concurrent.TimeUnit;
27+
import java.util.concurrent.atomic.AtomicLong;
2728

2829
import static com.google.common.base.Preconditions.checkNotNull;
2930

@@ -38,7 +39,7 @@ public class PushManagerImp implements PushManager {
3839

3940
private Future<?> _nextTokenRefreshTask;
4041
private final ScheduledExecutorService _scheduledExecutorService;
41-
private long _expirationTime;
42+
private AtomicLong _expirationTime;
4243

4344
@VisibleForTesting
4445
/* package private */ PushManagerImp(AuthApiClient authApiClient,
@@ -52,6 +53,7 @@ public class PushManagerImp implements PushManager {
5253
_splitsWorker = splitsWorker;
5354
_segmentWorker = segmentWorker;
5455
_pushStatusTracker = pushStatusTracker;
56+
_expirationTime = new AtomicLong();
5557
_scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryBuilder()
5658
.setDaemon(true)
5759
.setNameFormat("Split-SSERefreshToken-%d")
@@ -79,7 +81,7 @@ public synchronized void start() {
7981
AuthenticationResponse response = _authApiClient.Authenticate();
8082
_log.debug(String.format("Auth service response pushEnabled: %s", response.isPushEnabled()));
8183
if (response.isPushEnabled() && startSse(response.getToken(), response.getChannels())) {
82-
_expirationTime = response.getExpiration();
84+
_expirationTime.set(response.getExpiration());
8385
return;
8486
}
8587

@@ -108,7 +110,7 @@ public synchronized void scheduleConnectionReset() {
108110
_log.debug("Starting scheduleNextTokenRefresh ...");
109111
stop();
110112
start();
111-
}, _expirationTime, TimeUnit.SECONDS);
113+
}, _expirationTime.get(), TimeUnit.SECONDS);
112114
}
113115

114116
private boolean startSse(String token, String channels) {

client/src/main/java/io/split/engine/common/SyncManagerImp.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -122,6 +122,7 @@ private void startPollingMode() {
122122
_pushManager.stopWorkers();
123123
_pushManager.stop();
124124
Thread.sleep(howLong);
125+
_incomingPushStatus.clear();
125126
_pushManager.start();
126127
break;
127128
case STREAMING_OFF:

client/src/main/java/io/split/engine/sse/EventSourceClientImp.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
public class EventSourceClientImp implements EventSourceClient {
2222
private static final Logger _log = LoggerFactory.getLogger(EventSourceClient.class);
2323
private static final String ERROR = "error";
24+
private static final String MESSAGE = "message";
2425

2526
private final String _baseStreamingUrl;
2627
private final NotificationParser _notificationParser;
@@ -101,7 +102,7 @@ private void onMessage(RawEvent event) {
101102
if (payload.length() > 0) {
102103
_log.debug(String.format("Payload received: %s", payload));
103104
switch (type) {
104-
case "message":
105+
case MESSAGE:
105106
_notificationProcessor.process(_notificationParser.parseMessage(payload));
106107
break;
107108
case ERROR:

client/src/main/java/io/split/engine/sse/PushStatusTrackerImp.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -41,8 +41,7 @@ public void handleSseStatus(SSEClient.StatusMessage newStatus) {
4141
switch(newStatus) {
4242
case CONNECTED:
4343
if (_sseStatus.compareAndSet(SSEClient.StatusMessage.INITIALIZATION_IN_PROGRESS, SSEClient.StatusMessage.CONNECTED)
44-
|| _sseStatus.compareAndSet(SSEClient.StatusMessage.RETRYABLE_ERROR, SSEClient.StatusMessage.CONNECTED)) {
45-
//_statusMessages.offer(PushManager.Status.STREAMING_READY); //desactivar TODO
44+
|| _sseStatus.compareAndSet(SSEClient.StatusMessage.RETRYABLE_ERROR, SSEClient.StatusMessage.CONNECTED)) {
4645
}
4746
break;
4847
case RETRYABLE_ERROR:

0 commit comments

Comments
 (0)