Skip to content

Commit 86e6d62

Browse files
Integrating cosmos diagnostics with open telemetry tracer (Azure#22202)
* Diagnostic integration with open telemetry * adding test case for query metrics in tracer * incremental change * Adding api to pass threshhold for diagnostics * Correcting Api name * Documentation improvement * resolving comments * Adding pkrange and pkrange id both for querymetrics key * resolving comments * Using new core addEvent api with context param * build error fix * resolving comments * fixing spot bugs * fixing test case * fixing test for emulator
1 parent 29b09c2 commit 86e6d62

17 files changed

+1035
-272
lines changed

sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/CosmosAsyncContainer.java

Lines changed: 16 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,6 @@
2828
import com.azure.cosmos.implementation.throughputControl.config.ThroughputControlGroupFactory;
2929
import com.azure.cosmos.implementation.throughputControl.config.GlobalThroughputControlGroup;
3030
import com.azure.cosmos.implementation.throughputControl.config.LocalThroughputControlGroup;
31-
import com.azure.cosmos.implementation.throughputControl.config.ThroughputControlGroupFactory;
3231
import com.azure.cosmos.models.CosmosChangeFeedRequestOptions;
3332
import com.azure.cosmos.models.CosmosConflictProperties;
3433
import com.azure.cosmos.models.CosmosContainerProperties;
@@ -302,7 +301,8 @@ private <T> Mono<CosmosItemResponse<T>> createItemInternal(T item, CosmosItemReq
302301
database.getClient(),
303302
ModelBridgeInternal.getConsistencyLevel(options),
304303
OperationType.Create,
305-
ResourceType.Document);
304+
ResourceType.Document,
305+
options.getThresholdForDiagnosticsOnTracer());
306306
}
307307

308308
private <T> Mono<CosmosItemResponse<T>> createItemInternal(T item, CosmosItemRequestOptions options) {
@@ -403,6 +403,7 @@ <T> CosmosPagedFlux<T> readAllItems(CosmosQueryRequestOptions options, Class<T>
403403
pagedFluxOptions.setTracerAndTelemetryInformation(this.readAllItemsSpanName, database.getId(),
404404
this.getId(), OperationType.ReadFeed, ResourceType.Document, this.getDatabase().getClient());
405405
setContinuationTokenAndMaxItemCount(pagedFluxOptions, options);
406+
pagedFluxOptions.setThresholdForDiagnosticsOnTracer(options.getThresholdForDiagnosticsOnTracer());
406407
return getDatabase().getDocClientWrapper().readDocuments(getLink(), options).map(
407408
response -> prepareFeedResponse(response, false, classType));
408409
});
@@ -546,6 +547,7 @@ <T> Function<CosmosPagedFluxOptions, Flux<FeedResponse<T>>> queryItemsInternalFu
546547
pagedFluxOptions.setTracerAndTelemetryInformation(spanName, database.getId(),
547548
this.getId(), OperationType.Query, ResourceType.Document, this.getDatabase().getClient());
548549
setContinuationTokenAndMaxItemCount(pagedFluxOptions, cosmosQueryRequestOptions);
550+
pagedFluxOptions.setThresholdForDiagnosticsOnTracer(cosmosQueryRequestOptions.getThresholdForDiagnosticsOnTracer());
549551

550552
return getDatabase().getDocClientWrapper()
551553
.queryDocuments(CosmosAsyncContainer.this.getLink(), sqlQuerySpec, cosmosQueryRequestOptions)
@@ -562,6 +564,7 @@ <T> Function<CosmosPagedFluxOptions, Flux<FeedResponse<T>>> queryItemsInternalFu
562564
pagedFluxOptions.setTracerAndTelemetryInformation(spanName, database.getId(),
563565
this.getId(), OperationType.Query, ResourceType.Document, this.getDatabase().getClient());
564566
setContinuationTokenAndMaxItemCount(pagedFluxOptions, cosmosQueryRequestOptions);
567+
pagedFluxOptions.setThresholdForDiagnosticsOnTracer(cosmosQueryRequestOptions.getThresholdForDiagnosticsOnTracer());
565568

