From f36f33aed1ed55033ff1640ed49d1f722b53ddf8 Mon Sep 17 00:00:00 2001 From: HTHou Date: Fri, 19 Dec 2025 09:52:32 +0800 Subject: [PATCH 1/4] Add a new TNonblockingSSLSocket --- .../iotdb/rpc/TNonblockingSSLSocket.java | 412 ++++++++++++++++++ .../rpc/TNonblockingTransportWrapper.java | 4 +- 2 files changed, 414 insertions(+), 2 deletions(-) create mode 100644 iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TNonblockingSSLSocket.java diff --git a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TNonblockingSSLSocket.java b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TNonblockingSSLSocket.java new file mode 100644 index 000000000000..e11b22bcc3a6 --- /dev/null +++ b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TNonblockingSSLSocket.java @@ -0,0 +1,412 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.rpc; + +import org.apache.thrift.transport.TNonblockingSocket; +import org.apache.thrift.transport.TTransportException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.net.ssl.KeyManagerFactory; +import javax.net.ssl.SSLContext; +import javax.net.ssl.SSLEngine; +import javax.net.ssl.SSLEngineResult; +import javax.net.ssl.SSLEngineResult.HandshakeStatus; +import javax.net.ssl.SSLException; +import javax.net.ssl.TrustManagerFactory; + +import java.io.FileInputStream; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.InputStream; +import java.net.MalformedURLException; +import java.net.URL; +import java.nio.ByteBuffer; +import java.nio.channels.SelectionKey; +import java.nio.channels.Selector; +import java.security.KeyStore; + +/** Transport for use with async client. */ +public class TNonblockingSSLSocket extends TNonblockingSocket { + + private static final Logger LOGGER = + LoggerFactory.getLogger(TNonblockingSSLSocket.class.getName()); + + private final SSLEngine sslEngine_; + + private final ByteBuffer appUnwrap; + private final ByteBuffer netUnwrap; + + private final ByteBuffer appWrap; + private final ByteBuffer netWrap; + + private ByteBuffer decodedBytes; + + private boolean isHandshakeCompleted; + + private SelectionKey selectionKey; + + public TNonblockingSSLSocket( + String host, + int port, + int timeout, + String keystore, + String keystorePassword, + String truststore, + String truststorePassword) + throws TTransportException, IOException { + this( + host, + port, + timeout, + createSSLContext(keystore, keystorePassword, truststore, truststorePassword)); + } + + private static SSLContext createSSLContext( + String keystore, String keystorePassword, String truststore, String truststorePassword) + throws TTransportException { + SSLContext ctx; + InputStream in = null; + InputStream is = null; + + try { + ctx = SSLContext.getInstance("TLS"); + TrustManagerFactory tmf = null; + KeyManagerFactory kmf = null; + + if (truststore != null) { + tmf = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm()); + KeyStore ts = KeyStore.getInstance("JKS"); + in = getStoreAsStream(truststore); + ts.load(in, (truststorePassword != null ? truststorePassword.toCharArray() : null)); + tmf.init(ts); + } + + if (keystore != null) { + kmf = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm()); + KeyStore ks = KeyStore.getInstance("JKS"); + is = getStoreAsStream(keystore); + ks.load(is, keystorePassword.toCharArray()); + kmf.init(ks, keystorePassword.toCharArray()); + } + + if (keystore != null && truststore != null) { + ctx.init(kmf.getKeyManagers(), tmf.getTrustManagers(), null); + } else if (keystore != null) { + ctx.init(kmf.getKeyManagers(), null, null); + } else { + ctx.init(null, tmf.getTrustManagers(), null); + } + + } catch (Exception e) { + throw new TTransportException( + TTransportException.NOT_OPEN, "Error creating the transport", e); + } finally { + if (in != null) { + try { + in.close(); + } catch (IOException e) { + LOGGER.warn("Unable to close stream", e); + } + } + if (is != null) { + try { + is.close(); + } catch (IOException e) { + LOGGER.warn("Unable to close stream", e); + } + } + } + + return ctx; + } + + private static InputStream getStoreAsStream(String store) throws IOException { + try { + return new FileInputStream(store); + } catch (FileNotFoundException e) { + } + + InputStream storeStream = null; + try { + storeStream = new URL(store).openStream(); + if (storeStream != null) { + return storeStream; + } + } catch (MalformedURLException e) { + } + + storeStream = Thread.currentThread().getContextClassLoader().getResourceAsStream(store); + + if (storeStream != null) { + return storeStream; + } else { + throw new IOException("Could not load file: " + store); + } + } + + protected TNonblockingSSLSocket(String host, int port, int timeout, SSLContext sslContext) + throws IOException, TTransportException { + super(host, port, timeout); + sslEngine_ = sslContext.createSSLEngine(host, port); + sslEngine_.setUseClientMode(true); + + int appBufferSize = sslEngine_.getSession().getApplicationBufferSize(); + int netBufferSize = sslEngine_.getSession().getPacketBufferSize(); + appUnwrap = ByteBuffer.allocate(appBufferSize); + netUnwrap = ByteBuffer.allocate(netBufferSize); + appWrap = ByteBuffer.allocate(appBufferSize); + netWrap = ByteBuffer.allocate(netBufferSize); + decodedBytes = ByteBuffer.allocate(appBufferSize); + decodedBytes.flip(); + isHandshakeCompleted = false; + } + + /** + * Register the new SocketChannel with our Selector, indicating we'd like to be notified when it's + * ready for I/O. + * + * @param selector + * @return the selection key for this socket. + */ + public SelectionKey registerSelector(Selector selector, int interests) throws IOException { + selectionKey = super.registerSelector(selector, interests); + return selectionKey; + } + + /** Checks whether the socket is connected. */ + public boolean isOpen() { + // isConnected() does not return false after close(), but isOpen() does + return super.isOpen() && isHandshakeCompleted; + } + + /** Do not call, the implementation provides its own lazy non-blocking connect. */ + public void open() throws TTransportException { + throw new RuntimeException("open() is not implemented for TNonblockingSSLSocket"); + } + + /** Perform a nonblocking read into buffer. */ + public int read(ByteBuffer buffer) throws TTransportException { + int numBytes = buffer.limit(); + while (decodedBytes.remaining() < numBytes) { + HandshakeStatus hs = sslEngine_.getHandshakeStatus(); + if (hs == HandshakeStatus.FINISHED) + throw new TTransportException( + TTransportException.UNKNOWN, "Read operation is terminated. Handshake is completed"); + try { + if (doUnwrap() == -1) { + throw new IOException("Unable to read " + numBytes + " bytes"); + } + } catch (IOException exc) { + throw new TTransportException(TTransportException.UNKNOWN, exc.getMessage()); + } + if (appUnwrap.position() > 0) { + int t; + appUnwrap.flip(); + if (decodedBytes.position() > 0) decodedBytes.flip(); + t = appUnwrap.limit() + decodedBytes.limit(); + byte[] tmpBuffer = new byte[t]; + decodedBytes.get(tmpBuffer, 0, decodedBytes.remaining()); + appUnwrap.get(tmpBuffer, 0, appUnwrap.remaining()); + if (appUnwrap.position() > 0) { + appUnwrap.clear(); + appUnwrap.flip(); + appUnwrap.compact(); + } + decodedBytes = ByteBuffer.wrap(tmpBuffer); + } + } + byte[] b = new byte[numBytes]; + decodedBytes.get(b, 0, numBytes); + if (decodedBytes.position() > 0) { + decodedBytes.compact(); + decodedBytes.flip(); + } + buffer.put(b); + selectionKey.interestOps(SelectionKey.OP_WRITE); + return numBytes; + } + + /** Perform a nonblocking write of the data in buffer; */ + public int write(ByteBuffer buffer) throws TTransportException { + int numBytes = 0; + + if (buffer.position() > 0) buffer.flip(); + + int nTransfer; + int num; + while (buffer.remaining() != 0) { + nTransfer = Math.min(appWrap.remaining(), buffer.remaining()); + if (nTransfer > 0) { + appWrap.put(buffer.array(), buffer.arrayOffset() + buffer.position(), nTransfer); + buffer.position(buffer.position() + nTransfer); + } + + try { + num = doWrap(); + } catch (IOException iox) { + throw new TTransportException(TTransportException.UNKNOWN, iox); + } + if (num < 0) { + LOGGER.error("Failed while writing. Probably server is down"); + return -1; + } + numBytes += num; + } + return numBytes; + } + + /** Closes the socket. */ + public void close() { + sslEngine_.closeOutbound(); + super.close(); + } + + /** {@inheritDoc} */ + public boolean startConnect() throws IOException { + if (this.isOpen()) { + return true; + } + sslEngine_.beginHandshake(); + return super.startConnect() && doHandShake(); + } + + /** {@inheritDoc} */ + public boolean finishConnect() throws IOException { + return super.finishConnect() && doHandShake(); + } + + private synchronized boolean doHandShake() throws IOException { + LOGGER.debug("Handshake is started"); + while (true) { + HandshakeStatus hs = sslEngine_.getHandshakeStatus(); + switch (hs) { + case NEED_UNWRAP: + if (doUnwrap() == -1) { + LOGGER.error("Unexpected. Handshake failed abruptly during unwrap"); + return false; + } + break; + case NEED_WRAP: + if (doWrap() == -1) { + LOGGER.error("Unexpected. Handshake failed abruptly during wrap"); + return false; + } + break; + case NEED_TASK: + if (!doTask()) { + LOGGER.error("Unexpected. Handshake failed abruptly during task"); + return false; + } + break; + case FINISHED: + case NOT_HANDSHAKING: + isHandshakeCompleted = true; + return true; + default: + LOGGER.error("Unknown handshake status. Handshake failed"); + return false; + } + } + } + + private synchronized boolean doTask() { + Runnable runnable; + while ((runnable = sslEngine_.getDelegatedTask()) != null) { + runnable.run(); + } + HandshakeStatus hs = sslEngine_.getHandshakeStatus(); + if (hs == HandshakeStatus.NEED_TASK) { + try { + throw new TTransportException( + TTransportException.UNKNOWN, "handshake shouldn't need additional tasks"); + } catch (TTransportException e) { + return false; + } + } + return true; + } + + private synchronized int doUnwrap() throws IOException { + int num = getSocketChannel().read(netUnwrap); + if (num < 0) { + LOGGER.error("Failed during read operation. Probably server is down"); + return -1; + } + SSLEngineResult unwrapResult; + + try { + netUnwrap.flip(); + unwrapResult = sslEngine_.unwrap(netUnwrap, appUnwrap); + netUnwrap.compact(); + } catch (SSLException ex) { + LOGGER.error(ex.getMessage()); + throw ex; + } + + switch (unwrapResult.getStatus()) { + case OK: + if (appUnwrap.position() > 0) { + appUnwrap.flip(); + appUnwrap.compact(); + } + break; + case CLOSED: + return -1; + case BUFFER_OVERFLOW: + throw new IllegalStateException("Failed to unwrap"); + case BUFFER_UNDERFLOW: + break; + } + return num; + } + + private synchronized int doWrap() throws IOException { + int num = 0; + SSLEngineResult wrapResult; + try { + appWrap.flip(); + wrapResult = sslEngine_.wrap(appWrap, netWrap); + appWrap.compact(); + } catch (SSLException exc) { + LOGGER.error(exc.getMessage()); + throw exc; + } + + switch (wrapResult.getStatus()) { + case OK: + if (netWrap.position() > 0) { + netWrap.flip(); + num = getSocketChannel().write(netWrap); + netWrap.compact(); + } + break; + case BUFFER_UNDERFLOW: + // try again later + break; + case BUFFER_OVERFLOW: + throw new IllegalStateException("Failed to wrap"); + case CLOSED: + LOGGER.error("SSL session is closed"); + return -1; + } + return num; + } +} diff --git a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TNonblockingTransportWrapper.java b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TNonblockingTransportWrapper.java index 40cf543fb0a3..d985542b17b5 100644 --- a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TNonblockingTransportWrapper.java +++ b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TNonblockingTransportWrapper.java @@ -68,9 +68,9 @@ public static TNonblockingTransport wrap( String trustStorePath, String trustStorePwd) { try { - return new NettyTNonblockingTransport( + return new TNonblockingSSLSocket( host, port, timeout, keyStorePath, keyStorePwd, trustStorePath, trustStorePwd); - } catch (TTransportException e) { + } catch (Exception e) { // never happen return null; } From 09a87b26e4d344fe13b39630fcdfa8c5ea6a339a Mon Sep 17 00:00:00 2001 From: HTHou Date: Fri, 19 Dec 2025 17:29:10 +0800 Subject: [PATCH 2/4] remove netty base TNonblockingTransport --- iotdb-client/service-rpc/pom.xml | 16 - .../iotdb/rpc/NettyTNonblockingTransport.java | 624 ------------------ 2 files changed, 640 deletions(-) delete mode 100644 iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/NettyTNonblockingTransport.java diff --git a/iotdb-client/service-rpc/pom.xml b/iotdb-client/service-rpc/pom.xml index 4ca56f496a83..013fde31d5c4 100644 --- a/iotdb-client/service-rpc/pom.xml +++ b/iotdb-client/service-rpc/pom.xml @@ -84,22 +84,6 @@ org.slf4j slf4j-api - - io.netty - netty-common - - - io.netty - netty-buffer - - - io.netty - netty-transport - - - io.netty - netty-handler - diff --git a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/NettyTNonblockingTransport.java b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/NettyTNonblockingTransport.java deleted file mode 100644 index 549e1bb2e0b2..000000000000 --- a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/NettyTNonblockingTransport.java +++ /dev/null @@ -1,624 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.iotdb.rpc; - -import io.netty.bootstrap.Bootstrap; -import io.netty.buffer.ByteBuf; -import io.netty.buffer.Unpooled; -import io.netty.channel.Channel; -import io.netty.channel.ChannelFuture; -import io.netty.channel.ChannelHandlerContext; -import io.netty.channel.ChannelInboundHandlerAdapter; -import io.netty.channel.ChannelInitializer; -import io.netty.channel.ChannelOption; -import io.netty.channel.ChannelPipeline; -import io.netty.channel.EventLoopGroup; -import io.netty.channel.nio.NioEventLoopGroup; -import io.netty.channel.socket.SocketChannel; -import io.netty.channel.socket.nio.NioSocketChannel; -import io.netty.handler.ssl.SslContext; -import io.netty.handler.ssl.SslContextBuilder; -import io.netty.handler.ssl.SslHandler; -import io.netty.util.concurrent.GenericFutureListener; -import org.apache.thrift.TConfiguration; -import org.apache.thrift.transport.TNonblockingTransport; -import org.apache.thrift.transport.TTransportException; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import javax.net.ssl.KeyManagerFactory; -import javax.net.ssl.TrustManagerFactory; - -import java.io.FileInputStream; -import java.io.FileNotFoundException; -import java.io.IOException; -import java.net.InetSocketAddress; -import java.net.SocketAddress; -import java.nio.ByteBuffer; -import java.nio.channels.SelectionKey; -import java.nio.channels.Selector; -import java.nio.channels.ServerSocketChannel; -import java.nio.file.AccessDeniedException; -import java.security.KeyStore; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.atomic.AtomicBoolean; - -/** - * A non-blocking Thrift transport implementation using Netty for asynchronous I/O. Integrates with - * Thrift's TAsyncClientManager using a dummy local SocketChannel for selector events. Supports - * SSL/TLS for secure communication. - */ -public class NettyTNonblockingTransport extends TNonblockingTransport { - - private static final Logger logger = LoggerFactory.getLogger(NettyTNonblockingTransport.class); - - private final String host; - private final int port; - private final int connectTimeoutMs; - private final long sslHandshakeTimeoutMs; - private final EventLoopGroup eventLoopGroup; - private final Bootstrap bootstrap; - private volatile Channel channel; - private final AtomicBoolean connected = new AtomicBoolean(false); - private final AtomicBoolean connecting = new AtomicBoolean(false); - private final CompletableFuture listenerFuture = new CompletableFuture<>(); - private final LinkedBlockingQueue readQueue = new LinkedBlockingQueue<>(); - - // SSL configuration - private final String keystorePath; - private final String keystorePassword; - private final String truststorePath; - private final String truststorePassword; - - // Dummy local socket for selector integration - private ServerSocketChannel dummyServer; - private java.nio.channels.SocketChannel dummyClient; - private java.nio.channels.SocketChannel dummyServerAccepted; - private int dummyPort; - private Selector selector; // Stored for wakeup if needed - - public NettyTNonblockingTransport( - String host, - int port, - int connectTimeoutMs, - String keystorePath, - String keystorePassword, - String truststorePath, - String truststorePassword) - throws TTransportException { - super(new TConfiguration()); - this.host = host; - this.port = port; - this.connectTimeoutMs = connectTimeoutMs; - this.sslHandshakeTimeoutMs = connectTimeoutMs; - this.eventLoopGroup = new NioEventLoopGroup(); - this.bootstrap = new Bootstrap(); - this.keystorePath = keystorePath; - this.keystorePassword = keystorePassword; - this.truststorePath = truststorePath; - this.truststorePassword = truststorePassword; - initDummyChannels(); - initBootstrap(); - } - - /** Initializes dummy local channels for selector event simulation. */ - private void initDummyChannels() throws TTransportException { - try { - dummyServer = ServerSocketChannel.open(); - dummyServer.configureBlocking(false); - dummyServer.bind(new InetSocketAddress("localhost", 0)); - dummyPort = dummyServer.socket().getLocalPort(); - if (logger.isDebugEnabled()) { - logger.debug("Dummy server bound to localhost:{}", dummyPort); - } - dummyClient = java.nio.channels.SocketChannel.open(); - dummyClient.configureBlocking(false); - } catch (IOException e) { - throw new TTransportException("Failed to initialize dummy channels", e); - } - } - - private void initBootstrap() { - bootstrap - .group(eventLoopGroup) - .channel(NioSocketChannel.class) - .option(ChannelOption.TCP_NODELAY, true) - .option(ChannelOption.SO_KEEPALIVE, true) - .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, connectTimeoutMs) - .handler( - new ChannelInitializer() { - @Override - protected void initChannel(SocketChannel ch) throws Exception { - if (logger.isDebugEnabled()) { - logger.debug("Initializing channel for {}:{}", host, port); - } - ChannelPipeline pipeline = ch.pipeline(); - SslContext sslContext = createSslContext(); - SslHandler sslHandler = sslContext.newHandler(ch.alloc(), host, port); - sslHandler.setHandshakeTimeoutMillis(sslHandshakeTimeoutMs); - // set this for avoiding error log on the server side - sslHandler.setCloseNotifyReadTimeoutMillis(100); - - pipeline.addLast("ssl", sslHandler); - sslHandler - .handshakeFuture() - .addListener( - future -> { - if (future.isSuccess()) { - if (logger.isDebugEnabled()) { - logger.debug( - "SSL handshake completed successfully for {}:{}", host, port); - } - } else { - if (future.cause().getMessage() != null - && !future - .cause() - .getMessage() - .contains("SslHandler removed before handshake completed")) { - logger.warn( - "SSL handshake failed for {}:{}", host, port, future.cause()); - } else if (logger.isDebugEnabled()) { - logger.debug( - "SSL handshake failed for {}:{}", host, port, future.cause()); - } - } - }); - pipeline.addLast("handler", new NettyTransportHandler()); - } - }); - } - - private SslContext createSslContext() throws Exception { - SslContextBuilder sslContextBuilder = SslContextBuilder.forClient(); - - if (keystorePath != null && keystorePassword != null) { - KeyStore keyStore = KeyStore.getInstance("JKS"); - try (FileInputStream fis = new FileInputStream(keystorePath)) { - keyStore.load(fis, keystorePassword.toCharArray()); - } catch (AccessDeniedException e) { - throw new AccessDeniedException("Failed to load keystore file"); - } catch (FileNotFoundException e) { - throw new FileNotFoundException("keystore file not found"); - } - KeyManagerFactory kmf = - KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm()); - kmf.init(keyStore, keystorePassword.toCharArray()); - sslContextBuilder.keyManager(kmf); - } - - if (truststorePath != null && truststorePassword != null) { - KeyStore trustStore = KeyStore.getInstance("JKS"); - try (FileInputStream fis = new FileInputStream(truststorePath)) { - trustStore.load(fis, truststorePassword.toCharArray()); - } catch (AccessDeniedException e) { - throw new AccessDeniedException("Failed to load truststore file"); - } catch (FileNotFoundException e) { - throw new FileNotFoundException("truststore file not found"); - } - TrustManagerFactory tmf = - TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm()); - tmf.init(trustStore); - sslContextBuilder.trustManager(tmf); - } - - return sslContextBuilder.build(); - } - - @Override - public boolean isOpen() { - synchronized (this) { - return connected.get() || (channel != null && channel.isActive()); - } - } - - @Override - public void open() throws TTransportException { - throw new TTransportException( - TTransportException.NOT_OPEN, "open() is not implemented; use startConnect() instead"); - } - - @Override - public int read(ByteBuffer buffer) throws TTransportException { - - if (!isOpen()) { - if (logger.isDebugEnabled()) { - logger.debug("Transport not open for ByteBuffer read"); - } - throw new TTransportException(TTransportException.NOT_OPEN, "Transport not open"); - } - - ByteBuf byteBuf = null; - try { - byteBuf = readQueue.peek(); - if (byteBuf == null) { - if (logger.isDebugEnabled()) { - logger.debug("No data available for ByteBuffer read"); - } - return 0; - } - - int available = Math.min(buffer.remaining(), byteBuf.readableBytes()); - if (available > 0) { - buffer.put(byteBuf.nioBuffer(byteBuf.readerIndex(), available)); - byteBuf.readerIndex(byteBuf.readerIndex() + available); - if (logger.isDebugEnabled()) { - logger.debug( - "Read {} bytes into ByteBuffer, remaining space: {}", available, buffer.remaining()); - } - } - - if (byteBuf.readableBytes() > 0) { - if (logger.isDebugEnabled()) { - logger.debug("ByteBuf remaining {} bytes", byteBuf.readableBytes()); - } - // set null to avoid release in finally block - byteBuf = null; - } else { - readQueue.poll(); - } - - // Drain dummy channel to clear OP_READ - ByteBuffer discard = ByteBuffer.allocate(16); - try { - while (dummyClient.read(discard) > 0) { - discard.clear(); - } - } catch (IOException e) { - if (logger.isDebugEnabled()) { - logger.debug("Failed to drain dummy channel", e); - } - if (channel == null || !channel.isActive()) { - throw new TTransportException(TTransportException.END_OF_FILE, "Connection reset"); - } - } - // Trigger OP_READ on dummy by writing dummy byte - ByteBuffer dummyByte = ByteBuffer.wrap(new byte[1]); - dummyServerAccepted.write(dummyByte); - // Wakeup selector if needed - if (selector != null) { - selector.wakeup(); - } - - return available; - } catch (Throwable e) { - if (channel == null || !channel.isActive()) { - throw new TTransportException(TTransportException.END_OF_FILE, "Connection reset"); - } - throw new TTransportException(TTransportException.UNKNOWN, "Read failed", e); - } finally { - if (byteBuf != null) { - byteBuf.release(); - } - } - } - - @Override - public int read(byte[] buf, int off, int len) throws TTransportException { - if (!isOpen()) { - if (logger.isDebugEnabled()) { - logger.debug("Transport not open for read"); - } - throw new TTransportException(TTransportException.NOT_OPEN, "Transport not open"); - } - - try { - ByteBuffer buffer = ByteBuffer.wrap(buf, off, len); - return read(buffer); - } catch (Exception e) { - if (logger.isDebugEnabled()) { - logger.debug("Read failed: {}", e.getMessage()); - } - throw new TTransportException(TTransportException.UNKNOWN, "Read failed", e); - } - } - - @Override - public int write(ByteBuffer buffer) throws TTransportException { - if (!isOpen()) { - if (logger.isDebugEnabled()) { - logger.debug("Transport not open for ByteBuffer write"); - } - throw new TTransportException(TTransportException.NOT_OPEN, "Transport not open"); - } - - int remaining = buffer.remaining(); - if (remaining == 0) { - return 0; - } - - if (logger.isDebugEnabled()) { - logger.debug("Writing {} bytes from ByteBuffer", remaining); - } - - synchronized (this) { - ByteBuf byteBuf = Unpooled.wrappedBuffer(buffer); - try { - ChannelFuture future = channel.writeAndFlush(byteBuf); - buffer.position(buffer.position() + remaining); - future.addListener( - (GenericFutureListener) - future1 -> { - if (future1.isSuccess()) { - if (logger.isDebugEnabled()) { - logger.debug("ByteBuffer write completed successfully: {} bytes", remaining); - } - } else { - if (logger.isDebugEnabled()) { - logger.debug("ByteBuffer write failed: {}", future1.cause().getMessage()); - } - } - }); - return remaining; - } catch (Throwable e) { - byteBuf.release(); - if (channel == null || !channel.isActive()) { - throw new TTransportException(TTransportException.END_OF_FILE, "Broken pipe"); - } - throw new TTransportException(TTransportException.UNKNOWN, e); - } - } - } - - @Override - public void write(byte[] buf, int off, int len) throws TTransportException { - if (!isOpen()) { - if (logger.isDebugEnabled()) { - logger.debug("Transport not open for write"); - } - throw new TTransportException(TTransportException.NOT_OPEN, "Transport not open"); - } - - ByteBuffer buffer = ByteBuffer.wrap(buf, off, len); - write(buffer); - } - - @Override - public void flush() throws TTransportException { - if (!isOpen()) { - if (logger.isDebugEnabled()) { - logger.debug("Transport not open for flush"); - } - throw new TTransportException(TTransportException.NOT_OPEN, "Transport not open"); - } - synchronized (this) { - channel.flush(); - if (logger.isDebugEnabled()) { - logger.debug("Flushed channel"); - } - } - } - - @Override - public void close() { - synchronized (this) { - connected.set(false); - if (channel != null) { - channel.close(); - channel = null; - if (logger.isDebugEnabled()) { - logger.debug("Channel closed for {}:{}", host, port); - } - } - try { - if (dummyClient != null) { - dummyClient.close(); - } - if (dummyServerAccepted != null) { - dummyServerAccepted.close(); - } - if (dummyServer != null) { - dummyServer.close(); - } - } catch (IOException e) { - if (logger.isDebugEnabled()) { - logger.debug("Failed to close dummy channels", e); - } - } - eventLoopGroup.shutdownGracefully(); - if (logger.isDebugEnabled()) { - logger.debug("EventLoopGroup shutdown initiated"); - } - } - } - - @Override - public boolean startConnect() { - if (connected.get() || connecting.get()) { - if (logger.isDebugEnabled()) { - logger.debug("Connection already started or established for {}:{}", host, port); - } - return connected.get(); - } - - if (!connecting.compareAndSet(false, true)) { - if (logger.isDebugEnabled()) { - logger.debug("Concurrent connection attempt detected for {}:{}", host, port); - } - return false; - } - - if (logger.isDebugEnabled()) { - logger.debug("Starting connection to {}:{}", host, port); - } - - try { - // Initiate dummy connect, it will pend until acceptance - dummyClient.connect(new InetSocketAddress("localhost", dummyPort)); - - // Initiate Netty connect - ChannelFuture future = bootstrap.connect(host, port); - future.addListener( - (GenericFutureListener) - future1 -> { - synchronized (this) { - if (future1.isSuccess()) { - if (logger.isDebugEnabled()) { - logger.debug("Connection established successfully to {}:{}", host, port); - } - channel = future1.channel(); - connected.set(true); - // Now accept the dummy connection to complete it - try { - dummyServerAccepted = dummyServer.accept(); - if (dummyServerAccepted != null) { - dummyServerAccepted.configureBlocking(false); - if (logger.isDebugEnabled()) { - logger.debug("Dummy server accepted connection"); - } - // Wakeup selector to detect OP_CONNECT - if (selector != null) { - selector.wakeup(); - } - } - listenerFuture.complete(null); - } catch (IOException e) { - if (logger.isDebugEnabled()) { - logger.debug("Failed to accept dummy connection", e); - } - listenerFuture.completeExceptionally(e); - } - } else { - if (logger.isDebugEnabled()) { - logger.debug( - "Connection failed to {}:{}: {}", - host, - port, - future1.cause().getMessage()); - } - listenerFuture.completeExceptionally(future1.cause()); - } - connecting.set(false); - } - }); - return false; // Return false to indicate pending connect for dummy - } catch (Exception e) { - connecting.set(false); - listenerFuture.completeExceptionally(e); - return false; - } - } - - @Override - public boolean finishConnect() throws IOException { - try { - listenerFuture.get(); - } catch (Throwable e) { - throw new IOException(e); - } - synchronized (this) { - boolean dummyFinished = dummyClient.finishConnect(); - boolean isConnected = connected.get() && dummyFinished; - if (logger.isDebugEnabled()) { - logger.debug( - "finishConnect called, netty connected: {}, dummy finished: {}", - connected.get(), - dummyFinished); - } - return isConnected; - } - } - - @Override - public SelectionKey registerSelector(Selector selector, int interests) throws IOException { - synchronized (this) { - this.selector = selector; - return dummyClient.register(selector, interests); - } - } - - @Override - public String toString() { - return "[remote: " + getRemoteAddress() + ", local: " + getLocalAddress() + "]"; - } - - public SocketAddress getRemoteAddress() { - return channel != null ? channel.remoteAddress() : null; - } - - public SocketAddress getLocalAddress() { - return channel != null ? channel.localAddress() : null; - } - - private class NettyTransportHandler extends ChannelInboundHandlerAdapter { - - @Override - public void channelActive(ChannelHandlerContext ctx) throws Exception { - if (logger.isDebugEnabled()) { - logger.debug("Channel active: {}", ctx.channel().remoteAddress()); - } - super.channelActive(ctx); - } - - @Override - public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { - if (msg instanceof ByteBuf) { - ByteBuf byteBuf = (ByteBuf) msg; - if (logger.isDebugEnabled()) { - logger.debug("Received {} bytes", byteBuf.readableBytes()); - } - try { - synchronized (this) { - readQueue.offer(byteBuf.retain()); - // Trigger OP_READ on dummy by writing dummy byte - if (dummyServerAccepted != null) { - ByteBuffer dummyByte = ByteBuffer.wrap(new byte[1]); - dummyServerAccepted.write(dummyByte); - } - // Wakeup selector if needed - if (selector != null) { - selector.wakeup(); - } - } - } finally { - byteBuf.release(); - } - } - } - - @Override - public void channelInactive(ChannelHandlerContext ctx) throws Exception { - if (logger.isDebugEnabled()) { - logger.debug("Channel inactive: {}", ctx.channel().remoteAddress()); - } - synchronized (this) { - if (dummyServerAccepted != null) { - dummyServerAccepted.close(); - dummyServerAccepted = null; - } - if (channel != null) { - channel.close(); - channel = null; - } - } - super.channelInactive(ctx); - } - - @Override - public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { - if (logger.isDebugEnabled()) { - logger.debug("Channel exception: {}", cause.getMessage()); - } - synchronized (this) { - ctx.close(); - } - } - } -} From e9bff96883b5b6f83153bc27bc829d5c98958f86 Mon Sep 17 00:00:00 2001 From: HTHou Date: Mon, 22 Dec 2025 18:00:30 +0800 Subject: [PATCH 3/4] fix some review --- .../iotdb/rpc/TNonblockingSSLSocket.java | 44 +++++++------------ 1 file changed, 17 insertions(+), 27 deletions(-) diff --git a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TNonblockingSSLSocket.java b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TNonblockingSSLSocket.java index e11b22bcc3a6..a4530599fabb 100644 --- a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TNonblockingSSLSocket.java +++ b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TNonblockingSSLSocket.java @@ -179,37 +179,31 @@ protected TNonblockingSSLSocket(String host, int port, int timeout, SSLContext s isHandshakeCompleted = false; } - /** - * Register the new SocketChannel with our Selector, indicating we'd like to be notified when it's - * ready for I/O. - * - * @param selector - * @return the selection key for this socket. - */ + /** {@inheritDoc} */ + @Override public SelectionKey registerSelector(Selector selector, int interests) throws IOException { selectionKey = super.registerSelector(selector, interests); return selectionKey; } - /** Checks whether the socket is connected. */ + /** {@inheritDoc} */ + @Override public boolean isOpen() { // isConnected() does not return false after close(), but isOpen() does return super.isOpen() && isHandshakeCompleted; } - /** Do not call, the implementation provides its own lazy non-blocking connect. */ + /** {@inheritDoc} */ + @Override public void open() throws TTransportException { throw new RuntimeException("open() is not implemented for TNonblockingSSLSocket"); } - /** Perform a nonblocking read into buffer. */ - public int read(ByteBuffer buffer) throws TTransportException { + /** {@inheritDoc} */ + @Override + public synchronized int read(ByteBuffer buffer) throws TTransportException { int numBytes = buffer.limit(); while (decodedBytes.remaining() < numBytes) { - HandshakeStatus hs = sslEngine_.getHandshakeStatus(); - if (hs == HandshakeStatus.FINISHED) - throw new TTransportException( - TTransportException.UNKNOWN, "Read operation is terminated. Handshake is completed"); try { if (doUnwrap() == -1) { throw new IOException("Unable to read " + numBytes + " bytes"); @@ -244,8 +238,9 @@ public int read(ByteBuffer buffer) throws TTransportException { return numBytes; } - /** Perform a nonblocking write of the data in buffer; */ - public int write(ByteBuffer buffer) throws TTransportException { + /** {@inheritDoc} */ + @Override + public synchronized int write(ByteBuffer buffer) throws TTransportException { int numBytes = 0; if (buffer.position() > 0) buffer.flip(); @@ -273,13 +268,15 @@ public int write(ByteBuffer buffer) throws TTransportException { return numBytes; } - /** Closes the socket. */ + /** {@inheritDoc} */ + @Override public void close() { sslEngine_.closeOutbound(); super.close(); } /** {@inheritDoc} */ + @Override public boolean startConnect() throws IOException { if (this.isOpen()) { return true; @@ -289,6 +286,7 @@ public boolean startConnect() throws IOException { } /** {@inheritDoc} */ + @Override public boolean finishConnect() throws IOException { return super.finishConnect() && doHandShake(); } @@ -333,15 +331,7 @@ private synchronized boolean doTask() { runnable.run(); } HandshakeStatus hs = sslEngine_.getHandshakeStatus(); - if (hs == HandshakeStatus.NEED_TASK) { - try { - throw new TTransportException( - TTransportException.UNKNOWN, "handshake shouldn't need additional tasks"); - } catch (TTransportException e) { - return false; - } - } - return true; + return hs != HandshakeStatus.NEED_TASK; } private synchronized int doUnwrap() throws IOException { From 58bd0236382a598a18a5d540e8256527fb0581fc Mon Sep 17 00:00:00 2001 From: HTHou Date: Mon, 22 Dec 2025 21:51:04 +0800 Subject: [PATCH 4/4] opt code --- .../iotdb/rpc/TNonblockingSSLSocket.java | 47 +++++++++---------- 1 file changed, 21 insertions(+), 26 deletions(-) diff --git a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TNonblockingSSLSocket.java b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TNonblockingSSLSocket.java index a4530599fabb..29f22f95a85f 100644 --- a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TNonblockingSSLSocket.java +++ b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TNonblockingSSLSocket.java @@ -202,7 +202,7 @@ public void open() throws TTransportException { /** {@inheritDoc} */ @Override public synchronized int read(ByteBuffer buffer) throws TTransportException { - int numBytes = buffer.limit(); + int numBytes = buffer.remaining(); while (decodedBytes.remaining() < numBytes) { try { if (doUnwrap() == -1) { @@ -211,29 +211,25 @@ public synchronized int read(ByteBuffer buffer) throws TTransportException { } catch (IOException exc) { throw new TTransportException(TTransportException.UNKNOWN, exc.getMessage()); } - if (appUnwrap.position() > 0) { - int t; + if (appUnwrap.hasRemaining() + || (decodedBytes.position() > 0 && decodedBytes.flip().hasRemaining())) { appUnwrap.flip(); - if (decodedBytes.position() > 0) decodedBytes.flip(); - t = appUnwrap.limit() + decodedBytes.limit(); - byte[] tmpBuffer = new byte[t]; - decodedBytes.get(tmpBuffer, 0, decodedBytes.remaining()); - appUnwrap.get(tmpBuffer, 0, appUnwrap.remaining()); - if (appUnwrap.position() > 0) { - appUnwrap.clear(); - appUnwrap.flip(); - appUnwrap.compact(); - } - decodedBytes = ByteBuffer.wrap(tmpBuffer); + decodedBytes.flip(); + + ByteBuffer tempBuffer = + ByteBuffer.allocate(appUnwrap.remaining() + decodedBytes.remaining()); + tempBuffer.put(decodedBytes); + tempBuffer.put(appUnwrap); + tempBuffer.flip(); + + decodedBytes = tempBuffer; + appUnwrap.clear(); } } - byte[] b = new byte[numBytes]; - decodedBytes.get(b, 0, numBytes); - if (decodedBytes.position() > 0) { - decodedBytes.compact(); - decodedBytes.flip(); - } - buffer.put(b); + int oldLimit = decodedBytes.limit(); + decodedBytes.limit(decodedBytes.position() + numBytes); + buffer.put(decodedBytes); + decodedBytes.limit(oldLimit); selectionKey.interestOps(SelectionKey.OP_WRITE); return numBytes; } @@ -247,11 +243,10 @@ public synchronized int write(ByteBuffer buffer) throws TTransportException { int nTransfer; int num; - while (buffer.remaining() != 0) { + while (buffer.hasRemaining()) { nTransfer = Math.min(appWrap.remaining(), buffer.remaining()); if (nTransfer > 0) { - appWrap.put(buffer.array(), buffer.arrayOffset() + buffer.position(), nTransfer); - buffer.position(buffer.position() + nTransfer); + appWrap.put(buffer); } try { @@ -374,7 +369,7 @@ private synchronized int doWrap() throws IOException { try { appWrap.flip(); wrapResult = sslEngine_.wrap(appWrap, netWrap); - appWrap.compact(); + appWrap.clear(); } catch (SSLException exc) { LOGGER.error(exc.getMessage()); throw exc; @@ -385,7 +380,7 @@ private synchronized int doWrap() throws IOException { if (netWrap.position() > 0) { netWrap.flip(); num = getSocketChannel().write(netWrap); - netWrap.compact(); + netWrap.clear(); } break; case BUFFER_UNDERFLOW: