4040import org .apache .thrift .TException ;
4141import org .apache .thrift .protocol .TCompactProtocol ;
4242import org .apache .thrift .server .TServer ;
43- import org .apache .thrift .server .TThreadPoolServer ;
4443import org .apache .thrift .server .TThreadedSelectorServer ;
4544import org .apache .thrift .transport .TFramedTransport ;
46- import org .apache .thrift .transport .TServerSocket ;
47- import org .apache .thrift .transport .TServerTransport ;
45+ import org .apache .thrift .transport .TNonblockingServerSocket ;
46+ import org .apache .thrift .transport .TNonblockingServerTransport ;
4847import org .apache .thrift .transport .TTransportException ;
4948import org .microfunctions .data_layer .DataLayerService .Iface ;
5049
@@ -66,7 +65,8 @@ public class DataLayerServer implements Iface, Callable<Object> {
6665 private static final List <AbstractMap .SimpleEntry <String , Integer >> NO_KEYSPACES = new ArrayList <AbstractMap .SimpleEntry <String , Integer >>(0 );
6766 private static final List <String > NO_KEYS = new ArrayList <String >(0 );
6867
69- public static final int DEFAULT_MAX_WORKER_THREADS = Integer .MAX_VALUE ;
68+ public static final int DEFAULT_SELECTOR_THREADS = Math .max (2 , 2 * Runtime .getRuntime ().availableProcessors ());
69+ public static final int DEFAULT_WORKER_THREADS = Math .max (4 , 4 * Runtime .getRuntime ().availableProcessors ());
7070 public static final int DEFAULT_CLIENT_TIMEOUT = 0 ;
7171 public static final int DEFAULT_MAX_FRAME_LENGTH = Integer .MAX_VALUE ;
7272
@@ -1625,13 +1625,14 @@ public boolean updateTableTypeCache(String action, String table, String tableTyp
16251625
16261626
16271627 public void start (InetSocketAddress bindAddr ) throws TTransportException {
1628- TServerTransport transport = new TServerSocket (bindAddr , DEFAULT_CLIENT_TIMEOUT );
1629- TThreadPoolServer .Args args = new TThreadPoolServer .Args (transport )
1628+ TNonblockingServerTransport transport = new TNonblockingServerSocket (bindAddr , DEFAULT_CLIENT_TIMEOUT );
1629+ TThreadedSelectorServer .Args args = new TThreadedSelectorServer .Args (transport )
16301630 .transportFactory (new TFramedTransport .Factory (DEFAULT_MAX_FRAME_LENGTH ))
16311631 .protocolFactory (new TCompactProtocol .Factory ())
16321632 .processor (new DataLayerService .Processor <Iface >(this ))
1633- .maxWorkerThreads (DEFAULT_MAX_WORKER_THREADS );
1634- server = new TThreadPoolServer (args );
1633+ .selectorThreads (DEFAULT_SELECTOR_THREADS )
1634+ .workerThreads (DEFAULT_WORKER_THREADS );
1635+ server = new TThreadedSelectorServer (args );
16351636
16361637 LOGGER .info ("Listening on " +bindAddr );
16371638 server .serve ();
0 commit comments