Skip to content

Commit

Permalink
Disable context propagation when virtual thread is switched to the ca…
Browse files Browse the repository at this point in the history
…rrier thread (#10854)
  • Loading branch information
laurit committed Mar 14, 2024
1 parent 61ca213 commit 628136e
Show file tree
Hide file tree
Showing 3 changed files with 85 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,35 @@
*/
public final class ExecutorAdviceHelper {

private static final ThreadLocal<Boolean> propagationDisabled = new ThreadLocal<>();

/**
* Temporarily disable context propagation for current thread. Call {@link #enablePropagation()}
* to re-enable the propagation.
*/
public static void disablePropagation() {
propagationDisabled.set(Boolean.TRUE);
}

/**
* Enable context propagation for current thread after it was disabled by calling {@link
* #disablePropagation()}.
*/
public static void enablePropagation() {
propagationDisabled.remove();
}

private static boolean isPropagationDisabled() {
return propagationDisabled.get() != null;
}

/**
* Check if {@code context} should be propagated to the passed {@code task}. This method must be
* called before each {@link #attachContextToTask(Context, VirtualField, Object)} call to ensure
* that unwanted tasks are not instrumented.
*/
public static boolean shouldPropagateContext(Context context, @Nullable Object task) {
if (task == null) {
if (task == null || isPropagationDisabled()) {
return false;
}

Expand Down Expand Up @@ -89,6 +111,10 @@ public static void cleanUpAfterSubmit(
/** Clean context attached to the given task. */
public static <T> void cleanPropagatedContext(
VirtualField<T, PropagatedContext> virtualField, T task) {
if (isPropagationDisabled()) {
return;
}

PropagatedContext propagatedContext = virtualField.get(task);
if (propagatedContext != null) {
propagatedContext.clear();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ public List<TypeInstrumentation> typeInstrumentations() {
new JavaExecutorInstrumentation(),
new JavaForkJoinTaskInstrumentation(),
new RunnableInstrumentation(),
new ThreadPoolExtendingExecutorInstrumentation());
new ThreadPoolExtendingExecutorInstrumentation(),
new VirtualThreadInstrumentation());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.javaagent.instrumentation.executors;

import static net.bytebuddy.matcher.ElementMatchers.named;
import static net.bytebuddy.matcher.ElementMatchers.takesArguments;

import io.opentelemetry.javaagent.bootstrap.executors.ExecutorAdviceHelper;
import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer;
import net.bytebuddy.asm.Advice;
import net.bytebuddy.description.type.TypeDescription;
import net.bytebuddy.matcher.ElementMatcher;

public class VirtualThreadInstrumentation implements TypeInstrumentation {

@Override
public ElementMatcher<TypeDescription> typeMatcher() {
return named("java.lang.VirtualThread");
}

@Override
public void transform(TypeTransformer transformer) {
// Disable context propagation when virtual thread is switched to the carrier thread. We should
// not propagate context on the carrier thread. Also, context propagation code can cause the
// carrier thread to park when it normally does not park, which may be unexpected for the jvm.
// https://github.com/open-telemetry/opentelemetry-java-instrumentation/issues/10747
transformer.applyAdviceToMethod(
named("switchToCarrierThread").and(takesArguments(0)),
this.getClass().getName() + "$SwitchToCarrierAdvice");
transformer.applyAdviceToMethod(
named("switchToVirtualThread").and(takesArguments(1)),
this.getClass().getName() + "$SwitchToVirtualAdvice");
}

@SuppressWarnings("unused")
public static class SwitchToCarrierAdvice {

@Advice.OnMethodExit(suppress = Throwable.class)
public static void exit() {
ExecutorAdviceHelper.disablePropagation();
}
}

@SuppressWarnings("unused")
public static class SwitchToVirtualAdvice {

@Advice.OnMethodEnter(suppress = Throwable.class)
public static void enter() {
ExecutorAdviceHelper.enablePropagation();
}
}
}

0 comments on commit 628136e

Please sign in to comment.