11package io .split .engine .sse ;
22
3+ import com .google .common .collect .Maps ;
34import io .split .engine .common .PushManager ;
45import io .split .engine .sse .client .SSEClient ;
56import io .split .engine .sse .dtos .ControlNotification ;
910import org .slf4j .Logger ;
1011import org .slf4j .LoggerFactory ;
1112
13+ import java .util .concurrent .ConcurrentMap ;
1214import java .util .concurrent .LinkedBlockingQueue ;
1315import java .util .concurrent .atomic .AtomicBoolean ;
1416import java .util .concurrent .atomic .AtomicReference ;
@@ -20,6 +22,7 @@ public class PushStatusTrackerImp implements PushStatusTracker {
2022 private final AtomicReference <SSEClient .StatusMessage > _sseStatus = new AtomicReference <>(SSEClient .StatusMessage .INITIALIZATION_IN_PROGRESS );
2123 private final AtomicReference <ControlType > _backendStatus = new AtomicReference <>(ControlType .STREAMING_RESUMED );
2224 private final LinkedBlockingQueue <PushManager .Status > _statusMessages ;
25+ private final ConcurrentMap <String , Integer > regions = Maps .newConcurrentMap ();
2326
2427 public PushStatusTrackerImp (LinkedBlockingQueue <PushManager .Status > statusMessages ) {
2528 _statusMessages = statusMessages ;
@@ -98,9 +101,11 @@ public void handleIncomingOccupancyEvent(OccupancyNotification occupancyNotifica
98101 _log .debug (String .format ("handleIncomingOccupancyEvent: publishers=%d" , occupancyNotification .getMetrics ().getPublishers ()));
99102
100103 int publishers = occupancyNotification .getMetrics ().getPublishers ();
101- if (publishers <= 0 && _publishersOnline .compareAndSet (true , false ) && _backendStatus .get ().equals (ControlType .STREAMING_RESUMED )) {
104+ regions .put (occupancyNotification .getChannel (), publishers );
105+ boolean isPublishers = isPublishers ();
106+ if (!isPublishers && _publishersOnline .compareAndSet (true , false ) && _backendStatus .get ().equals (ControlType .STREAMING_RESUMED )) {
102107 _statusMessages .offer (PushManager .Status .STREAMING_DOWN );
103- } else if (publishers >= 1 && _publishersOnline .compareAndSet (false , true ) && _backendStatus .get ().equals (ControlType .STREAMING_RESUMED )) {
108+ } else if (isPublishers && _publishersOnline .compareAndSet (false , true ) && _backendStatus .get ().equals (ControlType .STREAMING_RESUMED )) {
104109 _statusMessages .offer (PushManager .Status .STREAMING_READY );
105110 }
106111 }
@@ -114,6 +119,7 @@ public void handleIncomingAblyError(ErrorNotification notification) {
114119 }
115120 if (notification .getCode () >= 40140 && notification .getCode () <= 40149 ) {
116121 _statusMessages .offer (PushManager .Status .STREAMING_BACKOFF );
122+ return ;
117123 }
118124 if (notification .getCode () >= 40000 && notification .getCode () <= 49999 ) {
119125 _statusMessages .offer (PushManager .Status .STREAMING_OFF );
@@ -129,4 +135,13 @@ public synchronized void forcePushDisable() {
129135 _backendStatus .set (ControlType .STREAMING_DISABLED );
130136 _statusMessages .offer (PushManager .Status .STREAMING_OFF );
131137 }
138+
139+ private boolean isPublishers () {
140+ for (Integer publisher : regions .values ()) {
141+ if (publisher > 0 ) {
142+ return true ;
143+ }
144+ }
145+ return false ;
146+ }
132147}
0 commit comments