Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import static datadog.trace.api.datastreams.DataStreamsTags.Direction.OUTBOUND;
import static datadog.trace.api.datastreams.DataStreamsTags.createWithClusterId;
import static datadog.trace.bootstrap.instrumentation.api.AgentPropagation.DSM_CONCERN;
import static datadog.trace.bootstrap.instrumentation.api.AgentPropagation.extractContextAndGetSpanContext;
import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.activateSpan;
import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.activeSpan;
import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.startSpan;
Expand All @@ -33,6 +34,7 @@
import datadog.trace.bootstrap.InstrumentationContext;
import datadog.trace.bootstrap.instrumentation.api.AgentScope;
import datadog.trace.bootstrap.instrumentation.api.AgentSpan;
import datadog.trace.bootstrap.instrumentation.api.AgentSpanContext;
import datadog.trace.bootstrap.instrumentation.api.AgentTracer;
import datadog.trace.bootstrap.instrumentation.api.InstrumentationTags;
import datadog.trace.instrumentation.kafka_common.ClusterIdHolder;
Expand Down Expand Up @@ -125,8 +127,18 @@ public static AgentScope onEnter(
ClusterIdHolder.set(clusterId);
}

// Try to extract existing trace context from record headers
final AgentSpanContext extractedContext =
extractContextAndGetSpanContext(record.headers(), TextMapExtractAdapter.GETTER);

final AgentSpan parent = activeSpan();
final AgentSpan span = startSpan(KAFKA_PRODUCE);
// Use extracted context as parent if available, otherwise use activeSpan
final AgentSpan span;
if (extractedContext != null) {
Copy link
Contributor

@ygree ygree Dec 3, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This could be a breaking change if users expect activeSpan to take precedence. Adding a configuration option, would allow users to configure this behavior.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I actually was expecting the other way around, especially coming from opentelemetry, I expected to have the context reuse the one from headers and attach new spans to the same trace and not break it.
I can do it, no problem there, but I'd say this is a bug fix rather than a feature.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's done. Let me know if the name works

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense. I'll exclude the commit that adds the setting, then. Let's see if all the CI checks pass in the mirroring PR: #10022

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm looking into the failing test. Just found how to run the system tests locally.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ygree I force pushed without the commit that added the config and also fixed the test, at least it passes on my computer 😅

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks! I'll pull your changes to the mirroring PR and run the CI checks.

span = startSpan(KAFKA_PRODUCE, extractedContext);
} else {
span = startSpan(KAFKA_PRODUCE);
}
PRODUCER_DECORATE.afterStart(span);
PRODUCER_DECORATE.onProduce(span, record, producerConfig);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,11 @@ import org.apache.kafka.clients.producer.ProducerConfig
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.clients.producer.RecordMetadata
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.header.internals.RecordHeader
import org.apache.kafka.common.header.internals.RecordHeaders
import org.apache.kafka.common.serialization.StringSerializer

import java.nio.charset.StandardCharsets
import org.junit.Rule
import org.springframework.kafka.core.DefaultKafkaConsumerFactory
import org.springframework.kafka.core.DefaultKafkaProducerFactory
Expand Down Expand Up @@ -1018,6 +1022,44 @@ abstract class KafkaClientTestBase extends VersionedNamingTestBase {
"true" | true
}

def "test producer extracts and uses existing trace context from record headers"() {
setup:
def senderProps = KafkaTestUtils.senderProps(embeddedKafka.getBrokersAsString())
def producer = new KafkaProducer<>(senderProps)

// Create a trace context to inject into headers (simulating a message with existing context)
def traceId = 1234567890123456L
def spanId = 9876543210987654L
def headers = new RecordHeaders()
headers.add(new RecordHeader("x-datadog-trace-id",
String.valueOf(traceId).getBytes(StandardCharsets.UTF_8)))
headers.add(new RecordHeader("x-datadog-parent-id",
String.valueOf(spanId).getBytes(StandardCharsets.UTF_8)))

when:
// Send a message with pre-existing trace context in headers
def record = new ProducerRecord(SHARED_TOPIC, 0, null, "test-context-extraction", headers)
producer.send(record).get()

then:
// Verify that a produce span was created that used the extracted context
assertTraces(1) {
trace(1) {
producerSpan(it, senderProps, null, false)
// Verify the span used the extracted context as parent
def producedSpan = TEST_WRITER[0][0]
assert producedSpan.context().traceId == traceId
assert producedSpan.context().parentId == spanId
// Verify a NEW span was created (not reusing the extracted span ID)
assert producedSpan.context().spanId != spanId
assert producedSpan.context().spanId != 0
}
}

cleanup:
producer?.close()
}

def containerProperties() {
try {
// Different class names for test and latestDepTest.
Expand Down