Skip to content

Commit 5885efd

Browse files
authored
Add tracing support for Service Bus processor (Azure#17684)
* Add tracing support for SB processor * Make addContext packag-private * Resolve merge conflict
1 parent 57c79dd commit 5885efd

File tree

5 files changed

+200
-3
lines changed

5 files changed

+200
-3
lines changed

sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusClientBuilder.java

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -646,9 +646,10 @@ public final class ServiceBusSessionProcessorClientBuilder {
646646

647647
private ServiceBusSessionProcessorClientBuilder() {
648648
sessionReceiverClientBuilder = new ServiceBusSessionReceiverClientBuilder();
649-
processorClientOptions = new ServiceBusProcessorClientOptions();
649+
processorClientOptions = new ServiceBusProcessorClientOptions()
650+
.setMaxConcurrentCalls(1)
651+
.setTracerProvider(tracerProvider);
650652
sessionReceiverClientBuilder.maxConcurrentSessions(1);
651-
processorClientOptions.setMaxConcurrentCalls(1);
652653
}
653654

654655
/**
@@ -1101,7 +1102,9 @@ public final class ServiceBusProcessorClientBuilder {
11011102

11021103
private ServiceBusProcessorClientBuilder() {
11031104
serviceBusReceiverClientBuilder = new ServiceBusReceiverClientBuilder();
1104-
processorClientOptions = new ServiceBusProcessorClientOptions().setMaxConcurrentCalls(1);
1105+
processorClientOptions = new ServiceBusProcessorClientOptions()
1106+
.setMaxConcurrentCalls(1)
1107+
.setTracerProvider(tracerProvider);
11051108
}
11061109

11071110
/**

sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusProcessorClient.java

Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,20 +3,38 @@
33

44
package com.azure.messaging.servicebus;
55

6+
import com.azure.core.amqp.implementation.TracerProvider;
7+
import com.azure.core.util.Context;
68
import com.azure.core.util.logging.ClientLogger;
9+
import com.azure.core.util.tracing.ProcessKind;
710
import com.azure.messaging.servicebus.implementation.models.ServiceBusProcessorClientOptions;
811
import org.reactivestreams.Subscriber;
912
import org.reactivestreams.Subscription;
13+
import reactor.core.publisher.Signal;
1014
import reactor.core.scheduler.Schedulers;
1115

16+
import java.io.Closeable;
17+
import java.io.IOException;
18+
import java.util.Locale;
1219
import java.util.Objects;
20+
import java.util.Optional;
1321
import java.util.concurrent.Executors;
1422
import java.util.concurrent.ScheduledExecutorService;
1523
import java.util.concurrent.TimeUnit;
1624
import java.util.concurrent.atomic.AtomicBoolean;
1725
import java.util.concurrent.atomic.AtomicReference;
1826
import java.util.function.Consumer;
1927

28+
import static com.azure.core.util.tracing.Tracer.AZ_TRACING_NAMESPACE_KEY;
29+
import static com.azure.core.util.tracing.Tracer.DIAGNOSTIC_ID_KEY;
30+
import static com.azure.core.util.tracing.Tracer.ENTITY_PATH_KEY;
31+
import static com.azure.core.util.tracing.Tracer.HOST_NAME_KEY;
32+
import static com.azure.core.util.tracing.Tracer.MESSAGE_ENQUEUED_TIME;
33+
import static com.azure.core.util.tracing.Tracer.SCOPE_KEY;
34+
import static com.azure.core.util.tracing.Tracer.SPAN_CONTEXT_KEY;
35+
import static com.azure.messaging.servicebus.implementation.ServiceBusConstants.AZ_TRACING_NAMESPACE_VALUE;
36+
import static com.azure.messaging.servicebus.implementation.ServiceBusConstants.AZ_TRACING_SERVICE_NAME;
37+
2038
/**
2139
* The processor client for processing Service Bus messages. {@link ServiceBusProcessorClient
2240
* ServiceBusProcessorClients} provides a push-based mechanism that invokes the message processing callback when a
@@ -44,6 +62,7 @@ public final class ServiceBusProcessorClient implements AutoCloseable {
4462
private final AtomicReference<Subscription> receiverSubscription = new AtomicReference<>();
4563
private final AtomicReference<ServiceBusReceiverAsyncClient> asyncClient = new AtomicReference<>();
4664
private final AtomicBoolean isRunning = new AtomicBoolean();
65+
private final TracerProvider tracerProvider;
4766
private ScheduledExecutorService scheduledExecutor;
4867

4968
/**
@@ -65,6 +84,7 @@ public final class ServiceBusProcessorClient implements AutoCloseable {
6584
this.processorOptions = Objects.requireNonNull(processorOptions, "'processorOptions' cannot be null");
6685
this.asyncClient.set(sessionReceiverBuilder.buildAsyncClientForProcessor());
6786
this.receiverBuilder = null;
87+
this.tracerProvider = processorOptions.getTracerProvider();
6888
}
6989

7090
/**
@@ -84,6 +104,7 @@ public final class ServiceBusProcessorClient implements AutoCloseable {
84104
this.processorOptions = Objects.requireNonNull(processorOptions, "'processorOptions' cannot be null");
85105
this.asyncClient.set(receiverBuilder.buildAsyncClient());
86106
this.sessionReceiverBuilder = null;
107+
this.tracerProvider = processorOptions.getTracerProvider();
87108
}
88109

89110
/**
@@ -164,12 +185,22 @@ public void onNext(ServiceBusMessageContext serviceBusMessageContext) {
164185
if (serviceBusMessageContext.hasError()) {
165186
handleError(serviceBusMessageContext.getThrowable());
166187
} else {
188+
Context processSpanContext = null;
167189
try {
168190
ServiceBusReceivedMessageContext serviceBusReceivedMessageContext =
169191
new ServiceBusReceivedMessageContext(receiverClient, serviceBusMessageContext);
192+
193+
processSpanContext =
194+
startProcessTracingSpan(serviceBusMessageContext.getMessage(),
195+
receiverClient.getEntityPath(), receiverClient.getFullyQualifiedNamespace());
196+
if (processSpanContext.getData(SPAN_CONTEXT_KEY).isPresent()) {
197+
serviceBusMessageContext.getMessage().addContext(SPAN_CONTEXT_KEY, processSpanContext);
198+
}
170199
processMessage.accept(serviceBusReceivedMessageContext);
200+
endProcessTracingSpan(processSpanContext, Signal.complete());
171201
} catch (Exception ex) {
172202
handleError(new ServiceBusException(ex, ServiceBusErrorSource.USER_CALLBACK));
203+
endProcessTracingSpan(processSpanContext, Signal.error(ex));
173204
if (!processorOptions.isDisableAutoComplete()) {
174205
logger.warning("Error when processing message. Abandoning message.", ex);
175206
abandonMessage(serviceBusMessageContext, receiverClient);
@@ -201,6 +232,54 @@ public void onComplete() {
201232
});
202233
}
203234

235+
private void endProcessTracingSpan(Context processSpanContext, Signal<Void> signal) {
236+
if (processSpanContext == null) {
237+
return;
238+
}
239+
240+
Optional<Object> spanScope = processSpanContext.getData(SCOPE_KEY);
241+
// Disposes of the scope when the trace span closes.
242+
if (!spanScope.isPresent() || !tracerProvider.isEnabled()) {
243+
return;
244+
}
245+
if (spanScope.get() instanceof Closeable) {
246+
Closeable close = (Closeable) processSpanContext.getData(SCOPE_KEY).get();
247+
try {
248+
close.close();
249+
tracerProvider.endSpan(processSpanContext, signal);
250+
} catch (IOException ioException) {
251+
logger.error("endTracingSpan().close() failed with an error %s", ioException);
252+
}
253+
254+
} else {
255+
logger.warning(String.format(Locale.US,
256+
"Process span scope type is not of type Closeable, but type: %s. Not closing the scope and span",
257+
spanScope.get() != null ? spanScope.getClass() : "null"));
258+
}
259+
}
260+
261+
private Context startProcessTracingSpan(ServiceBusReceivedMessage receivedMessage, String entityPath,
262+
String fullyQualifiedNamespace) {
263+
264+
Object diagnosticId = receivedMessage.getApplicationProperties().get(DIAGNOSTIC_ID_KEY);
265+
if (diagnosticId == null || !tracerProvider.isEnabled()) {
266+
return Context.NONE;
267+
}
268+
269+
Context spanContext = tracerProvider.extractContext(diagnosticId.toString(), Context.NONE);
270+
271+
spanContext = spanContext
272+
.addData(ENTITY_PATH_KEY, entityPath)
273+
.addData(HOST_NAME_KEY, fullyQualifiedNamespace)
274+
.addData(AZ_TRACING_NAMESPACE_KEY, AZ_TRACING_NAMESPACE_VALUE);
275+
spanContext = receivedMessage.getEnqueuedTime() == null
276+
? spanContext
277+
: spanContext.addData(MESSAGE_ENQUEUED_TIME,
278+
receivedMessage.getEnqueuedTime().toInstant().getEpochSecond());
279+
280+
return tracerProvider.startSpan(AZ_TRACING_SERVICE_NAME, spanContext, ProcessKind.PROCESS);
281+
}
282+
204283
private void abandonMessage(ServiceBusMessageContext serviceBusMessageContext,
205284
ServiceBusReceiverAsyncClient receiverClient) {
206285
try {

sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusReceivedMessage.java

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
import com.azure.core.amqp.models.AmqpMessageBodyType;
1111
import com.azure.core.amqp.models.AmqpMessageId;
1212
import com.azure.core.experimental.util.BinaryData;
13+
import com.azure.core.util.Context;
1314
import com.azure.core.util.logging.ClientLogger;
1415
import com.azure.messaging.servicebus.models.ServiceBusReceiveMode;
1516

@@ -40,10 +41,12 @@ public final class ServiceBusReceivedMessage {
4041
private final AmqpAnnotatedMessage amqpAnnotatedMessage;
4142
private UUID lockToken;
4243
private boolean isSettled = false;
44+
private Context context;
4345

4446
ServiceBusReceivedMessage(BinaryData body) {
4547
Objects.requireNonNull(body, "'body' cannot be null.");
4648
amqpAnnotatedMessage = new AmqpAnnotatedMessage(AmqpMessageBody.fromData(body.toBytes()));
49+
context = Context.NONE;
4750
}
4851

4952
/**
@@ -438,6 +441,22 @@ public String getTo() {
438441
return to;
439442
}
440443

444+
/**
445+
* Adds a new key value pair to the existing context on Message.
446+
*
447+
* @param key The key for this context object
448+
* @param value The value for this context object.
449+
*
450+
* @return The updated {@link ServiceBusMessage}.
451+
* @throws NullPointerException if {@code key} or {@code value} is null.
452+
*/
453+
ServiceBusReceivedMessage addContext(String key, Object value) {
454+
Objects.requireNonNull(key, "The 'key' parameter cannot be null.");
455+
Objects.requireNonNull(value, "The 'value' parameter cannot be null.");
456+
this.context = context.addData(key, value);
457+
return this;
458+
}
459+
441460
/**
442461
* Gets whether the message has been settled.
443462
*

sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/implementation/models/ServiceBusProcessorClientOptions.java

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33

44
package com.azure.messaging.servicebus.implementation.models;
55

6+
import com.azure.core.amqp.implementation.TracerProvider;
67
import com.azure.core.annotation.Fluent;
78
import com.azure.messaging.servicebus.ServiceBusProcessorClient;
89

@@ -15,6 +16,8 @@ public final class ServiceBusProcessorClientOptions {
1516
private int maxConcurrentCalls = 1;
1617
private boolean disableAutoComplete;
1718

19+
private TracerProvider tracerProvider;
20+
1821
/**
1922
* Returns true if the auto-complete and auto-abandon feature is disabled.
2023
* @return true if the auto-complete and auto-abandon feature is disabled.
@@ -50,4 +53,24 @@ public ServiceBusProcessorClientOptions setMaxConcurrentCalls(int maxConcurrentC
5053
this.maxConcurrentCalls = maxConcurrentCalls;
5154
return this;
5255
}
56+
57+
/**
58+
* Returns the {@link TracerProvider} instance that is used in {@link ServiceBusProcessorClient}.
59+
*
60+
* @return The {@link TracerProvider} instance that is used in {@link ServiceBusProcessorClient}.
61+
*/
62+
public TracerProvider getTracerProvider() {
63+
return tracerProvider;
64+
}
65+
66+
/**
67+
* Sets the {@link TracerProvider} instance to use in {@link ServiceBusProcessorClient}.
68+
*
69+
* @param tracerProvider The {@link TracerProvider} instance to use in {@link ServiceBusProcessorClient}.
70+
* @return The updated instance of {@link ServiceBusProcessorClientOptions}.
71+
*/
72+
public ServiceBusProcessorClientOptions setTracerProvider(TracerProvider tracerProvider) {
73+
this.tracerProvider = tracerProvider;
74+
return this;
75+
}
5376
}

sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusProcessorTest.java

Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,25 +3,38 @@
33

44
package com.azure.messaging.servicebus;
55

6+
import com.azure.core.amqp.implementation.TracerProvider;
67
import com.azure.core.experimental.util.BinaryData;
8+
import com.azure.core.util.Context;
9+
import com.azure.core.util.tracing.ProcessKind;
10+
import com.azure.core.util.tracing.Tracer;
711
import com.azure.messaging.servicebus.implementation.models.ServiceBusProcessorClientOptions;
812
import org.junit.jupiter.api.Assertions;
913
import org.junit.jupiter.api.Test;
1014
import reactor.core.publisher.Flux;
1115
import reactor.core.publisher.FluxSink;
1216
import reactor.core.publisher.Mono;
1317

18+
import java.io.Closeable;
19+
import java.time.OffsetDateTime;
1420
import java.util.ArrayList;
21+
import java.util.Collections;
1522
import java.util.List;
1623
import java.util.concurrent.CountDownLatch;
1724
import java.util.concurrent.TimeUnit;
1825
import java.util.concurrent.atomic.AtomicBoolean;
1926
import java.util.concurrent.atomic.AtomicInteger;
2027
import java.util.concurrent.atomic.AtomicReference;
2128

29+
import static com.azure.core.util.tracing.Tracer.DIAGNOSTIC_ID_KEY;
30+
import static com.azure.core.util.tracing.Tracer.MESSAGE_ENQUEUED_TIME;
31+
import static com.azure.core.util.tracing.Tracer.PARENT_SPAN_KEY;
32+
import static com.azure.core.util.tracing.Tracer.SPAN_CONTEXT_KEY;
2233
import static org.junit.jupiter.api.Assertions.assertEquals;
2334
import static org.junit.jupiter.api.Assertions.assertTrue;
2435
import static org.mockito.ArgumentMatchers.any;
36+
import static org.mockito.ArgumentMatchers.eq;
37+
import static org.mockito.ArgumentMatchers.isNull;
2538
import static org.mockito.Mockito.doNothing;
2639
import static org.mockito.Mockito.mock;
2740
import static org.mockito.Mockito.never;
@@ -319,6 +332,66 @@ public void testUserMessageHandlerErrorWithAutoCompleteDisabled() throws Interru
319332
verify(asyncClient, never()).abandon(any(ServiceBusReceivedMessage.class));
320333
}
321334

