Skip to content

Commit

Permalink
Stop kotlin coroutine dispatcher from propagating context (#11500)
Browse files Browse the repository at this point in the history
  • Loading branch information
laurit committed Jun 5, 2024
1 parent a47c406 commit 0ea05a8
Show file tree
Hide file tree
Showing 5 changed files with 113 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ dependencies {

testLibrary("org.jetbrains.kotlinx:kotlinx-coroutines-core:1.0.0")
testLibrary("org.jetbrains.kotlinx:kotlinx-coroutines-reactor:1.0.0")
testLibrary("io.vertx:vertx-lang-kotlin-coroutines:3.6.0")
}

kotlin {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.javaagent.instrumentation.kotlinxcoroutines;

import static io.opentelemetry.javaagent.extension.matcher.AgentElementMatchers.extendsClass;
import static io.opentelemetry.javaagent.extension.matcher.AgentElementMatchers.hasClassesNamed;
import static net.bytebuddy.matcher.ElementMatchers.named;
import static net.bytebuddy.matcher.ElementMatchers.takesArgument;

import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer;
import net.bytebuddy.asm.Advice;
import net.bytebuddy.description.type.TypeDescription;
import net.bytebuddy.matcher.ElementMatcher;

public class KotlinCoroutineDispatcherInstrumentation implements TypeInstrumentation {

@Override
public ElementMatcher<ClassLoader> classLoaderOptimization() {
return hasClassesNamed("kotlinx.coroutines.CoroutineDispatcher");
}

@Override
public ElementMatcher<TypeDescription> typeMatcher() {
return extendsClass(named("kotlinx.coroutines.CoroutineDispatcher"));
}

@Override
public void transform(TypeTransformer transformer) {
transformer.applyAdviceToMethod(
named("dispatch").and(takesArgument(1, Runnable.class)),
this.getClass().getName() + "$StopContextPropagationAdvice");
}

@SuppressWarnings("unused")
public static class StopContextPropagationAdvice {

@Advice.OnMethodEnter
public static void enter(@Advice.Argument(value = 1, readOnly = false) Runnable runnable) {
if (runnable != null) {
runnable = RunnableWrapper.stopPropagation(runnable);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

package io.opentelemetry.javaagent.instrumentation.kotlinxcoroutines;

import static java.util.Collections.singletonList;
import static java.util.Arrays.asList;

import com.google.auto.service.AutoService;
import io.opentelemetry.javaagent.extension.instrumentation.InstrumentationModule;
Expand Down Expand Up @@ -35,6 +35,7 @@ public boolean isIndyModule() {

@Override
public List<TypeInstrumentation> typeInstrumentations() {
return singletonList(new KotlinCoroutinesInstrumentation());
return asList(
new KotlinCoroutinesInstrumentation(), new KotlinCoroutineDispatcherInstrumentation());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.javaagent.instrumentation.kotlinxcoroutines;

import io.opentelemetry.context.Context;
import io.opentelemetry.context.Scope;

public final class RunnableWrapper {

public static Runnable stopPropagation(Runnable runnable) {
return () -> {
try (Scope ignored = Context.root().makeCurrent()) {
runnable.run();
}
};
}

private RunnableWrapper() {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ import io.opentelemetry.instrumentation.testing.util.TelemetryDataUtil.orderByRo
import io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.equalTo
import io.opentelemetry.sdk.testing.assertj.TraceAssert
import io.opentelemetry.semconv.incubating.CodeIncubatingAttributes
import io.vertx.core.Vertx
import io.vertx.kotlin.coroutines.dispatcher
import kotlinx.coroutines.CompletableDeferred
import kotlinx.coroutines.CoroutineDispatcher
import kotlinx.coroutines.CoroutineScope
Expand All @@ -41,6 +43,7 @@ import kotlinx.coroutines.withTimeout
import kotlinx.coroutines.yield
import org.assertj.core.api.Assertions.assertThat
import org.junit.jupiter.api.AfterAll
import org.junit.jupiter.api.Assumptions
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.TestInstance
import org.junit.jupiter.api.extension.ExtensionContext
Expand All @@ -63,17 +66,20 @@ class KotlinCoroutinesInstrumentationTest {
companion object {
val threadPool = Executors.newFixedThreadPool(2)
val singleThread = Executors.newSingleThreadExecutor()
val vertx = Vertx.vertx()

@JvmStatic
@RegisterExtension
val testing = AgentInstrumentationExtension.create()
}

@AfterAll
fun shutdown() {
threadPool.shutdown()
singleThread.shutdown()
vertx.close()
}

@RegisterExtension
val testing = AgentInstrumentationExtension.create()

val tracer = testing.openTelemetry.getTracer("test")

@ParameterizedTest
Expand Down Expand Up @@ -517,6 +523,7 @@ class KotlinCoroutinesInstrumentationTest {
arguments(DispatcherWrapper(Dispatchers.Unconfined)),
arguments(DispatcherWrapper(threadPool.asCoroutineDispatcher())),
arguments(DispatcherWrapper(singleThread.asCoroutineDispatcher())),
arguments(DispatcherWrapper(vertx.dispatcher()))
)
}

Expand Down Expand Up @@ -559,4 +566,33 @@ class KotlinCoroutinesInstrumentationTest {
return otelContext.makeCurrent()
}
}

// regression test for
// https://github.com/open-telemetry/opentelemetry-java-instrumentation/issues/11411
@ParameterizedTest
@ArgumentsSource(DispatchersSource::class)
fun `dispatch does not propagate context`(dispatcher: DispatcherWrapper) {
Assumptions.assumeTrue(dispatcher.dispatcher != Dispatchers.Unconfined)

runTest(dispatcher) {
dispatcher.dispatcher.dispatch(coroutineContext) {
tracer.spanBuilder("dispatched").startSpan().end()
}
}

testing.waitAndAssertTraces(
{ trace ->
trace.hasSpansSatisfyingExactly({
it.hasName("parent")
.hasNoParent()
})
},
{ trace ->
trace.hasSpansSatisfyingExactly({
it.hasName("dispatched")
.hasNoParent()
})
}
)
}
}

0 comments on commit 0ea05a8

Please sign in to comment.