Skip to content

Commit 5e3fa77

Browse files
Fix for RNTBDChannelPool task queue starvation (Azure#15157)
* rntbd improvements * fixed latency issue and a race condition on close * fixed race condition in connection management resulting in creating more connections, fixed a infinite loop issue * fixed compilation warning * ensure the channel is servicable * increase monitoring period * removed info debug logs * cleanup * update condition * Reacting to code review comments * improved queue pending task monitoring, fixed spotbug complain * Defense in-depth against releaseChannel race condition * cancels pending acquisition tasks which are expired * cancels pending acquisition tasks which are expired * Fixing compiler warnings * Fixing SpotBug warning * Fixing build issue * Fixing SpotBug issue Co-authored-by: Fabian Meiswinkel <fabian@meiswinkel.com>
1 parent 10d7b33 commit 5e3fa77

File tree

9 files changed

+517
-147
lines changed

9 files changed

+517
-147
lines changed

sdk/cosmos/azure-cosmos-benchmark/src/main/java/com/azure/cosmos/benchmark/AsyncBenchmark.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -149,10 +149,11 @@ abstract class AsyncBenchmark<T> {
149149
}).flux();
150150
createDocumentObservables.add(obs);
151151
}
152-
logger.info("Finished pre-populating {} documents", cfg.getNumberOfPreCreatedDocuments());
153152
}
154153

155154
docsToRead = Flux.merge(Flux.fromIterable(createDocumentObservables), 100).collectList().block();
155+
logger.info("Finished pre-populating {} documents", cfg.getNumberOfPreCreatedDocuments());
156+
156157
init();
157158