566569
return sqlQuerySpecMono.flux()
567570
.flatMap(sqlQuerySpec -> getDatabase().getDocClientWrapper()
@@ -973,7 +976,7 @@ public <T> CosmosPagedFlux<T> readAllItems(
973976
Class<T> classType) {
974977
final CosmosQueryRequestOptions requestOptions = options == null ? new CosmosQueryRequestOptions() : options;
975978
requestOptions.setPartitionKey(partitionKey);
976-
979+
977980
return UtilBridgeInternal.createCosmosPagedFlux(pagedFluxOptions -> {
978981
pagedFluxOptions.setTracerAndTelemetryInformation(this.readAllItemsSpanName, database.getId(),
979982
this.getId(), OperationType.ReadFeed, ResourceType.Document, this.getDatabase().getClient());
@@ -1301,7 +1304,8 @@ private Mono<CosmosItemResponse<Object>> deleteItemInternal(
13011304
database.getClient(),
13021305
requestOptions.getConsistencyLevel(),
13031306
OperationType.Delete,
1304-
ResourceType.Document);
1307+
ResourceType.Document,
1308+
requestOptions.getThresholdForDiagnosticsOnTracer());
13051309
}
13061310

13071311
private <T> Mono<CosmosItemResponse<T>> replaceItemInternal(
@@ -1327,7 +1331,8 @@ private <T> Mono<CosmosItemResponse<T>> replaceItemInternal(
13271331
database.getClient(),
13281332
ModelBridgeInternal.getConsistencyLevel(options),
13291333
OperationType.Replace,
1330-
ResourceType.Document);
1334+
ResourceType.Document,
1335+
options.getThresholdForDiagnosticsOnTracer());
13311336
}
13321337

13331338
private <T> Mono<CosmosItemResponse<T>> patchItemInternal(
@@ -1354,7 +1359,8 @@ private <T> Mono<CosmosItemResponse<T>> patchItemInternal(
13541359
database.getClient(),
13551360
ModelBridgeInternal.getConsistencyLevel(options),
13561361
OperationType.Patch,
1357-
ResourceType.Document);
1362+
ResourceType.Document,
1363+
options.getThresholdForDiagnosticsOnTracer());
13581364
}
13591365

13601366
private <T> Mono<CosmosItemResponse<T>> upsertItemInternal(T item, CosmosItemRequestOptions options, Context context) {
@@ -1378,7 +1384,8 @@ private <T> Mono<CosmosItemResponse<T>> upsertItemInternal(T item, CosmosItemReq
13781384
database.getClient(),
13791385
ModelBridgeInternal.getConsistencyLevel(options),
13801386
OperationType.Upsert,
1381-
ResourceType.Document);
1387+
ResourceType.Document,
1388+
options.getThresholdForDiagnosticsOnTracer());
13821389
}
13831390

13841391
private <T> Mono<CosmosItemResponse<T>> readItemInternal(
@@ -1401,7 +1408,8 @@ private <T> Mono<CosmosItemResponse<T>> readItemInternal(
14011408
database.getClient(),
14021409
requestOptions.getConsistencyLevel(),
14031410
OperationType.Read,
1404-
ResourceType.Document);
1411+
ResourceType.Document,
1412+
requestOptions.getThresholdForDiagnosticsOnTracer());
14051413
}
14061414

