From a4af9d57da576504d5df934ddbce9f367ea47c1c Mon Sep 17 00:00:00 2001 From: Tudor Plugaru Date: Sun, 23 Nov 2025 17:30:01 +0200 Subject: [PATCH 1/4] Extract trace context from Kafka producer record headers Allow Kafka producers to continue existing traces by extracting trace context from record headers and using it as parent for the produce span. This enables distributed tracing when messages are forwarded between services with pre-existing context. --- .../KafkaProducerInstrumentation.java | 14 ++++++- .../test/groovy/KafkaClientTestBase.groovy | 42 +++++++++++++++++++ 2 files changed, 55 insertions(+), 1 deletion(-) diff --git a/dd-java-agent/instrumentation/kafka/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/KafkaProducerInstrumentation.java b/dd-java-agent/instrumentation/kafka/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/KafkaProducerInstrumentation.java index 2dc6059f7fc..bce8e7d4308 100644 --- a/dd-java-agent/instrumentation/kafka/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/KafkaProducerInstrumentation.java +++ b/dd-java-agent/instrumentation/kafka/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/KafkaProducerInstrumentation.java @@ -7,6 +7,7 @@ import static datadog.trace.api.datastreams.DataStreamsTags.Direction.OUTBOUND; import static datadog.trace.api.datastreams.DataStreamsTags.createWithClusterId; import static datadog.trace.bootstrap.instrumentation.api.AgentPropagation.DSM_CONCERN; +import static datadog.trace.bootstrap.instrumentation.api.AgentPropagation.extractContextAndGetSpanContext; import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.activateSpan; import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.activeSpan; import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.startSpan; @@ -33,6 +34,7 @@ import datadog.trace.bootstrap.InstrumentationContext; import datadog.trace.bootstrap.instrumentation.api.AgentScope; import datadog.trace.bootstrap.instrumentation.api.AgentSpan; +import datadog.trace.bootstrap.instrumentation.api.AgentSpanContext; import datadog.trace.bootstrap.instrumentation.api.AgentTracer; import datadog.trace.bootstrap.instrumentation.api.InstrumentationTags; import datadog.trace.instrumentation.kafka_common.ClusterIdHolder; @@ -125,8 +127,18 @@ public static AgentScope onEnter( ClusterIdHolder.set(clusterId); } + // Try to extract existing trace context from record headers + final AgentSpanContext extractedContext = + extractContextAndGetSpanContext(record.headers(), TextMapExtractAdapter.GETTER); + final AgentSpan parent = activeSpan(); - final AgentSpan span = startSpan(KAFKA_PRODUCE); + // Use extracted context as parent if available, otherwise use activeSpan + final AgentSpan span; + if (extractedContext != null) { + span = startSpan(KAFKA_PRODUCE, extractedContext); + } else { + span = startSpan(KAFKA_PRODUCE); + } PRODUCER_DECORATE.afterStart(span); PRODUCER_DECORATE.onProduce(span, record, producerConfig); diff --git a/dd-java-agent/instrumentation/kafka/kafka-clients-0.11/src/test/groovy/KafkaClientTestBase.groovy b/dd-java-agent/instrumentation/kafka/kafka-clients-0.11/src/test/groovy/KafkaClientTestBase.groovy index 4f3b0324a7e..eda4528b219 100644 --- a/dd-java-agent/instrumentation/kafka/kafka-clients-0.11/src/test/groovy/KafkaClientTestBase.groovy +++ b/dd-java-agent/instrumentation/kafka/kafka-clients-0.11/src/test/groovy/KafkaClientTestBase.groovy @@ -25,7 +25,11 @@ import org.apache.kafka.clients.producer.ProducerConfig import org.apache.kafka.clients.producer.ProducerRecord import org.apache.kafka.clients.producer.RecordMetadata import org.apache.kafka.common.TopicPartition +import org.apache.kafka.common.header.internals.RecordHeader +import org.apache.kafka.common.header.internals.RecordHeaders import org.apache.kafka.common.serialization.StringSerializer + +import java.nio.charset.StandardCharsets import org.junit.Rule import org.springframework.kafka.core.DefaultKafkaConsumerFactory import org.springframework.kafka.core.DefaultKafkaProducerFactory @@ -1013,6 +1017,44 @@ abstract class KafkaClientTestBase extends VersionedNamingTestBase { "true" | true } + def "test producer extracts and uses existing trace context from record headers"() { + setup: + def senderProps = KafkaTestUtils.senderProps(embeddedKafka.getBrokersAsString()) + def producer = new KafkaProducer<>(senderProps) + + // Create a trace context to inject into headers (simulating a message with existing context) + def traceId = 1234567890123456L + def spanId = 9876543210987654L + def headers = new RecordHeaders() + headers.add(new RecordHeader("x-datadog-trace-id", + String.valueOf(traceId).getBytes(StandardCharsets.UTF_8))) + headers.add(new RecordHeader("x-datadog-parent-id", + String.valueOf(spanId).getBytes(StandardCharsets.UTF_8))) + + when: + // Send a message with pre-existing trace context in headers + def record = new ProducerRecord(SHARED_TOPIC, 0, null, "test-context-extraction", headers) + producer.send(record).get() + + then: + // Verify that a produce span was created that used the extracted context + assertTraces(1) { + trace(1) { + producerSpan(it, senderProps, null, false) + // Verify the span used the extracted context as parent + def producedSpan = TEST_WRITER[0][0] + assert producedSpan.context().traceId == traceId + assert producedSpan.context().parentId == spanId + // Verify a NEW span was created (not reusing the extracted span ID) + assert producedSpan.context().spanId != spanId + assert producedSpan.context().spanId != 0 + } + } + + cleanup: + producer?.close() + } + def containerProperties() { try { // Different class names for test and latestDepTest. From 118c499615b3228b5628424029570a5ad52e6cda Mon Sep 17 00:00:00 2001 From: Tudor Plugaru Date: Fri, 28 Nov 2025 10:49:22 +0200 Subject: [PATCH 2/4] Link the parent span to the extracted context for KafkaProducerCallback --- .../KafkaProducerInstrumentation.java | 10 +++++-- .../test/groovy/KafkaClientTestBase.groovy | 30 +++++++------------ 2 files changed, 18 insertions(+), 22 deletions(-) diff --git a/dd-java-agent/instrumentation/kafka/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/KafkaProducerInstrumentation.java b/dd-java-agent/instrumentation/kafka/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/KafkaProducerInstrumentation.java index bce8e7d4308..e781ae2dbae 100644 --- a/dd-java-agent/instrumentation/kafka/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/KafkaProducerInstrumentation.java +++ b/dd-java-agent/instrumentation/kafka/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/KafkaProducerInstrumentation.java @@ -131,18 +131,22 @@ public static AgentScope onEnter( final AgentSpanContext extractedContext = extractContextAndGetSpanContext(record.headers(), TextMapExtractAdapter.GETTER); - final AgentSpan parent = activeSpan(); - // Use extracted context as parent if available, otherwise use activeSpan + final AgentSpan localActiveSpan = activeSpan(); + final AgentSpan span; + final AgentSpan callbackParentSpan; + if (extractedContext != null) { span = startSpan(KAFKA_PRODUCE, extractedContext); + callbackParentSpan = span; } else { span = startSpan(KAFKA_PRODUCE); + callbackParentSpan = localActiveSpan; } PRODUCER_DECORATE.afterStart(span); PRODUCER_DECORATE.onProduce(span, record, producerConfig); - callback = new KafkaProducerCallback(callback, parent, span, clusterId); + callback = new KafkaProducerCallback(callback, callbackParentSpan, span, clusterId); if (record.value() == null) { span.setTag(InstrumentationTags.TOMBSTONE, true); diff --git a/dd-java-agent/instrumentation/kafka/kafka-clients-0.11/src/test/groovy/KafkaClientTestBase.groovy b/dd-java-agent/instrumentation/kafka/kafka-clients-0.11/src/test/groovy/KafkaClientTestBase.groovy index eda4528b219..bd9df3a4101 100644 --- a/dd-java-agent/instrumentation/kafka/kafka-clients-0.11/src/test/groovy/KafkaClientTestBase.groovy +++ b/dd-java-agent/instrumentation/kafka/kafka-clients-0.11/src/test/groovy/KafkaClientTestBase.groovy @@ -1022,34 +1022,26 @@ abstract class KafkaClientTestBase extends VersionedNamingTestBase { def senderProps = KafkaTestUtils.senderProps(embeddedKafka.getBrokersAsString()) def producer = new KafkaProducer<>(senderProps) - // Create a trace context to inject into headers (simulating a message with existing context) - def traceId = 1234567890123456L - def spanId = 9876543210987654L + def existingTraceId = 1234567890123456L + def existingSpanId = 9876543210987654L def headers = new RecordHeaders() headers.add(new RecordHeader("x-datadog-trace-id", - String.valueOf(traceId).getBytes(StandardCharsets.UTF_8))) + String.valueOf(existingTraceId).getBytes(StandardCharsets.UTF_8))) headers.add(new RecordHeader("x-datadog-parent-id", - String.valueOf(spanId).getBytes(StandardCharsets.UTF_8))) + String.valueOf(existingSpanId).getBytes(StandardCharsets.UTF_8))) when: - // Send a message with pre-existing trace context in headers def record = new ProducerRecord(SHARED_TOPIC, 0, null, "test-context-extraction", headers) producer.send(record).get() then: - // Verify that a produce span was created that used the extracted context - assertTraces(1) { - trace(1) { - producerSpan(it, senderProps, null, false) - // Verify the span used the extracted context as parent - def producedSpan = TEST_WRITER[0][0] - assert producedSpan.context().traceId == traceId - assert producedSpan.context().parentId == spanId - // Verify a NEW span was created (not reusing the extracted span ID) - assert producedSpan.context().spanId != spanId - assert producedSpan.context().spanId != 0 - } - } + TEST_WRITER.waitForTraces(1) + def producedSpan = TEST_WRITER[0][0] + // Verify the span used the extracted context as parent + producedSpan.traceId.toLong() == existingTraceId + producedSpan.parentId == existingSpanId + // Verify a new span was created (not reusing the extracted span ID) + producedSpan.spanId != existingSpanId cleanup: producer?.close() From bfccfc36120b096c42a8130529b121c4e7a06110 Mon Sep 17 00:00:00 2001 From: Tudor Plugaru Date: Thu, 4 Dec 2025 14:30:16 +0200 Subject: [PATCH 3/4] Port the change to Kafka Clients 3.8+ --- .../kafka_clients38/ProducerAdvice.java | 22 ++++++++++-- .../test/groovy/KafkaClientTestBase.groovy | 34 +++++++++++++++++++ 2 files changed, 53 insertions(+), 3 deletions(-) diff --git a/dd-java-agent/instrumentation/kafka/kafka-clients-3.8/src/main/java17/datadog/trace/instrumentation/kafka_clients38/ProducerAdvice.java b/dd-java-agent/instrumentation/kafka/kafka-clients-3.8/src/main/java17/datadog/trace/instrumentation/kafka_clients38/ProducerAdvice.java index be6bdc2500b..b64ccd6d06c 100644 --- a/dd-java-agent/instrumentation/kafka/kafka-clients-3.8/src/main/java17/datadog/trace/instrumentation/kafka_clients38/ProducerAdvice.java +++ b/dd-java-agent/instrumentation/kafka/kafka-clients-3.8/src/main/java17/datadog/trace/instrumentation/kafka_clients38/ProducerAdvice.java @@ -5,6 +5,7 @@ import static datadog.trace.api.datastreams.DataStreamsTags.Direction.OUTBOUND; import static datadog.trace.api.datastreams.DataStreamsTags.create; import static datadog.trace.bootstrap.instrumentation.api.AgentPropagation.DSM_CONCERN; +import static datadog.trace.bootstrap.instrumentation.api.AgentPropagation.extractContextAndGetSpanContext; import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.activateSpan; import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.activeSpan; import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.startSpan; @@ -21,6 +22,7 @@ import datadog.trace.bootstrap.InstrumentationContext; import datadog.trace.bootstrap.instrumentation.api.AgentScope; import datadog.trace.bootstrap.instrumentation.api.AgentSpan; +import datadog.trace.bootstrap.instrumentation.api.AgentSpanContext; import datadog.trace.bootstrap.instrumentation.api.InstrumentationTags; import datadog.trace.instrumentation.kafka_common.ClusterIdHolder; import net.bytebuddy.asm.Advice; @@ -46,12 +48,26 @@ public static AgentScope onEnter( ClusterIdHolder.set(clusterId); } - final AgentSpan parent = activeSpan(); - final AgentSpan span = startSpan(KAFKA_PRODUCE); + // Try to extract existing trace context from record headers + final AgentSpanContext extractedContext = + extractContextAndGetSpanContext(record.headers(), TextMapExtractAdapter.GETTER); + + final AgentSpan localActiveSpan = activeSpan(); + + final AgentSpan span; + final AgentSpan callbackParentSpan; + + if (extractedContext != null) { + span = startSpan(KAFKA_PRODUCE, extractedContext); + callbackParentSpan = span; + } else { + span = startSpan(KAFKA_PRODUCE); + callbackParentSpan = localActiveSpan; + } PRODUCER_DECORATE.afterStart(span); PRODUCER_DECORATE.onProduce(span, record, producerConfig); - callback = new KafkaProducerCallback(callback, parent, span, clusterId); + callback = new KafkaProducerCallback(callback, callbackParentSpan, span, clusterId); if (record.value() == null) { span.setTag(InstrumentationTags.TOMBSTONE, true); diff --git a/dd-java-agent/instrumentation/kafka/kafka-clients-3.8/src/test/groovy/KafkaClientTestBase.groovy b/dd-java-agent/instrumentation/kafka/kafka-clients-3.8/src/test/groovy/KafkaClientTestBase.groovy index c0a2c9c2625..e889cf2ac93 100644 --- a/dd-java-agent/instrumentation/kafka/kafka-clients-3.8/src/test/groovy/KafkaClientTestBase.groovy +++ b/dd-java-agent/instrumentation/kafka/kafka-clients-3.8/src/test/groovy/KafkaClientTestBase.groovy @@ -18,7 +18,11 @@ import org.apache.kafka.clients.producer.ProducerConfig import org.apache.kafka.clients.producer.ProducerRecord import org.apache.kafka.clients.producer.RecordMetadata import org.apache.kafka.common.TopicPartition +import org.apache.kafka.common.header.internals.RecordHeader +import org.apache.kafka.common.header.internals.RecordHeaders import org.apache.kafka.common.serialization.StringSerializer + +import java.nio.charset.StandardCharsets import org.springframework.kafka.core.DefaultKafkaConsumerFactory import org.springframework.kafka.core.DefaultKafkaProducerFactory import org.springframework.kafka.core.KafkaTemplate @@ -853,6 +857,36 @@ abstract class KafkaClientTestBase extends VersionedNamingTestBase { "true" | true } + def "test producer extracts and uses existing trace context from record headers"() { + setup: + def senderProps = KafkaTestUtils.producerProps(embeddedKafka.getBrokersAsString()) + def producer = new KafkaProducer<>(senderProps) + + def existingTraceId = 1234567890123456L + def existingSpanId = 9876543210987654L + def headers = new RecordHeaders() + headers.add(new RecordHeader("x-datadog-trace-id", + String.valueOf(existingTraceId).getBytes(StandardCharsets.UTF_8))) + headers.add(new RecordHeader("x-datadog-parent-id", + String.valueOf(existingSpanId).getBytes(StandardCharsets.UTF_8))) + + when: + def record = new ProducerRecord(SHARED_TOPIC, 0, null, "test-context-extraction", headers) + producer.send(record).get() + + then: + TEST_WRITER.waitForTraces(1) + def producedSpan = TEST_WRITER[0][0] + // Verify the span used the extracted context as parent + producedSpan.traceId.toLong() == existingTraceId + producedSpan.parentId == existingSpanId + // Verify a new span was created (not reusing the extracted span ID) + producedSpan.spanId != existingSpanId + + cleanup: + producer?.close() + } + def producerSpan( TraceAssert trace, Map config, From 82a174513b8a288c3493a14bf8196e8a3bbfd253 Mon Sep 17 00:00:00 2001 From: Tudor Plugaru Date: Fri, 5 Dec 2025 17:18:29 +0200 Subject: [PATCH 4/4] Add TextMapExtractAdapter to Kafka producer instrumentation --- .../kafka_clients/KafkaProducerInstrumentation.java | 1 + .../kafka_clients38/KafkaProducerInstrumentation.java | 1 + 2 files changed, 2 insertions(+) diff --git a/dd-java-agent/instrumentation/kafka/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/KafkaProducerInstrumentation.java b/dd-java-agent/instrumentation/kafka/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/KafkaProducerInstrumentation.java index e781ae2dbae..341b4f654da 100644 --- a/dd-java-agent/instrumentation/kafka/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/KafkaProducerInstrumentation.java +++ b/dd-java-agent/instrumentation/kafka/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/KafkaProducerInstrumentation.java @@ -78,6 +78,7 @@ public String[] helperClassNames() { packageName + ".KafkaDecorator", packageName + ".TextMapInjectAdapterInterface", packageName + ".TextMapInjectAdapter", + packageName + ".TextMapExtractAdapter", packageName + ".NoopTextMapInjectAdapter", packageName + ".KafkaProducerCallback", "datadog.trace.instrumentation.kafka_common.StreamingContext", diff --git a/dd-java-agent/instrumentation/kafka/kafka-clients-3.8/src/main/java/datadog/trace/instrumentation/kafka_clients38/KafkaProducerInstrumentation.java b/dd-java-agent/instrumentation/kafka/kafka-clients-3.8/src/main/java/datadog/trace/instrumentation/kafka_clients38/KafkaProducerInstrumentation.java index a06469fb494..2e40f6b67e3 100644 --- a/dd-java-agent/instrumentation/kafka/kafka-clients-3.8/src/main/java/datadog/trace/instrumentation/kafka_clients38/KafkaProducerInstrumentation.java +++ b/dd-java-agent/instrumentation/kafka/kafka-clients-3.8/src/main/java/datadog/trace/instrumentation/kafka_clients38/KafkaProducerInstrumentation.java @@ -38,6 +38,7 @@ public String[] helperClassNames() { packageName + ".KafkaDecorator", packageName + ".TextMapInjectAdapterInterface", packageName + ".TextMapInjectAdapter", + packageName + ".TextMapExtractAdapter", packageName + ".NoopTextMapInjectAdapter", packageName + ".KafkaProducerCallback", "datadog.trace.instrumentation.kafka_common.StreamingContext",