Skip to content

Commit c2a25d3

Browse files
committed
changed DataLayerService's server type from TThreadPoolServer to TThreadedSelectorServer for memory efficiency
1 parent 9c72483 commit c2a25d3

File tree

1 file changed

+14
-11
lines changed

1 file changed

+14
-11
lines changed

DataLayerService/src/main/java/org/microfunctions/data_layer/DataLayerServer.java

Lines changed: 14 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -41,10 +41,10 @@
4141
import org.apache.thrift.TException;
4242
import org.apache.thrift.protocol.TCompactProtocol;
4343
import org.apache.thrift.server.TServer;
44-
import org.apache.thrift.server.TThreadPoolServer;
44+
import org.apache.thrift.server.TThreadedSelectorServer;
4545
import org.apache.thrift.transport.TFramedTransport;
46-
import org.apache.thrift.transport.TServerSocket;
47-
import org.apache.thrift.transport.TServerTransport;
46+
import org.apache.thrift.transport.TNonblockingServerSocket;
47+
import org.apache.thrift.transport.TNonblockingServerTransport;
4848
import org.apache.thrift.transport.TTransportException;
4949
import org.microfunctions.data_layer.DataLayerService.Iface;
5050

@@ -66,7 +66,8 @@ public class DataLayerServer implements Iface, Callable<Object> {
6666
private static final List<AbstractMap.SimpleEntry<String, Integer>> NO_KEYSPACES = new ArrayList<AbstractMap.SimpleEntry<String, Integer>>(0);
6767
private static final List<String> NO_KEYS = new ArrayList<String>(0);
6868

69-
public static final int DEFAULT_MAX_WORKER_THREADS = Integer.MAX_VALUE;
69+
public static final int DEFAULT_SELECTOR_THREADS = Math.max(2, 2 * Runtime.getRuntime().availableProcessors());
70+
public static final int DEFAULT_WORKER_THREADS = Math.max(4, 4 * Runtime.getRuntime().availableProcessors());
7071
public static final int DEFAULT_CLIENT_TIMEOUT = 0;
7172
public static final int DEFAULT_MAX_FRAME_LENGTH = Integer.MAX_VALUE;
7273

@@ -1625,13 +1626,15 @@ public boolean updateTableTypeCache(String action, String table, String tableTyp
16251626

16261627

16271628
public void start(InetSocketAddress bindAddr) throws TTransportException {
1628-
TServerTransport transport = new TServerSocket(bindAddr, DEFAULT_CLIENT_TIMEOUT);
1629-
TThreadPoolServer.Args args = new TThreadPoolServer.Args(transport)
1630-
.transportFactory(new TFramedTransport.Factory(DEFAULT_MAX_FRAME_LENGTH))
1631-
.protocolFactory(new TCompactProtocol.Factory())
1632-
.processor(new DataLayerService.Processor<Iface>(this))
1633-
.maxWorkerThreads(DEFAULT_MAX_WORKER_THREADS);
1634-
server = new TThreadPoolServer(args);
1629+
TNonblockingServerTransport transport = new TNonblockingServerSocket(bindAddr, DEFAULT_CLIENT_TIMEOUT);
1630+
TThreadedSelectorServer.Args args = new TThreadedSelectorServer.Args(transport)
1631+
.transportFactory(new TFramedTransport.Factory(DEFAULT_MAX_FRAME_LENGTH))
1632+
.protocolFactory(new TCompactProtocol.Factory())
1633+
.processor(new DataLayerService.Processor<Iface>(this))
1634+
.selectorThreads(DEFAULT_SELECTOR_THREADS)
1635+
.workerThreads(DEFAULT_WORKER_THREADS);
1636+
server = new TThreadedSelectorServer(args);
1637+
16351638
LOGGER.info("Listening on "+bindAddr);
16361639
server.serve();
16371640
}

0 commit comments

Comments
 (0)