Skip to content

Commit 3e400b4

Browse files
PlugaruTygree
authored andcommitted
Extract trace context from Kafka producer record headers
Allow Kafka producers to continue existing traces by extracting trace context from record headers and using it as parent for the produce span. This enables distributed tracing when messages are forwarded between services with pre-existing context.
1 parent a2f3c7a commit 3e400b4

File tree

2 files changed

+55
-1
lines changed

2 files changed

+55
-1
lines changed

dd-java-agent/instrumentation/kafka/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/KafkaProducerInstrumentation.java

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
import static datadog.trace.api.datastreams.DataStreamsTags.Direction.OUTBOUND;
88
import static datadog.trace.api.datastreams.DataStreamsTags.createWithClusterId;
99
import static datadog.trace.bootstrap.instrumentation.api.AgentPropagation.DSM_CONCERN;
10+
import static datadog.trace.bootstrap.instrumentation.api.AgentPropagation.extractContextAndGetSpanContext;
1011
import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.activateSpan;
1112
import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.activeSpan;
1213
import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.startSpan;
@@ -33,6 +34,7 @@
3334
import datadog.trace.bootstrap.InstrumentationContext;
3435
import datadog.trace.bootstrap.instrumentation.api.AgentScope;
3536
import datadog.trace.bootstrap.instrumentation.api.AgentSpan;
37+
import datadog.trace.bootstrap.instrumentation.api.AgentSpanContext;
3638
import datadog.trace.bootstrap.instrumentation.api.AgentTracer;
3739
import datadog.trace.bootstrap.instrumentation.api.InstrumentationTags;
3840
import datadog.trace.instrumentation.kafka_common.ClusterIdHolder;
@@ -125,8 +127,18 @@ public static AgentScope onEnter(
125127
ClusterIdHolder.set(clusterId);
126128
}
127129

130+
// Try to extract existing trace context from record headers
131+
final AgentSpanContext extractedContext =
132+
extractContextAndGetSpanContext(record.headers(), TextMapExtractAdapter.GETTER);
133+
128134
final AgentSpan parent = activeSpan();
129-
final AgentSpan span = startSpan(KAFKA_PRODUCE);
135+
// Use extracted context as parent if available, otherwise use activeSpan
136+
final AgentSpan span;
137+
if (extractedContext != null) {
138+
span = startSpan(KAFKA_PRODUCE, extractedContext);
139+
} else {
140+
span = startSpan(KAFKA_PRODUCE);
141+
}
130142
PRODUCER_DECORATE.afterStart(span);
131143
PRODUCER_DECORATE.onProduce(span, record, producerConfig);
132144

dd-java-agent/instrumentation/kafka/kafka-clients-0.11/src/test/groovy/KafkaClientTestBase.groovy

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,11 @@ import org.apache.kafka.clients.producer.ProducerConfig
2525
import org.apache.kafka.clients.producer.ProducerRecord
2626
import org.apache.kafka.clients.producer.RecordMetadata
2727
import org.apache.kafka.common.TopicPartition
28+
import org.apache.kafka.common.header.internals.RecordHeader
29+
import org.apache.kafka.common.header.internals.RecordHeaders
2830
import org.apache.kafka.common.serialization.StringSerializer
31+
32+
import java.nio.charset.StandardCharsets
2933
import org.junit.Rule
3034
import org.springframework.kafka.core.DefaultKafkaConsumerFactory
3135
import org.springframework.kafka.core.DefaultKafkaProducerFactory
@@ -1013,6 +1017,44 @@ abstract class KafkaClientTestBase extends VersionedNamingTestBase {
10131017
"true" | true
10141018
}
10151019

1020+
def "test producer extracts and uses existing trace context from record headers"() {
1021+
setup:
1022+
def senderProps = KafkaTestUtils.senderProps(embeddedKafka.getBrokersAsString())
1023+
def producer = new KafkaProducer<>(senderProps)
1024+
1025+
// Create a trace context to inject into headers (simulating a message with existing context)
1026+
def traceId = 1234567890123456L
1027+
def spanId = 9876543210987654L
1028+
def headers = new RecordHeaders()
1029+
headers.add(new RecordHeader("x-datadog-trace-id",
1030+
String.valueOf(traceId).getBytes(StandardCharsets.UTF_8)))
1031+
headers.add(new RecordHeader("x-datadog-parent-id",
1032+
String.valueOf(spanId).getBytes(StandardCharsets.UTF_8)))
1033+
1034+
when:
1035+
// Send a message with pre-existing trace context in headers
1036+
def record = new ProducerRecord(SHARED_TOPIC, 0, null, "test-context-extraction", headers)
1037+
producer.send(record).get()
1038+
1039+
then:
1040+
// Verify that a produce span was created that used the extracted context
1041+
assertTraces(1) {
1042+
trace(1) {
1043+
producerSpan(it, senderProps, null, false)
1044+
// Verify the span used the extracted context as parent
1045+
def producedSpan = TEST_WRITER[0][0]
1046+
assert producedSpan.context().traceId == traceId
1047+
assert producedSpan.context().parentId == spanId
1048+
// Verify a NEW span was created (not reusing the extracted span ID)
1049+
assert producedSpan.context().spanId != spanId
1050+
assert producedSpan.context().spanId != 0
1051+
}
1052+
}
1053+
1054+
cleanup:
1055+
producer?.close()
1056+
}
1057+
10161058
def containerProperties() {
10171059
try {
10181060
// Different class names for test and latestDepTest.

0 commit comments

Comments
 (0)