Skip to content

Commit 82282ea

Browse files
PlugaruTPerfectSlayer
authored andcommitted
Extract trace context from Kafka producer record headers
Allow Kafka producers to continue existing traces by extracting trace context from record headers and using it as parent for the produce span. This enables distributed tracing when messages are forwarded between services with pre-existing context.
1 parent c8bb444 commit 82282ea

File tree

2 files changed

+55
-1
lines changed

2 files changed

+55
-1
lines changed

dd-java-agent/instrumentation/kafka/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/KafkaProducerInstrumentation.java

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
import static datadog.trace.api.datastreams.DataStreamsTags.Direction.OUTBOUND;
88
import static datadog.trace.api.datastreams.DataStreamsTags.createWithClusterId;
99
import static datadog.trace.bootstrap.instrumentation.api.AgentPropagation.DSM_CONCERN;
10+
import static datadog.trace.bootstrap.instrumentation.api.AgentPropagation.extractContextAndGetSpanContext;
1011
import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.activateSpan;
1112
import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.activeSpan;
1213
import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.startSpan;
@@ -33,6 +34,7 @@
3334
import datadog.trace.bootstrap.InstrumentationContext;
3435
import datadog.trace.bootstrap.instrumentation.api.AgentScope;
3536
import datadog.trace.bootstrap.instrumentation.api.AgentSpan;
37+
import datadog.trace.bootstrap.instrumentation.api.AgentSpanContext;
3638
import datadog.trace.bootstrap.instrumentation.api.AgentTracer;
3739
import datadog.trace.bootstrap.instrumentation.api.InstrumentationTags;
3840
import datadog.trace.instrumentation.kafka_common.ClusterIdHolder;
@@ -125,8 +127,18 @@ public static AgentScope onEnter(
125127
ClusterIdHolder.set(clusterId);
126128
}
127129

130+
// Try to extract existing trace context from record headers
131+
final AgentSpanContext extractedContext =
132+
extractContextAndGetSpanContext(record.headers(), TextMapExtractAdapter.GETTER);
133+
128134
final AgentSpan parent = activeSpan();
129-
final AgentSpan span = startSpan(KAFKA_PRODUCE);
135+
// Use extracted context as parent if available, otherwise use activeSpan
136+
final AgentSpan span;
137+
if (extractedContext != null) {
138+
span = startSpan(KAFKA_PRODUCE, extractedContext);
139+
} else {
140+
span = startSpan(KAFKA_PRODUCE);
141+
}
130142
PRODUCER_DECORATE.afterStart(span);
131143
PRODUCER_DECORATE.onProduce(span, record, producerConfig);
132144

dd-java-agent/instrumentation/kafka/kafka-clients-0.11/src/test/groovy/KafkaClientTestBase.groovy

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,11 @@ import org.apache.kafka.clients.producer.ProducerConfig
2525
import org.apache.kafka.clients.producer.ProducerRecord
2626
import org.apache.kafka.clients.producer.RecordMetadata
2727
import org.apache.kafka.common.TopicPartition
28+
import org.apache.kafka.common.header.internals.RecordHeader
29+
import org.apache.kafka.common.header.internals.RecordHeaders
2830
import org.apache.kafka.common.serialization.StringSerializer
31+
32+
import java.nio.charset.StandardCharsets
2933
import org.junit.Rule
3034
import org.springframework.kafka.core.DefaultKafkaConsumerFactory
3135
import org.springframework.kafka.core.DefaultKafkaProducerFactory
@@ -1018,6 +1022,44 @@ abstract class KafkaClientTestBase extends VersionedNamingTestBase {
10181022
"true" | true
10191023
}
10201024

1025+
def "test producer extracts and uses existing trace context from record headers"() {
1026+
setup:
1027+
def senderProps = KafkaTestUtils.senderProps(embeddedKafka.getBrokersAsString())
1028+
def producer = new KafkaProducer<>(senderProps)
1029+
1030+
// Create a trace context to inject into headers (simulating a message with existing context)
1031+
def traceId = 1234567890123456L
1032+
def spanId = 9876543210987654L
1033+
def headers = new RecordHeaders()
1034+
headers.add(new RecordHeader("x-datadog-trace-id",
1035+
String.valueOf(traceId).getBytes(StandardCharsets.UTF_8)))
1036+
headers.add(new RecordHeader("x-datadog-parent-id",
1037+
String.valueOf(spanId).getBytes(StandardCharsets.UTF_8)))
1038+
1039+
when:
1040+
// Send a message with pre-existing trace context in headers
1041+
def record = new ProducerRecord(SHARED_TOPIC, 0, null, "test-context-extraction", headers)
1042+
producer.send(record).get()
1043+
1044+
then:
1045+
// Verify that a produce span was created that used the extracted context
1046+
assertTraces(1) {
1047+
trace(1) {
1048+
producerSpan(it, senderProps, null, false)
1049+
// Verify the span used the extracted context as parent
1050+
def producedSpan = TEST_WRITER[0][0]
1051+
assert producedSpan.context().traceId == traceId
1052+
assert producedSpan.context().parentId == spanId
1053+
// Verify a NEW span was created (not reusing the extracted span ID)
1054+
assert producedSpan.context().spanId != spanId
1055+
assert producedSpan.context().spanId != 0
1056+
}
1057+
}
1058+
1059+
cleanup:
1060+
producer?.close()
1061+
}
1062+
10211063
def containerProperties() {
10221064
try {
10231065
// Different class names for test and latestDepTest.

0 commit comments

Comments
 (0)