@@ -33,8 +33,8 @@ public synchronized void reset() {
3333
3434 @ Override
3535 public void handleSseStatus (SSEClient .StatusMessage newStatus ) {
36- _log .debug (String .format ("handleSseStatus new status: %s" , newStatus .toString ()));
37- _log . debug ( String . format ( "handleSseStatus current status: %s" , _sseStatus . get (). toString ()));
36+ _log .debug (String .format ("Current status: %s. New status: %s" , _sseStatus . get (). toString () , newStatus .toString ()));
37+
3838 switch (newStatus ) {
3939 case CONNECTED :
4040 if (_sseStatus .compareAndSet (SSEClient .StatusMessage .INITIALIZATION_IN_PROGRESS , SSEClient .StatusMessage .CONNECTED )
@@ -53,6 +53,13 @@ public void handleSseStatus(SSEClient.StatusMessage newStatus) {
5353 _statusMessages .offer (PushManager .Status .STREAMING_OFF );
5454 }
5555 break ;
56+ case FORCED_STOP :
57+ if (_sseStatus .compareAndSet (SSEClient .StatusMessage .CONNECTED , SSEClient .StatusMessage .FORCED_STOP )
58+ || _sseStatus .compareAndSet (SSEClient .StatusMessage .RETRYABLE_ERROR , SSEClient .StatusMessage .FORCED_STOP )
59+ || _sseStatus .compareAndSet (SSEClient .StatusMessage .INITIALIZATION_IN_PROGRESS , SSEClient .StatusMessage .FORCED_STOP )) {
60+ _statusMessages .offer (PushManager .Status .STREAMING_DOWN );
61+ }
62+ break ;
5663 case INITIALIZATION_IN_PROGRESS : // Restore initial status
5764 reset ();
5865 break ;
@@ -62,6 +69,7 @@ public void handleSseStatus(SSEClient.StatusMessage newStatus) {
6269 @ Override
6370 public void handleIncomingControlEvent (ControlNotification controlNotification ) {
6471 _log .debug (String .format ("handleIncomingOccupancyEvent: %s" , controlNotification .getControlType ()));
72+
6573 if (_backendStatus .get ().equals (ControlType .STREAMING_DISABLED )) {
6674 return ;
6775 }
@@ -88,6 +96,7 @@ public void handleIncomingControlEvent(ControlNotification controlNotification)
8896 @ Override
8997 public void handleIncomingOccupancyEvent (OccupancyNotification occupancyNotification ) {
9098 _log .debug (String .format ("handleIncomingOccupancyEvent: publishers=%d" , occupancyNotification .getMetrics ().getPublishers ()));
99+
91100 int publishers = occupancyNotification .getMetrics ().getPublishers ();
92101 if (publishers <= 0 && _publishersOnline .compareAndSet (true , false ) && _backendStatus .get ().equals (ControlType .STREAMING_RESUMED )) {
93102 _statusMessages .offer (PushManager .Status .STREAMING_DOWN );
@@ -99,6 +108,7 @@ public void handleIncomingOccupancyEvent(OccupancyNotification occupancyNotifica
99108 @ Override
100109 public void handleIncomingAblyError (ErrorNotification notification ) {
101110 _log .debug (String .format ("handleIncomingAblyError: %s" , notification .getMessage ()));
111+
102112 if (_backendStatus .get ().equals (ControlType .STREAMING_DISABLED )) {
103113 return ; // Ignore
104114 }
@@ -113,6 +123,7 @@ public void handleIncomingAblyError(ErrorNotification notification) {
113123 @ Override
114124 public synchronized void forcePushDisable () {
115125 _log .debug ("forcePushDisable" );
126+
116127 _publishersOnline .set (false );
117128 _sseStatus .set (SSEClient .StatusMessage .INITIALIZATION_IN_PROGRESS );
118129 _backendStatus .set (ControlType .STREAMING_DISABLED );
0 commit comments