Skip to content

Commit cf29adc

Browse files
PlugaruTygree
authored andcommitted
Link the parent span to the extracted context for KafkaProducerCallback
1 parent 3e400b4 commit cf29adc

File tree

2 files changed

+18
-22
lines changed

2 files changed

+18
-22
lines changed

dd-java-agent/instrumentation/kafka/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/KafkaProducerInstrumentation.java

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -131,18 +131,22 @@ public static AgentScope onEnter(
131131
final AgentSpanContext extractedContext =
132132
extractContextAndGetSpanContext(record.headers(), TextMapExtractAdapter.GETTER);
133133

134-
final AgentSpan parent = activeSpan();
135-
// Use extracted context as parent if available, otherwise use activeSpan
134+
final AgentSpan localActiveSpan = activeSpan();
135+
136136
final AgentSpan span;
137+
final AgentSpan callbackParentSpan;
138+
137139
if (extractedContext != null) {
138140
span = startSpan(KAFKA_PRODUCE, extractedContext);
141+
callbackParentSpan = span;
139142
} else {
140143
span = startSpan(KAFKA_PRODUCE);
144+
callbackParentSpan = localActiveSpan;
141145
}
142146
PRODUCER_DECORATE.afterStart(span);
143147
PRODUCER_DECORATE.onProduce(span, record, producerConfig);
144148

145-
callback = new KafkaProducerCallback(callback, parent, span, clusterId);
149+
callback = new KafkaProducerCallback(callback, callbackParentSpan, span, clusterId);
146150

147151
if (record.value() == null) {
148152
span.setTag(InstrumentationTags.TOMBSTONE, true);

dd-java-agent/instrumentation/kafka/kafka-clients-0.11/src/test/groovy/KafkaClientTestBase.groovy

Lines changed: 11 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1022,34 +1022,26 @@ abstract class KafkaClientTestBase extends VersionedNamingTestBase {
10221022
def senderProps = KafkaTestUtils.senderProps(embeddedKafka.getBrokersAsString())
10231023
def producer = new KafkaProducer<>(senderProps)
10241024

1025-
// Create a trace context to inject into headers (simulating a message with existing context)
1026-
def traceId = 1234567890123456L
1027-
def spanId = 9876543210987654L
1025+
def existingTraceId = 1234567890123456L
1026+
def existingSpanId = 9876543210987654L
10281027
def headers = new RecordHeaders()
10291028
headers.add(new RecordHeader("x-datadog-trace-id",
1030-
String.valueOf(traceId).getBytes(StandardCharsets.UTF_8)))
1029+
String.valueOf(existingTraceId).getBytes(StandardCharsets.UTF_8)))
10311030
headers.add(new RecordHeader("x-datadog-parent-id",
1032-
String.valueOf(spanId).getBytes(StandardCharsets.UTF_8)))
1031+
String.valueOf(existingSpanId).getBytes(StandardCharsets.UTF_8)))
10331032

10341033
when:
1035-
// Send a message with pre-existing trace context in headers
10361034
def record = new ProducerRecord(SHARED_TOPIC, 0, null, "test-context-extraction", headers)
10371035
producer.send(record).get()
10381036

10391037
then:
1040-
// Verify that a produce span was created that used the extracted context
1041-
assertTraces(1) {
1042-
trace(1) {
1043-
producerSpan(it, senderProps, null, false)
1044-
// Verify the span used the extracted context as parent
1045-
def producedSpan = TEST_WRITER[0][0]
1046-
assert producedSpan.context().traceId == traceId
1047-
assert producedSpan.context().parentId == spanId
1048-
// Verify a NEW span was created (not reusing the extracted span ID)
1049-
assert producedSpan.context().spanId != spanId
1050-
assert producedSpan.context().spanId != 0
1051-
}
1052-
}
1038+
TEST_WRITER.waitForTraces(1)
1039+
def producedSpan = TEST_WRITER[0][0]
1040+
// Verify the span used the extracted context as parent
1041+
producedSpan.traceId.toLong() == existingTraceId
1042+
producedSpan.parentId == existingSpanId
1043+
// Verify a new span was created (not reusing the extracted span ID)
1044+
producedSpan.spanId != existingSpanId
10531045

10541046
cleanup:
10551047
producer?.close()

0 commit comments

Comments
 (0)