Skip to content

Commit 79f8032

Browse files
committed
Add a new configuration kafka.client.propagation.extraction.enabled to make this behavior optional
1 parent 9cf2fd6 commit 79f8032

File tree

6 files changed

+80
-4
lines changed

6 files changed

+80
-4
lines changed

dd-java-agent/instrumentation/kafka/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/KafkaProducerInstrumentation.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -127,9 +127,11 @@ public static AgentScope onEnter(
127127
ClusterIdHolder.set(clusterId);
128128
}
129129

130-
// Try to extract existing trace context from record headers
130+
// Try to extract existing trace context from record headers if enabled
131131
final AgentSpanContext extractedContext =
132-
extractContextAndGetSpanContext(record.headers(), TextMapExtractAdapter.GETTER);
132+
Config.get().isKafkaClientPropagationExtractionEnabled()
133+
? extractContextAndGetSpanContext(record.headers(), TextMapExtractAdapter.GETTER)
134+
: null;
133135

134136
final AgentSpan localActiveSpan = activeSpan();
135137

dd-java-agent/instrumentation/kafka/kafka-clients-0.11/src/test/groovy/KafkaClientTestBase.groovy

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1024,6 +1024,7 @@ abstract class KafkaClientTestBase extends VersionedNamingTestBase {
10241024

10251025
def "test producer extracts and uses existing trace context from record headers"() {
10261026
setup:
1027+
injectSysConfig("kafka.client.propagation.extraction.enabled", "true")
10271028
def senderProps = KafkaTestUtils.senderProps(embeddedKafka.getBrokersAsString())
10281029
def producer = new KafkaProducer<>(senderProps)
10291030

@@ -1052,6 +1053,35 @@ abstract class KafkaClientTestBase extends VersionedNamingTestBase {
10521053
producer?.close()
10531054
}
10541055

1056+
def "test producer ignores existing trace context when extraction is disabled"() {
1057+
setup:
1058+
injectSysConfig("kafka.client.propagation.extraction.enabled", "false")
1059+
def senderProps = KafkaTestUtils.senderProps(embeddedKafka.getBrokersAsString())
1060+
def producer = new KafkaProducer<>(senderProps)
1061+
1062+
def existingTraceId = 1234567890123456L
1063+
def existingSpanId = 9876543210987654L
1064+
def headers = new RecordHeaders()
1065+
headers.add(new RecordHeader("x-datadog-trace-id",
1066+
String.valueOf(existingTraceId).getBytes(StandardCharsets.UTF_8)))
1067+
headers.add(new RecordHeader("x-datadog-parent-id",
1068+
String.valueOf(existingSpanId).getBytes(StandardCharsets.UTF_8)))
1069+
1070+
when:
1071+
def record = new ProducerRecord(SHARED_TOPIC, 0, null, "test-context-extraction-disabled", headers)
1072+
producer.send(record).get()
1073+
1074+
then:
1075+
TEST_WRITER.waitForTraces(1)
1076+
def producedSpan = TEST_WRITER[0][0]
1077+
// Verify the span did NOT use the extracted context as parent
1078+
producedSpan.traceId.toLong() != existingTraceId
1079+
producedSpan.parentId != existingSpanId
1080+
1081+
cleanup:
1082+
producer?.close()
1083+
}
1084+
10551085
def containerProperties() {
10561086
try {
10571087
// Different class names for test and latestDepTest.

dd-java-agent/instrumentation/kafka/kafka-clients-3.8/src/main/java17/datadog/trace/instrumentation/kafka_clients38/ProducerAdvice.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -48,9 +48,11 @@ public static AgentScope onEnter(
4848
ClusterIdHolder.set(clusterId);
4949
}
5050

51-
// Try to extract existing trace context from record headers
51+
// Try to extract existing trace context from record headers if enabled
5252
final AgentSpanContext extractedContext =
53-
extractContextAndGetSpanContext(record.headers(), TextMapExtractAdapter.GETTER);
53+
Config.get().isKafkaClientPropagationExtractionEnabled()
54+
? extractContextAndGetSpanContext(record.headers(), TextMapExtractAdapter.GETTER)
55+
: null;
5456

5557
final AgentSpan localActiveSpan = activeSpan();
5658

dd-java-agent/instrumentation/kafka/kafka-clients-3.8/src/test/groovy/KafkaClientTestBase.groovy

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -859,6 +859,7 @@ abstract class KafkaClientTestBase extends VersionedNamingTestBase {
859859
860860
def "test producer extracts and uses existing trace context from record headers"() {
861861
setup:
862+
injectSysConfig("kafka.client.propagation.extraction.enabled", "true")
862863
def senderProps = KafkaTestUtils.producerProps(embeddedKafka.getBrokersAsString())
863864
def producer = new KafkaProducer<>(senderProps)
864865
@@ -887,6 +888,35 @@ abstract class KafkaClientTestBase extends VersionedNamingTestBase {
887888
producer?.close()
888889
}
889890
891+
def "test producer ignores existing trace context when extraction is disabled"() {
892+
setup:
893+
injectSysConfig("kafka.client.propagation.extraction.enabled", "false")
894+
def senderProps = KafkaTestUtils.producerProps(embeddedKafka.getBrokersAsString())
895+
def producer = new KafkaProducer<>(senderProps)
896+
897+
def existingTraceId = 1234567890123456L
898+
def existingSpanId = 9876543210987654L
899+
def headers = new RecordHeaders()
900+
headers.add(new RecordHeader("x-datadog-trace-id",
901+
String.valueOf(existingTraceId).getBytes(StandardCharsets.UTF_8)))
902+
headers.add(new RecordHeader("x-datadog-parent-id",
903+
String.valueOf(existingSpanId).getBytes(StandardCharsets.UTF_8)))
904+
905+
when:
906+
def record = new ProducerRecord(SHARED_TOPIC, 0, null, "test-context-extraction-disabled", headers)
907+
producer.send(record).get()
908+
909+
then:
910+
TEST_WRITER.waitForTraces(1)
911+
def producedSpan = TEST_WRITER[0][0]
912+
// Verify the span did NOT use the extracted context as parent
913+
producedSpan.traceId.toLong() != existingTraceId
914+
producedSpan.parentId != existingSpanId
915+
916+
cleanup:
917+
producer?.close()
918+
}
919+
890920
def producerSpan(
891921
TraceAssert trace,
892922
Map<String, ?> config,

dd-trace-api/src/main/java/datadog/trace/api/config/TraceInstrumentationConfig.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,8 @@ public final class TraceInstrumentationConfig {
104104
"kafka.client.propagation.disabled.topics";
105105
public static final String KAFKA_CLIENT_BASE64_DECODING_ENABLED =
106106
"kafka.client.base64.decoding.enabled";
107+
public static final String KAFKA_CLIENT_PROPAGATION_EXTRACTION_ENABLED =
108+
"kafka.client.propagation.extraction.enabled";
107109

108110
public static final String JMS_PROPAGATION_DISABLED_TOPICS = "jms.propagation.disabled.topics";
109111
public static final String JMS_PROPAGATION_DISABLED_QUEUES = "jms.propagation.disabled.queues";

internal-api/src/main/java/datadog/trace/api/Config.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -562,6 +562,7 @@
562562
import static datadog.trace.api.config.TraceInstrumentationConfig.JMS_UNACKNOWLEDGED_MAX_AGE;
563563
import static datadog.trace.api.config.TraceInstrumentationConfig.KAFKA_CLIENT_BASE64_DECODING_ENABLED;
564564
import static datadog.trace.api.config.TraceInstrumentationConfig.KAFKA_CLIENT_PROPAGATION_DISABLED_TOPICS;
565+
import static datadog.trace.api.config.TraceInstrumentationConfig.KAFKA_CLIENT_PROPAGATION_EXTRACTION_ENABLED;
565566
import static datadog.trace.api.config.TraceInstrumentationConfig.LOGS_INJECTION;
566567
import static datadog.trace.api.config.TraceInstrumentationConfig.LOGS_INJECTION_ENABLED;
567568
import static datadog.trace.api.config.TraceInstrumentationConfig.MESSAGE_BROKER_SPLIT_BY_DESTINATION;
@@ -1158,6 +1159,7 @@ public static String getHostName() {
11581159
private final boolean kafkaClientPropagationEnabled;
11591160
private final Set<String> kafkaClientPropagationDisabledTopics;
11601161
private final boolean kafkaClientBase64DecodingEnabled;
1162+
private final boolean kafkaClientPropagationExtractionEnabled;
11611163

11621164
private final boolean jmsPropagationEnabled;
11631165
private final Set<String> jmsPropagationDisabledTopics;
@@ -2629,6 +2631,8 @@ PROFILING_DATADOG_PROFILER_ENABLED, isDatadogProfilerSafeInCurrentEnvironment())
26292631
tryMakeImmutableSet(configProvider.getList(KAFKA_CLIENT_PROPAGATION_DISABLED_TOPICS));
26302632
kafkaClientBase64DecodingEnabled =
26312633
configProvider.getBoolean(KAFKA_CLIENT_BASE64_DECODING_ENABLED, false);
2634+
kafkaClientPropagationExtractionEnabled =
2635+
configProvider.getBoolean(KAFKA_CLIENT_PROPAGATION_EXTRACTION_ENABLED, false);
26322636
jmsPropagationEnabled = isPropagationEnabled(true, "jms");
26332637
jmsPropagationDisabledTopics =
26342638
tryMakeImmutableSet(configProvider.getList(JMS_PROPAGATION_DISABLED_TOPICS));
@@ -4451,6 +4455,10 @@ public boolean isKafkaClientBase64DecodingEnabled() {
44514455
return kafkaClientBase64DecodingEnabled;
44524456
}
44534457

4458+
public boolean isKafkaClientPropagationExtractionEnabled() {
4459+
return kafkaClientPropagationExtractionEnabled;
4460+
}
4461+
44544462
public boolean isRabbitPropagationEnabled() {
44554463
return rabbitPropagationEnabled;
44564464
}
@@ -5952,6 +5960,8 @@ public String toString() {
59525960
+ kafkaClientPropagationDisabledTopics
59535961
+ ", kafkaClientBase64DecodingEnabled="
59545962
+ kafkaClientBase64DecodingEnabled
5963+
+ ", kafkaClientPropagationExtractionEnabled="
5964+
+ kafkaClientPropagationExtractionEnabled
59555965
+ ", jmsPropagationEnabled="
59565966
+ jmsPropagationEnabled
59575967
+ ", jmsPropagationDisabledTopics="

0 commit comments

Comments
 (0)