Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package datadog.trace.bootstrap.instrumentation.java.lang;

public final class VirtualThreadHelper {
public static final String VIRTUAL_THREAD_CLASS_NAME = "java.lang.VirtualThread";

/**
* {@link datadog.trace.bootstrap.instrumentation.api.AgentScope} class name as string literal.
* This is mandatory for {@link datadog.trace.bootstrap.ContextStore} API call.
*/
public static final String AGENT_SCOPE_CLASS_NAME =
"datadog.trace.bootstrap.instrumentation.api.AgentScope";
}
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@
0 java.lang.ProcessImpl
# allow Runtime instrumentation for RASP
0 java.lang.Runtime
# allow context tracking for VirtualThread
0 java.lang.VirtualThread
0 java.net.http.*
0 java.net.HttpURLConnection
0 java.net.Socket
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,14 @@
import datadog.trace.bootstrap.instrumentation.java.concurrent.State;
import java.util.Map;
import net.bytebuddy.asm.Advice;
import net.bytebuddy.asm.Advice.OnMethodEnter;
import net.bytebuddy.asm.Advice.OnMethodExit;

/**
* Instruments {@code TaskRunner}, internal runnable for {@code ThreadPerTaskExecutor} (JDK 19+ as
* preview, 21+ as stable), the executor with default virtual thread factory.
*/
@SuppressWarnings("unused")
@AutoService(InstrumenterModule.class)
public final class TaskRunnerInstrumentation extends InstrumenterModule.Tracing
implements Instrumenter.ForBootstrap, Instrumenter.ForSingleType, Instrumenter.HasMethodAdvice {
Expand Down Expand Up @@ -51,19 +54,19 @@ public void methodAdvice(MethodTransformer transformer) {
}

public static final class Construct {
@Advice.OnMethodExit
@OnMethodExit(suppress = Throwable.class)
public static void captureScope(@Advice.This Runnable task) {
capture(InstrumentationContext.get(Runnable.class, State.class), task);
}
}

public static final class Run {
@Advice.OnMethodEnter
@OnMethodEnter(suppress = Throwable.class)
public static AgentScope activate(@Advice.This Runnable task) {
return startTaskScope(InstrumentationContext.get(Runnable.class, State.class), task);
}

@Advice.OnMethodExit(onThrowable = Throwable.class)
@OnMethodExit(suppress = Throwable.class)
public static void close(@Advice.Enter AgentScope scope) {
endTaskScope(scope);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,14 +1,13 @@
import datadog.trace.agent.test.InstrumentationSpecification
import datadog.trace.api.Trace
import datadog.trace.core.DDSpan
import spock.lang.Shared

import java.util.concurrent.Callable
import java.util.concurrent.ExecutorCompletionService
import java.util.concurrent.Executors
import java.util.concurrent.TimeUnit
import spock.lang.Shared

class VirtualThreadTest extends InstrumentationSpecification {
class VirtualThreadPerTaskExecutorTest extends InstrumentationSpecification {
@Shared
def executeRunnable = { e, c -> e.execute((Runnable) c) }
@Shared
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
plugins {
id 'idea'
}

apply from: "$rootDir/gradle/java.gradle"
// Use slf4j-simple as default; logback has a high chance of getting stuck in a deadlock on CI.
apply from: "$rootDir/gradle/slf4j-simple.gradle"

testJvmConstraints {
minJavaVersion = JavaVersion.VERSION_21
}

muzzle {
pass {
coreJdk('21')
}
}

idea {
module {
jdkName = '21'
}
}

// Set all compile tasks to use JDK21 but let instrumentation code targets 1.8 compatibility
tasks.withType(AbstractCompile).configureEach {
configureCompiler(it, 21, JavaVersion.VERSION_1_8)
}

dependencies {
testImplementation project(':dd-java-agent:instrumentation:trace-annotation')
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
package datadog.trace.instrumentation.java.lang.jdk21;

import static datadog.trace.agent.tooling.bytebuddy.matcher.NameMatchers.named;
import static datadog.trace.bootstrap.instrumentation.java.concurrent.AdviceUtils.capture;
import static datadog.trace.bootstrap.instrumentation.java.concurrent.AdviceUtils.endTaskScope;
import static datadog.trace.bootstrap.instrumentation.java.concurrent.AdviceUtils.startTaskScope;
import static datadog.trace.bootstrap.instrumentation.java.lang.VirtualThreadHelper.AGENT_SCOPE_CLASS_NAME;
import static datadog.trace.bootstrap.instrumentation.java.lang.VirtualThreadHelper.VIRTUAL_THREAD_CLASS_NAME;
import static net.bytebuddy.matcher.ElementMatchers.isConstructor;
import static net.bytebuddy.matcher.ElementMatchers.isMethod;

import com.google.auto.service.AutoService;
import datadog.environment.JavaVirtualMachine;
import datadog.trace.agent.tooling.Instrumenter;
import datadog.trace.agent.tooling.InstrumenterModule;
import datadog.trace.bootstrap.ContextStore;
import datadog.trace.bootstrap.InstrumentationContext;
import datadog.trace.bootstrap.instrumentation.api.AgentScope;
import datadog.trace.bootstrap.instrumentation.java.concurrent.State;
import java.util.HashMap;
import java.util.Map;
import net.bytebuddy.asm.Advice;
import net.bytebuddy.asm.Advice.OnMethodEnter;
import net.bytebuddy.asm.Advice.OnMethodExit;

/**
* Instruments {@code VirtualThread} to capture active state at creation, activate it on
* continuation mount, and close the scope from activation on continuation unmount.
*
* <p>The instrumentation uses two context stores. The first from {@link Runnable} (as {@code
* VirtualThread} inherits from {@link Runnable}) to store the captured {@link State} to restore
* later. It additionally stores the {@link AgentScope} to be able to close it later as activation /
* close is not done around the same method (so passing the scope from {@link OnMethodEnter} /
* {@link OnMethodExit} using advice return value is not possible).
*
* <p>Instrumenting the internal {@code VirtualThread.runContinuation()} method does not work as the
* current thread is still the carrier thread and not a virtual thread. Activating the state when on
* the carrier thread (ie a platform thread) would store the active context into ThreadLocal using
* the platform thread as key, making the tracer unable to retrieve the stored context from the
* current virtual thread (ThreadLocal will not return the value associated to the underlying
* platform thread as they are considered to be different).
*/
@SuppressWarnings("unused")
@AutoService(InstrumenterModule.class)
public final class VirtualThreadInstrumentation extends InstrumenterModule.Tracing
Copy link
Contributor

@ygree ygree Dec 4, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This makes TaskRunnerInstrumentation unnecessary. Perhaps we could just remove it now and move its test VirtualThreadPerTaskExecutorTest into this project.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The TaskRunnerInstrumentation is still needed. Its unit tests end with disconnected traces and unfinished spans without it.
I also included the (direct) virtual thread API into the concurrent smoke test (21) to make sure both instrumentations don't mess with each other.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just double-checked, and the VirtualThreadPerTaskExecutorTest in TaskRunnerInstrumentation passes when TaskRunnerInstrumentation is disabled. Remember to add testImplementation project(':dd-java-agent:instrumentation:java:java-lang:java-lang-21.0') when running ./gradlew :dd-java-agent:instrumentation:java:java-concurrent:java-concurrent-21.0:test -PtestJvm=21

implements Instrumenter.ForBootstrap, Instrumenter.ForSingleType, Instrumenter.HasMethodAdvice {

public VirtualThreadInstrumentation() {
super("java-lang", "java-lang-21", "virtual-thread");
}

@Override
public String instrumentedType() {
return VIRTUAL_THREAD_CLASS_NAME;
}

@Override
public boolean isEnabled() {
return JavaVirtualMachine.isJavaVersionAtLeast(21) && super.isEnabled();
}

@Override
public Map<String, String> contextStore() {
Map<String, String> contextStore = new HashMap<>();
contextStore.put(Runnable.class.getName(), State.class.getName());
contextStore.put(VIRTUAL_THREAD_CLASS_NAME, AGENT_SCOPE_CLASS_NAME);
return contextStore;
}

@Override
public void methodAdvice(MethodTransformer transformer) {
transformer.applyAdvice(isConstructor(), getClass().getName() + "$Construct");
transformer.applyAdvice(isMethod().and(named("mount")), getClass().getName() + "$Activate");
transformer.applyAdvice(isMethod().and(named("unmount")), getClass().getName() + "$Close");
}

public static final class Construct {
@OnMethodExit(suppress = Throwable.class)
public static void captureScope(@Advice.This Object virtualThread) {
capture(InstrumentationContext.get(Runnable.class, State.class), (Runnable) virtualThread);
}
}

public static final class Activate {
@OnMethodExit(suppress = Throwable.class)
public static void activate(@Advice.This Object virtualThread) {
ContextStore<Runnable, State> stateStore =
InstrumentationContext.get(Runnable.class, State.class);
ContextStore<Object, Object> scopeStore =
InstrumentationContext.get(VIRTUAL_THREAD_CLASS_NAME, AGENT_SCOPE_CLASS_NAME);
AgentScope agentScope = startTaskScope(stateStore, (Runnable) virtualThread);
scopeStore.put(virtualThread, agentScope);
}
}

public static final class Close {
@OnMethodEnter(suppress = Throwable.class)
public static void close(@Advice.This Object virtualThread) {
ContextStore<Object, Object> scopeStore =
InstrumentationContext.get(VIRTUAL_THREAD_CLASS_NAME, AGENT_SCOPE_CLASS_NAME);
Object agentScope = scopeStore.get(virtualThread);
if (agentScope instanceof AgentScope) {
endTaskScope((AgentScope) agentScope);
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
import datadog.trace.agent.test.InstrumentationSpecification
import datadog.trace.api.Trace
import datadog.trace.core.DDSpan

// Note: test builder x2 + test factory can be refactored but are kept simple to ease with debugging.
class VirtualThreadApiTest extends InstrumentationSpecification {
def "test Thread.Builder.OfVirtual - start()"() {
setup:
def threadBuilder = Thread.ofVirtual().name("builder - started")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could also add a test for Thread.startVirtualThread(). It works, I checked, but since it's another public method, it'd be nice to have a test just to be sure.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought of it in the first place but as the Javadoc explicitly states it is equivalent to Thread.ofVirtual().start(task);, I skipped it.
image

But you're right, better add it by safety as it's another public API 😉


when:
new Runnable() {
@Override
@Trace(operationName = "parent")
void run() {
// this child will have a span
threadBuilder.start(new JavaAsyncChild())
// this child won't
threadBuilder.start(new JavaAsyncChild(false, false))
blockUntilChildSpansFinished(1)
}
}.run()

then:
TEST_WRITER.waitForTraces(1)
List<DDSpan> trace = TEST_WRITER.get(0)

expect:
TEST_WRITER.size() == 1
trace.size() == 2
trace.get(0).operationName == "parent"
trace.get(1).operationName == "asyncChild"
trace.get(1).parentId == trace.get(0).spanId
}

def "test Thread.Builder.OfVirtual - unstarted()"() {
setup:
def threadBuilder = Thread.ofVirtual().name("builder - unstarted")

when:
new Runnable() {
@Override
@Trace(operationName = "parent")
void run() {
// this child will have a span
threadBuilder.unstarted(new JavaAsyncChild()).start()
// this child won't
threadBuilder.unstarted(new JavaAsyncChild(false, false)).start()
blockUntilChildSpansFinished(1)
}
}.run()


then:
TEST_WRITER.waitForTraces(1)
List<DDSpan> trace = TEST_WRITER.get(0)

expect:
TEST_WRITER.size() == 1
trace.size() == 2
trace.get(0).operationName == "parent"
trace.get(1).operationName == "asyncChild"
trace.get(1).parentId == trace.get(0).spanId
}

def "test Thread.startVirtual()"() {
when:
new Runnable() {
@Override
@Trace(operationName = "parent")
void run() {
// this child will have a span
Thread.startVirtualThread(new JavaAsyncChild())
// this child won't
Thread.startVirtualThread(new JavaAsyncChild(false, false))
blockUntilChildSpansFinished(1)
}
}.run()

then:
TEST_WRITER.waitForTraces(1)
List<DDSpan> trace = TEST_WRITER.get(0)

expect:
TEST_WRITER.size() == 1
trace.size() == 2
trace.get(0).operationName == "parent"
trace.get(1).operationName == "asyncChild"
trace.get(1).parentId == trace.get(0).spanId
}

def "test virtual ThreadFactory"() {
setup:
def threadFactory = Thread.ofVirtual().factory()

when:
new Runnable() {
@Override
@Trace(operationName = "parent")
void run() {
// this child will have a span
threadFactory.newThread(new JavaAsyncChild()).start()
// this child won't
threadFactory.newThread(new JavaAsyncChild(false, false)).start()
blockUntilChildSpansFinished(1)
}
}.run()

then:
TEST_WRITER.waitForTraces(1)
List<DDSpan> trace = TEST_WRITER.get(0)

expect:
TEST_WRITER.size() == 1
trace.size() == 2
trace.get(0).operationName == "parent"
trace.get(1).operationName == "asyncChild"
trace.get(1).parentId == trace.get(0).spanId
}

def "test nested virtual threads"() {
setup:
def threadBuilder = Thread.ofVirtual()

when:
new Runnable() {
@Trace(operationName = "parent")
@Override
void run() {
threadBuilder.start(new Runnable() {
@Trace(operationName = "child")
@Override
void run() {
threadBuilder.start(new Runnable() {
@Trace(operationName = "great-child")
@Override
void run() {
println "complete"
}
})
blockUntilChildSpansFinished(1)
}
})
blockUntilChildSpansFinished(1)
}
}.run()

then:
assertTraces(1) {
sortSpansByStart()
trace(3) {
span {
operationName "parent"
}
span {
childOfPrevious()
operationName "child"
}
span {
childOfPrevious()
operationName "great-child"
}
}
}
}
}
Loading