Skip to content

Commit

Permalink
Configure kafka metrics reporter as class (#10855)
Browse files Browse the repository at this point in the history
  • Loading branch information
laurit committed Mar 14, 2024
1 parent dcf859a commit 61ca213
Show file tree
Hide file tree
Showing 4 changed files with 54 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,11 @@
package io.opentelemetry.javaagent.instrumentation.kafkaclients.v0_11.metrics;

import io.opentelemetry.api.GlobalOpenTelemetry;
import io.opentelemetry.instrumentation.kafka.internal.MetricsReporterList;
import io.opentelemetry.instrumentation.kafka.internal.OpenTelemetryMetricsReporter;
import io.opentelemetry.instrumentation.kafka.internal.OpenTelemetrySupplier;
import io.opentelemetry.javaagent.bootstrap.internal.DeprecatedConfigProperties;
import io.opentelemetry.javaagent.bootstrap.internal.InstrumentationConfig;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import org.apache.kafka.clients.CommonClientConfigs;
Expand All @@ -34,21 +34,24 @@ public static void enhanceConfig(Map<? super String, Object> config) {
}
config.merge(
CommonClientConfigs.METRIC_REPORTER_CLASSES_CONFIG,
OpenTelemetryMetricsReporter.class.getName(),
MetricsReporterList.singletonList(OpenTelemetryMetricsReporter.class),
(class1, class2) -> {
// class1 is either a class name or List of class names or classes
if (class1 instanceof List) {
List<Object> result = new ArrayList<>();
List<Object> result = new MetricsReporterList<>();
result.addAll((List<Object>) class1);
result.add(class2);
result.addAll((List<Object>) class2);
return result;
} else if (class1 instanceof String) {
String className1 = (String) class1;
if (className1.isEmpty()) {
return class2;
}
}
return class1 + "," + class2;
List<Object> result = new MetricsReporterList<>();
result.add(class1);
result.addAll((List<Object>) class2);
return result;
});
config.put(
OpenTelemetryMetricsReporter.CONFIG_KEY_OPENTELEMETRY_SUPPLIER,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ protected Map<String, Object> producerConfig() {
producerConfig.merge(
CommonClientConfigs.METRIC_REPORTER_CLASSES_CONFIG,
TestMetricsReporter.class.getName(),
(o, o2) -> o + "," + o2);
AbstractOpenTelemetryMetricsReporterTest::mergeValue);
return producerConfig;
}

Expand All @@ -134,10 +134,18 @@ protected Map<String, Object> consumerConfig() {
consumerConfig.merge(
CommonClientConfigs.METRIC_REPORTER_CLASSES_CONFIG,
TestMetricsReporter.class.getName(),
(o, o2) -> o + "," + o2);
AbstractOpenTelemetryMetricsReporterTest::mergeValue);
return consumerConfig;
}

@SuppressWarnings("unchecked")
private static Object mergeValue(Object o1, Object o2) {
List<Object> result = new MetricsReporterList<>();
result.addAll((List<Object>) o1);
result.add(o2);
return result;
}

@Test
void noDuplicateMetricsReporter() {
List<MetricsReporter> producerMetricsReporters = getMetricsReporters(producer);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import io.opentelemetry.instrumentation.kafka.internal.KafkaProducerRequest;
import io.opentelemetry.instrumentation.kafka.internal.KafkaReceiveRequest;
import io.opentelemetry.instrumentation.kafka.internal.KafkaUtil;
import io.opentelemetry.instrumentation.kafka.internal.MetricsReporterList;
import io.opentelemetry.instrumentation.kafka.internal.OpenTelemetryMetricsReporter;
import io.opentelemetry.instrumentation.kafka.internal.OpenTelemetrySupplier;
import io.opentelemetry.instrumentation.kafka.internal.TracingList;
Expand Down Expand Up @@ -196,7 +197,7 @@ <K, V> ConsumerRecords<K, V> addTracing(
Map<String, Object> config = new HashMap<>();
config.put(
CommonClientConfigs.METRIC_REPORTER_CLASSES_CONFIG,
OpenTelemetryMetricsReporter.class.getName());
MetricsReporterList.singletonList(OpenTelemetryMetricsReporter.class));
config.put(
OpenTelemetryMetricsReporter.CONFIG_KEY_OPENTELEMETRY_SUPPLIER,
new OpenTelemetrySupplier(openTelemetry));
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.instrumentation.kafka.internal;

import java.util.ArrayList;
import java.util.List;

/**
* List implementation that can be used to hold metrics reporters in kafka configuration without
* breaking serialization. When this list is serialized it removes OpenTelemetryMetricsReporter to
* ensure that the configuration can be deserialized even when the instrumentation is not present.
*
* <p>This class is internal and is hence not for public use. Its APIs are unstable and can change
* at any time.
*/
public class MetricsReporterList<T> extends ArrayList<T> {
private static final long serialVersionUID = 1L;

public static <T> List<T> singletonList(T o) {
List<T> list = new MetricsReporterList<>();
list.add(o);
return list;
}

private Object writeReplace() {
// serialize to plain ArrayList that does not contain OpenTelemetryMetricsReporter
List<Object> result = new ArrayList<>();
this.stream().filter(x -> x != OpenTelemetryMetricsReporter.class).forEach(result::add);
return result;
}
}

0 comments on commit 61ca213

Please sign in to comment.