Skip to content

Commit

Permalink
Spring-kafka single record instrumentation (#5904) (#5907)
Browse files Browse the repository at this point in the history
Co-authored-by: Mateusz Rzeszutek <mrzeszutek@splunk.com>
  • Loading branch information
github-actions[bot] and Mateusz Rzeszutek committed Apr 22, 2022
1 parent d018791 commit 74f16c0
Show file tree
Hide file tree
Showing 20 changed files with 504 additions and 226 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,13 @@ public final class KafkaClientsConsumerProcessTracing {

private KafkaClientsConsumerProcessTracing() {}

public static void enableWrapping() {
wrappingEnabled.set(true);
}

public static void disableWrapping() {
wrappingEnabled.set(false);
public static boolean setEnabled(boolean enabled) {
boolean previous = wrappingEnabled.get();
wrappingEnabled.set(enabled);
return previous;
}

public static boolean wrappingEnabled() {
return wrappingEnabled.get() == true;
return wrappingEnabled.get();
}
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,14 @@
import io.opentelemetry.instrumentation.api.field.VirtualField;
import io.opentelemetry.instrumentation.kafka.internal.ReceivedRecords;
import io.opentelemetry.instrumentation.kafka.internal.Timer;
import io.opentelemetry.javaagent.bootstrap.kafka.KafkaClientsConsumerProcessTracing;
import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer;
import java.time.Duration;
import net.bytebuddy.asm.Advice;
import net.bytebuddy.description.type.TypeDescription;
import net.bytebuddy.matcher.ElementMatcher;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;

public class KafkaConsumerInstrumentation implements TypeInstrumentation {
Expand Down Expand Up @@ -74,6 +76,18 @@ public static void onExit(
VirtualField<ConsumerRecords<?, ?>, Context> consumerRecordsContext =
VirtualField.find(ConsumerRecords.class, Context.class);
consumerRecordsContext.set(records, context);

// disable process tracing and store the receive span for each individual record too
boolean previousValue = KafkaClientsConsumerProcessTracing.setEnabled(false);
try {
VirtualField<ConsumerRecord<?, ?>, Context> consumerRecordContext =
VirtualField.find(ConsumerRecord.class, Context.class);
for (ConsumerRecord<?, ?> record : records) {
consumerRecordContext.set(record, context);
}
} finally {
KafkaClientsConsumerProcessTracing.setEnabled(previousValue);
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,11 @@

import io.opentelemetry.context.Context;
import io.opentelemetry.javaagent.bootstrap.kafka.KafkaClientsConsumerProcessTracing;
import io.opentelemetry.javaagent.bootstrap.kafka.KafkaClientsConsumerProcessWrapper;
import java.util.Iterator;
import javax.annotation.Nullable;
import org.apache.kafka.clients.consumer.ConsumerRecord;

public class TracingIterable<K, V>
implements Iterable<ConsumerRecord<K, V>>,
KafkaClientsConsumerProcessWrapper<Iterable<ConsumerRecord<K, V>>> {
public class TracingIterable<K, V> implements Iterable<ConsumerRecord<K, V>> {
private final Iterable<ConsumerRecord<K, V>> delegate;
@Nullable private final Context receiveContext;
private boolean firstIterator = true;
Expand Down Expand Up @@ -48,9 +45,4 @@ public Iterator<ConsumerRecord<K, V>> iterator() {

return it;
}

@Override
public Iterable<ConsumerRecord<K, V>> unwrap() {
return delegate;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,11 @@
import io.opentelemetry.context.Context;
import io.opentelemetry.context.Scope;
import io.opentelemetry.javaagent.bootstrap.kafka.KafkaClientsConsumerProcessTracing;
import io.opentelemetry.javaagent.bootstrap.kafka.KafkaClientsConsumerProcessWrapper;
import java.util.Iterator;
import javax.annotation.Nullable;
import org.apache.kafka.clients.consumer.ConsumerRecord;

public class TracingIterator<K, V>
implements Iterator<ConsumerRecord<K, V>>,
KafkaClientsConsumerProcessWrapper<Iterator<ConsumerRecord<K, V>>> {
public class TracingIterator<K, V> implements Iterator<ConsumerRecord<K, V>> {
private final Iterator<ConsumerRecord<K, V>> delegateIterator;
private final Context parentContext;

Expand Down Expand Up @@ -79,9 +76,4 @@ private void closeScopeAndEndSpan() {
public void remove() {
delegateIterator.remove();
}

@Override
public Iterator<ConsumerRecord<K, V>> unwrap() {
return delegateIterator;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import io.opentelemetry.api.OpenTelemetry;
import io.opentelemetry.instrumentation.api.config.ExperimentalConfig;
import io.opentelemetry.instrumentation.api.instrumenter.AttributesExtractor;
import io.opentelemetry.instrumentation.api.instrumenter.ErrorCauseExtractor;
import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter;
import io.opentelemetry.instrumentation.api.instrumenter.InstrumenterBuilder;
import io.opentelemetry.instrumentation.api.instrumenter.SpanKindExtractor;
Expand Down Expand Up @@ -81,14 +82,35 @@ public static Instrumenter<ReceivedRecords, Void> createConsumerReceiveInstrumen
instrumentationName,
GlobalOpenTelemetry.get(),
MessageOperation.PROCESS,
Collections.emptyList());
Collections.emptyList(),
ErrorCauseExtractor.jdk());
}

public static Instrumenter<ConsumerRecord<?, ?>, Void> createConsumerProcessInstrumenter(
String instrumentationName, ErrorCauseExtractor errorCauseExtractor) {
return createConsumerOperationInstrumenter(
instrumentationName,
GlobalOpenTelemetry.get(),
MessageOperation.PROCESS,
Collections.emptyList(),
errorCauseExtractor);
}

public static Instrumenter<ConsumerRecord<?, ?>, Void> createConsumerOperationInstrumenter(
String instrumentationName,
OpenTelemetry openTelemetry,
MessageOperation operation,
Iterable<AttributesExtractor<ConsumerRecord<?, ?>, Void>> extractors) {
return createConsumerOperationInstrumenter(
instrumentationName, openTelemetry, operation, extractors, ErrorCauseExtractor.jdk());
}

private static Instrumenter<ConsumerRecord<?, ?>, Void> createConsumerOperationInstrumenter(
String instrumentationName,
OpenTelemetry openTelemetry,
MessageOperation operation,
Iterable<AttributesExtractor<ConsumerRecord<?, ?>, Void>> extractors,
ErrorCauseExtractor errorCauseExtractor) {

KafkaConsumerAttributesGetter getter = KafkaConsumerAttributesGetter.INSTANCE;

Expand All @@ -99,7 +121,8 @@ public static Instrumenter<ReceivedRecords, Void> createConsumerReceiveInstrumen
MessagingSpanNameExtractor.create(getter, operation))
.addAttributesExtractor(MessagingAttributesExtractor.create(getter, operation))
.addAttributesExtractor(new KafkaConsumerAdditionalAttributesExtractor())
.addAttributesExtractors(extractors);
.addAttributesExtractors(extractors)
.setErrorCauseExtractor(errorCauseExtractor);
if (KafkaConsumerExperimentalAttributesExtractor.isEnabled()) {
builder.addAttributesExtractor(new KafkaConsumerExperimentalAttributesExtractor());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,23 +5,15 @@

package io.opentelemetry.javaagent.instrumentation.kafkastreams;

import static net.bytebuddy.matcher.ElementMatchers.isPrivate;
import static net.bytebuddy.matcher.ElementMatchers.named;
import static net.bytebuddy.matcher.ElementMatchers.returns;

import io.opentelemetry.context.Context;
import io.opentelemetry.instrumentation.api.field.VirtualField;
import io.opentelemetry.javaagent.bootstrap.kafka.KafkaClientsConsumerProcessTracing;
import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer;
import net.bytebuddy.asm.Advice;
import net.bytebuddy.description.type.TypeDescription;
import net.bytebuddy.matcher.ElementMatcher;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;

// This instrumentation copies the receive CONSUMER span context from the ConsumerRecords aggregate
// object to each individual record
public class StreamThreadInstrumentation implements TypeInstrumentation {

@Override
Expand All @@ -31,47 +23,20 @@ public ElementMatcher<TypeDescription> typeMatcher() {

@Override
public void transform(TypeTransformer transformer) {
transformer.applyAdviceToMethod(
named("pollRequests")
.and(isPrivate())
.and(returns(named("org.apache.kafka.clients.consumer.ConsumerRecords"))),
this.getClass().getName() + "$PollRecordsAdvice");
transformer.applyAdviceToMethod(named("runLoop"), this.getClass().getName() + "$RunLoopAdvice");
}

@SuppressWarnings("unused")
public static class PollRecordsAdvice {
@Advice.OnMethodExit(suppress = Throwable.class)
public static void onExit(@Advice.Return ConsumerRecords<?, ?> records) {
if (records.isEmpty()) {
return;
}

Context receiveContext = VirtualField.find(ConsumerRecords.class, Context.class).get(records);
if (receiveContext == null) {
return;
}

VirtualField<ConsumerRecord<?, ?>, Context> singleRecordReceiveContext =
VirtualField.find(ConsumerRecord.class, Context.class);

for (ConsumerRecord<?, ?> record : records) {
singleRecordReceiveContext.set(record, receiveContext);
}
}
}

// this advice suppresses the CONSUMER spans created by the kafka-clients instrumentation
@SuppressWarnings("unused")
public static class RunLoopAdvice {
@Advice.OnMethodEnter(suppress = Throwable.class)
public static void onEnter() {
KafkaClientsConsumerProcessTracing.disableWrapping();
public static boolean onEnter() {
return KafkaClientsConsumerProcessTracing.setEnabled(false);
}

@Advice.OnMethodExit(suppress = Throwable.class)
public static void onExit() {
KafkaClientsConsumerProcessTracing.enableWrapping();
public static void onExit(@Advice.Enter boolean previousValue) {
KafkaClientsConsumerProcessTracing.setEnabled(previousValue);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,13 @@
import net.bytebuddy.asm.Advice;
import net.bytebuddy.description.type.TypeDescription;
import net.bytebuddy.matcher.ElementMatcher;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.springframework.kafka.listener.BatchInterceptor;
import org.springframework.kafka.listener.RecordInterceptor;

public class AbstractMessageListenerContainerInstrumentation implements TypeInstrumentation {

@Override
public ElementMatcher<TypeDescription> typeMatcher() {
return named("org.springframework.kafka.listener.AbstractMessageListenerContainer");
Expand All @@ -36,20 +39,48 @@ public void transform(TypeTransformer transformer) {
.and(takesArguments(0))
.and(returns(named("org.springframework.kafka.listener.BatchInterceptor"))),
this.getClass().getName() + "$GetBatchInterceptorAdvice");
// getRecordInterceptor() is called internally by AbstractMessageListenerContainer
// implementations
transformer.applyAdviceToMethod(
named("getRecordInterceptor")
.and(isProtected())
.and(takesArguments(0))
.and(returns(named("org.springframework.kafka.listener.RecordInterceptor"))),
this.getClass().getName() + "$GetRecordInterceptorAdvice");
}

@SuppressWarnings("unused")
public static class GetBatchInterceptorAdvice {

@Advice.OnMethodExit(suppress = Throwable.class)
public static <K, V> void onExit(
@Advice.Return(readOnly = false) BatchInterceptor<K, V> interceptor) {

if (!(interceptor instanceof InstrumentedBatchInterceptor)) {
VirtualField<ConsumerRecords<K, V>, Context> receiveContextVirtualField =
VirtualField<ConsumerRecords<K, V>, Context> receiveContextField =
VirtualField.find(ConsumerRecords.class, Context.class);
VirtualField<ConsumerRecords<K, V>, State<K, V>> stateStore =
VirtualField<ConsumerRecords<K, V>, State<ConsumerRecords<K, V>>> stateField =
VirtualField.find(ConsumerRecords.class, State.class);
interceptor =
new InstrumentedBatchInterceptor<>(receiveContextVirtualField, stateStore, interceptor);
new InstrumentedBatchInterceptor<>(receiveContextField, stateField, interceptor);
}
}
}

@SuppressWarnings("unused")
public static class GetRecordInterceptorAdvice {

@Advice.OnMethodExit(suppress = Throwable.class)
public static <K, V> void onExit(
@Advice.Return(readOnly = false) RecordInterceptor<K, V> interceptor) {

if (!(interceptor instanceof InstrumentedRecordInterceptor)) {
VirtualField<ConsumerRecord<K, V>, Context> receiveContextField =
VirtualField.find(ConsumerRecord.class, Context.class);
VirtualField<ConsumerRecord<K, V>, State<ConsumerRecord<K, V>>> stateField =
VirtualField.find(ConsumerRecord.class, State.class);
interceptor =
new InstrumentedRecordInterceptor<>(receiveContextField, stateField, interceptor);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

package io.opentelemetry.javaagent.instrumentation.spring.kafka;

import static io.opentelemetry.javaagent.instrumentation.spring.kafka.SpringKafkaSingletons.processInstrumenter;
import static io.opentelemetry.javaagent.instrumentation.spring.kafka.SpringKafkaSingletons.batchProcessInstrumenter;

import io.opentelemetry.context.Context;
import io.opentelemetry.context.Scope;
Expand All @@ -16,34 +16,35 @@
import org.springframework.kafka.listener.BatchInterceptor;

public final class InstrumentedBatchInterceptor<K, V> implements BatchInterceptor<K, V> {
private final VirtualField<ConsumerRecords<K, V>, Context> receiveContextVirtualField;
private final VirtualField<ConsumerRecords<K, V>, State<K, V>> stateStore;

private final VirtualField<ConsumerRecords<K, V>, Context> receiveContextField;
private final VirtualField<ConsumerRecords<K, V>, State<ConsumerRecords<K, V>>> stateField;
@Nullable private final BatchInterceptor<K, V> decorated;

public InstrumentedBatchInterceptor(
VirtualField<ConsumerRecords<K, V>, Context> receiveContextVirtualField,
VirtualField<ConsumerRecords<K, V>, State<K, V>> stateStore,
VirtualField<ConsumerRecords<K, V>, Context> receiveContextField,
VirtualField<ConsumerRecords<K, V>, State<ConsumerRecords<K, V>>> stateField,
@Nullable BatchInterceptor<K, V> decorated) {
this.receiveContextVirtualField = receiveContextVirtualField;
this.stateStore = stateStore;
this.receiveContextField = receiveContextField;
this.stateField = stateField;
this.decorated = decorated;
}

@Override
public ConsumerRecords<K, V> intercept(ConsumerRecords<K, V> records, Consumer<K, V> consumer) {
Context parentContext = getParentContext(records);

if (processInstrumenter().shouldStart(parentContext, records)) {
Context context = processInstrumenter().start(parentContext, records);
if (batchProcessInstrumenter().shouldStart(parentContext, records)) {
Context context = batchProcessInstrumenter().start(parentContext, records);
Scope scope = context.makeCurrent();
stateStore.set(records, State.create(records, context, scope));
stateField.set(records, State.create(records, context, scope));
}

return decorated == null ? records : decorated.intercept(records, consumer);
}

private Context getParentContext(ConsumerRecords<K, V> records) {
Context receiveContext = receiveContextVirtualField.get(records);
Context receiveContext = receiveContextField.get(records);

// use the receive CONSUMER span as parent if it's available
return receiveContext != null ? receiveContext : Context.current();
Expand All @@ -66,11 +67,11 @@ public void failure(ConsumerRecords<K, V> records, Exception exception, Consumer
}

private void end(ConsumerRecords<K, V> records, @Nullable Throwable error) {
State<K, V> state = stateStore.get(records);
stateStore.set(records, null);
State<ConsumerRecords<K, V>> state = stateField.get(records);
stateField.set(records, null);
if (state != null) {
state.scope().close();
processInstrumenter().end(state.context(), state.request(), null, error);
batchProcessInstrumenter().end(state.context(), state.request(), null, error);
}
}
}
Loading

0 comments on commit 74f16c0

Please sign in to comment.