11package io .split .engine .sse .client ;
22
33import com .google .common .base .Strings ;
4+ import com .google .common .util .concurrent .ThreadFactoryBuilder ;
45import org .apache .hc .client5 .http .classic .methods .HttpGet ;
5- import org .apache .hc .client5 .http .config .RequestConfig ;
66import org .apache .hc .client5 .http .impl .classic .CloseableHttpClient ;
77import org .apache .hc .client5 .http .impl .classic .CloseableHttpResponse ;
8- import org .apache .hc .client5 .http .impl .classic .HttpClients ;
9- import org .apache .hc .client5 .http .impl .io .PoolingHttpClientConnectionManager ;
10- import org .apache .hc .core5 .util .Timeout ;
118import org .slf4j .Logger ;
129import org .slf4j .LoggerFactory ;
1310
1916import java .net .SocketException ;
2017import java .net .URI ;
2118import java .util .concurrent .CountDownLatch ;
19+ import java .util .concurrent .ExecutorService ;
20+ import java .util .concurrent .Executors ;
2221import java .util .concurrent .TimeUnit ;
2322import java .util .concurrent .atomic .AtomicReference ;
2423import java .util .function .Function ;
@@ -45,11 +44,16 @@ private enum ConnectionState {
4544 private final static long CONNECT_TIMEOUT = 30000 ;
4645 private static final Logger _log = LoggerFactory .getLogger (SSEClient .class );
4746
47+ private final ExecutorService _connectionExecutor = Executors .newSingleThreadExecutor (new ThreadFactoryBuilder ()
48+ .setDaemon (true )
49+ .setNameFormat ("SPLIT-SSEConnection-%d" )
50+ .build ());
4851 private final CloseableHttpClient _client ;
4952 private final Function <RawEvent , Void > _eventCallback ;
5053 private final Function <StatusMessage , Void > _statusCallback ;
5154 private final AtomicReference <ConnectionState > _state = new AtomicReference <>(ConnectionState .CLOSED );
5255 private final AtomicReference <CloseableHttpResponse > _ongoingResponse = new AtomicReference <>();
56+ private final AtomicReference <HttpGet > _ongoingRequest = new AtomicReference <>();
5357
5458 public SSEClient (Function <RawEvent , Void > eventCallback ,
5559 Function <StatusMessage , Void > statusCallback ,
@@ -61,23 +65,21 @@ public SSEClient(Function<RawEvent, Void> eventCallback,
6165
6266 public synchronized boolean open (URI uri ) {
6367 if (isOpen ()) {
64- _log .debug ("SSEClient already open." );
68+ _log .info ("SSEClient already open." );
6569 return false ;
6670 }
6771
6872 _statusCallback .apply (StatusMessage .INITIALIZATION_IN_PROGRESS );
6973
7074 CountDownLatch signal = new CountDownLatch (1 );
71- Thread thread = new Thread (() -> connectAndLoop (uri , signal ));
72- thread .setDaemon (true );
73- thread .start ();
75+ _connectionExecutor .submit (() -> connectAndLoop (uri , signal ));
7476 try {
7577 if (!signal .await (CONNECT_TIMEOUT , TimeUnit .SECONDS )) {
7678 return false ;
77- };
79+ }
7880 } catch (InterruptedException e ) {
7981 Thread .currentThread ().interrupt ();
80- _log .debug (e .getMessage ());
82+ _log .info (e .getMessage ());
8183 return false ;
8284 }
8385 return isOpen ();
@@ -91,9 +93,10 @@ public synchronized void close() {
9193 if (_state .compareAndSet (ConnectionState .OPEN , ConnectionState .CLOSED )) {
9294 if (_ongoingResponse .get () != null ) {
9395 try {
96+ _ongoingRequest .get ().abort ();
9497 _ongoingResponse .get ().close ();
9598 } catch (IOException e ) {
96- _log .debug (String .format ("Error closing SSEClient: %s" , e .getMessage ()));
99+ _log .info (String .format ("Error closing SSEClient: %s" , e .getMessage ()));
97100 }
98101 }
99102 }
@@ -124,7 +127,7 @@ private void connectAndLoop(URI uri, CountDownLatch signal) {
124127 _statusCallback .apply (StatusMessage .RETRYABLE_ERROR );
125128 return ;
126129 } catch (IOException exc ) { // Other type of connection error
127- _log .debug ( exc .getMessage ());
130+ _log .info ( String . format ( "SSE connection ended abruptly: %s. Retying" , exc .getMessage () ));
128131 _statusCallback .apply (StatusMessage .RETRYABLE_ERROR );
129132 return ;
130133 }
@@ -145,17 +148,18 @@ private void connectAndLoop(URI uri, CountDownLatch signal) {
145148 }
146149
147150 private boolean establishConnection (URI uri , CountDownLatch signal ) {
148- HttpGet request = new HttpGet (uri );
151+
152+ _ongoingRequest .set (new HttpGet (uri ));
149153
150154 try {
151- _ongoingResponse .set (_client .execute (request ));
155+ _ongoingResponse .set (_client .execute (_ongoingRequest . get () ));
152156 if (_ongoingResponse .get ().getCode () != 200 ) {
153157 return false ;
154158 }
155159 _state .set (ConnectionState .OPEN );
156160 _statusCallback .apply (StatusMessage .CONNECTED );
157161 } catch (IOException exc ) {
158- _log .debug (String .format ("Error establishConnection: %s" , exc ));
162+ _log .error (String .format ("Error establishConnection: %s" , exc ));
159163 return false ;
160164 } finally {
161165 signal .countDown ();
0 commit comments