Skip to content

Commit 8fed0a3

Browse files
PlugaruTygree
authored andcommitted
Add a new configuration kafka.client.propagation.extraction.enabled to make this behavior optional
1 parent a8cb36a commit 8fed0a3

File tree

6 files changed

+80
-4
lines changed

6 files changed

+80
-4
lines changed

dd-java-agent/instrumentation/kafka/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/KafkaProducerInstrumentation.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -127,9 +127,11 @@ public static AgentScope onEnter(
127127
ClusterIdHolder.set(clusterId);
128128
}
129129

130-
// Try to extract existing trace context from record headers
130+
// Try to extract existing trace context from record headers if enabled
131131
final AgentSpanContext extractedContext =
132-
extractContextAndGetSpanContext(record.headers(), TextMapExtractAdapter.GETTER);
132+
Config.get().isKafkaClientPropagationExtractionEnabled()
133+
? extractContextAndGetSpanContext(record.headers(), TextMapExtractAdapter.GETTER)
134+
: null;
133135

134136
final AgentSpan localActiveSpan = activeSpan();
135137

dd-java-agent/instrumentation/kafka/kafka-clients-0.11/src/test/groovy/KafkaClientTestBase.groovy

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1019,6 +1019,7 @@ abstract class KafkaClientTestBase extends VersionedNamingTestBase {
10191019

10201020
def "test producer extracts and uses existing trace context from record headers"() {
10211021
setup:
1022+
injectSysConfig("kafka.client.propagation.extraction.enabled", "true")
10221023
def senderProps = KafkaTestUtils.senderProps(embeddedKafka.getBrokersAsString())
10231024
def producer = new KafkaProducer<>(senderProps)
10241025

@@ -1047,6 +1048,35 @@ abstract class KafkaClientTestBase extends VersionedNamingTestBase {
10471048
producer?.close()
10481049
}
10491050

1051+
def "test producer ignores existing trace context when extraction is disabled"() {
1052+
setup:
1053+
injectSysConfig("kafka.client.propagation.extraction.enabled", "false")
1054+
def senderProps = KafkaTestUtils.senderProps(embeddedKafka.getBrokersAsString())
1055+
def producer = new KafkaProducer<>(senderProps)
1056+
1057+
def existingTraceId = 1234567890123456L
1058+
def existingSpanId = 9876543210987654L
1059+
def headers = new RecordHeaders()
1060+
headers.add(new RecordHeader("x-datadog-trace-id",
1061+
String.valueOf(existingTraceId).getBytes(StandardCharsets.UTF_8)))
1062+
headers.add(new RecordHeader("x-datadog-parent-id",
1063+
String.valueOf(existingSpanId).getBytes(StandardCharsets.UTF_8)))
1064+
1065+
when:
1066+
def record = new ProducerRecord(SHARED_TOPIC, 0, null, "test-context-extraction-disabled", headers)
1067+
producer.send(record).get()
1068+
1069+
then:
1070+
TEST_WRITER.waitForTraces(1)
1071+
def producedSpan = TEST_WRITER[0][0]
1072+
// Verify the span did NOT use the extracted context as parent
1073+
producedSpan.traceId.toLong() != existingTraceId
1074+
producedSpan.parentId != existingSpanId
1075+
1076+
cleanup:
1077+
producer?.close()
1078+
}
1079+
10501080
def containerProperties() {
10511081
try {
10521082
// Different class names for test and latestDepTest.

dd-java-agent/instrumentation/kafka/kafka-clients-3.8/src/main/java17/datadog/trace/instrumentation/kafka_clients38/ProducerAdvice.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -48,9 +48,11 @@ public static AgentScope onEnter(
4848
ClusterIdHolder.set(clusterId);
4949
}
5050

51-
// Try to extract existing trace context from record headers
51+
// Try to extract existing trace context from record headers if enabled
5252
final AgentSpanContext extractedContext =
53-
extractContextAndGetSpanContext(record.headers(), TextMapExtractAdapter.GETTER);
53+
Config.get().isKafkaClientPropagationExtractionEnabled()
54+
? extractContextAndGetSpanContext(record.headers(), TextMapExtractAdapter.GETTER)
55+
: null;
5456

5557
final AgentSpan localActiveSpan = activeSpan();
5658

dd-java-agent/instrumentation/kafka/kafka-clients-3.8/src/test/groovy/KafkaClientTestBase.groovy

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -859,6 +859,7 @@ abstract class KafkaClientTestBase extends VersionedNamingTestBase {
859859
860860
def "test producer extracts and uses existing trace context from record headers"() {
861861
setup:
862+
injectSysConfig("kafka.client.propagation.extraction.enabled", "true")
862863
def senderProps = KafkaTestUtils.producerProps(embeddedKafka.getBrokersAsString())
863864
def producer = new KafkaProducer<>(senderProps)
864865
@@ -887,6 +888,35 @@ abstract class KafkaClientTestBase extends VersionedNamingTestBase {
887888
producer?.close()
888889
}
889890
891+
def "test producer ignores existing trace context when extraction is disabled"() {
892+
setup:
893+
injectSysConfig("kafka.client.propagation.extraction.enabled", "false")
894+
def senderProps = KafkaTestUtils.producerProps(embeddedKafka.getBrokersAsString())
895+
def producer = new KafkaProducer<>(senderProps)
896+
897+
def existingTraceId = 1234567890123456L
898+
def existingSpanId = 9876543210987654L
899+
def headers = new RecordHeaders()
900+
headers.add(new RecordHeader("x-datadog-trace-id",
901+
String.valueOf(existingTraceId).getBytes(StandardCharsets.UTF_8)))
902+
headers.add(new RecordHeader("x-datadog-parent-id",
903+
String.valueOf(existingSpanId).getBytes(StandardCharsets.UTF_8)))
904+
905+
when:
906+
def record = new ProducerRecord(SHARED_TOPIC, 0, null, "test-context-extraction-disabled", headers)
907+
producer.send(record).get()
908+
909+
then:
910+
TEST_WRITER.waitForTraces(1)
911+
def producedSpan = TEST_WRITER[0][0]
912+
// Verify the span did NOT use the extracted context as parent
913+
producedSpan.traceId.toLong() != existingTraceId
914+
producedSpan.parentId != existingSpanId
915+
916+
cleanup:
917+
producer?.close()
918+
}
919+
890920
def producerSpan(
891921
TraceAssert trace,
892922
Map<String, ?> config,

dd-trace-api/src/main/java/datadog/trace/api/config/TraceInstrumentationConfig.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -109,6 +109,8 @@ public final class TraceInstrumentationConfig {
109109
"kafka.client.propagation.disabled.topics";
110110
public static final String KAFKA_CLIENT_BASE64_DECODING_ENABLED =
111111
"kafka.client.base64.decoding.enabled";
112+
public static final String KAFKA_CLIENT_PROPAGATION_EXTRACTION_ENABLED =
113+
"kafka.client.propagation.extraction.enabled";
112114

113115
public static final String JMS_PROPAGATION_DISABLED_TOPICS = "jms.propagation.disabled.topics";
114116
public static final String JMS_PROPAGATION_DISABLED_QUEUES = "jms.propagation.disabled.queues";

internal-api/src/main/java/datadog/trace/api/Config.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -562,6 +562,7 @@
562562
import static datadog.trace.api.config.TraceInstrumentationConfig.JMS_UNACKNOWLEDGED_MAX_AGE;
563563
import static datadog.trace.api.config.TraceInstrumentationConfig.KAFKA_CLIENT_BASE64_DECODING_ENABLED;
564564
import static datadog.trace.api.config.TraceInstrumentationConfig.KAFKA_CLIENT_PROPAGATION_DISABLED_TOPICS;
565+
import static datadog.trace.api.config.TraceInstrumentationConfig.KAFKA_CLIENT_PROPAGATION_EXTRACTION_ENABLED;
565566
import static datadog.trace.api.config.TraceInstrumentationConfig.LOGS_INJECTION;
566567
import static datadog.trace.api.config.TraceInstrumentationConfig.LOGS_INJECTION_ENABLED;
567568
import static datadog.trace.api.config.TraceInstrumentationConfig.MESSAGE_BROKER_SPLIT_BY_DESTINATION;
@@ -1160,6 +1161,7 @@ public static String getHostName() {
11601161
private final boolean kafkaClientPropagationEnabled;
11611162
private final Set<String> kafkaClientPropagationDisabledTopics;
11621163
private final boolean kafkaClientBase64DecodingEnabled;
1164+
private final boolean kafkaClientPropagationExtractionEnabled;
11631165

11641166
private final boolean jmsPropagationEnabled;
11651167
private final Set<String> jmsPropagationDisabledTopics;
@@ -2645,6 +2647,8 @@ PROFILING_DATADOG_PROFILER_ENABLED, isDatadogProfilerSafeInCurrentEnvironment())
26452647
tryMakeImmutableSet(configProvider.getList(KAFKA_CLIENT_PROPAGATION_DISABLED_TOPICS));
26462648
kafkaClientBase64DecodingEnabled =
26472649
configProvider.getBoolean(KAFKA_CLIENT_BASE64_DECODING_ENABLED, false);
2650+
kafkaClientPropagationExtractionEnabled =
2651+
configProvider.getBoolean(KAFKA_CLIENT_PROPAGATION_EXTRACTION_ENABLED, false);
26482652
jmsPropagationEnabled = isPropagationEnabled(true, "jms");
26492653
jmsPropagationDisabledTopics =
26502654
tryMakeImmutableSet(configProvider.getList(JMS_PROPAGATION_DISABLED_TOPICS));
@@ -4480,6 +4484,10 @@ public boolean isKafkaClientBase64DecodingEnabled() {
44804484
return kafkaClientBase64DecodingEnabled;
44814485
}
44824486

4487+
public boolean isKafkaClientPropagationExtractionEnabled() {
4488+
return kafkaClientPropagationExtractionEnabled;
4489+
}
4490+
44834491
public boolean isRabbitPropagationEnabled() {
44844492
return rabbitPropagationEnabled;
44854493
}
@@ -6022,6 +6030,8 @@ public String toString() {
60226030
+ kafkaClientPropagationDisabledTopics
60236031
+ ", kafkaClientBase64DecodingEnabled="
60246032
+ kafkaClientBase64DecodingEnabled
6033+
+ ", kafkaClientPropagationExtractionEnabled="
6034+
+ kafkaClientPropagationExtractionEnabled
60256035
+ ", jmsPropagationEnabled="
60266036
+ jmsPropagationEnabled
60276037
+ ", jmsPropagationDisabledTopics="

0 commit comments

Comments
 (0)