Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import static datadog.trace.api.datastreams.DataStreamsTags.Direction.OUTBOUND;
import static datadog.trace.api.datastreams.DataStreamsTags.createWithClusterId;
import static datadog.trace.bootstrap.instrumentation.api.AgentPropagation.DSM_CONCERN;
import static datadog.trace.bootstrap.instrumentation.api.AgentPropagation.extractContextAndGetSpanContext;
import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.activateSpan;
import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.activeSpan;
import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.startSpan;
Expand All @@ -33,6 +34,7 @@
import datadog.trace.bootstrap.InstrumentationContext;
import datadog.trace.bootstrap.instrumentation.api.AgentScope;
import datadog.trace.bootstrap.instrumentation.api.AgentSpan;
import datadog.trace.bootstrap.instrumentation.api.AgentSpanContext;
import datadog.trace.bootstrap.instrumentation.api.AgentTracer;
import datadog.trace.bootstrap.instrumentation.api.InstrumentationTags;
import datadog.trace.instrumentation.kafka_common.ClusterIdHolder;
Expand Down Expand Up @@ -76,6 +78,7 @@ public String[] helperClassNames() {
packageName + ".KafkaDecorator",
packageName + ".TextMapInjectAdapterInterface",
packageName + ".TextMapInjectAdapter",
packageName + ".TextMapExtractAdapter",
packageName + ".NoopTextMapInjectAdapter",
packageName + ".KafkaProducerCallback",
"datadog.trace.instrumentation.kafka_common.StreamingContext",
Expand Down Expand Up @@ -125,12 +128,26 @@ public static AgentScope onEnter(
ClusterIdHolder.set(clusterId);
}

final AgentSpan parent = activeSpan();
final AgentSpan span = startSpan(KAFKA_PRODUCE);
// Try to extract existing trace context from record headers
final AgentSpanContext extractedContext =
extractContextAndGetSpanContext(record.headers(), TextMapExtractAdapter.GETTER);

final AgentSpan localActiveSpan = activeSpan();

final AgentSpan span;
final AgentSpan callbackParentSpan;

if (extractedContext != null) {
span = startSpan(KAFKA_PRODUCE, extractedContext);
callbackParentSpan = span;
} else {
span = startSpan(KAFKA_PRODUCE);
callbackParentSpan = localActiveSpan;
}
PRODUCER_DECORATE.afterStart(span);
PRODUCER_DECORATE.onProduce(span, record, producerConfig);

callback = new KafkaProducerCallback(callback, parent, span, clusterId);
callback = new KafkaProducerCallback(callback, callbackParentSpan, span, clusterId);

if (record.value() == null) {
span.setTag(InstrumentationTags.TOMBSTONE, true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,11 @@ import org.apache.kafka.clients.producer.ProducerConfig
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.clients.producer.RecordMetadata
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.header.internals.RecordHeader
import org.apache.kafka.common.header.internals.RecordHeaders
import org.apache.kafka.common.serialization.StringSerializer

import java.nio.charset.StandardCharsets
import org.junit.Rule
import org.springframework.kafka.core.DefaultKafkaConsumerFactory
import org.springframework.kafka.core.DefaultKafkaProducerFactory
Expand Down Expand Up @@ -1013,6 +1017,36 @@ abstract class KafkaClientTestBase extends VersionedNamingTestBase {
"true" | true
}

def "test producer extracts and uses existing trace context from record headers"() {
setup:
def senderProps = KafkaTestUtils.senderProps(embeddedKafka.getBrokersAsString())
def producer = new KafkaProducer<>(senderProps)

def existingTraceId = 1234567890123456L
def existingSpanId = 9876543210987654L
def headers = new RecordHeaders()
headers.add(new RecordHeader("x-datadog-trace-id",
String.valueOf(existingTraceId).getBytes(StandardCharsets.UTF_8)))
headers.add(new RecordHeader("x-datadog-parent-id",
String.valueOf(existingSpanId).getBytes(StandardCharsets.UTF_8)))

when:
def record = new ProducerRecord(SHARED_TOPIC, 0, null, "test-context-extraction", headers)
producer.send(record).get()

then:
TEST_WRITER.waitForTraces(1)
def producedSpan = TEST_WRITER[0][0]
// Verify the span used the extracted context as parent
producedSpan.traceId.toLong() == existingTraceId
producedSpan.parentId == existingSpanId
// Verify a new span was created (not reusing the extracted span ID)
producedSpan.spanId != existingSpanId

cleanup:
producer?.close()
}

def containerProperties() {
try {
// Different class names for test and latestDepTest.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ public String[] helperClassNames() {
packageName + ".KafkaDecorator",
packageName + ".TextMapInjectAdapterInterface",
packageName + ".TextMapInjectAdapter",
packageName + ".TextMapExtractAdapter",
packageName + ".NoopTextMapInjectAdapter",
packageName + ".KafkaProducerCallback",
"datadog.trace.instrumentation.kafka_common.StreamingContext",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import static datadog.trace.api.datastreams.DataStreamsTags.Direction.OUTBOUND;
import static datadog.trace.api.datastreams.DataStreamsTags.create;
import static datadog.trace.bootstrap.instrumentation.api.AgentPropagation.DSM_CONCERN;
import static datadog.trace.bootstrap.instrumentation.api.AgentPropagation.extractContextAndGetSpanContext;
import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.activateSpan;
import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.activeSpan;
import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.startSpan;
Expand All @@ -21,6 +22,7 @@
import datadog.trace.bootstrap.InstrumentationContext;
import datadog.trace.bootstrap.instrumentation.api.AgentScope;
import datadog.trace.bootstrap.instrumentation.api.AgentSpan;
import datadog.trace.bootstrap.instrumentation.api.AgentSpanContext;
import datadog.trace.bootstrap.instrumentation.api.InstrumentationTags;
import datadog.trace.instrumentation.kafka_common.ClusterIdHolder;
import net.bytebuddy.asm.Advice;
Expand All @@ -46,12 +48,26 @@ public static AgentScope onEnter(
ClusterIdHolder.set(clusterId);
}

final AgentSpan parent = activeSpan();
final AgentSpan span = startSpan(KAFKA_PRODUCE);
// Try to extract existing trace context from record headers
final AgentSpanContext extractedContext =
extractContextAndGetSpanContext(record.headers(), TextMapExtractAdapter.GETTER);

final AgentSpan localActiveSpan = activeSpan();

final AgentSpan span;
final AgentSpan callbackParentSpan;

if (extractedContext != null) {
span = startSpan(KAFKA_PRODUCE, extractedContext);
callbackParentSpan = span;
} else {
span = startSpan(KAFKA_PRODUCE);
callbackParentSpan = localActiveSpan;
}
PRODUCER_DECORATE.afterStart(span);
PRODUCER_DECORATE.onProduce(span, record, producerConfig);

callback = new KafkaProducerCallback(callback, parent, span, clusterId);
callback = new KafkaProducerCallback(callback, callbackParentSpan, span, clusterId);

if (record.value() == null) {
span.setTag(InstrumentationTags.TOMBSTONE, true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,11 @@ import org.apache.kafka.clients.producer.ProducerConfig
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.clients.producer.RecordMetadata
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.header.internals.RecordHeader
import org.apache.kafka.common.header.internals.RecordHeaders
import org.apache.kafka.common.serialization.StringSerializer

import java.nio.charset.StandardCharsets
import org.springframework.kafka.core.DefaultKafkaConsumerFactory
import org.springframework.kafka.core.DefaultKafkaProducerFactory
import org.springframework.kafka.core.KafkaTemplate
Expand Down Expand Up @@ -853,6 +857,36 @@ abstract class KafkaClientTestBase extends VersionedNamingTestBase {
"true" | true
}

def "test producer extracts and uses existing trace context from record headers"() {
setup:
def senderProps = KafkaTestUtils.producerProps(embeddedKafka.getBrokersAsString())
def producer = new KafkaProducer<>(senderProps)

def existingTraceId = 1234567890123456L
def existingSpanId = 9876543210987654L
def headers = new RecordHeaders()
headers.add(new RecordHeader("x-datadog-trace-id",
String.valueOf(existingTraceId).getBytes(StandardCharsets.UTF_8)))
headers.add(new RecordHeader("x-datadog-parent-id",
String.valueOf(existingSpanId).getBytes(StandardCharsets.UTF_8)))

when:
def record = new ProducerRecord(SHARED_TOPIC, 0, null, "test-context-extraction", headers)
producer.send(record).get()

then:
TEST_WRITER.waitForTraces(1)
def producedSpan = TEST_WRITER[0][0]
// Verify the span used the extracted context as parent
producedSpan.traceId.toLong() == existingTraceId
producedSpan.parentId == existingSpanId
// Verify a new span was created (not reusing the extracted span ID)
producedSpan.spanId != existingSpanId

cleanup:
producer?.close()
}

def producerSpan(
TraceAssert trace,
Map<String, ?> config,
Expand Down