Skip to content

Commit 490aed4

Browse files
author
Liudmila Molkova
authored
EventHubs: Fix Span links on send (Azure#28951)
* Fix link population on send
1 parent 53150a7 commit 490aed4

File tree

5 files changed

+201
-78
lines changed

5 files changed

+201
-78
lines changed

sdk/eventhubs/azure-messaging-eventhubs/CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@
88

99
### Bugs Fixed
1010

11+
- Fixes trace context propagation issue: links to *message* spans were not populated on *send* span. ([#28951](https://github.com/Azure/azure-sdk-for-java/pull/28951))
12+
1113
### Other Changes
1214

1315
## 5.12.0 (2022-05-16)

sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventDataBatch.java

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -155,13 +155,17 @@ private EventData traceMessageSpan(EventData eventData) {
155155
.addData(AZ_TRACING_NAMESPACE_KEY, AZ_NAMESPACE_VALUE)
156156
.addData(ENTITY_PATH_KEY, this.entityPath)
157157
.addData(HOST_NAME_KEY, this.hostname);
158-
Context eventSpanContext = tracerProvider.startSpan(AZ_TRACING_SERVICE_NAME, eventContext,
158+
eventContext = tracerProvider.startSpan(AZ_TRACING_SERVICE_NAME, eventContext,
159159
ProcessKind.MESSAGE);
160-
Optional<Object> eventDiagnosticIdOptional = eventSpanContext.getData(DIAGNOSTIC_ID_KEY);
160+
Optional<Object> eventDiagnosticIdOptional = eventContext.getData(DIAGNOSTIC_ID_KEY);
161161
if (eventDiagnosticIdOptional.isPresent()) {
162162
eventData.getProperties().put(DIAGNOSTIC_ID_KEY, eventDiagnosticIdOptional.get().toString());
163-
tracerProvider.endSpan(eventSpanContext, Signal.complete());
164-
eventData.addContext(SPAN_CONTEXT_KEY, eventSpanContext);
163+
tracerProvider.endSpan(eventContext, Signal.complete());
164+
165+
Object spanContext = eventContext.getData(SPAN_CONTEXT_KEY).orElse(null);
166+
if (spanContext != null) {
167+
eventData.addContext(SPAN_CONTEXT_KEY, spanContext);
168+
}
165169
}
166170
}
167171

sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubProducerAsyncClient.java

Lines changed: 13 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -539,13 +539,21 @@ public Mono<Void> send(EventDataBatch batch) {
539539
for (int i = 0; i < batch.getEvents().size(); i++) {
540540
final EventData event = batch.getEvents().get(i);
541541
if (isTracingEnabled) {
542-
parentContext.set(event.getContext());
543542
if (i == 0) {
544-
sharedContext = tracerProvider.getSharedSpanBuilder(ClientConstants.AZ_TRACING_SERVICE_NAME,
545-
parentContext.get());
543+
sharedContext = event.getContext()
544+
.addData(AZ_TRACING_NAMESPACE_KEY, AZ_NAMESPACE_VALUE)
545+
.addData(ENTITY_PATH_KEY, eventHubName)
546+
.addData(HOST_NAME_KEY, fullyQualifiedNamespace);
547+
548+
sharedContext = tracerProvider.getSharedSpanBuilder(ClientConstants.AZ_TRACING_SERVICE_NAME, sharedContext);
549+
tracerProvider.addSpanLinks(sharedContext);
550+
} else {
551+
// TODO (lmolkova) we need better addSpanLinks - https://github.com/Azure/azure-sdk-for-java/issues/28953
552+
Object eventSpanContext = event.getContext().getData(SPAN_CONTEXT_KEY).orElse(Context.NONE);
553+
tracerProvider.addSpanLinks(sharedContext.addData(SPAN_CONTEXT_KEY, eventSpanContext));
546554
}
547-
tracerProvider.addSpanLinks(sharedContext.addData(SPAN_CONTEXT_KEY, event.getContext()));
548555
}
556+
549557
final Message message = messageSerializer.serialize(event);
550558

551559
if (!CoreUtils.isNullOrEmpty(partitionKey)) {
@@ -559,14 +567,8 @@ public Mono<Void> send(EventDataBatch batch) {
559567
}
560568

561569
if (isTracingEnabled) {
562-
final Context finalSharedContext = sharedContext == null
563-
? Context.NONE
564-
: sharedContext
565-
.addData(ENTITY_PATH_KEY, eventHubName)
566-
.addData(HOST_NAME_KEY, fullyQualifiedNamespace)
567-
.addData(AZ_TRACING_NAMESPACE_KEY, AZ_NAMESPACE_VALUE);
568570
// Start send span and store updated context
569-
parentContext.set(tracerProvider.startSpan(AZ_TRACING_SERVICE_NAME, finalSharedContext, ProcessKind.SEND));
571+
parentContext.set(tracerProvider.startSpan(AZ_TRACING_SERVICE_NAME, sharedContext, ProcessKind.SEND));
570572
}
571573

572574
final Mono<Void> sendMessage = getSendLink(batch.getPartitionId())

sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/EventHubProducerAsyncClientTest.java

Lines changed: 99 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -58,22 +58,26 @@
5858
import java.util.concurrent.Semaphore;
5959
import java.util.concurrent.TimeUnit;
6060
import java.util.concurrent.atomic.AtomicInteger;
61+
import java.util.concurrent.atomic.AtomicReference;
6162

6263
import static com.azure.core.util.tracing.Tracer.AZ_TRACING_NAMESPACE_KEY;
6364
import static com.azure.core.util.tracing.Tracer.DIAGNOSTIC_ID_KEY;
6465
import static com.azure.core.util.tracing.Tracer.ENTITY_PATH_KEY;
6566
import static com.azure.core.util.tracing.Tracer.HOST_NAME_KEY;
66-
import static com.azure.core.util.tracing.Tracer.PARENT_SPAN_KEY;
67+
import static com.azure.core.util.tracing.Tracer.PARENT_TRACE_CONTEXT_KEY;
6768
import static com.azure.core.util.tracing.Tracer.SPAN_BUILDER_KEY;
69+
import static com.azure.core.util.tracing.Tracer.SPAN_CONTEXT_KEY;
6870
import static com.azure.messaging.eventhubs.implementation.ClientConstants.AZ_NAMESPACE_VALUE;
6971
import static java.nio.charset.StandardCharsets.UTF_8;
7072
import static org.junit.jupiter.api.Assertions.assertEquals;
73+
import static org.junit.jupiter.api.Assertions.assertFalse;
7174
import static org.mockito.ArgumentMatchers.any;
7275
import static org.mockito.ArgumentMatchers.anyList;
7376
import static org.mockito.ArgumentMatchers.anyString;
7477
import static org.mockito.ArgumentMatchers.argThat;
7578
import static org.mockito.ArgumentMatchers.eq;
7679
import static org.mockito.ArgumentMatchers.isNull;
80+
import static org.mockito.Mockito.doAnswer;
7781
import static org.mockito.Mockito.mock;
7882
import static org.mockito.Mockito.never;
7983
import static org.mockito.Mockito.times;
@@ -318,60 +322,69 @@ void partitionProducerCannotSendWithPartitionKey() {
318322
*/
319323
@Test
320324
void sendStartSpanSingleMessage() {
325+
final Flux<EventData> testData = Flux.just(
326+
new EventData(TEST_CONTENTS.getBytes(UTF_8)));
327+
final SendOptions sendOptions = new SendOptions();
328+
321329
// Arrange
322330
final Tracer tracer1 = mock(Tracer.class);
323331
final List<Tracer> tracers = Collections.singletonList(tracer1);
324332
TracerProvider tracerProvider = new TracerProvider(tracers);
325-
final Flux<EventData> testData = Flux.just(
326-
new EventData(TEST_CONTENTS.getBytes(UTF_8)),
327-
new EventData(TEST_CONTENTS.getBytes(UTF_8)));
328333

329-
final String partitionId = "my-partition-id";
330-
final SendOptions sendOptions = new SendOptions()
331-
.setPartitionId(partitionId);
332334
final EventHubProducerAsyncClient asyncProducer = new EventHubProducerAsyncClient(HOSTNAME, EVENT_HUB_NAME,
333335
connectionProcessor, retryOptions, tracerProvider, messageSerializer, Schedulers.parallel(),
334336
false, onClientClosed);
335337

336-
when(connection.createSendLink(
337-
argThat(name -> name.endsWith(partitionId)), argThat(name -> name.endsWith(partitionId)), eq(retryOptions)))
338+
when(connection.createSendLink(eq(EVENT_HUB_NAME), eq(EVENT_HUB_NAME), any()))
338339
.thenReturn(Mono.just(sendLink));
339-
340+
when(sendLink.getHostname()).thenReturn(HOSTNAME);
341+
when(sendLink.getEntityPath()).thenReturn(EVENT_HUB_NAME);
340342
when(sendLink.send(anyList())).thenReturn(Mono.empty());
343+
when(sendLink.send(any(Message.class))).thenReturn(Mono.empty());
341344

342345
when(tracer1.start(eq("EventHubs.send"), any(), eq(ProcessKind.SEND))).thenAnswer(
343346
invocation -> {
344347
Context passed = invocation.getArgument(1, Context.class);
345348
assertEquals(passed.getData(AZ_TRACING_NAMESPACE_KEY).get(), AZ_NAMESPACE_VALUE);
346-
return passed.addData(PARENT_SPAN_KEY, "value");
349+
return passed.addData(PARENT_TRACE_CONTEXT_KEY, "value");
347350
}
348351
);
349-
350352
when(tracer1.start(eq("EventHubs.message"), any(), eq(ProcessKind.MESSAGE))).thenAnswer(
351353
invocation -> {
352354
Context passed = invocation.getArgument(1, Context.class);
353355
assertEquals(passed.getData(AZ_TRACING_NAMESPACE_KEY).get(), AZ_NAMESPACE_VALUE);
354-
return passed.addData(PARENT_SPAN_KEY, "value").addData(DIAGNOSTIC_ID_KEY, "value2");
356+
return passed.addData(PARENT_TRACE_CONTEXT_KEY, "value")
357+
.addData(DIAGNOSTIC_ID_KEY, "diag-id")
358+
.addData(SPAN_CONTEXT_KEY, "span-context");
355359
}
356360
);
357-
358361
when(tracer1.getSharedSpanBuilder(eq("EventHubs.send"), any())).thenAnswer(
359362
invocation -> {
360363
Context passed = invocation.getArgument(1, Context.class);
361-
return passed.addData(SPAN_BUILDER_KEY, "value");
364+
return passed.addData(SPAN_BUILDER_KEY, "span-builder");
362365
}
363366
);
364367

368+
doAnswer(
369+
invocation -> {
370+
Context passed = invocation.getArgument(0, Context.class);
371+
assertEquals("span-builder", passed.getData(SPAN_BUILDER_KEY).orElseGet(null));
372+
assertEquals("span-context", passed.getData(SPAN_CONTEXT_KEY).orElseGet(null));
373+
return null;
374+
}).when(tracer1).addLink(any());
375+
365376
// Act
366377
StepVerifier.create(asyncProducer.send(testData, sendOptions))
367378
.verifyComplete();
368379

369-
// Assert
380+
//Assert
370381
verify(tracer1, times(1))
371382
.start(eq("EventHubs.send"), any(), eq(ProcessKind.SEND));
372-
verify(tracer1, times(2))
383+
verify(tracer1, times(1))
373384
.start(eq("EventHubs.message"), any(), eq(ProcessKind.MESSAGE));
374-
verify(tracer1, times(3)).end(eq("success"), isNull(), any());
385+
verify(tracer1, times(2)).end(eq("success"), isNull(), any());
386+
verify(tracer1, times(1)).getSharedSpanBuilder(eq("EventHubs.send"), any());
387+
verify(tracer1, times(1)).addLink(any());
375388

376389
verifyNoInteractions(onClientClosed);
377390
}
@@ -391,29 +404,25 @@ void sendMessageRetrySpanTest() {
391404
tracerProvider, messageSerializer, Schedulers.parallel(), false, onClientClosed);
392405

393406
final String failureKey = "fail";
394-
final EventData testData = new EventData("test");
407+
final EventData testData = new EventData("test")
408+
.addContext(SPAN_CONTEXT_KEY, "span-context");
395409
testData.getProperties().put(failureKey, "true");
396410

397411
when(tracer1.start(eq("EventHubs.send"), any(), eq(ProcessKind.SEND))).thenAnswer(
398412
invocation -> {
399413
Context passed = invocation.getArgument(1, Context.class);
400-
assertEquals(passed.getData(AZ_TRACING_NAMESPACE_KEY).get(), AZ_NAMESPACE_VALUE);
401-
return passed.addData(PARENT_SPAN_KEY, "value").addData(HOST_NAME_KEY, "value2");
402-
}
403-
);
404-
405-
when(tracer1.start(eq("EventHubs.message"), any(), eq(ProcessKind.MESSAGE))).thenAnswer(
406-
invocation -> {
407-
Context passed = invocation.getArgument(1, Context.class);
408-
assertEquals(passed.getData(AZ_TRACING_NAMESPACE_KEY).get(), AZ_NAMESPACE_VALUE);
409-
return passed.addData(PARENT_SPAN_KEY, "value").addData(DIAGNOSTIC_ID_KEY, "value2");
414+
assertEquals(AZ_NAMESPACE_VALUE, passed.getData(AZ_TRACING_NAMESPACE_KEY).get());
415+
assertEquals(HOSTNAME, passed.getData(HOST_NAME_KEY).get());
416+
assertEquals(EVENT_HUB_NAME, passed.getData(ENTITY_PATH_KEY).get());
417+
return passed.addData(PARENT_TRACE_CONTEXT_KEY, "trace-context");
410418
}
411419
);
412420

413421
when(tracer1.getSharedSpanBuilder(eq("EventHubs.send"), any())).thenAnswer(
414422
invocation -> {
415423
Context passed = invocation.getArgument(1, Context.class);
416-
return passed.addData(SPAN_BUILDER_KEY, "value");
424+
assertEquals("span-context", passed.getData("span-context").orElseGet(null));
425+
return passed.addData(SPAN_BUILDER_KEY, "span-builder");
417426
}
418427
);
419428

@@ -432,14 +441,15 @@ void sendMessageRetrySpanTest() {
432441
.thenReturn(Mono.empty());
433442

434443
producer.send(testData).block();
444+
assertFalse(testData.getProperties().containsKey(DIAGNOSTIC_ID_KEY));
435445

436446
//Assert
437-
verify(tracer1, times(1))
438-
.start(eq("EventHubs.send"), any(), eq(ProcessKind.SEND));
439-
verify(tracer1, times(1))
440-
.start(eq("EventHubs.message"), any(), eq(ProcessKind.MESSAGE));
447+
verify(tracer1, times(1)).start(eq("EventHubs.send"), any(), eq(ProcessKind.SEND));
448+
verify(tracer1, never()).start(eq("EventHubs.message"), any(), eq(ProcessKind.MESSAGE));
441449
verify(tracer1, times(1)).addLink(any());
442-
verify(tracer1, times(2)).end(eq("success"), isNull(), any());
450+
verify(tracer1, times(1)).getSharedSpanBuilder(eq("EventHubs.send"), any());
451+
verify(tracer1, times(1)).end(eq("success"), isNull(), any());
452+
443453

444454
verifyNoMoreInteractions(onClientClosed);
445455
}
@@ -529,37 +539,75 @@ void startMessageSpansOnCreateBatch() {
529539
final EventHubProducerAsyncClient asyncProducer = new EventHubProducerAsyncClient(HOSTNAME, EVENT_HUB_NAME,
530540
connectionProcessor, retryOptions, tracerProvider, messageSerializer, Schedulers.parallel(),
531541
false, onClientClosed);
532-
final AmqpSendLink link = mock(AmqpSendLink.class);
533-
534-
when(link.getLinkSize()).thenReturn(Mono.just(ClientConstants.MAX_MESSAGE_LENGTH_BYTES));
535-
when(link.getHostname()).thenReturn(HOSTNAME);
536-
when(link.getEntityPath()).thenReturn(ENTITY_PATH);
537542

538543
// EC is the prefix they use when creating a link that sends to the service round-robin.
539544
when(connection.createSendLink(eq(EVENT_HUB_NAME), eq(EVENT_HUB_NAME), eq(retryOptions)))
540-
.thenReturn(Mono.just(link));
545+
.thenReturn(Mono.just(sendLink));
546+
when(sendLink.getHostname()).thenReturn(HOSTNAME);
547+
when(sendLink.getEntityPath()).thenReturn(EVENT_HUB_NAME);
548+
when(sendLink.send(anyList())).thenReturn(Mono.empty());
549+
when(sendLink.send(any(Message.class))).thenReturn(Mono.empty());
541550

551+
final AtomicReference<Integer> eventInd = new AtomicReference<>(0);
542552
when(tracer1.start(eq("EventHubs.message"), any(), eq(ProcessKind.MESSAGE))).thenAnswer(
543553
invocation -> {
544554
Context passed = invocation.getArgument(1, Context.class);
545-
assertEquals(passed.getData(AZ_TRACING_NAMESPACE_KEY).get(), AZ_NAMESPACE_VALUE);
546-
assertEquals(passed.getData(ENTITY_PATH_KEY).get(), ENTITY_PATH);
547-
assertEquals(passed.getData(HOST_NAME_KEY).get(), HOSTNAME);
555+
return passed.addData(PARENT_TRACE_CONTEXT_KEY, "span")
556+
.addData(DIAGNOSTIC_ID_KEY, eventInd.get().toString())
557+
.addData(SPAN_CONTEXT_KEY, eventInd.get());
558+
}
559+
);
560+
561+
when(tracer1.getSharedSpanBuilder(eq("EventHubs.send"), any())).thenAnswer(
562+
invocation -> {
563+
Context passed = invocation.getArgument(1, Context.class);
564+
assertEquals(0, passed.getData(SPAN_CONTEXT_KEY).orElseGet(null));
565+
return passed.addData(SPAN_BUILDER_KEY, "span-builder");
566+
}
567+
);
548568

549-
return passed.addData(PARENT_SPAN_KEY, "value").addData(DIAGNOSTIC_ID_KEY, "value2");
569+
final AtomicReference<Integer> linkNumber = new AtomicReference<>(0);
570+
doAnswer(
571+
invocation -> {
572+
Context passed = invocation.getArgument(0, Context.class);
573+
assertEquals("span-builder", passed.getData(SPAN_BUILDER_KEY).orElseGet(null));
574+
assertEquals(linkNumber.get(), passed.getData(SPAN_CONTEXT_KEY).orElseGet(null));
575+
linkNumber.set(linkNumber.get() + 1);
576+
return null;
577+
}).when(tracer1).addLink(any());
578+
579+
when(tracer1.start(eq("EventHubs.send"), any(), eq(ProcessKind.SEND))).thenAnswer(
580+
invocation -> {
581+
Context passed = invocation.getArgument(1, Context.class);
582+
assertEquals(EVENT_HUB_NAME, passed.getData(ENTITY_PATH_KEY).orElseGet(null));
583+
assertEquals(HOSTNAME, passed.getData(HOST_NAME_KEY).orElseGet(null));
584+
assertEquals(AZ_NAMESPACE_VALUE, passed.getData(AZ_TRACING_NAMESPACE_KEY).orElseGet(null));
585+
return passed.addData(PARENT_TRACE_CONTEXT_KEY, "span");
550586
}
551587
);
552588

553589
// Act & Assert
554-
StepVerifier.create(asyncProducer.createBatch())
555-
.assertNext(batch -> {
556-
Assertions.assertTrue(batch.tryAdd(new EventData("Hello World".getBytes(UTF_8))));
557-
})
590+
StepVerifier.create(asyncProducer.createBatch()
591+
.flatMap(batch -> {
592+
final EventData data0 = new EventData("Hello World".getBytes(UTF_8));
593+
Assertions.assertTrue(batch.tryAdd(data0));
594+
assertEquals("0", data0.getProperties().get(DIAGNOSTIC_ID_KEY));
595+
596+
eventInd.set(1);
597+
final EventData data1 = new EventData("Hello World".getBytes(UTF_8));
598+
Assertions.assertTrue(batch.tryAdd(data1));
599+
assertEquals("1", data1.getProperties().get(DIAGNOSTIC_ID_KEY));
600+
return asyncProducer.send(batch);
601+
}))
558602
.verifyComplete();
559603

560-
verify(tracer1, times(1))
604+
verify(tracer1, times(2))
561605
.start(eq("EventHubs.message"), any(), eq(ProcessKind.MESSAGE));
562-
verify(tracer1, times(1)).end(eq("success"), isNull(), any());
606+
verify(tracer1, times(1)).getSharedSpanBuilder(eq("EventHubs.send"), any());
607+
verify(tracer1, times(2)).addLink(any());
608+
verify(tracer1, times(1)).start(eq("EventHubs.send"), any(), eq(ProcessKind.SEND));
609+
verify(tracer1, times(2)).start(eq("EventHubs.message"), any(), eq(ProcessKind.MESSAGE));
610+
verify(tracer1, times(3)).end(eq("success"), isNull(), any());
563611

564612
verifyNoInteractions(onClientClosed);
565613
}

0 commit comments

Comments
 (0)