From e2294d3c73e4665cf7144b4e2a641d63d4830eda Mon Sep 17 00:00:00 2001 From: Tudor Plugaru Date: Sun, 23 Nov 2025 17:30:01 +0200 Subject: [PATCH 1/4] Extract trace context from Kafka producer record headers Allow Kafka producers to continue existing traces by extracting trace context from record headers and using it as parent for the produce span. This enables distributed tracing when messages are forwarded between services with pre-existing context. --- .../KafkaProducerInstrumentation.java | 14 ++++++- .../test/groovy/KafkaClientTestBase.groovy | 42 +++++++++++++++++++ 2 files changed, 55 insertions(+), 1 deletion(-) diff --git a/dd-java-agent/instrumentation/kafka/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/KafkaProducerInstrumentation.java b/dd-java-agent/instrumentation/kafka/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/KafkaProducerInstrumentation.java index 2dc6059f7fc..bce8e7d4308 100644 --- a/dd-java-agent/instrumentation/kafka/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/KafkaProducerInstrumentation.java +++ b/dd-java-agent/instrumentation/kafka/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/KafkaProducerInstrumentation.java @@ -7,6 +7,7 @@ import static datadog.trace.api.datastreams.DataStreamsTags.Direction.OUTBOUND; import static datadog.trace.api.datastreams.DataStreamsTags.createWithClusterId; import static datadog.trace.bootstrap.instrumentation.api.AgentPropagation.DSM_CONCERN; +import static datadog.trace.bootstrap.instrumentation.api.AgentPropagation.extractContextAndGetSpanContext; import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.activateSpan; import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.activeSpan; import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.startSpan; @@ -33,6 +34,7 @@ import datadog.trace.bootstrap.InstrumentationContext; import datadog.trace.bootstrap.instrumentation.api.AgentScope; import datadog.trace.bootstrap.instrumentation.api.AgentSpan; +import datadog.trace.bootstrap.instrumentation.api.AgentSpanContext; import datadog.trace.bootstrap.instrumentation.api.AgentTracer; import datadog.trace.bootstrap.instrumentation.api.InstrumentationTags; import datadog.trace.instrumentation.kafka_common.ClusterIdHolder; @@ -125,8 +127,18 @@ public static AgentScope onEnter( ClusterIdHolder.set(clusterId); } + // Try to extract existing trace context from record headers + final AgentSpanContext extractedContext = + extractContextAndGetSpanContext(record.headers(), TextMapExtractAdapter.GETTER); + final AgentSpan parent = activeSpan(); - final AgentSpan span = startSpan(KAFKA_PRODUCE); + // Use extracted context as parent if available, otherwise use activeSpan + final AgentSpan span; + if (extractedContext != null) { + span = startSpan(KAFKA_PRODUCE, extractedContext); + } else { + span = startSpan(KAFKA_PRODUCE); + } PRODUCER_DECORATE.afterStart(span); PRODUCER_DECORATE.onProduce(span, record, producerConfig); diff --git a/dd-java-agent/instrumentation/kafka/kafka-clients-0.11/src/test/groovy/KafkaClientTestBase.groovy b/dd-java-agent/instrumentation/kafka/kafka-clients-0.11/src/test/groovy/KafkaClientTestBase.groovy index 686c550a8b6..b2eaa1fff06 100644 --- a/dd-java-agent/instrumentation/kafka/kafka-clients-0.11/src/test/groovy/KafkaClientTestBase.groovy +++ b/dd-java-agent/instrumentation/kafka/kafka-clients-0.11/src/test/groovy/KafkaClientTestBase.groovy @@ -25,7 +25,11 @@ import org.apache.kafka.clients.producer.ProducerConfig import org.apache.kafka.clients.producer.ProducerRecord import org.apache.kafka.clients.producer.RecordMetadata import org.apache.kafka.common.TopicPartition +import org.apache.kafka.common.header.internals.RecordHeader +import org.apache.kafka.common.header.internals.RecordHeaders import org.apache.kafka.common.serialization.StringSerializer + +import java.nio.charset.StandardCharsets import org.junit.Rule import org.springframework.kafka.core.DefaultKafkaConsumerFactory import org.springframework.kafka.core.DefaultKafkaProducerFactory @@ -1018,6 +1022,44 @@ abstract class KafkaClientTestBase extends VersionedNamingTestBase { "true" | true } + def "test producer extracts and uses existing trace context from record headers"() { + setup: + def senderProps = KafkaTestUtils.senderProps(embeddedKafka.getBrokersAsString()) + def producer = new KafkaProducer<>(senderProps) + + // Create a trace context to inject into headers (simulating a message with existing context) + def traceId = 1234567890123456L + def spanId = 9876543210987654L + def headers = new RecordHeaders() + headers.add(new RecordHeader("x-datadog-trace-id", + String.valueOf(traceId).getBytes(StandardCharsets.UTF_8))) + headers.add(new RecordHeader("x-datadog-parent-id", + String.valueOf(spanId).getBytes(StandardCharsets.UTF_8))) + + when: + // Send a message with pre-existing trace context in headers + def record = new ProducerRecord(SHARED_TOPIC, 0, null, "test-context-extraction", headers) + producer.send(record).get() + + then: + // Verify that a produce span was created that used the extracted context + assertTraces(1) { + trace(1) { + producerSpan(it, senderProps, null, false) + // Verify the span used the extracted context as parent + def producedSpan = TEST_WRITER[0][0] + assert producedSpan.context().traceId == traceId + assert producedSpan.context().parentId == spanId + // Verify a NEW span was created (not reusing the extracted span ID) + assert producedSpan.context().spanId != spanId + assert producedSpan.context().spanId != 0 + } + } + + cleanup: + producer?.close() + } + def containerProperties() { try { // Different class names for test and latestDepTest. From 5e9b05dbe22c327082f8ec3cbd015115713ffb02 Mon Sep 17 00:00:00 2001 From: Tudor Plugaru Date: Fri, 28 Nov 2025 10:49:22 +0200 Subject: [PATCH 2/4] Link the parent span to the extracted context for KafkaProducerCallback --- .../KafkaProducerInstrumentation.java | 10 +++++-- .../test/groovy/KafkaClientTestBase.groovy | 30 +++++++------------ 2 files changed, 18 insertions(+), 22 deletions(-) diff --git a/dd-java-agent/instrumentation/kafka/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/KafkaProducerInstrumentation.java b/dd-java-agent/instrumentation/kafka/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/KafkaProducerInstrumentation.java index bce8e7d4308..e781ae2dbae 100644 --- a/dd-java-agent/instrumentation/kafka/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/KafkaProducerInstrumentation.java +++ b/dd-java-agent/instrumentation/kafka/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/KafkaProducerInstrumentation.java @@ -131,18 +131,22 @@ public static AgentScope onEnter( final AgentSpanContext extractedContext = extractContextAndGetSpanContext(record.headers(), TextMapExtractAdapter.GETTER); - final AgentSpan parent = activeSpan(); - // Use extracted context as parent if available, otherwise use activeSpan + final AgentSpan localActiveSpan = activeSpan(); + final AgentSpan span; + final AgentSpan callbackParentSpan; + if (extractedContext != null) { span = startSpan(KAFKA_PRODUCE, extractedContext); + callbackParentSpan = span; } else { span = startSpan(KAFKA_PRODUCE); + callbackParentSpan = localActiveSpan; } PRODUCER_DECORATE.afterStart(span); PRODUCER_DECORATE.onProduce(span, record, producerConfig); - callback = new KafkaProducerCallback(callback, parent, span, clusterId); + callback = new KafkaProducerCallback(callback, callbackParentSpan, span, clusterId); if (record.value() == null) { span.setTag(InstrumentationTags.TOMBSTONE, true); diff --git a/dd-java-agent/instrumentation/kafka/kafka-clients-0.11/src/test/groovy/KafkaClientTestBase.groovy b/dd-java-agent/instrumentation/kafka/kafka-clients-0.11/src/test/groovy/KafkaClientTestBase.groovy index b2eaa1fff06..1cad2e7c24b 100644 --- a/dd-java-agent/instrumentation/kafka/kafka-clients-0.11/src/test/groovy/KafkaClientTestBase.groovy +++ b/dd-java-agent/instrumentation/kafka/kafka-clients-0.11/src/test/groovy/KafkaClientTestBase.groovy @@ -1027,34 +1027,26 @@ abstract class KafkaClientTestBase extends VersionedNamingTestBase { def senderProps = KafkaTestUtils.senderProps(embeddedKafka.getBrokersAsString()) def producer = new KafkaProducer<>(senderProps) - // Create a trace context to inject into headers (simulating a message with existing context) - def traceId = 1234567890123456L - def spanId = 9876543210987654L + def existingTraceId = 1234567890123456L + def existingSpanId = 9876543210987654L def headers = new RecordHeaders() headers.add(new RecordHeader("x-datadog-trace-id", - String.valueOf(traceId).getBytes(StandardCharsets.UTF_8))) + String.valueOf(existingTraceId).getBytes(StandardCharsets.UTF_8))) headers.add(new RecordHeader("x-datadog-parent-id", - String.valueOf(spanId).getBytes(StandardCharsets.UTF_8))) + String.valueOf(existingSpanId).getBytes(StandardCharsets.UTF_8))) when: - // Send a message with pre-existing trace context in headers def record = new ProducerRecord(SHARED_TOPIC, 0, null, "test-context-extraction", headers) producer.send(record).get() then: - // Verify that a produce span was created that used the extracted context - assertTraces(1) { - trace(1) { - producerSpan(it, senderProps, null, false) - // Verify the span used the extracted context as parent - def producedSpan = TEST_WRITER[0][0] - assert producedSpan.context().traceId == traceId - assert producedSpan.context().parentId == spanId - // Verify a NEW span was created (not reusing the extracted span ID) - assert producedSpan.context().spanId != spanId - assert producedSpan.context().spanId != 0 - } - } + TEST_WRITER.waitForTraces(1) + def producedSpan = TEST_WRITER[0][0] + // Verify the span used the extracted context as parent + producedSpan.traceId.toLong() == existingTraceId + producedSpan.parentId == existingSpanId + // Verify a new span was created (not reusing the extracted span ID) + producedSpan.spanId != existingSpanId cleanup: producer?.close() From 9cf2fd62f0d1cfb245a948355bef81761e0a9f52 Mon Sep 17 00:00:00 2001 From: Tudor Plugaru Date: Thu, 4 Dec 2025 14:30:16 +0200 Subject: [PATCH 3/4] Port the change to Kafka Clients 3.8+ --- .../kafka_clients38/ProducerAdvice.java | 22 ++++++++++-- .../test/groovy/KafkaClientTestBase.groovy | 34 +++++++++++++++++++ 2 files changed, 53 insertions(+), 3 deletions(-) diff --git a/dd-java-agent/instrumentation/kafka/kafka-clients-3.8/src/main/java17/datadog/trace/instrumentation/kafka_clients38/ProducerAdvice.java b/dd-java-agent/instrumentation/kafka/kafka-clients-3.8/src/main/java17/datadog/trace/instrumentation/kafka_clients38/ProducerAdvice.java index be6bdc2500b..b64ccd6d06c 100644 --- a/dd-java-agent/instrumentation/kafka/kafka-clients-3.8/src/main/java17/datadog/trace/instrumentation/kafka_clients38/ProducerAdvice.java +++ b/dd-java-agent/instrumentation/kafka/kafka-clients-3.8/src/main/java17/datadog/trace/instrumentation/kafka_clients38/ProducerAdvice.java @@ -5,6 +5,7 @@ import static datadog.trace.api.datastreams.DataStreamsTags.Direction.OUTBOUND; import static datadog.trace.api.datastreams.DataStreamsTags.create; import static datadog.trace.bootstrap.instrumentation.api.AgentPropagation.DSM_CONCERN; +import static datadog.trace.bootstrap.instrumentation.api.AgentPropagation.extractContextAndGetSpanContext; import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.activateSpan; import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.activeSpan; import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.startSpan; @@ -21,6 +22,7 @@ import datadog.trace.bootstrap.InstrumentationContext; import datadog.trace.bootstrap.instrumentation.api.AgentScope; import datadog.trace.bootstrap.instrumentation.api.AgentSpan; +import datadog.trace.bootstrap.instrumentation.api.AgentSpanContext; import datadog.trace.bootstrap.instrumentation.api.InstrumentationTags; import datadog.trace.instrumentation.kafka_common.ClusterIdHolder; import net.bytebuddy.asm.Advice; @@ -46,12 +48,26 @@ public static AgentScope onEnter( ClusterIdHolder.set(clusterId); } - final AgentSpan parent = activeSpan(); - final AgentSpan span = startSpan(KAFKA_PRODUCE); + // Try to extract existing trace context from record headers + final AgentSpanContext extractedContext = + extractContextAndGetSpanContext(record.headers(), TextMapExtractAdapter.GETTER); + + final AgentSpan localActiveSpan = activeSpan(); + + final AgentSpan span; + final AgentSpan callbackParentSpan; + + if (extractedContext != null) { + span = startSpan(KAFKA_PRODUCE, extractedContext); + callbackParentSpan = span; + } else { + span = startSpan(KAFKA_PRODUCE); + callbackParentSpan = localActiveSpan; + } PRODUCER_DECORATE.afterStart(span); PRODUCER_DECORATE.onProduce(span, record, producerConfig); - callback = new KafkaProducerCallback(callback, parent, span, clusterId); + callback = new KafkaProducerCallback(callback, callbackParentSpan, span, clusterId); if (record.value() == null) { span.setTag(InstrumentationTags.TOMBSTONE, true); diff --git a/dd-java-agent/instrumentation/kafka/kafka-clients-3.8/src/test/groovy/KafkaClientTestBase.groovy b/dd-java-agent/instrumentation/kafka/kafka-clients-3.8/src/test/groovy/KafkaClientTestBase.groovy index c0a2c9c2625..e889cf2ac93 100644 --- a/dd-java-agent/instrumentation/kafka/kafka-clients-3.8/src/test/groovy/KafkaClientTestBase.groovy +++ b/dd-java-agent/instrumentation/kafka/kafka-clients-3.8/src/test/groovy/KafkaClientTestBase.groovy @@ -18,7 +18,11 @@ import org.apache.kafka.clients.producer.ProducerConfig import org.apache.kafka.clients.producer.ProducerRecord import org.apache.kafka.clients.producer.RecordMetadata import org.apache.kafka.common.TopicPartition +import org.apache.kafka.common.header.internals.RecordHeader +import org.apache.kafka.common.header.internals.RecordHeaders import org.apache.kafka.common.serialization.StringSerializer + +import java.nio.charset.StandardCharsets import org.springframework.kafka.core.DefaultKafkaConsumerFactory import org.springframework.kafka.core.DefaultKafkaProducerFactory import org.springframework.kafka.core.KafkaTemplate @@ -853,6 +857,36 @@ abstract class KafkaClientTestBase extends VersionedNamingTestBase { "true" | true } + def "test producer extracts and uses existing trace context from record headers"() { + setup: + def senderProps = KafkaTestUtils.producerProps(embeddedKafka.getBrokersAsString()) + def producer = new KafkaProducer<>(senderProps) + + def existingTraceId = 1234567890123456L + def existingSpanId = 9876543210987654L + def headers = new RecordHeaders() + headers.add(new RecordHeader("x-datadog-trace-id", + String.valueOf(existingTraceId).getBytes(StandardCharsets.UTF_8))) + headers.add(new RecordHeader("x-datadog-parent-id", + String.valueOf(existingSpanId).getBytes(StandardCharsets.UTF_8))) + + when: + def record = new ProducerRecord(SHARED_TOPIC, 0, null, "test-context-extraction", headers) + producer.send(record).get() + + then: + TEST_WRITER.waitForTraces(1) + def producedSpan = TEST_WRITER[0][0] + // Verify the span used the extracted context as parent + producedSpan.traceId.toLong() == existingTraceId + producedSpan.parentId == existingSpanId + // Verify a new span was created (not reusing the extracted span ID) + producedSpan.spanId != existingSpanId + + cleanup: + producer?.close() + } + def producerSpan( TraceAssert trace, Map config, From 973db4ef1492335bd2ed30530c06a02561e4fbc2 Mon Sep 17 00:00:00 2001 From: Tudor Plugaru Date: Fri, 5 Dec 2025 17:18:29 +0200 Subject: [PATCH 4/4] Add TextMapExtractAdapter to Kafka producer instrumentation --- .../kafka_clients/KafkaProducerInstrumentation.java | 1 + .../kafka_clients38/KafkaProducerInstrumentation.java | 1 + 2 files changed, 2 insertions(+) diff --git a/dd-java-agent/instrumentation/kafka/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/KafkaProducerInstrumentation.java b/dd-java-agent/instrumentation/kafka/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/KafkaProducerInstrumentation.java index e781ae2dbae..341b4f654da 100644 --- a/dd-java-agent/instrumentation/kafka/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/KafkaProducerInstrumentation.java +++ b/dd-java-agent/instrumentation/kafka/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/KafkaProducerInstrumentation.java @@ -78,6 +78,7 @@ public String[] helperClassNames() { packageName + ".KafkaDecorator", packageName + ".TextMapInjectAdapterInterface", packageName + ".TextMapInjectAdapter", + packageName + ".TextMapExtractAdapter", packageName + ".NoopTextMapInjectAdapter", packageName + ".KafkaProducerCallback", "datadog.trace.instrumentation.kafka_common.StreamingContext", diff --git a/dd-java-agent/instrumentation/kafka/kafka-clients-3.8/src/main/java/datadog/trace/instrumentation/kafka_clients38/KafkaProducerInstrumentation.java b/dd-java-agent/instrumentation/kafka/kafka-clients-3.8/src/main/java/datadog/trace/instrumentation/kafka_clients38/KafkaProducerInstrumentation.java index a06469fb494..2e40f6b67e3 100644 --- a/dd-java-agent/instrumentation/kafka/kafka-clients-3.8/src/main/java/datadog/trace/instrumentation/kafka_clients38/KafkaProducerInstrumentation.java +++ b/dd-java-agent/instrumentation/kafka/kafka-clients-3.8/src/main/java/datadog/trace/instrumentation/kafka_clients38/KafkaProducerInstrumentation.java @@ -38,6 +38,7 @@ public String[] helperClassNames() { packageName + ".KafkaDecorator", packageName + ".TextMapInjectAdapterInterface", packageName + ".TextMapInjectAdapter", + packageName + ".TextMapExtractAdapter", packageName + ".NoopTextMapInjectAdapter", packageName + ".KafkaProducerCallback", "datadog.trace.instrumentation.kafka_common.StreamingContext",