335+
@Test
336+
public void testProcessorWithTracingEnabled() throws InterruptedException {
337+
final Tracer tracer = mock(Tracer.class);
338+
final List<Tracer> tracers = Collections.singletonList(tracer);
339+
TracerProvider tracerProvider = new TracerProvider(tracers);
340+
341+
String diagnosticId = "00-08ee063508037b1719dddcbf248e30e2-1365c684eb25daed-01";
342+
343+
when(tracer.extractContext(eq(diagnosticId), any())).thenAnswer(
344+
invocation -> {
345+
Context passed = invocation.getArgument(1, Context.class);
346+
return passed.addData(SPAN_CONTEXT_KEY, "value");
347+
}
348+
);
349+
when(tracer.start(eq("ServiceBus.process"), any(), eq(ProcessKind.PROCESS))).thenAnswer(
350+
invocation -> {
351+
Context passed = invocation.getArgument(1, Context.class);
352+
assertTrue(passed.getData(MESSAGE_ENQUEUED_TIME).isPresent());
353+
return passed.addData(SPAN_CONTEXT_KEY, "value1").addData("scope", (Closeable) () -> {
354+
return;
355+
}).addData(PARENT_SPAN_KEY, "value2");
356+
}
357+
);
358+
Flux<ServiceBusMessageContext> messageFlux =
359+
Flux.create(emitter -> {
360+
for (int i = 0; i < 5; i++) {
361+
ServiceBusReceivedMessage serviceBusReceivedMessage =
362+
new ServiceBusReceivedMessage(BinaryData.fromString("hello"));
363+
serviceBusReceivedMessage.setMessageId(String.valueOf(i));
364+
serviceBusReceivedMessage.setEnqueuedTime(OffsetDateTime.now());
365+
serviceBusReceivedMessage.getApplicationProperties().put(DIAGNOSTIC_ID_KEY, diagnosticId);
366+
ServiceBusMessageContext serviceBusMessageContext =
367+
new ServiceBusMessageContext(serviceBusReceivedMessage);
368+
emitter.next(serviceBusMessageContext);
369+
}
370+
});
371+
372+
ServiceBusClientBuilder.ServiceBusReceiverClientBuilder receiverBuilder = getBuilder(messageFlux);
373+
374+
AtomicInteger messageId = new AtomicInteger();
375+
CountDownLatch countDownLatch = new CountDownLatch(5);
376+
ServiceBusProcessorClient serviceBusProcessorClient = new ServiceBusProcessorClient(receiverBuilder,
377+
messageContext -> {
378+
assertEquals(String.valueOf(messageId.getAndIncrement()), messageContext.getMessage().getMessageId());
379+
countDownLatch.countDown();
380+
},
381+
error -> Assertions.fail("Error occurred when receiving messages from the processor"),
382+
new ServiceBusProcessorClientOptions().setMaxConcurrentCalls(1).setTracerProvider(tracerProvider));
383+
384+
serviceBusProcessorClient.start();
385+
boolean success = countDownLatch.await(5, TimeUnit.SECONDS);
386+
serviceBusProcessorClient.close();
387+
388+
assertTrue(success, "Failed to receive all expected messages");
389+
verify(tracer, times(5)).extractContext(eq(diagnosticId), any());
390+
verify(tracer, times(5)).start(eq("ServiceBus.process"), any(), eq(ProcessKind.PROCESS));
391+
verify(tracer, times(5)).end(eq("success"), isNull(), any());
392+
393+
}
394+
322395
private ServiceBusClientBuilder.ServiceBusReceiverClientBuilder getBuilder(
323396
Flux<ServiceBusMessageContext> messageFlux) {
324397

0 commit comments

Comments
 (0)