77import io .split .engine .sse .dtos .ControlType ;
88import io .split .engine .sse .dtos .ErrorNotification ;
99import io .split .engine .sse .dtos .OccupancyNotification ;
10+ import io .split .telemetry .domain .StreamingEvent ;
11+ import io .split .telemetry .domain .enums .StreamEventsEnum ;
12+ import io .split .telemetry .storage .TelemetryRuntimeProducer ;
1013import org .slf4j .Logger ;
1114import org .slf4j .LoggerFactory ;
1215
16+ import java .util .TimeZone ;
1317import java .util .concurrent .ConcurrentMap ;
1418import java .util .concurrent .LinkedBlockingQueue ;
1519import java .util .concurrent .atomic .AtomicBoolean ;
1620import java .util .concurrent .atomic .AtomicReference ;
1721
22+ import static com .google .common .base .Preconditions .checkNotNull ;
23+
1824public class PushStatusTrackerImp implements PushStatusTracker {
1925 private static final Logger _log = LoggerFactory .getLogger (PushStatusTracker .class );
26+ private static final String CONTROL_PRI_CHANNEL = "control_pri" ;
27+ private static final String CONTROL_SEC_CHANNEL = "control_sec" ;
28+ private static final int STREAMING_DISABLED = 0 ;
29+ private static final int STREAMING_ENABLED = 1 ;
30+ private static final int STREAMING_PAUSED = 2 ;
2031
2132 private final AtomicBoolean _publishersOnline = new AtomicBoolean (true );
2233 private final AtomicReference <SSEClient .StatusMessage > _sseStatus = new AtomicReference <>(SSEClient .StatusMessage .INITIALIZATION_IN_PROGRESS );
2334 private final AtomicReference <ControlType > _backendStatus = new AtomicReference <>(ControlType .STREAMING_RESUMED );
2435 private final LinkedBlockingQueue <PushManager .Status > _statusMessages ;
2536 private final ConcurrentMap <String , Integer > regions = Maps .newConcurrentMap ();
2637
27- public PushStatusTrackerImp (LinkedBlockingQueue <PushManager .Status > statusMessages ) {
38+ private final TelemetryRuntimeProducer _telemetryRuntimeProducer ;
39+
40+ public PushStatusTrackerImp (LinkedBlockingQueue <PushManager .Status > statusMessages , TelemetryRuntimeProducer telemetryRuntimeProducer ) {
2841 _statusMessages = statusMessages ;
42+ _telemetryRuntimeProducer = checkNotNull (telemetryRuntimeProducer );
2943 }
3044
3145 private synchronized void reset () {
@@ -42,6 +56,8 @@ public void handleSseStatus(SSEClient.StatusMessage newStatus) {
4256 case FIRST_EVENT :
4357 if (SSEClient .StatusMessage .CONNECTED .equals (_sseStatus .get ())) {
4458 _statusMessages .offer (PushManager .Status .STREAMING_READY );
59+ _telemetryRuntimeProducer .recordStreamingEvents (new StreamingEvent (StreamEventsEnum .STREAMING_STATUS .get_type (), STREAMING_ENABLED , System .currentTimeMillis ()));
60+ _telemetryRuntimeProducer .recordStreamingEvents (new StreamingEvent (StreamEventsEnum .CONNECTION_ESTABLISHED .get_type (),0l , System .currentTimeMillis ()));
4561 }
4662 case CONNECTED :
4763 _sseStatus .compareAndSet (SSEClient .StatusMessage .INITIALIZATION_IN_PROGRESS , SSEClient .StatusMessage .CONNECTED );
@@ -85,12 +101,14 @@ public void handleIncomingControlEvent(ControlNotification controlNotification)
85101 }
86102 break ;
87103 case STREAMING_PAUSED :
104+ _telemetryRuntimeProducer .recordStreamingEvents (new StreamingEvent (StreamEventsEnum .STREAMING_STATUS .get_type (), STREAMING_PAUSED , System .currentTimeMillis ()));
88105 if (_backendStatus .compareAndSet (ControlType .STREAMING_RESUMED , ControlType .STREAMING_PAUSED ) && _publishersOnline .get ()) {
89106 // If there are no publishers online, the STREAMING_DOWN message should have already been sent
90107 _statusMessages .offer (PushManager .Status .STREAMING_DOWN );
91108 }
92109 break ;
93110 case STREAMING_DISABLED :
111+ _telemetryRuntimeProducer .recordStreamingEvents (new StreamingEvent (StreamEventsEnum .STREAMING_STATUS .get_type (), STREAMING_DISABLED , System .currentTimeMillis ()));
94112 _backendStatus .set (ControlType .STREAMING_DISABLED );
95113 _statusMessages .offer (PushManager .Status .STREAMING_OFF );
96114 break ;
@@ -102,6 +120,7 @@ public void handleIncomingOccupancyEvent(OccupancyNotification occupancyNotifica
102120 _log .debug (String .format ("handleIncomingOccupancyEvent: publishers=%d" , occupancyNotification .getMetrics ().getPublishers ()));
103121
104122 int publishers = occupancyNotification .getMetrics ().getPublishers ();
123+ recordTelemetryOcuppancy (occupancyNotification , publishers );
105124 regions .put (occupancyNotification .getChannel (), publishers );
106125 boolean isPublishers = isPublishers ();
107126 if (!isPublishers && _publishersOnline .compareAndSet (true , false ) && _backendStatus .get ().equals (ControlType .STREAMING_RESUMED )) {
@@ -114,7 +133,7 @@ public void handleIncomingOccupancyEvent(OccupancyNotification occupancyNotifica
114133 @ Override
115134 public void handleIncomingAblyError (ErrorNotification notification ) {
116135 _log .debug (String .format ("handleIncomingAblyError: %s" , notification .getMessage ()));
117-
136+ _telemetryRuntimeProducer . recordStreamingEvents ( new StreamingEvent ( StreamEventsEnum . ABLY_ERROR . get_type (), notification . getCode (), System . currentTimeMillis ()));
118137 if (_backendStatus .get ().equals (ControlType .STREAMING_DISABLED )) {
119138 return ; // Ignore
120139 }
@@ -145,4 +164,14 @@ private boolean isPublishers() {
145164 }
146165 return false ;
147166 }
167+
168+ private void recordTelemetryOcuppancy (OccupancyNotification occupancyNotification , int publishers ) {
169+ if (CONTROL_PRI_CHANNEL .equals (occupancyNotification .getChannel ())) {
170+ _telemetryRuntimeProducer .recordStreamingEvents (new StreamingEvent (StreamEventsEnum .OCCUPANCY_PRI .get_type (), publishers , System .currentTimeMillis ()));
171+ }
172+ else if (CONTROL_SEC_CHANNEL .equals (occupancyNotification .getChannel ())){
173+ _telemetryRuntimeProducer .recordStreamingEvents (new StreamingEvent (StreamEventsEnum .OCCUPANCY_SEC .get_type (), publishers , System .currentTimeMillis ()));
174+ }
175+
176+ }
148177}
0 commit comments