Skip to content

Commit 8168949

Browse files
author
Istemi Ekin Akkus
committed
go back to ThreadPoolServer in QueueService and DataLayerService
1 parent c2a25d3 commit 8168949

File tree

2 files changed

+20
-23
lines changed

2 files changed

+20
-23
lines changed

DataLayerService/src/main/java/org/microfunctions/data_layer/DataLayerServer.java

Lines changed: 8 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -37,14 +37,14 @@
3737

3838
import org.apache.logging.log4j.LogManager;
3939
import org.apache.logging.log4j.Logger;
40-
4140
import org.apache.thrift.TException;
4241
import org.apache.thrift.protocol.TCompactProtocol;
4342
import org.apache.thrift.server.TServer;
43+
import org.apache.thrift.server.TThreadPoolServer;
4444
import org.apache.thrift.server.TThreadedSelectorServer;
4545
import org.apache.thrift.transport.TFramedTransport;
46-
import org.apache.thrift.transport.TNonblockingServerSocket;
47-
import org.apache.thrift.transport.TNonblockingServerTransport;
46+
import org.apache.thrift.transport.TServerSocket;
47+
import org.apache.thrift.transport.TServerTransport;
4848
import org.apache.thrift.transport.TTransportException;
4949
import org.microfunctions.data_layer.DataLayerService.Iface;
5050

@@ -66,8 +66,7 @@ public class DataLayerServer implements Iface, Callable<Object> {
6666
private static final List<AbstractMap.SimpleEntry<String, Integer>> NO_KEYSPACES = new ArrayList<AbstractMap.SimpleEntry<String, Integer>>(0);
6767
private static final List<String> NO_KEYS = new ArrayList<String>(0);
6868

69-
public static final int DEFAULT_SELECTOR_THREADS = Math.max(2, 2 * Runtime.getRuntime().availableProcessors());
70-
public static final int DEFAULT_WORKER_THREADS = Math.max(4, 4 * Runtime.getRuntime().availableProcessors());
69+
public static final int DEFAULT_MAX_WORKER_THREADS = Integer.MAX_VALUE;
7170
public static final int DEFAULT_CLIENT_TIMEOUT = 0;
7271
public static final int DEFAULT_MAX_FRAME_LENGTH = Integer.MAX_VALUE;
7372

@@ -1626,14 +1625,13 @@ public boolean updateTableTypeCache(String action, String table, String tableTyp
16261625

16271626

16281627
public void start(InetSocketAddress bindAddr) throws TTransportException {
1629-
TNonblockingServerTransport transport = new TNonblockingServerSocket(bindAddr, DEFAULT_CLIENT_TIMEOUT);
1630-
TThreadedSelectorServer.Args args = new TThreadedSelectorServer.Args(transport)
1628+
TServerTransport transport = new TServerSocket(bindAddr, DEFAULT_CLIENT_TIMEOUT);
1629+
TThreadPoolServer.Args args = new TThreadPoolServer.Args(transport)
16311630
.transportFactory(new TFramedTransport.Factory(DEFAULT_MAX_FRAME_LENGTH))
16321631
.protocolFactory(new TCompactProtocol.Factory())
16331632
.processor(new DataLayerService.Processor<Iface>(this))
1634-
.selectorThreads(DEFAULT_SELECTOR_THREADS)
1635-
.workerThreads(DEFAULT_WORKER_THREADS);
1636-
server = new TThreadedSelectorServer(args);
1633+
.maxWorkerThreads(DEFAULT_MAX_WORKER_THREADS);
1634+
server = new TThreadPoolServer(args);
16371635

16381636
LOGGER.info("Listening on "+bindAddr);
16391637
server.serve();

QueueService/src/main/java/org/microfunctions/queue/local/LocalQueueServer.java

Lines changed: 12 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -24,17 +24,17 @@
2424
import org.apache.thrift.TException;
2525
import org.apache.thrift.protocol.TCompactProtocol;
2626
import org.apache.thrift.server.TServer;
27+
import org.apache.thrift.server.TThreadPoolServer;
2728
import org.apache.thrift.server.TThreadedSelectorServer;
2829
import org.apache.thrift.transport.TFramedTransport;
29-
import org.apache.thrift.transport.TNonblockingServerSocket;
30-
import org.apache.thrift.transport.TNonblockingServerTransport;
30+
import org.apache.thrift.transport.TServerSocket;
31+
import org.apache.thrift.transport.TServerTransport;
3132
import org.apache.thrift.transport.TTransportException;
3233
import org.microfunctions.queue.local.LocalQueueService.Iface;
3334

3435
public class LocalQueueServer implements Iface, Runnable {
3536

36-
public static final int DEFAULT_SELECTOR_THREADS = Math.max(2, 2 * Runtime.getRuntime().availableProcessors());
37-
public static final int DEFAULT_WORKER_THREADS = Math.max(4, 4 * Runtime.getRuntime().availableProcessors());
37+
public static final int DEFAULT_MAX_WORKER_THREADS = Integer.MAX_VALUE;
3838
public static final int DEFAULT_CLIENT_TIMEOUT = 0;
3939
public static final int DEFAULT_MAX_FRAME_LENGTH = Integer.MAX_VALUE;
4040

@@ -116,26 +116,25 @@ public long freeMemory() throws TException {
116116
return Runtime.getRuntime().maxMemory() - Runtime.getRuntime().totalMemory() + Runtime.getRuntime().freeMemory();
117117
}
118118

119-
public void start (InetSocketAddress bindAddr, int nSelectorThreads, int nWorkerThreads, int clientTimeout) throws TTransportException {
120-
TNonblockingServerTransport transport = new TNonblockingServerSocket(bindAddr, clientTimeout);
121-
TThreadedSelectorServer.Args args = new TThreadedSelectorServer.Args(transport)
119+
public void start (InetSocketAddress bindAddr, int maxWorkerThreads, int clientTimeout) throws TTransportException {
120+
TServerTransport transport = new TServerSocket(bindAddr, clientTimeout);
121+
TThreadPoolServer.Args args = new TThreadPoolServer.Args(transport)
122122
.transportFactory(new TFramedTransport.Factory(DEFAULT_MAX_FRAME_LENGTH))
123123
.protocolFactory(new TCompactProtocol.Factory())
124124
.processor(new LocalQueueService.Processor<Iface>(this))
125-
.selectorThreads(nSelectorThreads)
126-
.workerThreads(nWorkerThreads);
127-
server = new TThreadedSelectorServer(args);
125+
.maxWorkerThreads(maxWorkerThreads);
126+
server = new TThreadPoolServer(args);
128127

129128
LOGGER.info("Starting local queue...");
130129
server.serve();
131130
}
132131

133-
public void start (InetSocketAddress bindAddr, int nSelectorThreads, int nWorkerThreads) throws TTransportException {
134-
this.start(bindAddr, nSelectorThreads, nWorkerThreads, DEFAULT_CLIENT_TIMEOUT);
132+
public void start (InetSocketAddress bindAddr, int maxWorkerThreads) throws TTransportException {
133+
this.start(bindAddr, maxWorkerThreads, DEFAULT_CLIENT_TIMEOUT);
135134
}
136135

137136
public void start (InetSocketAddress bindAddr) throws TTransportException {
138-
this.start(bindAddr, DEFAULT_SELECTOR_THREADS, DEFAULT_WORKER_THREADS, DEFAULT_CLIENT_TIMEOUT);
137+
this.start(bindAddr, DEFAULT_MAX_WORKER_THREADS, DEFAULT_CLIENT_TIMEOUT);
139138
}
140139

141140
public void stop () {

0 commit comments

Comments
 (0)