-
Notifications
You must be signed in to change notification settings - Fork 319
Add context tracking support for virtual threads #10040
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,12 @@ | ||
| package datadog.trace.bootstrap.instrumentation.java.lang; | ||
|
|
||
| public final class VirtualThreadHelper { | ||
| public static final String VIRTUAL_THREAD_CLASS_NAME = "java.lang.VirtualThread"; | ||
|
|
||
| /** | ||
| * {@link datadog.trace.bootstrap.instrumentation.api.AgentScope} class name as string literal. | ||
| * This is mandatory for {@link datadog.trace.bootstrap.ContextStore} API call. | ||
| */ | ||
| public static final String AGENT_SCOPE_CLASS_NAME = | ||
| "datadog.trace.bootstrap.instrumentation.api.AgentScope"; | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,32 @@ | ||
| plugins { | ||
| id 'idea' | ||
| } | ||
|
|
||
| apply from: "$rootDir/gradle/java.gradle" | ||
| // Use slf4j-simple as default; logback has a high chance of getting stuck in a deadlock on CI. | ||
| apply from: "$rootDir/gradle/slf4j-simple.gradle" | ||
|
|
||
| testJvmConstraints { | ||
| minJavaVersion = JavaVersion.VERSION_21 | ||
| } | ||
|
|
||
| muzzle { | ||
| pass { | ||
| coreJdk('21') | ||
| } | ||
| } | ||
|
|
||
| idea { | ||
| module { | ||
| jdkName = '21' | ||
| } | ||
| } | ||
|
|
||
| // Set all compile tasks to use JDK21 but let instrumentation code targets 1.8 compatibility | ||
| tasks.withType(AbstractCompile).configureEach { | ||
| configureCompiler(it, 21, JavaVersion.VERSION_1_8) | ||
| } | ||
|
|
||
| dependencies { | ||
| testImplementation project(':dd-java-agent:instrumentation:trace-annotation') | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,107 @@ | ||
| package datadog.trace.instrumentation.java.lang.jdk21; | ||
|
|
||
| import static datadog.trace.agent.tooling.bytebuddy.matcher.NameMatchers.named; | ||
| import static datadog.trace.bootstrap.instrumentation.java.concurrent.AdviceUtils.capture; | ||
| import static datadog.trace.bootstrap.instrumentation.java.concurrent.AdviceUtils.endTaskScope; | ||
| import static datadog.trace.bootstrap.instrumentation.java.concurrent.AdviceUtils.startTaskScope; | ||
| import static datadog.trace.bootstrap.instrumentation.java.lang.VirtualThreadHelper.AGENT_SCOPE_CLASS_NAME; | ||
| import static datadog.trace.bootstrap.instrumentation.java.lang.VirtualThreadHelper.VIRTUAL_THREAD_CLASS_NAME; | ||
| import static net.bytebuddy.matcher.ElementMatchers.isConstructor; | ||
| import static net.bytebuddy.matcher.ElementMatchers.isMethod; | ||
|
|
||
| import com.google.auto.service.AutoService; | ||
| import datadog.environment.JavaVirtualMachine; | ||
| import datadog.trace.agent.tooling.Instrumenter; | ||
| import datadog.trace.agent.tooling.InstrumenterModule; | ||
| import datadog.trace.bootstrap.ContextStore; | ||
| import datadog.trace.bootstrap.InstrumentationContext; | ||
| import datadog.trace.bootstrap.instrumentation.api.AgentScope; | ||
| import datadog.trace.bootstrap.instrumentation.java.concurrent.State; | ||
| import java.util.HashMap; | ||
| import java.util.Map; | ||
| import net.bytebuddy.asm.Advice; | ||
| import net.bytebuddy.asm.Advice.OnMethodEnter; | ||
| import net.bytebuddy.asm.Advice.OnMethodExit; | ||
|
|
||
| /** | ||
| * Instruments {@code VirtualThread} to capture active state at creation, activate it on | ||
| * continuation mount, and close the scope from activation on continuation unmount. | ||
| * | ||
| * <p>The instrumentation uses two context stores. The first from {@link Runnable} (as {@code | ||
| * VirtualThread} inherits from {@link Runnable}) to store the captured {@link State} to restore | ||
| * later. It additionally stores the {@link AgentScope} to be able to close it later as activation / | ||
| * close is not done around the same method (so passing the scope from {@link OnMethodEnter} / | ||
| * {@link OnMethodExit} using advice return value is not possible). | ||
| * | ||
| * <p>Instrumenting the internal {@code VirtualThread.runContinuation()} method does not work as the | ||
| * current thread is still the carrier thread and not a virtual thread. Activating the state when on | ||
| * the carrier thread (ie a platform thread) would store the active context into ThreadLocal using | ||
| * the platform thread as key, making the tracer unable to retrieve the stored context from the | ||
| * current virtual thread (ThreadLocal will not return the value associated to the underlying | ||
| * platform thread as they are considered to be different). | ||
| */ | ||
| @SuppressWarnings("unused") | ||
| @AutoService(InstrumenterModule.class) | ||
| public final class VirtualThreadInstrumentation extends InstrumenterModule.Tracing | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This makes
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I just double-checked, and the VirtualThreadPerTaskExecutorTest in TaskRunnerInstrumentation passes when TaskRunnerInstrumentation is disabled. Remember to add |
||
| implements Instrumenter.ForBootstrap, Instrumenter.ForSingleType, Instrumenter.HasMethodAdvice { | ||
|
|
||
| public VirtualThreadInstrumentation() { | ||
| super("java-lang", "java-lang-21", "virtual-thread"); | ||
| } | ||
|
|
||
| @Override | ||
| public String instrumentedType() { | ||
| return VIRTUAL_THREAD_CLASS_NAME; | ||
| } | ||
|
|
||
| @Override | ||
| public boolean isEnabled() { | ||
| return JavaVirtualMachine.isJavaVersionAtLeast(21) && super.isEnabled(); | ||
| } | ||
|
|
||
| @Override | ||
| public Map<String, String> contextStore() { | ||
| Map<String, String> contextStore = new HashMap<>(); | ||
| contextStore.put(Runnable.class.getName(), State.class.getName()); | ||
| contextStore.put(VIRTUAL_THREAD_CLASS_NAME, AGENT_SCOPE_CLASS_NAME); | ||
| return contextStore; | ||
| } | ||
|
|
||
| @Override | ||
| public void methodAdvice(MethodTransformer transformer) { | ||
| transformer.applyAdvice(isConstructor(), getClass().getName() + "$Construct"); | ||
| transformer.applyAdvice(isMethod().and(named("mount")), getClass().getName() + "$Activate"); | ||
| transformer.applyAdvice(isMethod().and(named("unmount")), getClass().getName() + "$Close"); | ||
| } | ||
|
|
||
| public static final class Construct { | ||
| @OnMethodExit(suppress = Throwable.class) | ||
| public static void captureScope(@Advice.This Object virtualThread) { | ||
| capture(InstrumentationContext.get(Runnable.class, State.class), (Runnable) virtualThread); | ||
| } | ||
| } | ||
|
|
||
| public static final class Activate { | ||
| @OnMethodExit(suppress = Throwable.class) | ||
| public static void activate(@Advice.This Object virtualThread) { | ||
| ContextStore<Runnable, State> stateStore = | ||
| InstrumentationContext.get(Runnable.class, State.class); | ||
| ContextStore<Object, Object> scopeStore = | ||
| InstrumentationContext.get(VIRTUAL_THREAD_CLASS_NAME, AGENT_SCOPE_CLASS_NAME); | ||
| AgentScope agentScope = startTaskScope(stateStore, (Runnable) virtualThread); | ||
| scopeStore.put(virtualThread, agentScope); | ||
| } | ||
| } | ||
|
|
||
| public static final class Close { | ||
| @OnMethodEnter(suppress = Throwable.class) | ||
| public static void close(@Advice.This Object virtualThread) { | ||
| ContextStore<Object, Object> scopeStore = | ||
| InstrumentationContext.get(VIRTUAL_THREAD_CLASS_NAME, AGENT_SCOPE_CLASS_NAME); | ||
| Object agentScope = scopeStore.get(virtualThread); | ||
| if (agentScope instanceof AgentScope) { | ||
| endTaskScope((AgentScope) agentScope); | ||
| } | ||
| } | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,166 @@ | ||
| import datadog.trace.agent.test.InstrumentationSpecification | ||
| import datadog.trace.api.Trace | ||
| import datadog.trace.core.DDSpan | ||
|
|
||
| // Note: test builder x2 + test factory can be refactored but are kept simple to ease with debugging. | ||
| class VirtualThreadApiTest extends InstrumentationSpecification { | ||
| def "test Thread.Builder.OfVirtual - start()"() { | ||
| setup: | ||
| def threadBuilder = Thread.ofVirtual().name("builder - started") | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We could also add a test for
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||
|
|
||
| when: | ||
| new Runnable() { | ||
| @Override | ||
| @Trace(operationName = "parent") | ||
| void run() { | ||
| // this child will have a span | ||
| threadBuilder.start(new JavaAsyncChild()) | ||
| // this child won't | ||
| threadBuilder.start(new JavaAsyncChild(false, false)) | ||
| blockUntilChildSpansFinished(1) | ||
| } | ||
| }.run() | ||
|
|
||
| then: | ||
| TEST_WRITER.waitForTraces(1) | ||
| List<DDSpan> trace = TEST_WRITER.get(0) | ||
|
|
||
| expect: | ||
| TEST_WRITER.size() == 1 | ||
| trace.size() == 2 | ||
| trace.get(0).operationName == "parent" | ||
| trace.get(1).operationName == "asyncChild" | ||
| trace.get(1).parentId == trace.get(0).spanId | ||
| } | ||
|
|
||
| def "test Thread.Builder.OfVirtual - unstarted()"() { | ||
| setup: | ||
| def threadBuilder = Thread.ofVirtual().name("builder - unstarted") | ||
|
|
||
| when: | ||
| new Runnable() { | ||
| @Override | ||
| @Trace(operationName = "parent") | ||
| void run() { | ||
| // this child will have a span | ||
| threadBuilder.unstarted(new JavaAsyncChild()).start() | ||
| // this child won't | ||
| threadBuilder.unstarted(new JavaAsyncChild(false, false)).start() | ||
| blockUntilChildSpansFinished(1) | ||
| } | ||
| }.run() | ||
|
|
||
|
|
||
| then: | ||
| TEST_WRITER.waitForTraces(1) | ||
| List<DDSpan> trace = TEST_WRITER.get(0) | ||
|
|
||
| expect: | ||
| TEST_WRITER.size() == 1 | ||
| trace.size() == 2 | ||
| trace.get(0).operationName == "parent" | ||
| trace.get(1).operationName == "asyncChild" | ||
| trace.get(1).parentId == trace.get(0).spanId | ||
| } | ||
|
|
||
| def "test Thread.startVirtual()"() { | ||
| when: | ||
| new Runnable() { | ||
| @Override | ||
| @Trace(operationName = "parent") | ||
| void run() { | ||
| // this child will have a span | ||
| Thread.startVirtualThread(new JavaAsyncChild()) | ||
| // this child won't | ||
| Thread.startVirtualThread(new JavaAsyncChild(false, false)) | ||
| blockUntilChildSpansFinished(1) | ||
| } | ||
| }.run() | ||
|
|
||
| then: | ||
| TEST_WRITER.waitForTraces(1) | ||
| List<DDSpan> trace = TEST_WRITER.get(0) | ||
|
|
||
| expect: | ||
| TEST_WRITER.size() == 1 | ||
| trace.size() == 2 | ||
| trace.get(0).operationName == "parent" | ||
| trace.get(1).operationName == "asyncChild" | ||
| trace.get(1).parentId == trace.get(0).spanId | ||
| } | ||
|
|
||
| def "test virtual ThreadFactory"() { | ||
| setup: | ||
| def threadFactory = Thread.ofVirtual().factory() | ||
|
|
||
| when: | ||
| new Runnable() { | ||
| @Override | ||
| @Trace(operationName = "parent") | ||
| void run() { | ||
| // this child will have a span | ||
| threadFactory.newThread(new JavaAsyncChild()).start() | ||
| // this child won't | ||
| threadFactory.newThread(new JavaAsyncChild(false, false)).start() | ||
| blockUntilChildSpansFinished(1) | ||
| } | ||
| }.run() | ||
|
|
||
| then: | ||
| TEST_WRITER.waitForTraces(1) | ||
| List<DDSpan> trace = TEST_WRITER.get(0) | ||
|
|
||
| expect: | ||
| TEST_WRITER.size() == 1 | ||
| trace.size() == 2 | ||
| trace.get(0).operationName == "parent" | ||
| trace.get(1).operationName == "asyncChild" | ||
| trace.get(1).parentId == trace.get(0).spanId | ||
| } | ||
|
|
||
| def "test nested virtual threads"() { | ||
| setup: | ||
| def threadBuilder = Thread.ofVirtual() | ||
|
|
||
| when: | ||
| new Runnable() { | ||
| @Trace(operationName = "parent") | ||
| @Override | ||
| void run() { | ||
| threadBuilder.start(new Runnable() { | ||
| @Trace(operationName = "child") | ||
| @Override | ||
| void run() { | ||
| threadBuilder.start(new Runnable() { | ||
| @Trace(operationName = "great-child") | ||
| @Override | ||
| void run() { | ||
| println "complete" | ||
| } | ||
| }) | ||
| blockUntilChildSpansFinished(1) | ||
| } | ||
| }) | ||
| blockUntilChildSpansFinished(1) | ||
| } | ||
| }.run() | ||
|
|
||
| then: | ||
| assertTraces(1) { | ||
| sortSpansByStart() | ||
| trace(3) { | ||
| span { | ||
| operationName "parent" | ||
| } | ||
| span { | ||
| childOfPrevious() | ||
| operationName "child" | ||
| } | ||
| span { | ||
| childOfPrevious() | ||
| operationName "great-child" | ||
| } | ||
| } | ||
| } | ||
| } | ||
| } | ||

Uh oh!
There was an error while loading. Please reload this page.