Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add async operation end strategy for kotlin coroutines flow #11168

Merged
merged 2 commits into from
Apr 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ fun createLanguageTask(
classFileVersion = ClassFileVersion.JAVA_V8
var transformationClassPath = inputClasspath
val compileTask = compileTaskProvider.get()
// this does not work for kotlin as compile task does not extend AbstractCompile
if (compileTask is AbstractCompile) {
val classesDirectory = compileTask.destinationDirectory.asFile.get()
val rawClassesDirectory: File = File(classesDirectory.parent, "${classesDirectory.name}raw")
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,8 @@ dependencies {
testImplementation(project(":instrumentation:reactor:reactor-3.1:library"))
testImplementation(project(":instrumentation-annotations"))

// Use first version with flow support since we have tests for it.
testLibrary("org.jetbrains.kotlinx:kotlinx-coroutines-core:1.3.0")
testLibrary("org.jetbrains.kotlinx:kotlinx-coroutines-reactor:1.3.0")
testLibrary("org.jetbrains.kotlinx:kotlinx-coroutines-core:1.0.0")
testLibrary("org.jetbrains.kotlinx:kotlinx-coroutines-reactor:1.0.0")
Comment on lines -43 to +44
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

}

tasks {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@

public final class AnnotationSingletons {

private static final String INSTRUMENTATION_NAME = "io.opentelemetry.kotlinx-coroutines";
private static final String INSTRUMENTATION_NAME = "io.opentelemetry.kotlinx-coroutines-1.0";

private static final Instrumenter<MethodRequest, Object> INSTRUMENTER = createInstrumenter();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import io.opentelemetry.extension.kotlin.asContextElement
import io.opentelemetry.extension.kotlin.getOpenTelemetryContext
import io.opentelemetry.instrumentation.annotations.SpanAttribute
import io.opentelemetry.instrumentation.annotations.WithSpan
import io.opentelemetry.instrumentation.reactor.v3_1.ContextPropagationOperator
import io.opentelemetry.instrumentation.testing.junit.AgentInstrumentationExtension
import io.opentelemetry.instrumentation.testing.util.TelemetryDataUtil.orderByRootSpanName
import io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.equalTo
Expand All @@ -31,16 +30,9 @@ import kotlinx.coroutines.ThreadContextElement
import kotlinx.coroutines.asCoroutineDispatcher
import kotlinx.coroutines.async
import kotlinx.coroutines.awaitAll
import kotlinx.coroutines.channels.produce
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.flow.consumeAsFlow
import kotlinx.coroutines.flow.onEach
import kotlinx.coroutines.launch
import kotlinx.coroutines.reactive.awaitSingle
import kotlinx.coroutines.reactive.collect
import kotlinx.coroutines.reactor.ReactorContext
import kotlinx.coroutines.reactor.flux
import kotlinx.coroutines.reactor.mono
import kotlinx.coroutines.runBlocking
import kotlinx.coroutines.selects.select
Expand Down Expand Up @@ -84,58 +76,6 @@ class KotlinCoroutinesInstrumentationTest {

val tracer = testing.openTelemetry.getTracer("test")

@ParameterizedTest
@ArgumentsSource(DispatchersSource::class)
fun `traced across channels`(dispatcher: DispatcherWrapper) {
runTest(dispatcher) {
val producer = produce {
repeat(3) {
tracedChild("produce_$it")
send(it)
}
}

producer.consumeAsFlow().onEach {
tracedChild("consume_$it")
}.collect()
}

testing.waitAndAssertTraces(
{ trace ->
trace.hasSpansSatisfyingExactlyInAnyOrder(
{
it.hasName("parent")
.hasNoParent()
},
{
it.hasName("produce_0")
.hasParent(trace.getSpan(0))
},
{
it.hasName("consume_0")
.hasParent(trace.getSpan(0))
},
{
it.hasName("produce_1")
.hasParent(trace.getSpan(0))
},
{
it.hasName("consume_1")
.hasParent(trace.getSpan(0))
},
{
it.hasName("produce_2")
.hasParent(trace.getSpan(0))
},
{
it.hasName("consume_2")
.hasParent(trace.getSpan(0))
},
)
},
)
}

@ParameterizedTest
@ArgumentsSource(DispatchersSource::class)
fun `cancellation prevents trace`(dispatcher: DispatcherWrapper) {
Expand Down Expand Up @@ -388,78 +328,6 @@ class KotlinCoroutinesInstrumentationTest {
)
}

@ParameterizedTest
@ArgumentsSource(DispatchersSource::class)
fun `traced mono with context propagation operator`(dispatcherWrapper: DispatcherWrapper) {
runTest(dispatcherWrapper) {
val currentContext = Context.current()
// clear current context to ensure that ContextPropagationOperator is used for context propagation
withContext(Context.root().asContextElement()) {
val mono = mono(dispatcherWrapper.dispatcher) {
// extract context from reactor and propagate it into coroutine
val reactorContext = coroutineContext[ReactorContext.Key]?.context
val otelContext = ContextPropagationOperator.getOpenTelemetryContext(reactorContext, Context.current())
withContext(otelContext.asContextElement()) {
tracedChild("child")
}
}
ContextPropagationOperator.runWithContext(mono, currentContext).awaitSingle()
}
}

testing.waitAndAssertTraces(
{ trace ->
trace.hasSpansSatisfyingExactly(
{
it.hasName("parent")
.hasNoParent()
},
{
it.hasName("child")
.hasParent(trace.getSpan(0))
},
)
},
)
}

@ParameterizedTest
@ArgumentsSource(DispatchersSource::class)
fun `traced flux`(dispatcherWrapper: DispatcherWrapper) {
runTest(dispatcherWrapper) {
flux(dispatcherWrapper.dispatcher) {
repeat(3) {
tracedChild("child_$it")
send(it)
}
}.collect {
}
}

testing.waitAndAssertTraces(
{ trace ->
trace.hasSpansSatisfyingExactly(
{
it.hasName("parent")
.hasNoParent()
},
{
it.hasName("child_0")
.hasParent(trace.getSpan(0))
},
{
it.hasName("child_1")
.hasParent(trace.getSpan(0))
},
{
it.hasName("child_2")
.hasParent(trace.getSpan(0))
},
)
},
)
}

private val animalKey: ContextKey<String> = ContextKey.named("animal")

@ParameterizedTest
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
import org.jetbrains.kotlin.gradle.tasks.KotlinCompile

// We are using a separate module for kotlin source instead of placing them in
// instrumentation/kotlinx-coroutines/kotlinx-coroutines-flow-1.3/javaagent because muzzle
// generation plugin currently doesn't handle kotlin sources correctly.
plugins {
id("org.jetbrains.kotlin.jvm")
id("otel.java-conventions")
}

dependencies {
compileOnly("org.jetbrains.kotlinx:kotlinx-coroutines-core:1.3.0")
compileOnly("org.jetbrains.kotlin:kotlin-stdlib-jdk8")
compileOnly(project(":instrumentation-api"))
}

tasks {
withType(KotlinCompile::class).configureEach {
kotlinOptions {
jvmTarget = "1.8"
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.javaagent.instrumentation.kotlinxcoroutines.flow

import io.opentelemetry.context.Context
import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.onCompletion

fun <REQUEST, RESPONSE> onComplete(flow: Flow<*>, instrumenter: Instrumenter<REQUEST, RESPONSE>, context: Context, request: REQUEST): Flow<*> {
return flow.onCompletion { cause: Throwable? ->
instrumenter.end(context, request, null, cause)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
import org.jetbrains.kotlin.gradle.tasks.KotlinCompile

plugins {
id("org.jetbrains.kotlin.jvm")
id("otel.javaagent-instrumentation")
}

muzzle {
pass {
group.set("org.jetbrains.kotlinx")
module.set("kotlinx-coroutines-core")
versions.set("[1.3.0,1.3.8)")
}
// 1.3.9 (and beyond?) have changed how artifact names are resolved due to multiplatform variants
pass {
group.set("org.jetbrains.kotlinx")
module.set("kotlinx-coroutines-core-jvm")
versions.set("[1.3.9,)")
}
}

dependencies {
library("org.jetbrains.kotlinx:kotlinx-coroutines-core:1.3.0")
compileOnly(project(":instrumentation-annotations-support"))
implementation(project(":instrumentation:kotlinx-coroutines:kotlinx-coroutines-flow-1.3:javaagent-kotlin"))

testInstrumentation(project(":instrumentation:kotlinx-coroutines:kotlinx-coroutines-1.0:javaagent"))
testInstrumentation(project(":instrumentation:opentelemetry-extension-kotlin-1.0:javaagent"))
testInstrumentation(project(":instrumentation:reactor:reactor-3.1:javaagent"))

testImplementation("io.opentelemetry:opentelemetry-extension-kotlin")
testImplementation("org.jetbrains.kotlin:kotlin-stdlib-jdk8")
testImplementation(project(":instrumentation:reactor:reactor-3.1:library"))
testImplementation(project(":instrumentation-annotations"))

testLibrary("org.jetbrains.kotlinx:kotlinx-coroutines-reactor:1.3.0")
}

tasks {
withType(KotlinCompile::class).configureEach {
kotlinOptions {
jvmTarget = "1.8"
}
}
withType<Test>().configureEach {
jvmArgs("-Dio.opentelemetry.javaagent.shaded.io.opentelemetry.context.enableStrictContext=false")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🥲

}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.javaagent.instrumentation.kotlinxcoroutines.flow;

import static net.bytebuddy.matcher.ElementMatchers.isConstructor;
import static net.bytebuddy.matcher.ElementMatchers.namedOneOf;

import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer;
import net.bytebuddy.asm.Advice;
import net.bytebuddy.description.type.TypeDescription;
import net.bytebuddy.matcher.ElementMatcher;

public class AbstractFlowInstrumentation implements TypeInstrumentation {

@Override
public ElementMatcher<TypeDescription> typeMatcher() {
return namedOneOf("kotlinx.coroutines.flow.AbstractFlow", "kotlinx.coroutines.flow.SafeFlow");
}

@Override
public void transform(TypeTransformer transformer) {
transformer.applyAdviceToMethod(
isConstructor(), this.getClass().getName() + "$ConstructorAdvice");
}

@SuppressWarnings("unused")
public static class ConstructorAdvice {

@Advice.OnMethodEnter
public static void enter() {
FlowInstrumentationHelper.initialize();
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.javaagent.instrumentation.kotlinxcoroutines.flow;

import io.opentelemetry.context.Context;
import io.opentelemetry.instrumentation.api.annotation.support.async.AsyncOperationEndStrategies;
import io.opentelemetry.instrumentation.api.annotation.support.async.AsyncOperationEndStrategy;
import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter;
import kotlinx.coroutines.flow.Flow;

public final class FlowInstrumentationHelper {
private static final FlowAsyncOperationEndStrategy asyncOperationEndStrategy =
new FlowAsyncOperationEndStrategy();

static {
AsyncOperationEndStrategies.instance().registerStrategy(asyncOperationEndStrategy);
}

public static void initialize() {}

private FlowInstrumentationHelper() {}

private static final class FlowAsyncOperationEndStrategy implements AsyncOperationEndStrategy {

@Override
public boolean supports(Class<?> returnType) {
return Flow.class.isAssignableFrom(returnType);
}

@Override
public <REQUEST, RESPONSE> Object end(
Instrumenter<REQUEST, RESPONSE> instrumenter,
Context context,
REQUEST request,
Object asyncValue,
Class<RESPONSE> responseType) {
Flow<?> flow = (Flow<?>) asyncValue;
return FlowUtilKt.onComplete(flow, instrumenter, context, request);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.javaagent.instrumentation.kotlinxcoroutines.flow;

import static java.util.Collections.singletonList;

import com.google.auto.service.AutoService;
import io.opentelemetry.javaagent.extension.instrumentation.InstrumentationModule;
import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
import java.util.List;

@AutoService(InstrumentationModule.class)
public class KotlinCoroutinesFlowInstrumentationModule extends InstrumentationModule {

public KotlinCoroutinesFlowInstrumentationModule() {
super("kotlinx-coroutines", "kotlinx-coroutines-flow");
}

@Override
public List<TypeInstrumentation> typeInstrumentations() {
return singletonList(new AbstractFlowInstrumentation());
}
}
Loading
Loading