Skip to content

Commit 92fa05b

Browse files
Adding reactor netty timeline to query plan calls (Azure#22817)
* Adding reactor timeline to query plan * Changing gatewayRequestTimeline variable name in ClientSideRequestStatistics * decoupling time connection created and connection aquired * Adding request time line check in test case
1 parent 384f6ce commit 92fa05b

File tree

12 files changed

+146
-40
lines changed

12 files changed

+146
-40
lines changed

sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/BridgeInternal.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -539,9 +539,9 @@ public static ClientSideRequestStatistics getClientSideRequestStatics(CosmosDiag
539539
}
540540

541541
@Warning(value = INTERNAL_USE_ONLY_WARNING)
542-
public static void setTransportClientRequestTimelineOnDiagnostics(CosmosDiagnostics cosmosDiagnostics,
543-
RequestTimeline requestTimeline) {
544-
cosmosDiagnostics.clientSideRequestStatistics().setTransportClientRequestTimeline(requestTimeline);
542+
public static void setGatewayRequestTimelineOnDiagnostics(CosmosDiagnostics cosmosDiagnostics,
543+
RequestTimeline requestTimeline) {
544+
cosmosDiagnostics.clientSideRequestStatistics().setGatewayRequestTimeline(requestTimeline);
545545
}
546546

547547
@Warning(value = INTERNAL_USE_ONLY_WARNING)

sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/ClientSideRequestStatistics.java

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ public class ClientSideRequestStatistics {
4242
private Set<URI> regionsContacted;
4343
private RetryContext retryContext;
4444
private GatewayStatistics gatewayStatistics;
45-
private RequestTimeline transportRequestTimeline;
45+
private RequestTimeline gatewayRequestTimeline;
4646
private MetadataDiagnosticsContext metadataDiagnosticsContext;
4747
private SerializationDiagnosticsContext serializationDiagnosticsContext;
4848

@@ -144,13 +144,17 @@ public void recordGatewayResponse(
144144
} else if (exception != null) {
145145
this.gatewayStatistics.statusCode = exception.getStatusCode();
146146
this.gatewayStatistics.subStatusCode = exception.getSubStatusCode();
147-
this.gatewayStatistics.requestTimeline = this.transportRequestTimeline;
147+
this.gatewayStatistics.requestTimeline = this.gatewayRequestTimeline;
148148
}
149149
}
150150
}
151151

152-
public void setTransportClientRequestTimeline(RequestTimeline transportRequestTimeline) {
153-
this.transportRequestTimeline = transportRequestTimeline;
152+
public void setGatewayRequestTimeline(RequestTimeline transportRequestTimeline) {
153+
this.gatewayRequestTimeline = transportRequestTimeline;
154+
}
155+
156+
public RequestTimeline getGatewayRequestTimeline() {
157+
return this.gatewayRequestTimeline;
154158
}
155159

156160
public String recordAddressResolutionStart(URI targetEndpoint) {

sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/FeedResponseDiagnostics.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,17 @@ public String toString() {
6565
.append(EQUALS)
6666
.append(Duration.between(diagnosticsContext.getStartTimeUTC(),
6767
diagnosticsContext.getEndTimeUTC()).toMillis()).append(System.lineSeparator());
68+
if (diagnosticsContext.getRequestTimeline() != null) {
69+
try {
70+
stringBuilder.append(QUERY_PLAN + SPACE + "RequestTimeline ")
71+
.append(EQUALS)
72+
.append(Utils.getSimpleObjectMapper().writeValueAsString(diagnosticsContext.getRequestTimeline()))
73+
.append(System.lineSeparator())
74+
.append(System.lineSeparator());
75+
} catch (JsonProcessingException e) {
76+
LOGGER.error("Error while parsing diagnostics ", e);
77+
}
78+
}
6879
}
6980
}
7081

sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RxDocumentServiceResponse.java

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@
88
import com.azure.cosmos.implementation.apachecommons.lang.StringUtils;
99
import com.azure.cosmos.implementation.directconnectivity.Address;
1010
import com.azure.cosmos.implementation.directconnectivity.StoreResponse;
11-
import com.azure.cosmos.models.ModelBridgeInternal;
1211
import com.fasterxml.jackson.databind.JsonNode;
1312
import com.fasterxml.jackson.databind.node.ArrayNode;
1413
import com.fasterxml.jackson.databind.node.ObjectNode;
@@ -29,6 +28,7 @@ public class RxDocumentServiceResponse {
2928
private final int statusCode;
3029
private final Map<String, String> headersMap;
3130
private final StoreResponse storeResponse;
31+
private RequestTimeline gatewayHttpRequestTimeline;
3232

3333
public RxDocumentServiceResponse(DiagnosticsClientContext diagnosticsClientContext, StoreResponse response) {
3434
String[] headerNames = response.getResponseHeaderNames();
@@ -48,6 +48,12 @@ public RxDocumentServiceResponse(DiagnosticsClientContext diagnosticsClientConte
4848
this.diagnosticsClientContext = diagnosticsClientContext;
4949
}
5050

51+
public RxDocumentServiceResponse(DiagnosticsClientContext diagnosticsClientContext, StoreResponse response,
52+
RequestTimeline gatewayHttpRequestTimeline) {
53+
this(diagnosticsClientContext, response);
54+
this.gatewayHttpRequestTimeline = gatewayHttpRequestTimeline;
55+
}
56+
5157
public static <T extends Resource> String getResourceKey(Class<T> c) {
5258
if (c.equals(Conflict.class)) {
5359
return InternalConstants.ResourceKeys.CONFLICTS;
@@ -96,6 +102,10 @@ public String getResponseBodyAsString() {
96102
return Utils.utf8StringFromOrNull(this.getResponseBodyAsByteArray());
97103
}
98104

105+
public RequestTimeline getGatewayHttpRequestTimeline() {
106+
return gatewayHttpRequestTimeline;
107+
}
108+
99109
public <T extends Resource> T getResource(Class<T> c) {
100110
String responseBody = this.getResponseBodyAsString();
101111
if (StringUtils.isEmpty(responseBody))

sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RxGatewayStoreModel.java

Lines changed: 18 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -186,8 +186,7 @@ public Mono<RxDocumentServiceResponse> performRequestInternal(RxDocumentServiceR
186186
}
187187

188188
Mono<HttpResponse> httpResponseMono = this.httpClient.send(httpRequest, responseTimeout);
189-
190-
return toDocumentServiceResponse(httpResponseMono, request);
189+
return toDocumentServiceResponse(httpResponseMono, request, httpRequest);
191190

192191
} catch (Exception e) {
193192
return Mono.error(e);
@@ -267,7 +266,8 @@ private String ensureSlashPrefixed(String path) {
267266
* @return {@link Mono}
268267
*/
269268
private Mono<RxDocumentServiceResponse> toDocumentServiceResponse(Mono<HttpResponse> httpResponseMono,
270-
RxDocumentServiceRequest request) {
269+
RxDocumentServiceRequest request,
270+
HttpRequest httpRequest) {
271271

272272
return httpResponseMono.flatMap(httpResponse -> {
273273

@@ -285,7 +285,7 @@ private Mono<RxDocumentServiceResponse> toDocumentServiceResponse(Mono<HttpRespo
285285
ReactorNettyRequestRecord reactorNettyRequestRecord = httpResponse.request().reactorNettyRequestRecord();
286286
if (reactorNettyRequestRecord != null) {
287287
reactorNettyRequestRecord.setTimeCompleted(Instant.now());
288-
BridgeInternal.setTransportClientRequestTimelineOnDiagnostics(request.requestContext.cosmosDiagnostics,
288+
BridgeInternal.setGatewayRequestTimelineOnDiagnostics(request.requestContext.cosmosDiagnostics,
289289
reactorNettyRequestRecord.takeTimelineSnapshot());
290290
}
291291

@@ -306,8 +306,15 @@ private Mono<RxDocumentServiceResponse> toDocumentServiceResponse(Mono<HttpRespo
306306
})
307307
.single();
308308

309-
}).map(rsp -> new RxDocumentServiceResponse(this.clientContext, rsp))
310-
.onErrorResume(throwable -> {
309+
}).map(rsp -> {
310+
if (httpRequest.reactorNettyRequestRecord() != null) {
311+
return new RxDocumentServiceResponse(this.clientContext, rsp,
312+
httpRequest.reactorNettyRequestRecord().takeTimelineSnapshot());
313+
314+
} else {
315+
return new RxDocumentServiceResponse(this.clientContext, rsp);
316+
}
317+
}).onErrorResume(throwable -> {
311318
Throwable unwrappedException = reactor.core.Exceptions.unwrap(throwable);
312319
if (!(unwrappedException instanceof Exception)) {
313320
// fatal error
@@ -335,6 +342,11 @@ private Mono<RxDocumentServiceResponse> toDocumentServiceResponse(Mono<HttpRespo
335342
}
336343

337344
if (request.requestContext.cosmosDiagnostics != null) {
345+
if (BridgeInternal.getClientSideRequestStatics(request.requestContext.cosmosDiagnostics).getGatewayRequestTimeline() == null && httpRequest.reactorNettyRequestRecord() != null) {
346+
BridgeInternal.setGatewayRequestTimelineOnDiagnostics(request.requestContext.cosmosDiagnostics,
347+
httpRequest.reactorNettyRequestRecord().takeTimelineSnapshot());
348+
}
349+
338350
BridgeInternal.recordGatewayResponse(request.requestContext.cosmosDiagnostics, request, null, dce);
339351
BridgeInternal.setCosmosDiagnostics(dce, request.requestContext.cosmosDiagnostics);
340352
}

sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/http/ReactorNettyClient.java

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -219,7 +219,7 @@ private static ConnectionObserver getConnectionObserver() {
219219
return (conn, state) -> {
220220
Instant time = Instant.now();
221221

222-
if (state.equals(HttpClientState.CONNECTED) || state.equals(HttpClientState.ACQUIRED)) {
222+
if (state.equals(HttpClientState.CONNECTED)) {
223223
if (conn instanceof ConnectionObserver) {
224224
ConnectionObserver observer = (ConnectionObserver) conn;
225225
ReactorNettyRequestRecord requestRecord =
@@ -229,6 +229,16 @@ private static ConnectionObserver getConnectionObserver() {
229229
}
230230
requestRecord.setTimeConnected(time);
231231
}
232+
}else if (state.equals(HttpClientState.ACQUIRED)) {
233+
if (conn instanceof ConnectionObserver) {
234+
ConnectionObserver observer = (ConnectionObserver) conn;
235+
ReactorNettyRequestRecord requestRecord =
236+
observer.currentContext().getOrDefault(REACTOR_NETTY_REQUEST_RECORD_KEY, null);
237+
if (requestRecord == null) {
238+
throw new IllegalStateException("ReactorNettyRequestRecord not found in context");
239+
}
240+
requestRecord.setTimeAcquired(time);
241+
}
232242
} else if (state.equals(HttpClientState.CONFIGURED)) {
233243
if (conn instanceof HttpClientRequest) {
234244
HttpClientRequest httpClientRequest = (HttpClientRequest) conn;

sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/http/ReactorNettyRequestRecord.java

Lines changed: 43 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ public final class ReactorNettyRequestRecord {
2525

2626
private volatile Instant timeCreated;
2727
private volatile Instant timeConnected;
28+
private volatile Instant timeAcquired;
2829
private volatile Instant timeConfigured;
2930
private volatile Instant timeSent;
3031
private volatile Instant timeReceived;
@@ -46,6 +47,14 @@ public Instant timeConnected() {
4647
return this.timeConnected;
4748
}
4849

50+
/**
51+
* Gets connection acquired instant.
52+
* @return timeAcquired
53+
*/
54+
public Instant timeAcquired() {
55+
return timeAcquired;
56+
}
57+
4958
/**
5059
* Get connection configured instant.
5160
* @return timeConfigured
@@ -94,6 +103,14 @@ public void setTimeConnected(Instant timeConnected) {
94103
this.timeConnected = timeConnected;
95104
}
96105

106+
/**
107+
* Sets connection acquired instant.
108+
* @param timeAcquired
109+
*/
110+
public void setTimeAcquired(Instant timeAcquired) {
111+
this.timeAcquired = timeAcquired;
112+
}
113+
97114
/**
98115
* Sets connection configured instant.
99116
* @param timeConfigured
@@ -135,23 +152,38 @@ public RequestTimeline takeTimelineSnapshot() {
135152
Instant now = Instant.now();
136153

137154
Instant timeCreated = this.timeCreated();
155+
Instant timeAcquired = this.timeAcquired();
138156
Instant timeConnected = this.timeConnected();
139157
Instant timeConfigured = this.timeConfigured();
140158
Instant timeSent = this.timeSent();
141159
Instant timeReceived = this.timeReceived();
142160
Instant timeCompleted = this.timeCompleted();
143161
Instant timeCompletedOrNow = timeCompleted == null ? now : timeCompleted;
144162

145-
return RequestTimeline.of(
146-
new RequestTimeline.Event("connectionCreated",
147-
timeCreated, timeConnected() == null ? timeCompletedOrNow : timeConnected),
148-
new RequestTimeline.Event("connectionConfigured",
149-
timeConnected, timeConfigured == null ? timeCompletedOrNow : timeConfigured),
150-
new RequestTimeline.Event("requestSent",
151-
timeConfigured, timeSent == null ? timeCompletedOrNow : timeSent),
152-
new RequestTimeline.Event("transitTime",
153-
timeSent, timeReceived == null ? timeCompletedOrNow : timeReceived),
154-
new RequestTimeline.Event("received",
155-
timeReceived, timeCompletedOrNow));
163+
if (this.timeConnected() != null) {
164+
return RequestTimeline.of(
165+
new RequestTimeline.Event("connectionCreated",
166+
timeCreated, timeConnected() == null ? timeCompletedOrNow : timeConnected),
167+
new RequestTimeline.Event("connectionConfigured",
168+
timeConnected, timeConfigured == null ? timeCompletedOrNow : timeConfigured),
169+
new RequestTimeline.Event("requestSent",
170+
timeConfigured, timeSent == null ? timeCompletedOrNow : timeSent),
171+
new RequestTimeline.Event("transitTime",
172+
timeSent, timeReceived == null ? timeCompletedOrNow : timeReceived),
173+
new RequestTimeline.Event("received",
174+
timeReceived, timeCompletedOrNow));
175+
} else {
176+
return RequestTimeline.of(
177+
new RequestTimeline.Event("connectionAcquired",
178+
timeCreated, timeAcquired() == null ? timeCompletedOrNow : timeAcquired),
179+
new RequestTimeline.Event("connectionConfigured",
180+
timeAcquired, timeConfigured == null ? timeCompletedOrNow : timeConfigured),
181+
new RequestTimeline.Event("requestSent",
182+
timeConfigured, timeSent == null ? timeCompletedOrNow : timeSent),
183+
new RequestTimeline.Event("transitTime",
184+
timeSent, timeReceived == null ? timeCompletedOrNow : timeReceived),
185+
new RequestTimeline.Event("received",
186+
timeReceived, timeCompletedOrNow));
187+
}
156188
}
157189
}

sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/query/DocumentQueryExecutionContextFactory.java

Lines changed: 12 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -2,28 +2,28 @@
22
// Licensed under the MIT License.
33
package com.azure.cosmos.implementation.query;
44

5-
import com.azure.cosmos.implementation.BadRequestException;
65
import com.azure.cosmos.BridgeInternal;
6+
import com.azure.cosmos.implementation.BadRequestException;
77
import com.azure.cosmos.implementation.DiagnosticsClientContext;
8-
import com.azure.cosmos.implementation.apachecommons.lang.tuple.Pair;
9-
import com.azure.cosmos.implementation.feedranges.FeedRangeEpkImpl;
10-
import com.azure.cosmos.implementation.feedranges.FeedRangeInternal;
11-
import com.azure.cosmos.models.CosmosQueryRequestOptions;
12-
import com.azure.cosmos.models.FeedRange;
13-
import com.azure.cosmos.models.PartitionKey;
14-
import com.azure.cosmos.implementation.Resource;
15-
import com.azure.cosmos.models.SqlQuerySpec;
168
import com.azure.cosmos.implementation.DocumentCollection;
179
import com.azure.cosmos.implementation.OperationType;
1810
import com.azure.cosmos.implementation.PartitionKeyRange;
11+
import com.azure.cosmos.implementation.Resource;
1912
import com.azure.cosmos.implementation.ResourceType;
2013
import com.azure.cosmos.implementation.RxDocumentServiceRequest;
2114
import com.azure.cosmos.implementation.Utils;
15+
import com.azure.cosmos.implementation.apachecommons.lang.StringUtils;
16+
import com.azure.cosmos.implementation.apachecommons.lang.tuple.Pair;
2217
import com.azure.cosmos.implementation.caches.RxCollectionCache;
18+
import com.azure.cosmos.implementation.feedranges.FeedRangeEpkImpl;
19+
import com.azure.cosmos.implementation.feedranges.FeedRangeInternal;
2320
import com.azure.cosmos.implementation.routing.PartitionKeyInternal;
2421
import com.azure.cosmos.implementation.routing.Range;
22+
import com.azure.cosmos.models.CosmosQueryRequestOptions;
23+
import com.azure.cosmos.models.FeedRange;
2524
import com.azure.cosmos.models.ModelBridgeInternal;
26-
import com.azure.cosmos.implementation.apachecommons.lang.StringUtils;
25+
import com.azure.cosmos.models.PartitionKey;
26+
import com.azure.cosmos.models.SqlQuerySpec;
2727
import org.slf4j.Logger;
2828
import org.slf4j.LoggerFactory;
2929
import reactor.core.publisher.Flux;
@@ -134,7 +134,8 @@ private static <T extends Resource> Mono<Pair<List<Range<String>>, QueryInfo>> g
134134
QueryInfo queryInfo =
135135
partitionedQueryExecutionInfo.getQueryInfo();
136136
queryInfo.setQueryPlanDiagnosticsContext(new QueryInfo.QueryPlanDiagnosticsContext(planFetchStartTime,
137-
planFetchEndTime));
137+
planFetchEndTime,
138+
partitionedQueryExecutionInfo.getQueryPlanRequestTimeline()));
138139
List<Range<String>> queryRanges =
139140
partitionedQueryExecutionInfo.getQueryRanges();
140141

sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/query/PartitionedQueryExecutionInfo.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33

44
package com.azure.cosmos.implementation.query;
55

6+
import com.azure.cosmos.implementation.RequestTimeline;
67
import com.azure.cosmos.implementation.routing.Range;
78
import com.azure.cosmos.BridgeInternal;
89
import com.azure.cosmos.implementation.JsonSerializable;
@@ -20,6 +21,7 @@ public final class PartitionedQueryExecutionInfo extends JsonSerializable {
2021

2122
private QueryInfo queryInfo;
2223
private List<Range<String>> queryRanges;
24+
private RequestTimeline queryPlanRequestTimeline;
2325

2426
PartitionedQueryExecutionInfo(QueryInfo queryInfo, List<Range<String>> queryRanges) {
2527
this.queryInfo = queryInfo;
@@ -30,8 +32,9 @@ public final class PartitionedQueryExecutionInfo extends JsonSerializable {
3032
Constants.PartitionedQueryExecutionInfo.VERSION_1);
3133
}
3234

33-
public PartitionedQueryExecutionInfo(byte[] bytes) {
35+
public PartitionedQueryExecutionInfo(byte[] bytes, RequestTimeline queryPlanRequestTimeline) {
3436
super(bytes);
37+
this.queryPlanRequestTimeline = queryPlanRequestTimeline;
3538
}
3639

3740
public PartitionedQueryExecutionInfo(String jsonString) {
@@ -54,6 +57,10 @@ public List<Range<String>> getQueryRanges() {
5457
PartitionedQueryExecutionInfoInternal.QUERY_RANGES_PROPERTY, QUERY_RANGES_CLASS));
5558
}
5659

60+
public RequestTimeline getQueryPlanRequestTimeline() {
61+
return queryPlanRequestTimeline;
62+
}
63+
5764
@Override
5865
public boolean equals(Object o) {
5966
return super.equals(o);

sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/query/QueryInfo.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33

44
package com.azure.cosmos.implementation.query;
55

6+
import com.azure.cosmos.implementation.RequestTimeline;
67
import com.azure.cosmos.implementation.DiagnosticsInstantSerializer;
78
import com.azure.cosmos.implementation.query.aggregation.AggregateOperator;
89
import com.azure.cosmos.implementation.JsonSerializable;
@@ -178,18 +179,29 @@ public static final class QueryPlanDiagnosticsContext {
178179
private volatile Instant startTimeUTC;
179180
@JsonSerialize(using = DiagnosticsInstantSerializer.class)
180181
private volatile Instant endTimeUTC;
182+
private volatile RequestTimeline requestTimeline;
181183
public QueryPlanDiagnosticsContext(Instant startTimeUTC, Instant endTimeUTC) {
182184
this.startTimeUTC = startTimeUTC;
183185
this.endTimeUTC = endTimeUTC;
184186
}
185187

188+
public QueryPlanDiagnosticsContext(Instant startTimeUTC, Instant endTimeUTC, RequestTimeline requestTimeline) {
189+
this.startTimeUTC = startTimeUTC;
190+
this.endTimeUTC = endTimeUTC;
191+
this.requestTimeline = requestTimeline;
192+
}
193+
186194
public Instant getStartTimeUTC() {
187195
return startTimeUTC;
188196
}
189197

190198
public Instant getEndTimeUTC() {
191199
return endTimeUTC;
192200
}
201+
202+
public RequestTimeline getRequestTimeline() {
203+
return requestTimeline;
204+
}
193205
}
194206

195207
@Override

0 commit comments

Comments
 (0)