14071415
Mono<CosmosContainerResponse> read(CosmosContainerRequestOptions options, Context context) {

sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/CosmosDiagnostics.java

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,7 @@ public Duration getDuration() {
8080

8181
/**
8282
* Regions contacted for this request
83+
*
8384
* @return set of regions contacted for this request
8485
*/
8586
@Beta(value = Beta.SinceVersion.V4_9_0, warningText = Beta.PREVIEW_SUBJECT_TO_CHANGE_WARNING)
@@ -102,7 +103,7 @@ void fillCosmosDiagnostics(ObjectNode parentNode, StringBuilder stringBuilder) {
102103
}
103104

104105
if (stringBuilder != null) {
105-
stringBuilder.append(USER_AGENT_KEY +"=").append(USER_AGENT).append(System.lineSeparator());
106+
stringBuilder.append(USER_AGENT_KEY + "=").append(USER_AGENT).append(System.lineSeparator());
106107
stringBuilder.append(feedResponseDiagnostics);
107108
}
108109
} else {
@@ -135,6 +136,15 @@ private AtomicBoolean isDiagnosticsCapturedInPagedFlux(){
135136
static {
136137
ImplementationBridgeHelpers.CosmosDiagnosticsHelper.setCosmosDiagnosticsAccessor(
137138
new ImplementationBridgeHelpers.CosmosDiagnosticsHelper.CosmosDiagnosticsAccessor() {
139+
@Override
140+
public FeedResponseDiagnostics getFeedResponseDiagnostics(CosmosDiagnostics cosmosDiagnostics) {
141+
if (cosmosDiagnostics != null) {
142+
return cosmosDiagnostics.getFeedResponseDiagnostics();
143+
}
144+
145+
return null;
146+
}
147+
138148
@Override
139149
public AtomicBoolean isDiagnosticsCapturedInPagedFlux(CosmosDiagnostics cosmosDiagnostics) {
140150
return cosmosDiagnostics.isDiagnosticsCapturedInPagedFlux();

sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/ClientSideRequestStatistics.java

Lines changed: 74 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@
22
// Licensed under the MIT License.
33
package com.azure.cosmos.implementation;
44

5-
import com.azure.cosmos.ConnectionMode;
65
import com.azure.cosmos.CosmosException;
76
import com.azure.cosmos.implementation.apachecommons.lang.StringUtils;
87
import com.azure.cosmos.implementation.cpu.CpuMemoryMonitor;
@@ -30,7 +29,6 @@
3029
@JsonSerialize(using = ClientSideRequestStatistics.ClientSideRequestStatisticsSerializer.class)
3130
public class ClientSideRequestStatistics {
3231
private static final int MAX_SUPPLEMENTAL_REQUESTS_FOR_TO_STRING = 10;
33-
private final DiagnosticsClientContext clientContext;
3432
private final DiagnosticsClientContext diagnosticsClientContext;
3533

3634
private List<StoreResponseStatistics> responseStatisticsList;
@@ -50,7 +48,6 @@ public class ClientSideRequestStatistics {
5048

5149
public ClientSideRequestStatistics(DiagnosticsClientContext diagnosticsClientContext) {
5250
this.diagnosticsClientContext = diagnosticsClientContext;
53-
this.clientContext = null;
5451
this.requestStartTimeUTC = Instant.now();
5552
this.requestEndTimeUTC = Instant.now();
5653
this.responseStatisticsList = new ArrayList<>();
@@ -68,6 +65,14 @@ public Duration getDuration() {
6865
return Duration.between(requestStartTimeUTC, requestEndTimeUTC);
6966
}
7067

68+
public Instant getRequestStartTimeUTC() {
69+
return requestStartTimeUTC;
70+
}
71+
72+
public DiagnosticsClientContext getDiagnosticsClientContext() {
73+
return diagnosticsClientContext;
74+
}
75+
7176
public void recordResponse(RxDocumentServiceRequest request, StoreResult storeResult) {
7277
Objects.requireNonNull(request, "request is required and cannot be null.");
7378
Instant responseTime = Instant.now();
@@ -135,6 +140,7 @@ public void recordGatewayResponse(
135140
this.gatewayStatistics.requestCharge = storeResponse
136141
.getHeaderValue(HttpConstants.HttpHeaders.REQUEST_CHARGE);
137142
this.gatewayStatistics.requestTimeline = DirectBridgeInternal.getRequestTimeline(storeResponse);
143+
this.gatewayStatistics.partitionKeyRangeId = storeResponse.getPartitionKeyRangeId();
138144
} else if (exception != null) {
139145
this.gatewayStatistics.statusCode = exception.getStatusCode();
140146
this.gatewayStatistics.subStatusCode = exception.getSubStatusCode();
@@ -225,6 +231,22 @@ public RetryContext getRetryContext() {
225231
return retryContext;
226232
}
227233

234+
public List<StoreResponseStatistics> getResponseStatisticsList() {
235+
return responseStatisticsList;
236+
}
237+
238+
public List<StoreResponseStatistics> getSupplementalResponseStatisticsList() {
239+
return supplementalResponseStatisticsList;
240+
}
241+
242+
public Map<String, AddressResolutionStatistics> getAddressResolutionStatistics() {
243+
return addressResolutionStatistics;
244+
}
245+
246+
public GatewayStatistics getGatewayStatistics() {
247+
return gatewayStatistics;
248+
}
249+
228250
public static class StoreResponseStatistics {
229251
@JsonSerialize(using = StoreResult.StoreResultSerializer.class)
230252
StoreResult storeResult;
@@ -234,6 +256,14 @@ public static class StoreResponseStatistics {
234256
ResourceType requestResourceType;
235257
@JsonSerialize
236258
OperationType requestOperationType;
259+
260+
public StoreResult getStoreResult() {
261+
return storeResult;
262+
}
263+
264+
public Instant getRequestResponseTimeUTC() {
265+
return requestResponseTimeUTC;
266+
}
237267
}
238268

239269
private static class SystemInformation {
@@ -278,20 +308,7 @@ public void serialize(
278308
generator.writeStringField("requestStartTimeUTC", DiagnosticsInstantSerializer.fromInstant(statistics.requestStartTimeUTC));
279309
generator.writeStringField("requestEndTimeUTC", DiagnosticsInstantSerializer.fromInstant(statistics.requestEndTimeUTC));
280310
generator.writeObjectField("responseStatisticsList", statistics.responseStatisticsList);
281-
int supplementalResponseStatisticsListCount = statistics.supplementalResponseStatisticsList.size();
282-
int initialIndex =
283-
Math.max(supplementalResponseStatisticsListCount - MAX_SUPPLEMENTAL_REQUESTS_FOR_TO_STRING, 0);
284-
if (initialIndex != 0) {
285-
List<StoreResponseStatistics> subList = statistics.supplementalResponseStatisticsList
286-
.subList(initialIndex,
287-
supplementalResponseStatisticsListCount);
288-
generator.writeObjectField("supplementalResponseStatisticsList", subList);
289-
} else {
290-
generator
291-
.writeObjectField("supplementalResponseStatisticsList",
292-
statistics.supplementalResponseStatisticsList);
293-
}
294-
311+
generator.writeObjectField("supplementalResponseStatisticsList", getCappedSupplementalResponseStatisticsList(statistics.supplementalResponseStatisticsList));
295312
generator.writeObjectField("addressResolutionStatistics", statistics.addressResolutionStatistics);
296313
generator.writeObjectField("regionsContacted", statistics.regionsContacted);
297314
generator.writeObjectField("retryContext", statistics.retryContext);
@@ -300,17 +317,7 @@ public void serialize(
300317
generator.writeObjectField("gatewayStatistics", statistics.gatewayStatistics);
301318

302319
try {
303-
SystemInformation systemInformation = new SystemInformation();
304-
Runtime runtime = Runtime.getRuntime();
305-
long totalMemory = runtime.totalMemory() / 1024;
306-
long freeMemory = runtime.freeMemory() / 1024;
307-
long maxMemory = runtime.maxMemory() / 1024;
308-
systemInformation.usedMemory = totalMemory - freeMemory + " KB";
309-
systemInformation.availableMemory = (maxMemory - (totalMemory - freeMemory)) + " KB";
310-
systemInformation.availableProcessors = runtime.availableProcessors();
311-
312-
// TODO: other system related info also can be captured using a similar approach
313-
systemInformation.systemCpuLoad = CpuMemoryMonitor.getCpuLoad().toString();
320+
SystemInformation systemInformation = fetchSystemInformation();
314321
generator.writeObjectField("systemInformation", systemInformation);
315322
} catch (Exception e) {
316323
// Error while evaluating system information, do nothing
@@ -321,7 +328,20 @@ public void serialize(
321328
}
322329
}
323330

324-
private static class AddressResolutionStatistics {
331+
public static List<StoreResponseStatistics> getCappedSupplementalResponseStatisticsList(List<StoreResponseStatistics> supplementalResponseStatisticsList) {
332+
int supplementalResponseStatisticsListCount = supplementalResponseStatisticsList.size();
333+
int initialIndex =
334+
Math.max(supplementalResponseStatisticsListCount - MAX_SUPPLEMENTAL_REQUESTS_FOR_TO_STRING, 0);
335+
if (initialIndex != 0) {
336+
List<StoreResponseStatistics> subList = supplementalResponseStatisticsList
337+
.subList(initialIndex,
338+
supplementalResponseStatisticsListCount);
339+
return subList;
340+
}
341+
return supplementalResponseStatisticsList;
342+
}
343+
344+
public static class AddressResolutionStatistics {
325345
@JsonSerialize(using = DiagnosticsInstantSerializer.class)
326346
Instant startTimeUTC;
327347
@JsonSerialize(using = DiagnosticsInstantSerializer.class)
@@ -336,16 +356,21 @@ private static class AddressResolutionStatistics {
336356
// indicating background addressResolution is still inflight
337357
@JsonSerialize
338358
boolean inflightRequest = true;
359+
360+
public Instant getStartTimeUTC() {
361+
return startTimeUTC;
362+
}
339363
}
340364

341-
private static class GatewayStatistics {
365+
public static class GatewayStatistics {
342366
String sessionToken;
343367
OperationType operationType;
344368
ResourceType resourceType;
345369
int statusCode;
346370
int subStatusCode;
347371
String requestCharge;
348372
RequestTimeline requestTimeline;
373+
String partitionKeyRangeId;
349374

350375
public String getSessionToken() {
351376
return sessionToken;
@@ -374,5 +399,24 @@ public RequestTimeline getRequestTimeline() {
374399
public ResourceType getResourceType() {
375400
return resourceType;
376401
}
402+
403+
public String getPartitionKeyRangeId() {
404+
return partitionKeyRangeId;
405+
}
406+
}
407+
408+
public static SystemInformation fetchSystemInformation() {
409+
SystemInformation systemInformation = new SystemInformation();
410+
Runtime runtime = Runtime.getRuntime();
411+
long totalMemory = runtime.totalMemory() / 1024;
412+
long freeMemory = runtime.freeMemory() / 1024;
413+
long maxMemory = runtime.maxMemory() / 1024;
414+
systemInformation.usedMemory = totalMemory - freeMemory + " KB";
415+
systemInformation.availableMemory = (maxMemory - (totalMemory - freeMemory)) + " KB";
416+
systemInformation.availableProcessors = runtime.availableProcessors();
417+
418+
// TODO: other system related info also can be captured using a similar approach
419+
systemInformation.systemCpuLoad = CpuMemoryMonitor.getCpuLoad().toString();
420+
return systemInformation;
377421
}
378422
}

sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/CosmosPagedFluxOptions.java

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@
77
import com.azure.cosmos.CosmosAsyncClient;
88
import com.azure.cosmos.util.CosmosPagedFlux;
99

10+
import java.time.Duration;
11+
1012
/**
1113
* Specifies paging options for Cosmos Paged Flux implementation.
1214
* @see CosmosPagedFlux
@@ -23,6 +25,7 @@ public class CosmosPagedFluxOptions {
2325
private ResourceType resourceType;
2426
private String serviceEndpoint;
2527
private CosmosAsyncClient cosmosAsyncClient;
28+
private Duration thresholdForDiagnosticsOnTracer;
2629

2730
public CosmosPagedFluxOptions() {}
2831

@@ -132,6 +135,30 @@ public String getServiceEndpoint() {
132135
return serviceEndpoint;
133136
}
134137

138+
/**
139+
* Gets the thresholdForDiagnosticsOnTracer, if latency on query operation is greater than this
140+
* diagnostics will be send to open telemetry exporter as events in tracer span of end to end CRUD api.
141+
*
142+
* Default is 500 ms.
143+
*
144+
* @return thresholdForDiagnosticsOnTracer the latency threshold for diagnostics on tracer.
145+
*/
146+
public Duration getThresholdForDiagnosticsOnTracer() {
147+
return thresholdForDiagnosticsOnTracer;
148+
}
149+
150+
/**
151+
* Sets the thresholdForDiagnosticsOnTracer, if latency on query operation is greater than this
152+
* diagnostics will be send to open telemetry exporter as events in tracer span of end to end CRUD api.
153+
*
154+
* Default is 500 ms.
155+
*
156+
* @param thresholdForDiagnosticsOnTracer the latency threshold for diagnostics on tracer.
157+
*/
158+
public void setThresholdForDiagnosticsOnTracer(Duration thresholdForDiagnosticsOnTracer) {
159+
this.thresholdForDiagnosticsOnTracer = thresholdForDiagnosticsOnTracer;
160+
}
161+
135162
public void setTracerInformation(TracerProvider tracerProvider, String tracerSpanName, String serviceEndpoint, String databaseId) {
136163
this.databaseId = databaseId;
137164
this.serviceEndpoint = serviceEndpoint;

sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/FeedResponseDiagnostics.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ public FeedResponseDiagnostics(Map<String, QueryMetrics> queryMetricsMap) {
3333
this.clientSideRequestStatisticsList = Collections.synchronizedList(new ArrayList<>());
3434
}
3535

36-
Map<String, QueryMetrics> getQueryMetricsMap() {
36+
public Map<String, QueryMetrics> getQueryMetricsMap() {
3737
return queryMetricsMap;
3838
}
3939

@@ -88,6 +88,10 @@ public void setDiagnosticsContext(QueryInfo.QueryPlanDiagnosticsContext diagnost
8888
this.diagnosticsContext = diagnosticsContext;
8989
}
9090

91+
public QueryInfo.QueryPlanDiagnosticsContext getQueryPlanDiagnosticsContext() {
92+
return diagnosticsContext;
93+
}
94+
9195
/**
9296
* Getter for property 'clientSideRequestStatisticsList'.
9397
*

0 commit comments

Comments
 (0)