158159
if (configuration.isEnableJvmStats()) {

sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/RntbdTransportClient.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -340,7 +340,7 @@ private Options(final Builder builder) {
340340

341341
private Options(final ConnectionPolicy connectionPolicy) {
342342
this.bufferPageSize = 8192;
343-
this.connectionAcquisitionTimeout = Duration.ZERO;
343+
this.connectionAcquisitionTimeout = Duration.ofSeconds(5L);
344344
this.connectTimeout = connectionPolicy.getConnectTimeout();
345345
this.idleChannelTimeout = connectionPolicy.getIdleTcpConnectionTimeout();
346346
this.idleChannelTimerResolution = Duration.ofMillis(100);
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,175 @@
1+
// Copyright (c) Microsoft Corporation. All rights reserved.
2+
// Licensed under the MIT License.
3+
4+
package com.azure.cosmos.implementation.directconnectivity.rntbd;
5+
6+
import io.netty.channel.Channel;
7+
import io.netty.util.concurrent.Future;
8+
import io.netty.util.concurrent.GenericFutureListener;
9+
import io.netty.util.concurrent.Promise;
10+
11+
import java.util.concurrent.ExecutionException;
12+
import java.util.concurrent.TimeUnit;
13+
import java.util.concurrent.TimeoutException;
14+
15+
import static com.azure.cosmos.implementation.guava25.base.Preconditions.checkNotNull;
16+
17+
class ChannelPromiseWithExpiryTime implements Promise<Channel> {
18+
private final Promise<Channel> channelPromise;
19+
private final long expiryTimeInNanos;
20+
21+
public ChannelPromiseWithExpiryTime(Promise<Channel> channelPromise, long expiryTimeInNanos) {
22+
checkNotNull(channelPromise, "channelPromise must not be null");
23+
checkNotNull(expiryTimeInNanos, "expiryTimeInNanos must not be null");
24+
25+
this.channelPromise = channelPromise;
26+
this.expiryTimeInNanos = expiryTimeInNanos;
27+
}
28+
29+
public long getExpiryTimeInNanos() {
30+
return this.expiryTimeInNanos;
31+
}
32+
33+
@Override
34+
public Promise<Channel> setSuccess(Channel result) {
35+
return this.channelPromise.setSuccess(result);
36+
}
37+
38+
@Override
39+
public boolean trySuccess(Channel result) {
40+
return this.channelPromise.trySuccess(result);
41+
}
42+
43+
@Override
44+
public Promise<Channel> setFailure(Throwable cause) {
45+
return this.channelPromise.setFailure(cause);
46+
}
47+
48+
@Override
49+
public boolean tryFailure(Throwable cause) {
50+
return this.channelPromise.tryFailure(cause);
51+
}
52+
53+
@Override
54+
public boolean setUncancellable() {
55+
return this.channelPromise.setUncancellable();
56+
}
57+
58+
@Override
59+
public boolean isSuccess() {
60+
return this.channelPromise.isSuccess();
61+
}
62+
63+
@Override
64+
public boolean isCancellable() {
65+
return this.channelPromise.isCancellable();
66+
}
67+
68+
@Override
69+
public Throwable cause() {
70+
return this.channelPromise.cause();
71+
}
72+
73+
@Override
74+
public Promise<Channel> addListener(
75+
GenericFutureListener<? extends Future<? super Channel>> listener) {
76+
77+
return this.channelPromise.addListener(listener);
78+
}
79+
80+
@SafeVarargs
81+
@Override
82+
@SuppressWarnings("varargs")
83+
public final Promise<Channel> addListeners(
84+
GenericFutureListener<? extends Future<? super Channel>>... listeners) {
85+
86+
return this.channelPromise.addListeners(listeners);
87+
}
88+
89+
@Override
90+
public Promise<Channel> removeListener(
91+
GenericFutureListener<? extends Future<? super Channel>> listener) {
92+
93+
return this.channelPromise.removeListener(listener);
94+
}
95+
96+
@SafeVarargs
97+
@Override
98+
@SuppressWarnings("varargs")
99+
public final Promise<Channel> removeListeners(
100+
GenericFutureListener<? extends Future<? super Channel>>... listeners) {
101+
102+
return this.channelPromise.removeListeners(listeners);
103+
}
104+
105+
@Override
106+
public Promise<Channel> await() throws InterruptedException {
107+
return this.channelPromise.await();
108+
}
109+
110+
@Override
111+
public Promise<Channel> awaitUninterruptibly() {
112+
return this.channelPromise.awaitUninterruptibly();
113+
}
114+
115+
@Override
116+
public boolean await(long timeout, TimeUnit unit) throws InterruptedException {
117+
return this.channelPromise.await(timeout, unit);
118+
}
119+
120+
@Override
121+
public boolean await(long timeoutMillis) throws InterruptedException {
122+
return this.channelPromise.await(timeoutMillis);
123+
}
124+
125+
@Override
126+
public boolean awaitUninterruptibly(long timeout, TimeUnit unit) {
127+
return this.channelPromise.awaitUninterruptibly(timeout, unit);
128+
}
129+
130+
@Override
131+
public boolean awaitUninterruptibly(long timeoutMillis) {
132+
return this.channelPromise.awaitUninterruptibly(timeoutMillis);
133+
}
134+
135+
@Override
136+
public Channel getNow() {
137+
return this.channelPromise.getNow();
138+
}
139+
140+
@Override
141+
public boolean cancel(boolean mayInterruptIfRunning) {
142+
return this.channelPromise.cancel(mayInterruptIfRunning);
143+
}
144+
145+
@Override
146+
public boolean isCancelled() {
147+
return this.channelPromise.isCancelled();
148+
}
149+
150+
@Override
151+
public boolean isDone() {
152+
return this.channelPromise.isDone();
153+
}
154+
155+
@Override
156+
public Channel get() throws InterruptedException, ExecutionException {
157+
return this.channelPromise.get();
158+
}
159+
160+
@SuppressWarnings("NullableProblems")
161+
@Override
162+
public Channel get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
163+
return this.channelPromise.get(timeout, unit);
164+
}
165+
166+
@Override
167+
public Promise<Channel> sync() throws InterruptedException {
168+
return this.channelPromise.sync();
169+
}
170+
171+
@Override
172+
public Promise<Channel> syncUninterruptibly() {
173+
return this.channelPromise.syncUninterruptibly();
174+
}
175+
}

0 commit comments

Comments
 (0)