diff --git a/dd-java-agent/instrumentation/kafka/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/KafkaProducerInstrumentation.java b/dd-java-agent/instrumentation/kafka/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/KafkaProducerInstrumentation.java index 2dc6059f7fc..341b4f654da 100644 --- a/dd-java-agent/instrumentation/kafka/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/KafkaProducerInstrumentation.java +++ b/dd-java-agent/instrumentation/kafka/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/KafkaProducerInstrumentation.java @@ -7,6 +7,7 @@ import static datadog.trace.api.datastreams.DataStreamsTags.Direction.OUTBOUND; import static datadog.trace.api.datastreams.DataStreamsTags.createWithClusterId; import static datadog.trace.bootstrap.instrumentation.api.AgentPropagation.DSM_CONCERN; +import static datadog.trace.bootstrap.instrumentation.api.AgentPropagation.extractContextAndGetSpanContext; import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.activateSpan; import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.activeSpan; import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.startSpan; @@ -33,6 +34,7 @@ import datadog.trace.bootstrap.InstrumentationContext; import datadog.trace.bootstrap.instrumentation.api.AgentScope; import datadog.trace.bootstrap.instrumentation.api.AgentSpan; +import datadog.trace.bootstrap.instrumentation.api.AgentSpanContext; import datadog.trace.bootstrap.instrumentation.api.AgentTracer; import datadog.trace.bootstrap.instrumentation.api.InstrumentationTags; import datadog.trace.instrumentation.kafka_common.ClusterIdHolder; @@ -76,6 +78,7 @@ public String[] helperClassNames() { packageName + ".KafkaDecorator", packageName + ".TextMapInjectAdapterInterface", packageName + ".TextMapInjectAdapter", + packageName + ".TextMapExtractAdapter", packageName + ".NoopTextMapInjectAdapter", packageName + ".KafkaProducerCallback", "datadog.trace.instrumentation.kafka_common.StreamingContext", @@ -125,12 +128,26 @@ public static AgentScope onEnter( ClusterIdHolder.set(clusterId); } - final AgentSpan parent = activeSpan(); - final AgentSpan span = startSpan(KAFKA_PRODUCE); + // Try to extract existing trace context from record headers + final AgentSpanContext extractedContext = + extractContextAndGetSpanContext(record.headers(), TextMapExtractAdapter.GETTER); + + final AgentSpan localActiveSpan = activeSpan(); + + final AgentSpan span; + final AgentSpan callbackParentSpan; + + if (extractedContext != null) { + span = startSpan(KAFKA_PRODUCE, extractedContext); + callbackParentSpan = span; + } else { + span = startSpan(KAFKA_PRODUCE); + callbackParentSpan = localActiveSpan; + } PRODUCER_DECORATE.afterStart(span); PRODUCER_DECORATE.onProduce(span, record, producerConfig); - callback = new KafkaProducerCallback(callback, parent, span, clusterId); + callback = new KafkaProducerCallback(callback, callbackParentSpan, span, clusterId); if (record.value() == null) { span.setTag(InstrumentationTags.TOMBSTONE, true); diff --git a/dd-java-agent/instrumentation/kafka/kafka-clients-0.11/src/test/groovy/KafkaClientTestBase.groovy b/dd-java-agent/instrumentation/kafka/kafka-clients-0.11/src/test/groovy/KafkaClientTestBase.groovy index 4f3b0324a7e..bd9df3a4101 100644 --- a/dd-java-agent/instrumentation/kafka/kafka-clients-0.11/src/test/groovy/KafkaClientTestBase.groovy +++ b/dd-java-agent/instrumentation/kafka/kafka-clients-0.11/src/test/groovy/KafkaClientTestBase.groovy @@ -25,7 +25,11 @@ import org.apache.kafka.clients.producer.ProducerConfig import org.apache.kafka.clients.producer.ProducerRecord import org.apache.kafka.clients.producer.RecordMetadata import org.apache.kafka.common.TopicPartition +import org.apache.kafka.common.header.internals.RecordHeader +import org.apache.kafka.common.header.internals.RecordHeaders import org.apache.kafka.common.serialization.StringSerializer + +import java.nio.charset.StandardCharsets import org.junit.Rule import org.springframework.kafka.core.DefaultKafkaConsumerFactory import org.springframework.kafka.core.DefaultKafkaProducerFactory @@ -1013,6 +1017,36 @@ abstract class KafkaClientTestBase extends VersionedNamingTestBase { "true" | true } + def "test producer extracts and uses existing trace context from record headers"() { + setup: + def senderProps = KafkaTestUtils.senderProps(embeddedKafka.getBrokersAsString()) + def producer = new KafkaProducer<>(senderProps) + + def existingTraceId = 1234567890123456L + def existingSpanId = 9876543210987654L + def headers = new RecordHeaders() + headers.add(new RecordHeader("x-datadog-trace-id", + String.valueOf(existingTraceId).getBytes(StandardCharsets.UTF_8))) + headers.add(new RecordHeader("x-datadog-parent-id", + String.valueOf(existingSpanId).getBytes(StandardCharsets.UTF_8))) + + when: + def record = new ProducerRecord(SHARED_TOPIC, 0, null, "test-context-extraction", headers) + producer.send(record).get() + + then: + TEST_WRITER.waitForTraces(1) + def producedSpan = TEST_WRITER[0][0] + // Verify the span used the extracted context as parent + producedSpan.traceId.toLong() == existingTraceId + producedSpan.parentId == existingSpanId + // Verify a new span was created (not reusing the extracted span ID) + producedSpan.spanId != existingSpanId + + cleanup: + producer?.close() + } + def containerProperties() { try { // Different class names for test and latestDepTest. diff --git a/dd-java-agent/instrumentation/kafka/kafka-clients-3.8/src/main/java/datadog/trace/instrumentation/kafka_clients38/KafkaProducerInstrumentation.java b/dd-java-agent/instrumentation/kafka/kafka-clients-3.8/src/main/java/datadog/trace/instrumentation/kafka_clients38/KafkaProducerInstrumentation.java index a06469fb494..2e40f6b67e3 100644 --- a/dd-java-agent/instrumentation/kafka/kafka-clients-3.8/src/main/java/datadog/trace/instrumentation/kafka_clients38/KafkaProducerInstrumentation.java +++ b/dd-java-agent/instrumentation/kafka/kafka-clients-3.8/src/main/java/datadog/trace/instrumentation/kafka_clients38/KafkaProducerInstrumentation.java @@ -38,6 +38,7 @@ public String[] helperClassNames() { packageName + ".KafkaDecorator", packageName + ".TextMapInjectAdapterInterface", packageName + ".TextMapInjectAdapter", + packageName + ".TextMapExtractAdapter", packageName + ".NoopTextMapInjectAdapter", packageName + ".KafkaProducerCallback", "datadog.trace.instrumentation.kafka_common.StreamingContext", diff --git a/dd-java-agent/instrumentation/kafka/kafka-clients-3.8/src/main/java17/datadog/trace/instrumentation/kafka_clients38/ProducerAdvice.java b/dd-java-agent/instrumentation/kafka/kafka-clients-3.8/src/main/java17/datadog/trace/instrumentation/kafka_clients38/ProducerAdvice.java index be6bdc2500b..b64ccd6d06c 100644 --- a/dd-java-agent/instrumentation/kafka/kafka-clients-3.8/src/main/java17/datadog/trace/instrumentation/kafka_clients38/ProducerAdvice.java +++ b/dd-java-agent/instrumentation/kafka/kafka-clients-3.8/src/main/java17/datadog/trace/instrumentation/kafka_clients38/ProducerAdvice.java @@ -5,6 +5,7 @@ import static datadog.trace.api.datastreams.DataStreamsTags.Direction.OUTBOUND; import static datadog.trace.api.datastreams.DataStreamsTags.create; import static datadog.trace.bootstrap.instrumentation.api.AgentPropagation.DSM_CONCERN; +import static datadog.trace.bootstrap.instrumentation.api.AgentPropagation.extractContextAndGetSpanContext; import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.activateSpan; import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.activeSpan; import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.startSpan; @@ -21,6 +22,7 @@ import datadog.trace.bootstrap.InstrumentationContext; import datadog.trace.bootstrap.instrumentation.api.AgentScope; import datadog.trace.bootstrap.instrumentation.api.AgentSpan; +import datadog.trace.bootstrap.instrumentation.api.AgentSpanContext; import datadog.trace.bootstrap.instrumentation.api.InstrumentationTags; import datadog.trace.instrumentation.kafka_common.ClusterIdHolder; import net.bytebuddy.asm.Advice; @@ -46,12 +48,26 @@ public static AgentScope onEnter( ClusterIdHolder.set(clusterId); } - final AgentSpan parent = activeSpan(); - final AgentSpan span = startSpan(KAFKA_PRODUCE); + // Try to extract existing trace context from record headers + final AgentSpanContext extractedContext = + extractContextAndGetSpanContext(record.headers(), TextMapExtractAdapter.GETTER); + + final AgentSpan localActiveSpan = activeSpan(); + + final AgentSpan span; + final AgentSpan callbackParentSpan; + + if (extractedContext != null) { + span = startSpan(KAFKA_PRODUCE, extractedContext); + callbackParentSpan = span; + } else { + span = startSpan(KAFKA_PRODUCE); + callbackParentSpan = localActiveSpan; + } PRODUCER_DECORATE.afterStart(span); PRODUCER_DECORATE.onProduce(span, record, producerConfig); - callback = new KafkaProducerCallback(callback, parent, span, clusterId); + callback = new KafkaProducerCallback(callback, callbackParentSpan, span, clusterId); if (record.value() == null) { span.setTag(InstrumentationTags.TOMBSTONE, true); diff --git a/dd-java-agent/instrumentation/kafka/kafka-clients-3.8/src/test/groovy/KafkaClientTestBase.groovy b/dd-java-agent/instrumentation/kafka/kafka-clients-3.8/src/test/groovy/KafkaClientTestBase.groovy index c0a2c9c2625..e889cf2ac93 100644 --- a/dd-java-agent/instrumentation/kafka/kafka-clients-3.8/src/test/groovy/KafkaClientTestBase.groovy +++ b/dd-java-agent/instrumentation/kafka/kafka-clients-3.8/src/test/groovy/KafkaClientTestBase.groovy @@ -18,7 +18,11 @@ import org.apache.kafka.clients.producer.ProducerConfig import org.apache.kafka.clients.producer.ProducerRecord import org.apache.kafka.clients.producer.RecordMetadata import org.apache.kafka.common.TopicPartition +import org.apache.kafka.common.header.internals.RecordHeader +import org.apache.kafka.common.header.internals.RecordHeaders import org.apache.kafka.common.serialization.StringSerializer + +import java.nio.charset.StandardCharsets import org.springframework.kafka.core.DefaultKafkaConsumerFactory import org.springframework.kafka.core.DefaultKafkaProducerFactory import org.springframework.kafka.core.KafkaTemplate @@ -853,6 +857,36 @@ abstract class KafkaClientTestBase extends VersionedNamingTestBase { "true" | true } + def "test producer extracts and uses existing trace context from record headers"() { + setup: + def senderProps = KafkaTestUtils.producerProps(embeddedKafka.getBrokersAsString()) + def producer = new KafkaProducer<>(senderProps) + + def existingTraceId = 1234567890123456L + def existingSpanId = 9876543210987654L + def headers = new RecordHeaders() + headers.add(new RecordHeader("x-datadog-trace-id", + String.valueOf(existingTraceId).getBytes(StandardCharsets.UTF_8))) + headers.add(new RecordHeader("x-datadog-parent-id", + String.valueOf(existingSpanId).getBytes(StandardCharsets.UTF_8))) + + when: + def record = new ProducerRecord(SHARED_TOPIC, 0, null, "test-context-extraction", headers) + producer.send(record).get() + + then: + TEST_WRITER.waitForTraces(1) + def producedSpan = TEST_WRITER[0][0] + // Verify the span used the extracted context as parent + producedSpan.traceId.toLong() == existingTraceId + producedSpan.parentId == existingSpanId + // Verify a new span was created (not reusing the extracted span ID) + producedSpan.spanId != existingSpanId + + cleanup: + producer?.close() + } + def producerSpan( TraceAssert trace, Map config,