Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Configure kafka metrics reporter as class #10855

Merged
merged 5 commits into from
Mar 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,11 @@
package io.opentelemetry.javaagent.instrumentation.kafkaclients.v0_11.metrics;

import io.opentelemetry.api.GlobalOpenTelemetry;
import io.opentelemetry.instrumentation.kafka.internal.MetricsReporterList;
import io.opentelemetry.instrumentation.kafka.internal.OpenTelemetryMetricsReporter;
import io.opentelemetry.instrumentation.kafka.internal.OpenTelemetrySupplier;
import io.opentelemetry.javaagent.bootstrap.internal.DeprecatedConfigProperties;
import io.opentelemetry.javaagent.bootstrap.internal.InstrumentationConfig;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import org.apache.kafka.clients.CommonClientConfigs;
Expand All @@ -34,21 +34,24 @@ public static void enhanceConfig(Map<? super String, Object> config) {
}
config.merge(
CommonClientConfigs.METRIC_REPORTER_CLASSES_CONFIG,
OpenTelemetryMetricsReporter.class.getName(),
MetricsReporterList.singletonList(OpenTelemetryMetricsReporter.class),
(class1, class2) -> {
// class1 is either a class name or List of class names or classes
if (class1 instanceof List) {
List<Object> result = new ArrayList<>();
List<Object> result = new MetricsReporterList<>();
result.addAll((List<Object>) class1);
result.add(class2);
result.addAll((List<Object>) class2);
return result;
} else if (class1 instanceof String) {
String className1 = (String) class1;
if (className1.isEmpty()) {
return class2;
}
}
return class1 + "," + class2;
List<Object> result = new MetricsReporterList<>();
result.add(class1);
result.addAll((List<Object>) class2);
return result;
});
config.put(
OpenTelemetryMetricsReporter.CONFIG_KEY_OPENTELEMETRY_SUPPLIER,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ protected Map<String, Object> producerConfig() {
producerConfig.merge(
CommonClientConfigs.METRIC_REPORTER_CLASSES_CONFIG,
TestMetricsReporter.class.getName(),
(o, o2) -> o + "," + o2);
AbstractOpenTelemetryMetricsReporterTest::mergeValue);
return producerConfig;
}

Expand All @@ -134,10 +134,18 @@ protected Map<String, Object> consumerConfig() {
consumerConfig.merge(
CommonClientConfigs.METRIC_REPORTER_CLASSES_CONFIG,
TestMetricsReporter.class.getName(),
(o, o2) -> o + "," + o2);
AbstractOpenTelemetryMetricsReporterTest::mergeValue);
return consumerConfig;
}

@SuppressWarnings("unchecked")
private static Object mergeValue(Object o1, Object o2) {
List<Object> result = new MetricsReporterList<>();
result.addAll((List<Object>) o1);
result.add(o2);
return result;
}
Comment on lines +141 to +147
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't follow why test needs to use the internal MetricsReporterList?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because we have a test that tries to serialize the configuration

and that test gets the configuration from this method. If we just use an ArrayList here then the configuration will contain the list that includes OpenTelemetryMetricsReporter.class which will fail in This is a bit convoluted, but the current behavior that depends on the thread context class loader being able to load the OpenTelemetryMetricsReporterTest isn't ideal either.


@Test
void noDuplicateMetricsReporter() {
List<MetricsReporter> producerMetricsReporters = getMetricsReporters(producer);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import io.opentelemetry.instrumentation.kafka.internal.KafkaProducerRequest;
import io.opentelemetry.instrumentation.kafka.internal.KafkaReceiveRequest;
import io.opentelemetry.instrumentation.kafka.internal.KafkaUtil;
import io.opentelemetry.instrumentation.kafka.internal.MetricsReporterList;
import io.opentelemetry.instrumentation.kafka.internal.OpenTelemetryMetricsReporter;
import io.opentelemetry.instrumentation.kafka.internal.OpenTelemetrySupplier;
import io.opentelemetry.instrumentation.kafka.internal.TracingList;
Expand Down Expand Up @@ -196,7 +197,7 @@ <K, V> ConsumerRecords<K, V> addTracing(
Map<String, Object> config = new HashMap<>();
config.put(
CommonClientConfigs.METRIC_REPORTER_CLASSES_CONFIG,
OpenTelemetryMetricsReporter.class.getName());
MetricsReporterList.singletonList(OpenTelemetryMetricsReporter.class));
config.put(
OpenTelemetryMetricsReporter.CONFIG_KEY_OPENTELEMETRY_SUPPLIER,
new OpenTelemetrySupplier(openTelemetry));
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.instrumentation.kafka.internal;

import java.util.ArrayList;
import java.util.List;

/**
* List implementation that can be used to hold metrics reporters in kafka configuration without
* breaking serialization. When this list is serialized it removes OpenTelemetryMetricsReporter to
* ensure that the configuration can be deserialized even when the instrumentation is not present.
*
* <p>This class is internal and is hence not for public use. Its APIs are unstable and can change
* at any time.
*/
public class MetricsReporterList<T> extends ArrayList<T> {
private static final long serialVersionUID = 1L;

public static <T> List<T> singletonList(T o) {
List<T> list = new MetricsReporterList<>();
list.add(o);
return list;
}

private Object writeReplace() {
// serialize to plain ArrayList that does not contain OpenTelemetryMetricsReporter
List<Object> result = new ArrayList<>();
this.stream().filter(x -> x != OpenTelemetryMetricsReporter.class).forEach(result::add);
return result;
}
}
Loading