Skip to content

Commit af48d68

Browse files
committed
feat(java-lang): Add support for VirtualThread
Add context tracking for VirtualThread API
1 parent d1f47ff commit af48d68

File tree

7 files changed

+364
-0
lines changed

7 files changed

+364
-0
lines changed
Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
package datadog.trace.bootstrap.instrumentation.java.lang;
2+
3+
public final class VirtualThreadHelper {
4+
public static final String VIRTUAL_THREAD_CLASS_NAME = "java.lang.VirtualThread";
5+
6+
/**
7+
* {@link datadog.trace.bootstrap.instrumentation.api.AgentScope} class name as string literal.
8+
* This is mandatory for {@link datadog.trace.bootstrap.ContextStore} API call.
9+
*/
10+
public static final String AGENT_SCOPE_CLASS_NAME =
11+
"datadog.trace.bootstrap.instrumentation.api.AgentScope";
12+
}

dd-java-agent/agent-tooling/src/main/resources/datadog/trace/agent/tooling/bytebuddy/matcher/ignored_class_name.trie

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,8 @@
5050
0 java.lang.ProcessImpl
5151
# allow Runtime instrumentation for RASP
5252
0 java.lang.Runtime
53+
# allow context tracking for VirtualThread
54+
0 java.lang.VirtualThread
5355
0 java.net.http.*
5456
0 java.net.HttpURLConnection
5557
0 java.net.Socket
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
plugins {
2+
id 'idea'
3+
}
4+
5+
apply from: "$rootDir/gradle/java.gradle"
6+
// Use slf4j-simple as default; logback has a high chance of getting stuck in a deadlock on CI.
7+
apply from: "$rootDir/gradle/slf4j-simple.gradle"
8+
9+
testJvmConstraints {
10+
minJavaVersion = JavaVersion.VERSION_21
11+
}
12+
13+
muzzle {
14+
pass {
15+
coreJdk('21')
16+
}
17+
}
18+
19+
idea {
20+
module {
21+
jdkName = '21'
22+
}
23+
}
24+
25+
// Set all compile tasks to use JDK21 but let instrumentation code targets 1.8 compatibility
26+
tasks.withType(AbstractCompile).configureEach {
27+
configureCompiler(it, 21, JavaVersion.VERSION_1_8)
28+
}
29+
30+
dependencies {
31+
testImplementation project(':dd-java-agent:instrumentation:trace-annotation')
32+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,107 @@
1+
package datadog.trace.instrumentation.java.lang.jdk21;
2+
3+
import static datadog.trace.agent.tooling.bytebuddy.matcher.NameMatchers.named;
4+
import static datadog.trace.bootstrap.instrumentation.java.concurrent.AdviceUtils.capture;
5+
import static datadog.trace.bootstrap.instrumentation.java.concurrent.AdviceUtils.endTaskScope;
6+
import static datadog.trace.bootstrap.instrumentation.java.concurrent.AdviceUtils.startTaskScope;
7+
import static datadog.trace.bootstrap.instrumentation.java.lang.VirtualThreadHelper.AGENT_SCOPE_CLASS_NAME;
8+
import static datadog.trace.bootstrap.instrumentation.java.lang.VirtualThreadHelper.VIRTUAL_THREAD_CLASS_NAME;
9+
import static net.bytebuddy.matcher.ElementMatchers.isConstructor;
10+
import static net.bytebuddy.matcher.ElementMatchers.isMethod;
11+
12+
import com.google.auto.service.AutoService;
13+
import datadog.environment.JavaVirtualMachine;
14+
import datadog.trace.agent.tooling.Instrumenter;
15+
import datadog.trace.agent.tooling.InstrumenterModule;
16+
import datadog.trace.bootstrap.ContextStore;
17+
import datadog.trace.bootstrap.InstrumentationContext;
18+
import datadog.trace.bootstrap.instrumentation.api.AgentScope;
19+
import datadog.trace.bootstrap.instrumentation.java.concurrent.State;
20+
import java.util.HashMap;
21+
import java.util.Map;
22+
import net.bytebuddy.asm.Advice;
23+
import net.bytebuddy.asm.Advice.OnMethodEnter;
24+
import net.bytebuddy.asm.Advice.OnMethodExit;
25+
26+
/**
27+
* Instruments {@code VirtualThread} to capture active state at creation, activate it on
28+
* continuation mount, and close the scope from activation on continuation unmount.
29+
*
30+
* <p>The instrumentation uses two context stores. The first from {@link Runnable} (as {@code
31+
* VirtualThread} inherits from {@link Runnable}) to store the captured {@link State} to restore
32+
* later. It additionally stores the {@link AgentScope} to be able to close it later as activation /
33+
* close is not done around the same method (so passing the scope from {@link OnMethodEnter} /
34+
* {@link OnMethodExit} using advice return value is not possible).
35+
*
36+
* <p>Instrumenting the internal {@code VirtualThread.runContinuation()} method does not work as the
37+
* current thread is still the carrier thread and not a virtual thread. Activating the state when on
38+
* the carrier thread (ie a platform thread) would store the active context into ThreadLocal using
39+
* the platform thread as key, making the tracer unable to retrieve the stored context from the
40+
* current virtual thread (ThreadLocal will not return the value associated to the underlying
41+
* platform thread as they are considered to be different).
42+
*/
43+
@SuppressWarnings("unused")
44+
@AutoService(InstrumenterModule.class)
45+
public final class VirtualThreadInstrumentation extends InstrumenterModule.Tracing
46+
implements Instrumenter.ForBootstrap, Instrumenter.ForSingleType, Instrumenter.HasMethodAdvice {
47+
48+
public VirtualThreadInstrumentation() {
49+
super("java-lang", "java-lang-21", "virtual-thread");
50+
}
51+
52+
@Override
53+
public String instrumentedType() {
54+
return VIRTUAL_THREAD_CLASS_NAME;
55+
}
56+
57+
@Override
58+
public boolean isEnabled() {
59+
return JavaVirtualMachine.isJavaVersionAtLeast(21) && super.isEnabled();
60+
}
61+
62+
@Override
63+
public Map<String, String> contextStore() {
64+
Map<String, String> contextStore = new HashMap<>();
65+
contextStore.put(Runnable.class.getName(), State.class.getName());
66+
contextStore.put(VIRTUAL_THREAD_CLASS_NAME, AGENT_SCOPE_CLASS_NAME);
67+
return contextStore;
68+
}
69+
70+
@Override
71+
public void methodAdvice(MethodTransformer transformer) {
72+
transformer.applyAdvice(isConstructor(), getClass().getName() + "$Construct");
73+
transformer.applyAdvice(isMethod().and(named("mount")), getClass().getName() + "$Activate");
74+
transformer.applyAdvice(isMethod().and(named("unmount")), getClass().getName() + "$Close");
75+
}
76+
77+
public static final class Construct {
78+
@OnMethodExit(suppress = Throwable.class)
79+
public static void captureScope(@Advice.This Object virtualThread) {
80+
capture(InstrumentationContext.get(Runnable.class, State.class), (Runnable) virtualThread);
81+
}
82+
}
83+
84+
public static final class Activate {
85+
@OnMethodExit(suppress = Throwable.class)
86+
public static void activate(@Advice.This Object virtualThread) {
87+
ContextStore<Runnable, State> stateStore =
88+
InstrumentationContext.get(Runnable.class, State.class);
89+
ContextStore<Object, Object> scopeStore =
90+
InstrumentationContext.get(VIRTUAL_THREAD_CLASS_NAME, AGENT_SCOPE_CLASS_NAME);
91+
AgentScope agentScope = startTaskScope(stateStore, (Runnable) virtualThread);
92+
scopeStore.put(virtualThread, agentScope);
93+
}
94+
}
95+
96+
public static final class Close {
97+
@OnMethodEnter(suppress = Throwable.class)
98+
public static void close(@Advice.This Object virtualThread) {
99+
ContextStore<Object, Object> scopeStore =
100+
InstrumentationContext.get(VIRTUAL_THREAD_CLASS_NAME, AGENT_SCOPE_CLASS_NAME);
101+
Object agentScope = scopeStore.get(virtualThread);
102+
if (agentScope instanceof AgentScope) {
103+
endTaskScope((AgentScope) agentScope);
104+
}
105+
}
106+
}
107+
}
Lines changed: 166 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,166 @@
1+
import datadog.trace.agent.test.InstrumentationSpecification
2+
import datadog.trace.api.Trace
3+
import datadog.trace.core.DDSpan
4+
5+
// Note: test builder x2 + test factory can be refactored but are kept simple to ease with debugging.
6+
class VirtualThreadApiTest extends InstrumentationSpecification {
7+
def "test Thread.Builder.OfVirtual - start()"() {
8+
setup:
9+
def threadBuilder = Thread.ofVirtual().name("builder - started")
10+
11+
when:
12+
new Runnable() {
13+
@Override
14+
@Trace(operationName = "parent")
15+
void run() {
16+
// this child will have a span
17+
threadBuilder.start(new JavaAsyncChild())
18+
// this child won't
19+
threadBuilder.start(new JavaAsyncChild(false, false))
20+
blockUntilChildSpansFinished(1)
21+
}
22+
}.run()
23+
24+
then:
25+
TEST_WRITER.waitForTraces(1)
26+
List<DDSpan> trace = TEST_WRITER.get(0)
27+
28+
expect:
29+
TEST_WRITER.size() == 1
30+
trace.size() == 2
31+
trace.get(0).operationName == "parent"
32+
trace.get(1).operationName == "asyncChild"
33+
trace.get(1).parentId == trace.get(0).spanId
34+
}
35+
36+
def "test Thread.Builder.OfVirtual - unstarted()"() {
37+
setup:
38+
def threadBuilder = Thread.ofVirtual().name("builder - unstarted")
39+
40+
when:
41+
new Runnable() {
42+
@Override
43+
@Trace(operationName = "parent")
44+
void run() {
45+
// this child will have a span
46+
threadBuilder.unstarted(new JavaAsyncChild()).start()
47+
// this child won't
48+
threadBuilder.unstarted(new JavaAsyncChild(false, false)).start()
49+
blockUntilChildSpansFinished(1)
50+
}
51+
}.run()
52+
53+
54+
then:
55+
TEST_WRITER.waitForTraces(1)
56+
List<DDSpan> trace = TEST_WRITER.get(0)
57+
58+
expect:
59+
TEST_WRITER.size() == 1
60+
trace.size() == 2
61+
trace.get(0).operationName == "parent"
62+
trace.get(1).operationName == "asyncChild"
63+
trace.get(1).parentId == trace.get(0).spanId
64+
}
65+
66+
def "test Thread.startVirtual()"() {
67+
when:
68+
new Runnable() {
69+
@Override
70+
@Trace(operationName = "parent")
71+
void run() {
72+
// this child will have a span
73+
Thread.startVirtualThread(new JavaAsyncChild())
74+
// this child won't
75+
Thread.startVirtualThread(new JavaAsyncChild(false, false))
76+
blockUntilChildSpansFinished(1)
77+
}
78+
}.run()
79+
80+
then:
81+
TEST_WRITER.waitForTraces(1)
82+
List<DDSpan> trace = TEST_WRITER.get(0)
83+
84+
expect:
85+
TEST_WRITER.size() == 1
86+
trace.size() == 2
87+
trace.get(0).operationName == "parent"
88+
trace.get(1).operationName == "asyncChild"
89+
trace.get(1).parentId == trace.get(0).spanId
90+
}
91+
92+
def "test virtual ThreadFactory"() {
93+
setup:
94+
def threadFactory = Thread.ofVirtual().factory()
95+
96+
when:
97+
new Runnable() {
98+
@Override
99+
@Trace(operationName = "parent")
100+
void run() {
101+
// this child will have a span
102+
threadFactory.newThread(new JavaAsyncChild()).start()
103+
// this child won't
104+
threadFactory.newThread(new JavaAsyncChild(false, false)).start()
105+
blockUntilChildSpansFinished(1)
106+
}
107+
}.run()
108+
109+
then:
110+
TEST_WRITER.waitForTraces(1)
111+
List<DDSpan> trace = TEST_WRITER.get(0)
112+
113+
expect:
114+
TEST_WRITER.size() == 1
115+
trace.size() == 2
116+
trace.get(0).operationName == "parent"
117+
trace.get(1).operationName == "asyncChild"
118+
trace.get(1).parentId == trace.get(0).spanId
119+
}
120+
121+
def "test nested virtual threads"() {
122+
setup:
123+
def threadBuilder = Thread.ofVirtual()
124+
125+
when:
126+
new Runnable() {
127+
@Trace(operationName = "parent")
128+
@Override
129+
void run() {
130+
threadBuilder.start(new Runnable() {
131+
@Trace(operationName = "child")
132+
@Override
133+
void run() {
134+
threadBuilder.start(new Runnable() {
135+
@Trace(operationName = "great-child")
136+
@Override
137+
void run() {
138+
println "complete"
139+
}
140+
})
141+
blockUntilChildSpansFinished(1)
142+
}
143+
})
144+
blockUntilChildSpansFinished(1)
145+
}
146+
}.run()
147+
148+
then:
149+
assertTraces(1) {
150+
sortSpansByStart()
151+
trace(3) {
152+
span {
153+
operationName "parent"
154+
}
155+
span {
156+
childOfPrevious()
157+
operationName "child"
158+
}
159+
span {
160+
childOfPrevious()
161+
operationName "great-child"
162+
}
163+
}
164+
}
165+
}
166+
}
Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
import datadog.trace.api.Trace;
2+
import java.util.concurrent.Callable;
3+
import java.util.concurrent.atomic.AtomicBoolean;
4+
5+
public class JavaAsyncChild implements Runnable, Callable<Void> {
6+
private final AtomicBoolean blockThread;
7+
private final boolean doTraceableWork;
8+
9+
public JavaAsyncChild() {
10+
this(true, false);
11+
}
12+
13+
public JavaAsyncChild(final boolean doTraceableWork, final boolean blockThread) {
14+
this.doTraceableWork = doTraceableWork;
15+
this.blockThread = new AtomicBoolean(blockThread);
16+
}
17+
18+
public void unblock() {
19+
blockThread.set(false);
20+
}
21+
22+
@Override
23+
public void run() {
24+
runImpl();
25+
}
26+
27+
@Override
28+
public Void call() {
29+
runImpl();
30+
return null;
31+
}
32+
33+
private void runImpl() {
34+
while (blockThread.get()) {
35+
// busy-wait to block thread
36+
}
37+
if (doTraceableWork) {
38+
asyncChild();
39+
}
40+
}
41+
42+
@Trace(operationName = "asyncChild")
43+
private void asyncChild() {}
44+
}

settings.gradle.kts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -384,6 +384,7 @@ include(
384384
":dd-java-agent:instrumentation:java:java-lang:java-lang-11.0",
385385
":dd-java-agent:instrumentation:java:java-lang:java-lang-15.0",
386386
":dd-java-agent:instrumentation:java:java-lang:java-lang-17.0",
387+
":dd-java-agent:instrumentation:java:java-lang:java-lang-21.0",
387388
":dd-java-agent:instrumentation:java:java-lang:java-lang-9.0",
388389
":dd-java-agent:instrumentation:java:java-net:java-net-1.8",
389390
":dd-java-agent:instrumentation:java:java-net:java-net-11.0",

0 commit comments

Comments
 (0)