Skip to content

Commit

Permalink
support otel.instrumentation.kafka.experimental-span-attributes in sp…
Browse files Browse the repository at this point in the history
…ring starter (#11263)
  • Loading branch information
zeitlinger committed May 15, 2024
1 parent 8cc79c8 commit 5be203a
Show file tree
Hide file tree
Showing 8 changed files with 88 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,16 @@
import io.opentelemetry.sdk.autoconfigure.internal.AutoConfigureUtil;
import io.opentelemetry.sdk.autoconfigure.internal.ComponentLoader;
import io.opentelemetry.sdk.autoconfigure.internal.SpiHelper;
import io.opentelemetry.sdk.autoconfigure.spi.ConfigProperties;
import io.opentelemetry.sdk.autoconfigure.spi.ResourceProvider;
import io.opentelemetry.sdk.autoconfigure.spi.internal.DefaultConfigProperties;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.context.properties.ConfigurationPropertiesBinding;
Expand Down Expand Up @@ -80,31 +86,47 @@ public ResourceProvider otelDistroVersionResourceProvider() {
}

@Bean
// If you change the bean name, also change it in the OpenTelemetryJdbcDriverAutoConfiguration
// class
public OpenTelemetry openTelemetry(
public AutoConfiguredOpenTelemetrySdk autoConfiguredOpenTelemetrySdk(
Environment env,
OtlpExporterProperties otlpExporterProperties,
OtelResourceProperties resourceProperties,
PropagationProperties propagationProperties,
OpenTelemetrySdkComponentLoader componentLoader) {

OpenTelemetry openTelemetry =
AutoConfigureUtil.setComponentLoader(
AutoConfigureUtil.setConfigPropertiesCustomizer(
AutoConfiguredOpenTelemetrySdk.builder(),
c ->
SpringConfigProperties.create(
env,
otlpExporterProperties,
resourceProperties,
propagationProperties,
c)),
componentLoader)
.build()
.getOpenTelemetrySdk();

return openTelemetry;
return AutoConfigureUtil.setComponentLoader(
AutoConfigureUtil.setConfigPropertiesCustomizer(
AutoConfiguredOpenTelemetrySdk.builder(),
c ->
SpringConfigProperties.create(
env,
otlpExporterProperties,
resourceProperties,
propagationProperties,
c)),
componentLoader)
.build();
}

@Bean
public OpenTelemetry openTelemetry(
AutoConfiguredOpenTelemetrySdk autoConfiguredOpenTelemetrySdk) {
return autoConfiguredOpenTelemetrySdk.getOpenTelemetrySdk();
}

/**
* Expose the {@link ConfigProperties} bean for use in other auto-configurations.
*
* <p>Why not use spring boot properties directly? <br>
* 1. issues with older spring boot versions <br>
* 2. support for {@link
* io.opentelemetry.sdk.autoconfigure.spi.AutoConfigurationCustomizer#addPropertiesCustomizer(Function)}
* and {@link
* io.opentelemetry.sdk.autoconfigure.spi.AutoConfigurationCustomizer#addPropertiesSupplier(Supplier)}
*/
@Bean
public ConfigProperties otelProperties(
AutoConfiguredOpenTelemetrySdk autoConfiguredOpenTelemetrySdk) {
return AutoConfigureUtil.getConfig(autoConfiguredOpenTelemetrySdk);
}
}

Expand All @@ -117,6 +139,21 @@ public static class DisabledOpenTelemetrySdkConfig {
public OpenTelemetry openTelemetry() {
return OpenTelemetry.noop();
}

@Bean
public ConfigProperties otelProperties() {
return DefaultConfigProperties.createFromMap(Collections.emptyMap());
}
}

@Configuration
@ConditionalOnBean(OpenTelemetry.class)
@ConditionalOnMissingBean({ConfigProperties.class})
public static class FallbackConfigProperties {
@Bean
public ConfigProperties otelProperties() {
return DefaultConfigProperties.create(Collections.emptyMap());
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,17 +7,21 @@

import io.opentelemetry.api.OpenTelemetry;
import io.opentelemetry.instrumentation.spring.kafka.v2_7.SpringKafkaTelemetry;
import io.opentelemetry.sdk.autoconfigure.spi.ConfigProperties;
import org.springframework.beans.factory.ObjectProvider;
import org.springframework.beans.factory.config.BeanPostProcessor;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;

class ConcurrentKafkaListenerContainerFactoryPostProcessor implements BeanPostProcessor {

private final ObjectProvider<OpenTelemetry> openTelemetryProvider;
private final ObjectProvider<ConfigProperties> configPropertiesProvider;

ConcurrentKafkaListenerContainerFactoryPostProcessor(
ObjectProvider<OpenTelemetry> openTelemetryProvider) {
ObjectProvider<OpenTelemetry> openTelemetryProvider,
ObjectProvider<ConfigProperties> configPropertiesProvider) {
this.openTelemetryProvider = openTelemetryProvider;
this.configPropertiesProvider = configPropertiesProvider;
}

@Override
Expand All @@ -29,7 +33,12 @@ public Object postProcessAfterInitialization(Object bean, String beanName) {
ConcurrentKafkaListenerContainerFactory<?, ?> listenerContainerFactory =
(ConcurrentKafkaListenerContainerFactory<?, ?>) bean;
SpringKafkaTelemetry springKafkaTelemetry =
SpringKafkaTelemetry.create(openTelemetryProvider.getObject());
SpringKafkaTelemetry.builder(openTelemetryProvider.getObject())
.setCaptureExperimentalSpanAttributes(
configPropertiesProvider
.getObject()
.getBoolean("otel.instrumentation.kafka.experimental-span-attributes", false))
.build();
listenerContainerFactory.setBatchInterceptor(springKafkaTelemetry.createBatchInterceptor());
listenerContainerFactory.setRecordInterceptor(springKafkaTelemetry.createRecordInterceptor());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import io.opentelemetry.api.OpenTelemetry;
import io.opentelemetry.instrumentation.kafkaclients.v2_6.KafkaTelemetry;
import io.opentelemetry.instrumentation.spring.autoconfigure.internal.ConditionalOnEnabledInstrumentation;
import io.opentelemetry.sdk.autoconfigure.spi.ConfigProperties;
import org.springframework.beans.factory.ObjectProvider;
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
import org.springframework.boot.autoconfigure.kafka.DefaultKafkaProducerFactoryCustomizer;
Expand All @@ -32,7 +33,9 @@ DefaultKafkaProducerFactoryCustomizer otelKafkaProducerFactoryCustomizer(
@Bean
static ConcurrentKafkaListenerContainerFactoryPostProcessor
otelKafkaListenerContainerFactoryBeanPostProcessor(
ObjectProvider<OpenTelemetry> openTelemetryProvider) {
return new ConcurrentKafkaListenerContainerFactoryPostProcessor(openTelemetryProvider);
ObjectProvider<OpenTelemetry> openTelemetryProvider,
ObjectProvider<ConfigProperties> configPropertiesProvider) {
return new ConcurrentKafkaListenerContainerFactoryPostProcessor(
openTelemetryProvider, configPropertiesProvider);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -285,6 +285,12 @@
"description": "Enable the Kafka instrumentation.",
"defaultValue": true
},
{
"name": "otel.instrumentation.kafka.experimental-span-attributes",
"type": "java.lang.Boolean",
"description": "Enable the capture of experimental Kafka span attributes.",
"defaultValue": false
},
{
"name": "otel.instrumentation.log4j-appender.enabled",
"type": "java.lang.Boolean",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,8 @@ void customOpenTelemetry() {
context ->
assertThat(context)
.hasBean("customOpenTelemetry")
.doesNotHaveBean("openTelemetry"));
.doesNotHaveBean("openTelemetry")
.hasBean("otelProperties"));
}

@Test
Expand All @@ -57,7 +58,7 @@ void customOpenTelemetry() {
void initializeProvidersAndOpenTelemetry() {
this.contextRunner
.withConfiguration(AutoConfigurations.of(OpenTelemetryAutoConfiguration.class))
.run(context -> assertThat(context).hasBean("openTelemetry"));
.run(context -> assertThat(context).hasBean("openTelemetry").hasBean("otelProperties"));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ void setUpContext() {
KafkaInstrumentationAutoConfiguration.class,
KafkaConfig.class))
.withPropertyValues(
"otel.instrumentation.kafka.experimental-span-attributes=true",
"spring.kafka.bootstrap-servers=" + kafka.getBootstrapServers(),
"spring.kafka.consumer.auto-offset-reset=earliest",
"spring.kafka.consumer.linger-ms=10",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.satisfies;

import io.opentelemetry.api.OpenTelemetry;
import io.opentelemetry.api.common.AttributeKey;
import io.opentelemetry.api.trace.SpanKind;
import io.opentelemetry.semconv.incubating.MessagingIncubatingAttributes;
import org.apache.kafka.clients.admin.NewTopic;
Expand Down Expand Up @@ -97,6 +98,9 @@ void shouldInstrumentProducerAndConsumer() {
equalTo(
MessagingIncubatingAttributes.MESSAGING_KAFKA_CONSUMER_GROUP,
"testListener"),
satisfies(
AttributeKey.longKey("kafka.record.queue_time_ms"),
AbstractLongAssert::isNotNegative),
satisfies(
MessagingIncubatingAttributes.MESSAGING_CLIENT_ID,
stringAssert -> stringAssert.startsWith("consumer"))),
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
otel:
instrumentation:
kafka:
experimental-span-attributes: true
logback-appender:
experimental:
capture-code-attributes: true
Expand Down

0 comments on commit 5be203a

Please sign in to comment.