Skip to content

Commit e172230

Browse files
author
Liudmila Molkova
authored
Instrument sender and change checkpoint counter to duration (Azure#32813)
* Instrument sender and change chckpoint counter to duration
1 parent 513712d commit e172230

File tree

5 files changed

+92
-62
lines changed

5 files changed

+92
-62
lines changed

sdk/eventhubs/azure-messaging-eventhubs-checkpointstore-blob/CHANGELOG.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,9 @@
66

77
### Breaking Changes
88

9+
- Remove `messaging.eventhubs.checkpoints` counter and replace it with `messaging.eventhubs.checkpoint.duration`
10+
histogram that can be used to count checkpoint calls.
11+
912
### Bugs Fixed
1013

1114
### Other Changes

sdk/eventhubs/azure-messaging-eventhubs-checkpointstore-blob/src/main/java/com/azure/messaging/eventhubs/checkpointstore/blob/BlobCheckpointStore.java

Lines changed: 19 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -24,11 +24,13 @@
2424
import reactor.core.publisher.Mono;
2525

2626
import java.nio.ByteBuffer;
27+
import java.time.Instant;
2728
import java.util.HashMap;
2829
import java.util.List;
2930
import java.util.Map;
3031
import java.util.Objects;
3132
import java.util.concurrent.ConcurrentHashMap;
33+
import java.util.concurrent.atomic.AtomicReference;
3234
import java.util.function.Function;
3335

3436
import static java.nio.charset.StandardCharsets.UTF_8;
@@ -265,19 +267,30 @@ public Mono<Void> updateCheckpoint(Checkpoint checkpoint) {
265267
metadata.put(OFFSET, offset);
266268
BlobAsyncClient blobAsyncClient = blobClients.get(blobName);
267269

268-
return blobAsyncClient.exists().flatMap(exists -> {
270+
Mono<Void> response = blobAsyncClient.exists().flatMap(exists -> {
269271
if (exists) {
270272
return blobAsyncClient.setMetadata(metadata);
271273
} else {
272274
return blobAsyncClient.getBlockBlobAsyncClient().uploadWithResponse(Flux.just(UPLOAD_DATA), 0, null,
273275
metadata, null, null, null).then();
274276
}
275-
})
276-
.doOnEach(signal -> {
277-
if (signal.isOnComplete() || signal.isOnError()) {
278-
metricsHelper.reportCheckpoint(checkpoint, blobName, !signal.hasError());
279-
}
280277
});
278+
return reportMetrics(response, checkpoint, blobName);
279+
}
280+
281+
private Mono<Void> reportMetrics(Mono<Void> checkpointMono, Checkpoint checkpoint, String blobName) {
282+
AtomicReference<Instant> startTime = metricsHelper.isCheckpointDurationEnabled() ? new AtomicReference<>() : null;
283+
return checkpointMono
284+
.doOnEach(signal -> {
285+
if (signal.isOnComplete() || signal.isOnError()) {
286+
metricsHelper.reportCheckpoint(checkpoint, blobName, !signal.hasError(), startTime != null ? startTime.get() : null);
287+
}
288+
})
289+
.doOnSubscribe(ignored -> {
290+
if (startTime != null) {
291+
startTime.set(Instant.now());
292+
}
293+
});
281294
}
282295

283296
private String getBlobPrefix(String fullyQualifiedNamespace, String eventHubName, String consumerGroupName,

sdk/eventhubs/azure-messaging-eventhubs-checkpointstore-blob/src/main/java/com/azure/messaging/eventhubs/checkpointstore/blob/MetricsHelper.java

Lines changed: 18 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -8,12 +8,14 @@
88
import com.azure.core.util.MetricsOptions;
99
import com.azure.core.util.TelemetryAttributes;
1010
import com.azure.core.util.logging.ClientLogger;
11-
import com.azure.core.util.metrics.LongCounter;
11+
import com.azure.core.util.metrics.DoubleHistogram;
1212
import com.azure.core.util.metrics.LongGauge;
1313
import com.azure.core.util.metrics.Meter;
1414
import com.azure.core.util.metrics.MeterProvider;
1515
import com.azure.messaging.eventhubs.models.Checkpoint;
1616

17+
import java.time.Duration;
18+
import java.time.Instant;
1719
import java.util.HashMap;
1820
import java.util.Map;
1921
import java.util.concurrent.ConcurrentHashMap;
@@ -58,7 +60,7 @@ final class MetricsHelper {
5860

5961
private final Meter meter;
6062
private final LongGauge lastSequenceNumber;
61-
private final LongCounter checkpointCounter;
63+
private final DoubleHistogram checkpointDuration;
6264
private final boolean isEnabled;
6365

6466
MetricsHelper(MetricsOptions metricsOptions, MeterProvider meterProvider) {
@@ -72,15 +74,19 @@ final class MetricsHelper {
7274

7375
if (isEnabled) {
7476
this.lastSequenceNumber = this.meter.createLongGauge("messaging.eventhubs.checkpoint.sequence_number", "Last successfully checkpointed sequence number.", "seqNo");
75-
this.checkpointCounter = this.meter.createLongCounter("messaging.eventhubs.checkpoints", "Number of checkpoints.", null);
77+
this.checkpointDuration = this.meter.createDoubleHistogram("messaging.eventhubs.checkpoint.duration", "Duration of checkpoint call.", "ms");
7678
} else {
7779
this.lastSequenceNumber = null;
78-
this.checkpointCounter = null;
80+
this.checkpointDuration = null;
7981
}
8082
}
8183

82-
void reportCheckpoint(Checkpoint checkpoint, String attributesId, boolean success) {
83-
if (!isEnabled || !(lastSequenceNumber.isEnabled() && checkpointCounter.isEnabled())) {
84+
boolean isCheckpointDurationEnabled() {
85+
return isEnabled && checkpointDuration.isEnabled();
86+
}
87+
88+
void reportCheckpoint(Checkpoint checkpoint, String attributesId, boolean success, Instant startTime) {
89+
if (!isEnabled || !(lastSequenceNumber.isEnabled() && checkpointDuration.isEnabled())) {
8490
return;
8591
}
8692

@@ -93,15 +99,18 @@ void reportCheckpoint(Checkpoint checkpoint, String attributesId, boolean succes
9399
updateCurrentValue(attributesId, checkpoint);
94100
}
95101

96-
if (checkpointCounter.isEnabled()) {
97-
TelemetryAttributes attributes = null;
102+
if (checkpointDuration.isEnabled()) {
103+
TelemetryAttributes attributes;
98104
if (success) {
99105
attributes = getOrCreate(checkpointSuccess, attributesId, checkpoint, "ok");
100106
} else {
101107
attributes = getOrCreate(checkpointFailure, attributesId, checkpoint, "error");
102108
}
109+
103110
if (attributes != null) {
104-
checkpointCounter.add(1, attributes, Context.NONE);
111+
if (checkpointDuration.isEnabled()) {
112+
checkpointDuration.record(Duration.between(startTime, Instant.now()).toMillis(), attributes, Context.NONE);
113+
}
105114
}
106115
}
107116
}

sdk/eventhubs/azure-messaging-eventhubs-checkpointstore-blob/src/test/java/com/azure/messaging/eventhubs/checkpointstore/blob/MetricsHelperTests.java

Lines changed: 50 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,8 @@
33

44
package com.azure.messaging.eventhubs.checkpointstore.blob;
55

6-
import com.azure.core.test.utils.metrics.TestCounter;
76
import com.azure.core.test.utils.metrics.TestGauge;
7+
import com.azure.core.test.utils.metrics.TestHistogram;
88
import com.azure.core.test.utils.metrics.TestMeasurement;
99
import com.azure.core.test.utils.metrics.TestMeter;
1010
import com.azure.core.util.MetricsOptions;
@@ -16,6 +16,8 @@
1616
import org.junit.jupiter.api.parallel.ExecutionMode;
1717
import org.junit.jupiter.api.parallel.Isolated;
1818

19+
import java.time.Instant;
20+
import java.time.temporal.ChronoUnit;
1921
import java.util.HashMap;
2022
import java.util.List;
2123
import java.util.Map;
@@ -59,12 +61,12 @@ public void testUpdateDisabledMetrics() {
5961

6062

6163
MetricsHelper helper = new MetricsHelper(new MetricsOptions(), testProvider);
62-
helper.reportCheckpoint(checkpoint, "ns/eh/ch/0", true);
64+
helper.reportCheckpoint(checkpoint, "ns/eh/ch/0", true, null);
6365

6466
verify(meter, atLeastOnce()).isEnabled();
6567
verify(meter, never()).createAttributes(anyMap());
6668
verify(meter, never()).createLongGauge(any(), any(), any());
67-
verify(meter, never()).createLongCounter(any(), any(), any());
69+
verify(meter, never()).createDoubleHistogram(any(), any(), any());
6870
}
6971

7072
@Test
@@ -80,11 +82,11 @@ public void testUpdateDisabledMetricsViaOptions() {
8082
Meter meter = mock(Meter.class);
8183
TestMeterProvider testProvider = new TestMeterProvider((lib, ver, opts) -> meter);
8284
MetricsHelper helper = new MetricsHelper(new MetricsOptions().setEnabled(false), testProvider);
83-
helper.reportCheckpoint(checkpoint, "ns/eh/cg/0", true);
85+
helper.reportCheckpoint(checkpoint, "ns/eh/cg/0", true, null);
8486

8587
verify(meter, never()).createAttributes(anyMap());
8688
verify(meter, never()).createLongGauge(any(), any(), any());
87-
verify(meter, never()).createLongCounter(any(), any(), any());
89+
verify(meter, never()).createDoubleHistogram(any(), any(), any());
8890
}
8991

9092
@Test
@@ -105,7 +107,8 @@ public void testUpdateEnabledMetrics() {
105107
});
106108

107109
MetricsHelper helper = new MetricsHelper(new MetricsOptions(), testProvider);
108-
helper.reportCheckpoint(checkpoint, "ns/eh/cg/0", true);
110+
Instant startTime = Instant.now().minus(10, ChronoUnit.SECONDS);
111+
helper.reportCheckpoint(checkpoint, "ns/eh/cg/0", true, startTime);
109112

110113
assertTrue(meter.getGauges().containsKey("messaging.eventhubs.checkpoint.sequence_number"));
111114
TestGauge seqNo = meter.getGauges().get("messaging.eventhubs.checkpoint.sequence_number");
@@ -119,15 +122,14 @@ public void testUpdateEnabledMetrics() {
119122
assertEquals(2L, seqNoMeasurement.getValue());
120123
assertCommonAttributes(checkpoint, seqNoMeasurement.getAttributes());
121124

122-
assertTrue(meter.getCounters().containsKey("messaging.eventhubs.checkpoints"));
123-
TestCounter checkpoints = meter.getCounters().get("messaging.eventhubs.checkpoints");
124-
assertEquals(1, checkpoints.getMeasurements().size());
125-
TestMeasurement<Long> checkpointMeasurements = checkpoints.getMeasurements().get(0);
126-
assertEquals(1, checkpointMeasurements.getValue());
127-
assertStatusAttributes(checkpoint, "ok", checkpointMeasurements.getAttributes());
125+
assertTrue(meter.getHistograms().containsKey("messaging.eventhubs.checkpoint.duration"));
126+
TestHistogram checkpointDuration = meter.getHistograms().get("messaging.eventhubs.checkpoint.duration");
127+
assertEquals(1, checkpointDuration.getMeasurements().size());
128+
TestMeasurement<Double> durationMeasurements = checkpointDuration.getMeasurements().get(0);
129+
assertEquals(10000d, durationMeasurements.getValue(), 1000d);
130+
assertStatusAttributes(checkpoint, "ok", durationMeasurements.getAttributes());
128131
}
129132

130-
131133
@Test
132134
public void testUpdateEnabledMetricsFailure() {
133135
Checkpoint checkpoint = new Checkpoint()
@@ -139,16 +141,17 @@ public void testUpdateEnabledMetricsFailure() {
139141
.setOffset(100L);
140142

141143
TestMeter meter = new TestMeter();
144+
Instant startTime = Instant.now().minus(1, ChronoUnit.SECONDS);
142145
MetricsHelper helper = new MetricsHelper(new MetricsOptions(), new TestMeterProvider((lib, ver, opts) -> meter));
143-
helper.reportCheckpoint(checkpoint, "ns/eh/cg/0", false);
146+
helper.reportCheckpoint(checkpoint, "ns/eh/cg/0", false, startTime);
144147

145148
// sequence number is only reported for successful checkpoints
146149
assertEquals(0, meter.getGauges().get("messaging.eventhubs.checkpoint.sequence_number").getSubscriptions().size());
147150

148-
TestCounter checkpoints = meter.getCounters().get("messaging.eventhubs.checkpoints");
149-
TestMeasurement<Long> checkpointMeasurements = checkpoints.getMeasurements().get(0);
150-
assertEquals(1, checkpointMeasurements.getValue());
151-
assertStatusAttributes(checkpoint, "error", checkpointMeasurements.getAttributes());
151+
TestHistogram checkpointDuration = meter.getHistograms().get("messaging.eventhubs.checkpoint.duration");
152+
TestMeasurement<Double> durationMeasurements = checkpointDuration.getMeasurements().get(0);
153+
assertEquals(1000d, durationMeasurements.getValue(), 1000d);
154+
assertStatusAttributes(checkpoint, "error", durationMeasurements.getAttributes());
152155
}
153156

154157
@Test
@@ -161,15 +164,16 @@ public void testUpdateEnabledMetricsNullSeqNo() {
161164
.setOffset(100L);
162165

163166
TestMeter meter = new TestMeter();
167+
Instant startTime = Instant.now();
164168
MetricsHelper helper = new MetricsHelper(new MetricsOptions(), new TestMeterProvider((lib, ver, opts) -> meter));
165-
helper.reportCheckpoint(checkpoint, "ns/eh/cg/0", true);
169+
helper.reportCheckpoint(checkpoint, "ns/eh/cg/0", true, startTime);
166170

167171
assertEquals(0, meter.getGauges().get("messaging.eventhubs.checkpoint.sequence_number").getSubscriptions().size());
168172

169-
TestCounter checkpoints = meter.getCounters().get("messaging.eventhubs.checkpoints");
170-
TestMeasurement<Long> checkpointMeasurements = checkpoints.getMeasurements().get(0);
171-
assertEquals(1, checkpointMeasurements.getValue());
172-
assertStatusAttributes(checkpoint, "ok", checkpointMeasurements.getAttributes());
173+
TestHistogram checkpointDuration = meter.getHistograms().get("messaging.eventhubs.checkpoint.duration");
174+
TestMeasurement<Double> durationMeasurements = checkpointDuration.getMeasurements().get(0);
175+
assertEquals(0d, durationMeasurements.getValue(), 1000d);
176+
assertStatusAttributes(checkpoint, "ok", durationMeasurements.getAttributes());
173177
}
174178

175179
@Test
@@ -186,7 +190,7 @@ public void testUpdateEnabledMetricsTooManyAttributes() {
186190
.collect(Collectors.toList());
187191

188192
MetricsHelper helper = new MetricsHelper(new MetricsOptions(), new TestMeterProvider((lib, ver, opts) -> meter));
189-
checkpoints.forEach(ch -> helper.reportCheckpoint(ch, "ns/eh/cg/" + ch.getPartitionId(), true));
193+
checkpoints.forEach(ch -> helper.reportCheckpoint(ch, "ns/eh/cg/" + ch.getPartitionId(), true, Instant.now()));
190194

191195
List<TestGauge.Subscription> subscriptions = meter.getGauges().get("messaging.eventhubs.checkpoint.sequence_number").getSubscriptions();
192196
assertEquals(MAX_ATTRIBUTES_SETS, subscriptions.size());
@@ -201,14 +205,14 @@ public void testUpdateEnabledMetricsTooManyAttributes() {
201205
i[0]++;
202206
});
203207

204-
TestCounter checkpointCounter = meter.getCounters().get("messaging.eventhubs.checkpoints");
205-
assertEquals(MAX_ATTRIBUTES_SETS, checkpointCounter.getMeasurements().size());
208+
TestHistogram durationHistogram = meter.getHistograms().get("messaging.eventhubs.checkpoint.duration");
209+
assertEquals(MAX_ATTRIBUTES_SETS, durationHistogram.getMeasurements().size());
206210

207-
final int[] j = {0};
208-
checkpointCounter.getMeasurements().forEach(m -> {
209-
assertEquals(1, m.getValue());
210-
assertStatusAttributes(checkpoints.get(j[0]), "ok", m.getAttributes());
211-
j[0]++;
211+
final int[] k = {0};
212+
durationHistogram.getMeasurements().forEach(m -> {
213+
assertEquals(0d, m.getValue(), 1000d);
214+
assertStatusAttributes(checkpoints.get(k[0]), "ok", m.getAttributes());
215+
k[0]++;
212216
});
213217
}
214218

@@ -232,8 +236,8 @@ public void testUpdateEnabledMetricsMultipleMeasurements() {
232236

233237
TestMeter meter = new TestMeter();
234238
MetricsHelper helper = new MetricsHelper(new MetricsOptions(), new TestMeterProvider((lib, ver, opts) -> meter));
235-
helper.reportCheckpoint(checkpoint1, "ns/eh/cg/0", true);
236-
helper.reportCheckpoint(checkpoint2, "ns/eh/cg/0", true);
239+
helper.reportCheckpoint(checkpoint1, "ns/eh/cg/0", true, Instant.now().minus(10, ChronoUnit.SECONDS));
240+
helper.reportCheckpoint(checkpoint2, "ns/eh/cg/0", true, Instant.now());
237241

238242
TestGauge seqNo = meter.getGauges().get("messaging.eventhubs.checkpoint.sequence_number");
239243
TestGauge.Subscription subs = seqNo.getSubscriptions().get(0);
@@ -242,12 +246,12 @@ public void testUpdateEnabledMetricsMultipleMeasurements() {
242246
TestMeasurement<Long> seqNoMeasurement = subs.getMeasurements().get(0);
243247
assertEquals(42L, seqNoMeasurement.getValue());
244248

245-
TestCounter checkpoints = meter.getCounters().get("messaging.eventhubs.checkpoints");
246-
assertEquals(2, checkpoints.getMeasurements().size());
249+
TestHistogram duration = meter.getHistograms().get("messaging.eventhubs.checkpoint.duration");
250+
assertEquals(2, duration.getMeasurements().size());
247251

248-
assertEquals(1, checkpoints.getMeasurements().get(0).getValue());
249-
assertEquals(1, checkpoints.getMeasurements().get(1).getValue());
250-
assertStatusAttributes(checkpoint2, "ok", checkpoints.getMeasurements().get(1).getAttributes());
252+
assertEquals(10000d, duration.getMeasurements().get(0).getValue(), 1000d);
253+
assertEquals(0d, duration.getMeasurements().get(1).getValue(), 1000d);
254+
assertStatusAttributes(checkpoint2, "ok", duration.getMeasurements().get(1).getAttributes());
251255
}
252256

253257
@Test
@@ -271,8 +275,8 @@ public void testUpdateEnabledMetricsMultipleHubs() {
271275
TestMeter meter = new TestMeter();
272276
MetricsHelper helper = new MetricsHelper(new MetricsOptions(), new TestMeterProvider((lib, ver, opts) -> meter));
273277

274-
helper.reportCheckpoint(checkpoint1, "ns/eh1/cg/0", true);
275-
helper.reportCheckpoint(checkpoint2, "ns/eh2/cg/0", true);
278+
helper.reportCheckpoint(checkpoint1, "ns/eh1/cg/0", true, Instant.now());
279+
helper.reportCheckpoint(checkpoint2, "ns/eh2/cg/0", true, Instant.now());
276280

277281
TestGauge seqNo = meter.getGauges().get("messaging.eventhubs.checkpoint.sequence_number");
278282
assertEquals(2, seqNo.getSubscriptions().size());
@@ -289,13 +293,13 @@ public void testUpdateEnabledMetricsMultipleHubs() {
289293
assertEquals(42L, seqNoMeasurement2.getValue());
290294
assertCommonAttributes(checkpoint2, seqNoMeasurement2.getAttributes());
291295

292-
TestCounter checkpoints = meter.getCounters().get("messaging.eventhubs.checkpoints");
293-
assertEquals(2, checkpoints.getMeasurements().size());
296+
TestHistogram duration = meter.getHistograms().get("messaging.eventhubs.checkpoint.duration");
297+
assertEquals(2, duration.getMeasurements().size());
294298

295-
assertEquals(1, checkpoints.getMeasurements().get(0).getValue());
296-
assertStatusAttributes(checkpoint1, "ok", checkpoints.getMeasurements().get(0).getAttributes());
297-
assertEquals(1, checkpoints.getMeasurements().get(1).getValue());
298-
assertStatusAttributes(checkpoint2, "ok", checkpoints.getMeasurements().get(1).getAttributes());
299+
assertEquals(0d, duration.getMeasurements().get(0).getValue(), 1000d);
300+
assertStatusAttributes(checkpoint1, "ok", duration.getMeasurements().get(0).getAttributes());
301+
assertEquals(0d, duration.getMeasurements().get(1).getValue(), 1000d);
302+
assertStatusAttributes(checkpoint2, "ok", duration.getMeasurements().get(1).getAttributes());
299303
}
300304

301305

sdk/eventhubs/azure-messaging-eventhubs-stress/templates/job.yaml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,8 @@ spec:
1919
source $ENV_FILE &&
2020
export CONTAINER_NAME=sender &&
2121
export APPLICATIONINSIGHTS_ROLE_NAME=sender &&
22-
java "org.springframework.boot.loader.JarLauncher" \
22+
java -javaagent:BOOT-INF/classes/applicationinsights-agent-3.4.1.jar \
23+
"org.springframework.boot.loader.JarLauncher" \
2324
--TEST_CLASS=EventSender
2425
{{- include "stress-test-addons.container-env" . | nindent 6 }}
2526
resources:

0 commit comments

Comments
 (0)