Skip to content

Commit

Permalink
Add async operation end strategy for kotlin coroutines flow (#11168)
Browse files Browse the repository at this point in the history
  • Loading branch information
laurit committed Apr 30, 2024
1 parent acef37c commit ac2e8e1
Show file tree
Hide file tree
Showing 26 changed files with 487 additions and 138 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ fun createLanguageTask(
classFileVersion = ClassFileVersion.JAVA_V8
var transformationClassPath = inputClasspath
val compileTask = compileTaskProvider.get()
// this does not work for kotlin as compile task does not extend AbstractCompile
if (compileTask is AbstractCompile) {
val classesDirectory = compileTask.destinationDirectory.asFile.get()
val rawClassesDirectory: File = File(classesDirectory.parent, "${classesDirectory.name}raw")
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,8 @@ dependencies {
testImplementation(project(":instrumentation:reactor:reactor-3.1:library"))
testImplementation(project(":instrumentation-annotations"))

// Use first version with flow support since we have tests for it.
testLibrary("org.jetbrains.kotlinx:kotlinx-coroutines-core:1.3.0")
testLibrary("org.jetbrains.kotlinx:kotlinx-coroutines-reactor:1.3.0")
testLibrary("org.jetbrains.kotlinx:kotlinx-coroutines-core:1.0.0")
testLibrary("org.jetbrains.kotlinx:kotlinx-coroutines-reactor:1.0.0")
}

tasks {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@

public final class AnnotationSingletons {

private static final String INSTRUMENTATION_NAME = "io.opentelemetry.kotlinx-coroutines";
private static final String INSTRUMENTATION_NAME = "io.opentelemetry.kotlinx-coroutines-1.0";

private static final Instrumenter<MethodRequest, Object> INSTRUMENTER = createInstrumenter();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import io.opentelemetry.extension.kotlin.asContextElement
import io.opentelemetry.extension.kotlin.getOpenTelemetryContext
import io.opentelemetry.instrumentation.annotations.SpanAttribute
import io.opentelemetry.instrumentation.annotations.WithSpan
import io.opentelemetry.instrumentation.reactor.v3_1.ContextPropagationOperator
import io.opentelemetry.instrumentation.testing.junit.AgentInstrumentationExtension
import io.opentelemetry.instrumentation.testing.util.TelemetryDataUtil.orderByRootSpanName
import io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.equalTo
Expand All @@ -31,16 +30,9 @@ import kotlinx.coroutines.ThreadContextElement
import kotlinx.coroutines.asCoroutineDispatcher
import kotlinx.coroutines.async
import kotlinx.coroutines.awaitAll
import kotlinx.coroutines.channels.produce
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.flow.consumeAsFlow
import kotlinx.coroutines.flow.onEach
import kotlinx.coroutines.launch
import kotlinx.coroutines.reactive.awaitSingle
import kotlinx.coroutines.reactive.collect
import kotlinx.coroutines.reactor.ReactorContext
import kotlinx.coroutines.reactor.flux
import kotlinx.coroutines.reactor.mono
import kotlinx.coroutines.runBlocking
import kotlinx.coroutines.selects.select
Expand Down Expand Up @@ -84,58 +76,6 @@ class KotlinCoroutinesInstrumentationTest {

val tracer = testing.openTelemetry.getTracer("test")

@ParameterizedTest
@ArgumentsSource(DispatchersSource::class)
fun `traced across channels`(dispatcher: DispatcherWrapper) {
runTest(dispatcher) {
val producer = produce {
repeat(3) {
tracedChild("produce_$it")
send(it)
}
}

producer.consumeAsFlow().onEach {
tracedChild("consume_$it")
}.collect()
}

testing.waitAndAssertTraces(
{ trace ->
trace.hasSpansSatisfyingExactlyInAnyOrder(
{
it.hasName("parent")
.hasNoParent()
},
{
it.hasName("produce_0")
.hasParent(trace.getSpan(0))
},
{
it.hasName("consume_0")
.hasParent(trace.getSpan(0))
},
{
it.hasName("produce_1")
.hasParent(trace.getSpan(0))
},
{
it.hasName("consume_1")
.hasParent(trace.getSpan(0))
},
{
it.hasName("produce_2")
.hasParent(trace.getSpan(0))
},
{
it.hasName("consume_2")
.hasParent(trace.getSpan(0))
},
)
},
)
}

@ParameterizedTest
@ArgumentsSource(DispatchersSource::class)
fun `cancellation prevents trace`(dispatcher: DispatcherWrapper) {
Expand Down Expand Up @@ -388,78 +328,6 @@ class KotlinCoroutinesInstrumentationTest {
)
}

@ParameterizedTest
@ArgumentsSource(DispatchersSource::class)
fun `traced mono with context propagation operator`(dispatcherWrapper: DispatcherWrapper) {
runTest(dispatcherWrapper) {
val currentContext = Context.current()
// clear current context to ensure that ContextPropagationOperator is used for context propagation
withContext(Context.root().asContextElement()) {
val mono = mono(dispatcherWrapper.dispatcher) {
// extract context from reactor and propagate it into coroutine
val reactorContext = coroutineContext[ReactorContext.Key]?.context
val otelContext = ContextPropagationOperator.getOpenTelemetryContext(reactorContext, Context.current())
withContext(otelContext.asContextElement()) {
tracedChild("child")
}
}
ContextPropagationOperator.runWithContext(mono, currentContext).awaitSingle()
}
}

testing.waitAndAssertTraces(
{ trace ->
trace.hasSpansSatisfyingExactly(
{
it.hasName("parent")
.hasNoParent()
},
{
it.hasName("child")
.hasParent(trace.getSpan(0))
},
)
},
)
}

@ParameterizedTest
@ArgumentsSource(DispatchersSource::class)
fun `traced flux`(dispatcherWrapper: DispatcherWrapper) {
runTest(dispatcherWrapper) {
flux(dispatcherWrapper.dispatcher) {
repeat(3) {
tracedChild("child_$it")
send(it)
}
}.collect {
}
}

testing.waitAndAssertTraces(
{ trace ->
trace.hasSpansSatisfyingExactly(
{
it.hasName("parent")
.hasNoParent()
},
{
it.hasName("child_0")
.hasParent(trace.getSpan(0))
},
{
it.hasName("child_1")
.hasParent(trace.getSpan(0))
},
{
it.hasName("child_2")
.hasParent(trace.getSpan(0))
},
)
},
)
}

private val animalKey: ContextKey<String> = ContextKey.named("animal")

@ParameterizedTest
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
import org.jetbrains.kotlin.gradle.tasks.KotlinCompile

// We are using a separate module for kotlin source instead of placing them in
// instrumentation/kotlinx-coroutines/kotlinx-coroutines-flow-1.3/javaagent because muzzle
// generation plugin currently doesn't handle kotlin sources correctly.
plugins {
id("org.jetbrains.kotlin.jvm")
id("otel.java-conventions")
}

dependencies {
compileOnly("org.jetbrains.kotlinx:kotlinx-coroutines-core:1.3.0")
compileOnly("org.jetbrains.kotlin:kotlin-stdlib-jdk8")
compileOnly(project(":instrumentation-api"))
}

tasks {
withType(KotlinCompile::class).configureEach {
kotlinOptions {
jvmTarget = "1.8"
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.javaagent.instrumentation.kotlinxcoroutines.flow

import io.opentelemetry.context.Context
import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.onCompletion

fun <REQUEST, RESPONSE> onComplete(flow: Flow<*>, instrumenter: Instrumenter<REQUEST, RESPONSE>, context: Context, request: REQUEST): Flow<*> {
return flow.onCompletion { cause: Throwable? ->
instrumenter.end(context, request, null, cause)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
import org.jetbrains.kotlin.gradle.tasks.KotlinCompile

plugins {
id("org.jetbrains.kotlin.jvm")
id("otel.javaagent-instrumentation")
}

muzzle {
pass {
group.set("org.jetbrains.kotlinx")
module.set("kotlinx-coroutines-core")
versions.set("[1.3.0,1.3.8)")
}
// 1.3.9 (and beyond?) have changed how artifact names are resolved due to multiplatform variants
pass {
group.set("org.jetbrains.kotlinx")
module.set("kotlinx-coroutines-core-jvm")
versions.set("[1.3.9,)")
}
}

dependencies {
library("org.jetbrains.kotlinx:kotlinx-coroutines-core:1.3.0")
compileOnly(project(":instrumentation-annotations-support"))
implementation(project(":instrumentation:kotlinx-coroutines:kotlinx-coroutines-flow-1.3:javaagent-kotlin"))

testInstrumentation(project(":instrumentation:kotlinx-coroutines:kotlinx-coroutines-1.0:javaagent"))
testInstrumentation(project(":instrumentation:opentelemetry-extension-kotlin-1.0:javaagent"))
testInstrumentation(project(":instrumentation:reactor:reactor-3.1:javaagent"))

testImplementation("io.opentelemetry:opentelemetry-extension-kotlin")
testImplementation("org.jetbrains.kotlin:kotlin-stdlib-jdk8")
testImplementation(project(":instrumentation:reactor:reactor-3.1:library"))
testImplementation(project(":instrumentation-annotations"))

testLibrary("org.jetbrains.kotlinx:kotlinx-coroutines-reactor:1.3.0")
}

tasks {
withType(KotlinCompile::class).configureEach {
kotlinOptions {
jvmTarget = "1.8"
}
}
withType<Test>().configureEach {
jvmArgs("-Dio.opentelemetry.javaagent.shaded.io.opentelemetry.context.enableStrictContext=false")
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.javaagent.instrumentation.kotlinxcoroutines.flow;

import static net.bytebuddy.matcher.ElementMatchers.isConstructor;
import static net.bytebuddy.matcher.ElementMatchers.namedOneOf;

import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer;
import net.bytebuddy.asm.Advice;
import net.bytebuddy.description.type.TypeDescription;
import net.bytebuddy.matcher.ElementMatcher;

public class AbstractFlowInstrumentation implements TypeInstrumentation {

@Override
public ElementMatcher<TypeDescription> typeMatcher() {
return namedOneOf("kotlinx.coroutines.flow.AbstractFlow", "kotlinx.coroutines.flow.SafeFlow");
}

@Override
public void transform(TypeTransformer transformer) {
transformer.applyAdviceToMethod(
isConstructor(), this.getClass().getName() + "$ConstructorAdvice");
}

@SuppressWarnings("unused")
public static class ConstructorAdvice {

@Advice.OnMethodEnter
public static void enter() {
FlowInstrumentationHelper.initialize();
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.javaagent.instrumentation.kotlinxcoroutines.flow;

import io.opentelemetry.context.Context;
import io.opentelemetry.instrumentation.api.annotation.support.async.AsyncOperationEndStrategies;
import io.opentelemetry.instrumentation.api.annotation.support.async.AsyncOperationEndStrategy;
import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter;
import kotlinx.coroutines.flow.Flow;

public final class FlowInstrumentationHelper {
private static final FlowAsyncOperationEndStrategy asyncOperationEndStrategy =
new FlowAsyncOperationEndStrategy();

static {
AsyncOperationEndStrategies.instance().registerStrategy(asyncOperationEndStrategy);
}

public static void initialize() {}

private FlowInstrumentationHelper() {}

private static final class FlowAsyncOperationEndStrategy implements AsyncOperationEndStrategy {

@Override
public boolean supports(Class<?> returnType) {
return Flow.class.isAssignableFrom(returnType);
}

@Override
public <REQUEST, RESPONSE> Object end(
Instrumenter<REQUEST, RESPONSE> instrumenter,
Context context,
REQUEST request,
Object asyncValue,
Class<RESPONSE> responseType) {
Flow<?> flow = (Flow<?>) asyncValue;
return FlowUtilKt.onComplete(flow, instrumenter, context, request);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.javaagent.instrumentation.kotlinxcoroutines.flow;

import static java.util.Collections.singletonList;

import com.google.auto.service.AutoService;
import io.opentelemetry.javaagent.extension.instrumentation.InstrumentationModule;
import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
import java.util.List;

@AutoService(InstrumentationModule.class)
public class KotlinCoroutinesFlowInstrumentationModule extends InstrumentationModule {

public KotlinCoroutinesFlowInstrumentationModule() {
super("kotlinx-coroutines", "kotlinx-coroutines-flow");
}

@Override
public List<TypeInstrumentation> typeInstrumentations() {
return singletonList(new AbstractFlowInstrumentation());
}
}
Loading

0 comments on commit ac2e8e1

Please sign in to comment.