Skip to content

Commit a8cb36a

Browse files
PlugaruTygree
authored andcommitted
Port the change to Kafka Clients 3.8+
1 parent cf29adc commit a8cb36a

File tree

2 files changed

+53
-3
lines changed

2 files changed

+53
-3
lines changed

dd-java-agent/instrumentation/kafka/kafka-clients-3.8/src/main/java17/datadog/trace/instrumentation/kafka_clients38/ProducerAdvice.java

Lines changed: 19 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
import static datadog.trace.api.datastreams.DataStreamsTags.Direction.OUTBOUND;
66
import static datadog.trace.api.datastreams.DataStreamsTags.create;
77
import static datadog.trace.bootstrap.instrumentation.api.AgentPropagation.DSM_CONCERN;
8+
import static datadog.trace.bootstrap.instrumentation.api.AgentPropagation.extractContextAndGetSpanContext;
89
import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.activateSpan;
910
import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.activeSpan;
1011
import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.startSpan;
@@ -21,6 +22,7 @@
2122
import datadog.trace.bootstrap.InstrumentationContext;
2223
import datadog.trace.bootstrap.instrumentation.api.AgentScope;
2324
import datadog.trace.bootstrap.instrumentation.api.AgentSpan;
25+
import datadog.trace.bootstrap.instrumentation.api.AgentSpanContext;
2426
import datadog.trace.bootstrap.instrumentation.api.InstrumentationTags;
2527
import datadog.trace.instrumentation.kafka_common.ClusterIdHolder;
2628
import net.bytebuddy.asm.Advice;
@@ -46,12 +48,26 @@ public static AgentScope onEnter(
4648
ClusterIdHolder.set(clusterId);
4749
}
4850

49-
final AgentSpan parent = activeSpan();
50-
final AgentSpan span = startSpan(KAFKA_PRODUCE);
51+
// Try to extract existing trace context from record headers
52+
final AgentSpanContext extractedContext =
53+
extractContextAndGetSpanContext(record.headers(), TextMapExtractAdapter.GETTER);
54+
55+
final AgentSpan localActiveSpan = activeSpan();
56+
57+
final AgentSpan span;
58+
final AgentSpan callbackParentSpan;
59+
60+
if (extractedContext != null) {
61+
span = startSpan(KAFKA_PRODUCE, extractedContext);
62+
callbackParentSpan = span;
63+
} else {
64+
span = startSpan(KAFKA_PRODUCE);
65+
callbackParentSpan = localActiveSpan;
66+
}
5167
PRODUCER_DECORATE.afterStart(span);
5268
PRODUCER_DECORATE.onProduce(span, record, producerConfig);
5369

54-
callback = new KafkaProducerCallback(callback, parent, span, clusterId);
70+
callback = new KafkaProducerCallback(callback, callbackParentSpan, span, clusterId);
5571

5672
if (record.value() == null) {
5773
span.setTag(InstrumentationTags.TOMBSTONE, true);

dd-java-agent/instrumentation/kafka/kafka-clients-3.8/src/test/groovy/KafkaClientTestBase.groovy

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,11 @@ import org.apache.kafka.clients.producer.ProducerConfig
1818
import org.apache.kafka.clients.producer.ProducerRecord
1919
import org.apache.kafka.clients.producer.RecordMetadata
2020
import org.apache.kafka.common.TopicPartition
21+
import org.apache.kafka.common.header.internals.RecordHeader
22+
import org.apache.kafka.common.header.internals.RecordHeaders
2123
import org.apache.kafka.common.serialization.StringSerializer
24+
25+
import java.nio.charset.StandardCharsets
2226
import org.springframework.kafka.core.DefaultKafkaConsumerFactory
2327
import org.springframework.kafka.core.DefaultKafkaProducerFactory
2428
import org.springframework.kafka.core.KafkaTemplate
@@ -853,6 +857,36 @@ abstract class KafkaClientTestBase extends VersionedNamingTestBase {
853857
"true" | true
854858
}
855859
860+
def "test producer extracts and uses existing trace context from record headers"() {
861+
setup:
862+
def senderProps = KafkaTestUtils.producerProps(embeddedKafka.getBrokersAsString())
863+
def producer = new KafkaProducer<>(senderProps)
864+
865+
def existingTraceId = 1234567890123456L
866+
def existingSpanId = 9876543210987654L
867+
def headers = new RecordHeaders()
868+
headers.add(new RecordHeader("x-datadog-trace-id",
869+
String.valueOf(existingTraceId).getBytes(StandardCharsets.UTF_8)))
870+
headers.add(new RecordHeader("x-datadog-parent-id",
871+
String.valueOf(existingSpanId).getBytes(StandardCharsets.UTF_8)))
872+
873+
when:
874+
def record = new ProducerRecord(SHARED_TOPIC, 0, null, "test-context-extraction", headers)
875+
producer.send(record).get()
876+
877+
then:
878+
TEST_WRITER.waitForTraces(1)
879+
def producedSpan = TEST_WRITER[0][0]
880+
// Verify the span used the extracted context as parent
881+
producedSpan.traceId.toLong() == existingTraceId
882+
producedSpan.parentId == existingSpanId
883+
// Verify a new span was created (not reusing the extracted span ID)
884+
producedSpan.spanId != existingSpanId
885+
886+
cleanup:
887+
producer?.close()
888+
}
889+
856890
def producerSpan(
857891
TraceAssert trace,
858892
Map<String, ?> config,

0 commit comments

Comments
 (0)