11package io .split .engine .sse .client ;
22
33import com .google .common .base .Strings ;
4+ import io .split .engine .sse .EventSourceClient ;
45import org .apache .http .client .config .RequestConfig ;
56import org .apache .http .client .methods .CloseableHttpResponse ;
67import org .apache .http .client .methods .HttpGet ;
78import org .apache .http .impl .client .CloseableHttpClient ;
89import org .apache .http .impl .client .HttpClients ;
910import org .apache .http .impl .conn .PoolingHttpClientConnectionManager ;
11+ import org .slf4j .Logger ;
12+ import org .slf4j .LoggerFactory ;
1013
1114import java .io .BufferedReader ;
1215import java .io .EOFException ;
@@ -40,6 +43,7 @@ private enum ConnectionState {
4043 private final static String KEEP_ALIVE_PAYLOAD = ":keepalive\n " ;
4144 private final static Integer CONNECT_TIMEOUT = 30000 ;
4245 private final static Integer SOCKET_TIMEOUT = 70000 ;
46+ private static final Logger _log = LoggerFactory .getLogger (SSEClient .class );
4347
4448 private final CloseableHttpClient _client ;
4549 private final Function <RawEvent , Void > _eventCallback ;
@@ -55,7 +59,7 @@ public SSEClient(Function<RawEvent, Void> eventCallback, Function<StatusMessage,
5559
5660 public synchronized boolean open (URI uri ) {
5761 if (isOpen ()) {
58- // TODO: Log:
62+ _log . warn ( "SSEClient already open." );
5963 return false ;
6064 }
6165
@@ -66,7 +70,7 @@ public synchronized boolean open(URI uri) {
6670 try {
6771 signal .await (CONNECT_TIMEOUT , TimeUnit .SECONDS );
6872 } catch (InterruptedException e ) {
69- // TODO: Log
73+ _log . warn ( e . getMessage ());
7074 return false ;
7175 }
7276 return isOpen ();
@@ -82,7 +86,7 @@ public synchronized void close() {
8286 try {
8387 _ongoingResponse .get ().close ();
8488 } catch (IOException e ) {
85- // TODO: Log
89+ _log . warn ( String . format ( "Error closing SSEClient: %s" , e . getMessage ()));
8690 }
8791 }
8892 }
@@ -99,6 +103,7 @@ private void connectAndLoop(URI uri, CountDownLatch signal) {
99103 try {
100104 final InputStream stream = _ongoingResponse .get ().getEntity ().getContent ();
101105 final BufferedReader reader = new BufferedReader (new InputStreamReader (stream ));
106+
102107 while (isOpen () && !Thread .currentThread ().isInterrupted ()) {
103108 try {
104109 handleMessage (readMessageAsString (reader ));
@@ -118,14 +123,16 @@ private void connectAndLoop(URI uri, CountDownLatch signal) {
118123 }
119124 }
120125 } catch (IOException e ) {
121- // TODO: Log
126+ _log . warn ( e . getMessage ());
122127 _statusCallback .apply (StatusMessage .NONRETRYABLE_ERROR );
123- } finally {
128+ } finally {
124129 try {
125130 _ongoingResponse .get ().close ();
126131 } catch (IOException e ) {
127- // TODO: Log
132+ _log . warn ( e . getMessage ());
128133 }
134+
135+ _log .warn ("SSEClient finished." );
129136 }
130137 }
131138
@@ -140,7 +147,7 @@ private boolean establishConnection(URI uri, CountDownLatch signal) {
140147 _state .set (ConnectionState .OPEN );
141148 _statusCallback .apply (StatusMessage .CONNECTED );
142149 } catch (IOException exc ) {
143- // TODO
150+ _log . warn ( String . format ( "Error establishConnection: %s" , exc . getMessage ()));
144151 exc .printStackTrace ();
145152 return false ;
146153 } finally {
@@ -180,8 +187,8 @@ private static CloseableHttpClient buildHttpClient() {
180187 }
181188
182189 private void handleMessage (String message ) {
183- if (KEEP_ALIVE_PAYLOAD .equals (message )) {
184- // TODO: Log keepalive in debug mode?
190+ if (Strings . isNullOrEmpty ( message ) || KEEP_ALIVE_PAYLOAD .equals (message )) {
191+ _log . debug ( "Keep Alive event" );
185192 return ;
186193 }
187194
0 commit comments