Skip to content

Commit 5e9b05d

Browse files
committed
Link the parent span to the extracted context for KafkaProducerCallback
1 parent e2294d3 commit 5e9b05d

File tree

2 files changed

+18
-22
lines changed

2 files changed

+18
-22
lines changed

dd-java-agent/instrumentation/kafka/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/KafkaProducerInstrumentation.java

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -131,18 +131,22 @@ public static AgentScope onEnter(
131131
final AgentSpanContext extractedContext =
132132
extractContextAndGetSpanContext(record.headers(), TextMapExtractAdapter.GETTER);
133133

134-
final AgentSpan parent = activeSpan();
135-
// Use extracted context as parent if available, otherwise use activeSpan
134+
final AgentSpan localActiveSpan = activeSpan();
135+
136136
final AgentSpan span;
137+
final AgentSpan callbackParentSpan;
138+
137139
if (extractedContext != null) {
138140
span = startSpan(KAFKA_PRODUCE, extractedContext);
141+
callbackParentSpan = span;
139142
} else {
140143
span = startSpan(KAFKA_PRODUCE);
144+
callbackParentSpan = localActiveSpan;
141145
}
142146
PRODUCER_DECORATE.afterStart(span);
143147
PRODUCER_DECORATE.onProduce(span, record, producerConfig);
144148

145-
callback = new KafkaProducerCallback(callback, parent, span, clusterId);
149+
callback = new KafkaProducerCallback(callback, callbackParentSpan, span, clusterId);
146150

147151
if (record.value() == null) {
148152
span.setTag(InstrumentationTags.TOMBSTONE, true);

dd-java-agent/instrumentation/kafka/kafka-clients-0.11/src/test/groovy/KafkaClientTestBase.groovy

Lines changed: 11 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1027,34 +1027,26 @@ abstract class KafkaClientTestBase extends VersionedNamingTestBase {
10271027
def senderProps = KafkaTestUtils.senderProps(embeddedKafka.getBrokersAsString())
10281028
def producer = new KafkaProducer<>(senderProps)
10291029

1030-
// Create a trace context to inject into headers (simulating a message with existing context)
1031-
def traceId = 1234567890123456L
1032-
def spanId = 9876543210987654L
1030+
def existingTraceId = 1234567890123456L
1031+
def existingSpanId = 9876543210987654L
10331032
def headers = new RecordHeaders()
10341033
headers.add(new RecordHeader("x-datadog-trace-id",
1035-
String.valueOf(traceId).getBytes(StandardCharsets.UTF_8)))
1034+
String.valueOf(existingTraceId).getBytes(StandardCharsets.UTF_8)))
10361035
headers.add(new RecordHeader("x-datadog-parent-id",
1037-
String.valueOf(spanId).getBytes(StandardCharsets.UTF_8)))
1036+
String.valueOf(existingSpanId).getBytes(StandardCharsets.UTF_8)))
10381037

10391038
when:
1040-
// Send a message with pre-existing trace context in headers
10411039
def record = new ProducerRecord(SHARED_TOPIC, 0, null, "test-context-extraction", headers)
10421040
producer.send(record).get()
10431041

10441042
then:
1045-
// Verify that a produce span was created that used the extracted context
1046-
assertTraces(1) {
1047-
trace(1) {
1048-
producerSpan(it, senderProps, null, false)
1049-
// Verify the span used the extracted context as parent
1050-
def producedSpan = TEST_WRITER[0][0]
1051-
assert producedSpan.context().traceId == traceId
1052-
assert producedSpan.context().parentId == spanId
1053-
// Verify a NEW span was created (not reusing the extracted span ID)
1054-
assert producedSpan.context().spanId != spanId
1055-
assert producedSpan.context().spanId != 0
1056-
}
1057-
}
1043+
TEST_WRITER.waitForTraces(1)
1044+
def producedSpan = TEST_WRITER[0][0]
1045+
// Verify the span used the extracted context as parent
1046+
producedSpan.traceId.toLong() == existingTraceId
1047+
producedSpan.parentId == existingSpanId
1048+
// Verify a new span was created (not reusing the extracted span ID)
1049+
producedSpan.spanId != existingSpanId
10581050

10591051
cleanup:
10601052
producer?.close()

0 commit comments

Comments
 (0)