Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix Cassandra target #10357

Merged
merged 31 commits into from
Feb 15, 2024
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
b8043a4
Fix server.address for Cassandra
heyams Jan 30, 2024
d482ba4
Create experimental attributes extractor for cassandra
heyams Jan 31, 2024
127d2ad
Update
heyams Jan 31, 2024
be52518
Rename
heyams Jan 31, 2024
79c2d76
Fix tests
heyams Jan 31, 2024
6c16f6d
Update
heyams Jan 31, 2024
3774f3d
Rename back to CassandraAttributesExtractor
heyams Jan 31, 2024
02594ca
Consider SniEndpoint
heyams Feb 1, 2024
1858849
Fix tests
heyams Feb 1, 2024
7e1dd2a
Fix spotless
heyams Feb 1, 2024
415fc22
Use reflection to access proxyAddress
heyams Feb 5, 2024
c254dd7
Revert
heyams Feb 5, 2024
e18e666
Merge branch 'main' of https://github.com/open-telemetry/opentelemetr…
heyams Feb 6, 2024
4e3c230
Cache proxyAddress field
heyams Feb 7, 2024
5e9a16f
Comment
heyams Feb 7, 2024
ea7f2a1
Merge remote-tracking branch 'upstream/main' into heya/fix-cassandra-…
heyams Feb 8, 2024
da7b1c9
Fix errorprone
heyams Feb 8, 2024
4ac21ca
Fix
heyams Feb 8, 2024
a1bca4e
Update
heyams Feb 8, 2024
d2af7b2
Address comments
heyams Feb 12, 2024
c66b91f
Comments
heyams Feb 13, 2024
447c362
empty
heyams Feb 14, 2024
7b9f479
empty
heyams Feb 14, 2024
f185da0
Use a diff data center to fix sporadic 'could not reach any contact p…
heyams Feb 14, 2024
bcb1cf2
Revert
heyams Feb 15, 2024
57424b3
Merge remote-tracking branch 'upstream/main' into heya/fix-cassandra-…
trask Feb 15, 2024
ccdb872
Use DefaultDriverConfigLoader.builder()
trask Feb 15, 2024
bc6ab22
Remove SniEndpoint testing
trask Feb 15, 2024
814290e
Fix test
heyams Feb 15, 2024
bce1ca4
Revert "Remove SniEndpoint testing"
trask Feb 15, 2024
a3694b3
One more try
trask Feb 15, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.javaagent.instrumentation.cassandra.v3_0;

import com.datastax.driver.core.ExecutionInfo;
import io.opentelemetry.api.common.AttributesBuilder;
import io.opentelemetry.context.Context;
import io.opentelemetry.instrumentation.api.instrumenter.AttributesExtractor;
import io.opentelemetry.semconv.SemanticAttributes;
import javax.annotation.Nullable;

public class CassandraAttributesExtractor
heyams marked this conversation as resolved.
Show resolved Hide resolved
implements AttributesExtractor<CassandraRequest, ExecutionInfo> {
@Override
public void onStart(AttributesBuilder attributes, Context context, CassandraRequest request) {}

@Override
public void onEnd(
AttributesBuilder attributes,
Context context,
CassandraRequest request,
@Nullable ExecutionInfo executionInfo,
@Nullable Throwable error) {
if (executionInfo == null) {
return;
}
attributes.put(
SemanticAttributes.SERVER_ADDRESS,
executionInfo.getQueriedHost().getSocketAddress().getHostName());
attributes.put(
SemanticAttributes.SERVER_PORT,
executionInfo.getQueriedHost().getSocketAddress().getPort());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ public final class CassandraSingletons {
.build())
.addAttributesExtractor(
NetworkAttributesExtractor.create(new CassandraNetworkAttributesGetter()))
.addAttributesExtractor(new CassandraAttributesExtractor())
.buildInstrumenter(SpanKindExtractor.alwaysClient());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,8 @@ void syncTest(Parameter parameter) {
.hasNoParent()
.hasAttributesSatisfyingExactly(
equalTo(SemanticAttributes.NETWORK_TYPE, "ipv4"),
equalTo(SemanticAttributes.SERVER_ADDRESS, "localhost"),
equalTo(SemanticAttributes.SERVER_PORT, cassandraPort),
equalTo(NetworkAttributes.NETWORK_PEER_ADDRESS, "127.0.0.1"),
equalTo(NetworkAttributes.NETWORK_PEER_PORT, cassandraPort),
equalTo(SemanticAttributes.DB_SYSTEM, "cassandra"),
Expand All @@ -99,6 +101,8 @@ void syncTest(Parameter parameter) {
.hasNoParent()
.hasAttributesSatisfyingExactly(
equalTo(SemanticAttributes.NETWORK_TYPE, "ipv4"),
equalTo(SemanticAttributes.SERVER_ADDRESS, "localhost"),
equalTo(SemanticAttributes.SERVER_PORT, cassandraPort),
equalTo(NetworkAttributes.NETWORK_PEER_ADDRESS, "127.0.0.1"),
equalTo(NetworkAttributes.NETWORK_PEER_PORT, cassandraPort),
equalTo(SemanticAttributes.DB_SYSTEM, "cassandra"),
Expand All @@ -116,6 +120,8 @@ void syncTest(Parameter parameter) {
.hasNoParent()
.hasAttributesSatisfyingExactly(
equalTo(SemanticAttributes.NETWORK_TYPE, "ipv4"),
equalTo(SemanticAttributes.SERVER_ADDRESS, "localhost"),
equalTo(SemanticAttributes.SERVER_PORT, cassandraPort),
equalTo(NetworkAttributes.NETWORK_PEER_ADDRESS, "127.0.0.1"),
equalTo(NetworkAttributes.NETWORK_PEER_PORT, cassandraPort),
equalTo(SemanticAttributes.DB_SYSTEM, "cassandra"),
Expand Down Expand Up @@ -153,6 +159,8 @@ void asyncTest(Parameter parameter) {
.hasNoParent()
.hasAttributesSatisfyingExactly(
equalTo(SemanticAttributes.NETWORK_TYPE, "ipv4"),
equalTo(SemanticAttributes.SERVER_ADDRESS, "localhost"),
equalTo(SemanticAttributes.SERVER_PORT, cassandraPort),
equalTo(NetworkAttributes.NETWORK_PEER_ADDRESS, "127.0.0.1"),
equalTo(NetworkAttributes.NETWORK_PEER_PORT, cassandraPort),
equalTo(SemanticAttributes.DB_SYSTEM, "cassandra"),
Expand All @@ -167,6 +175,8 @@ void asyncTest(Parameter parameter) {
.hasParent(trace.getSpan(0))
.hasAttributesSatisfyingExactly(
equalTo(SemanticAttributes.NETWORK_TYPE, "ipv4"),
equalTo(SemanticAttributes.SERVER_ADDRESS, "localhost"),
equalTo(SemanticAttributes.SERVER_PORT, cassandraPort),
equalTo(NetworkAttributes.NETWORK_PEER_ADDRESS, "127.0.0.1"),
equalTo(NetworkAttributes.NETWORK_PEER_PORT, cassandraPort),
equalTo(SemanticAttributes.DB_SYSTEM, "cassandra"),
Expand All @@ -189,6 +199,8 @@ void asyncTest(Parameter parameter) {
.hasParent(trace.getSpan(0))
.hasAttributesSatisfyingExactly(
equalTo(SemanticAttributes.NETWORK_TYPE, "ipv4"),
equalTo(SemanticAttributes.SERVER_ADDRESS, "localhost"),
equalTo(SemanticAttributes.SERVER_PORT, cassandraPort),
equalTo(NetworkAttributes.NETWORK_PEER_ADDRESS, "127.0.0.1"),
equalTo(NetworkAttributes.NETWORK_PEER_PORT, cassandraPort),
equalTo(SemanticAttributes.DB_SYSTEM, "cassandra"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@
import static io.opentelemetry.semconv.SemanticAttributes.DB_STATEMENT;
import static io.opentelemetry.semconv.SemanticAttributes.DB_SYSTEM;
import static io.opentelemetry.semconv.SemanticAttributes.NETWORK_TYPE;
import static io.opentelemetry.semconv.SemanticAttributes.SERVER_ADDRESS;
import static io.opentelemetry.semconv.SemanticAttributes.SERVER_PORT;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Named.named;

import com.datastax.oss.driver.api.core.CqlSession;
Expand Down Expand Up @@ -90,8 +93,20 @@ void syncTest(Parameter parameter) {
.hasKind(SpanKind.CLIENT)
.hasNoParent()
.hasAttributesSatisfyingExactly(
equalTo(NETWORK_TYPE, "ipv4"),
equalTo(NetworkAttributes.NETWORK_PEER_ADDRESS, "127.0.0.1"),
satisfies(
NETWORK_TYPE,
val ->
val.satisfiesAnyOf(
v -> assertThat(v).isEqualTo("ipv4"),
v -> assertThat(v).isEqualTo("ipv6"))),
equalTo(SERVER_ADDRESS, "localhost"),
equalTo(SERVER_PORT, cassandraPort),
satisfies(
NetworkAttributes.NETWORK_PEER_ADDRESS,
val ->
val.satisfiesAnyOf(
v -> assertThat(v).isEqualTo("127.0.0.1"),
v -> assertThat(v).isEqualTo("0:0:0:0:0:0:0:1"))),
equalTo(NetworkAttributes.NETWORK_PEER_PORT, cassandraPort),
equalTo(DB_SYSTEM, "cassandra"),
equalTo(DB_NAME, parameter.keyspace),
Expand Down Expand Up @@ -137,8 +152,20 @@ void asyncTest(Parameter parameter) throws Exception {
.hasKind(SpanKind.CLIENT)
.hasParent(trace.getSpan(0))
.hasAttributesSatisfyingExactly(
equalTo(NETWORK_TYPE, "ipv4"),
equalTo(NetworkAttributes.NETWORK_PEER_ADDRESS, "127.0.0.1"),
satisfies(
NETWORK_TYPE,
val ->
val.satisfiesAnyOf(
v -> assertThat(v).isEqualTo("ipv4"),
v -> assertThat(v).isEqualTo("ipv6"))),
equalTo(SERVER_ADDRESS, "localhost"),
equalTo(SERVER_PORT, cassandraPort),
satisfies(
NetworkAttributes.NETWORK_PEER_ADDRESS,
val ->
val.satisfiesAnyOf(
v -> assertThat(v).isEqualTo("127.0.0.1"),
v -> assertThat(v).isEqualTo("0:0:0:0:0:0:0:1"))),
equalTo(NetworkAttributes.NETWORK_PEER_PORT, cassandraPort),
equalTo(DB_SYSTEM, "cassandra"),
equalTo(DB_NAME, parameter.keyspace),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
import io.opentelemetry.context.Context;
import io.opentelemetry.instrumentation.api.instrumenter.AttributesExtractor;
import io.opentelemetry.semconv.SemanticAttributes;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import javax.annotation.Nullable;

final class CassandraAttributesExtractor
heyams marked this conversation as resolved.
Show resolved Hide resolved
Expand All @@ -36,6 +38,12 @@ public void onEnd(

Node coordinator = executionInfo.getCoordinator();
if (coordinator != null) {
SocketAddress address = coordinator.getEndPoint().resolve();
if (address instanceof InetSocketAddress) {
attributes.put(
SemanticAttributes.SERVER_ADDRESS, ((InetSocketAddress) address).getHostName());
heyams marked this conversation as resolved.
Show resolved Hide resolved
attributes.put(SemanticAttributes.SERVER_PORT, ((InetSocketAddress) address).getPort());
}
if (coordinator.getDatacenter() != null) {
attributes.put(SemanticAttributes.DB_CASSANDRA_COORDINATOR_DC, coordinator.getDatacenter());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,16 +9,25 @@
import com.datastax.oss.driver.api.core.config.DriverExecutionProfile;
import com.datastax.oss.driver.api.core.cql.ExecutionInfo;
import com.datastax.oss.driver.api.core.cql.Statement;
import com.datastax.oss.driver.api.core.metadata.EndPoint;
import com.datastax.oss.driver.api.core.metadata.Node;
import com.datastax.oss.driver.internal.core.metadata.DefaultEndPoint;
import com.datastax.oss.driver.internal.core.metadata.SniEndPoint;
import io.opentelemetry.api.common.AttributesBuilder;
import io.opentelemetry.context.Context;
import io.opentelemetry.instrumentation.api.instrumenter.AttributesExtractor;
import io.opentelemetry.semconv.SemanticAttributes;
import java.lang.reflect.Field;
import java.net.InetSocketAddress;
import javax.annotation.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

final class CassandraAttributesExtractor
implements AttributesExtractor<CassandraRequest, ExecutionInfo> {

private static final Logger logger = LoggerFactory.getLogger(CassandraAttributesExtractor.class);
heyams marked this conversation as resolved.
Show resolved Hide resolved

@Override
public void onStart(
AttributesBuilder attributes, Context parentContext, CassandraRequest request) {}
Expand All @@ -36,6 +45,12 @@ public void onEnd(

Node coordinator = executionInfo.getCoordinator();
if (coordinator != null) {
try {
updateServerAddressAndPort(attributes, coordinator);
} catch (NoSuchFieldException | IllegalAccessException e) {
logger.error("Error while extracting server address and port", e);
heyams marked this conversation as resolved.
Show resolved Hide resolved
}

if (coordinator.getDatacenter() != null) {
attributes.put(SemanticAttributes.DB_CASSANDRA_COORDINATOR_DC, coordinator.getDatacenter());
}
Expand Down Expand Up @@ -74,4 +89,24 @@ public void onEnd(
}
attributes.put(SemanticAttributes.DB_CASSANDRA_IDEMPOTENCE, idempotent);
}

private static void updateServerAddressAndPort(AttributesBuilder attributes, Node coordinator)
throws NoSuchFieldException, IllegalAccessException {
EndPoint endPoint = coordinator.getEndPoint();
if (endPoint instanceof DefaultEndPoint) {
InetSocketAddress address = ((DefaultEndPoint) endPoint).resolve();
attributes.put(SemanticAttributes.SERVER_ADDRESS, address.getHostName());
attributes.put(SemanticAttributes.SERVER_PORT, address.getPort());
} else if (endPoint instanceof SniEndPoint) {
SniEndPoint sniEndPoint = (SniEndPoint) endPoint;
Field privateField = sniEndPoint.getClass().getDeclaredField("proxyAddress");
privateField.setAccessible(true);
heyams marked this conversation as resolved.
Show resolved Hide resolved
Object object = privateField.get(sniEndPoint);
if (object instanceof InetSocketAddress) {
InetSocketAddress address = (InetSocketAddress) object;
attributes.put(SemanticAttributes.SERVER_ADDRESS, address.getHostName());
attributes.put(SemanticAttributes.SERVER_PORT, address.getPort());
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,12 @@
package io.opentelemetry.instrumentation.cassandra.v4_4;

import com.datastax.oss.driver.api.core.cql.ExecutionInfo;
import com.datastax.oss.driver.api.core.metadata.EndPoint;
import com.datastax.oss.driver.api.core.metadata.Node;
import com.datastax.oss.driver.internal.core.metadata.DefaultEndPoint;
import com.datastax.oss.driver.internal.core.metadata.SniEndPoint;
import io.opentelemetry.instrumentation.api.semconv.network.NetworkAttributesGetter;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import javax.annotation.Nullable;

final class CassandraNetworkAttributesGetter
Expand All @@ -27,8 +29,12 @@ public InetSocketAddress getNetworkPeerInetSocketAddress(
return null;
}
// resolve() returns an existing InetSocketAddress, it does not do a dns resolve,
// at least in the only current EndPoint implementation (DefaultEndPoint)
SocketAddress address = coordinator.getEndPoint().resolve();
return address instanceof InetSocketAddress ? (InetSocketAddress) address : null;
EndPoint endPoint = coordinator.getEndPoint();
if (endPoint instanceof DefaultEndPoint) {
return (InetSocketAddress) coordinator.getEndPoint().resolve();
} else if (endPoint instanceof SniEndPoint) {
return ((SniEndPoint) endPoint).resolve();
}
return null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,19 @@
import static io.opentelemetry.semconv.SemanticAttributes.DB_STATEMENT;
import static io.opentelemetry.semconv.SemanticAttributes.DB_SYSTEM;
import static io.opentelemetry.semconv.SemanticAttributes.NETWORK_TYPE;
import static io.opentelemetry.semconv.SemanticAttributes.SERVER_ADDRESS;
import static io.opentelemetry.semconv.SemanticAttributes.SERVER_PORT;
import static org.junit.jupiter.api.Named.named;

import com.datastax.oss.driver.api.core.CqlSession;
import com.datastax.oss.driver.api.core.config.DefaultDriverOption;
import com.datastax.oss.driver.api.core.config.DriverConfigLoader;
import com.datastax.oss.driver.internal.core.metadata.SniEndPoint;
import io.opentelemetry.api.trace.SpanKind;
import io.opentelemetry.cassandra.v4.common.AbstractCassandraTest;
import io.opentelemetry.instrumentation.api.semconv.network.internal.NetworkAttributes;
import java.net.InetSocketAddress;
import java.time.Duration;
import java.util.stream.Stream;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
Expand Down Expand Up @@ -57,7 +64,9 @@ void reactiveTest(Parameter parameter) {
.hasParent(trace.getSpan(0))
.hasAttributesSatisfyingExactly(
equalTo(NETWORK_TYPE, "ipv4"),
equalTo(NetworkAttributes.NETWORK_PEER_ADDRESS, "127.0.0.1"),
equalTo(SERVER_ADDRESS, "localhost"),
equalTo(SERVER_PORT, cassandraPort),
equalTo(NetworkAttributes.NETWORK_PEER_ADDRESS, "0:0:0:0:0:0:0:1"),
equalTo(NetworkAttributes.NETWORK_PEER_PORT, cassandraPort),
equalTo(DB_SYSTEM, "cassandra"),
equalTo(DB_NAME, parameter.keyspace),
Expand Down Expand Up @@ -135,4 +144,22 @@ private static Stream<Arguments> provideReactiveParameters() {
"SELECT",
"users"))));
}

@Override
protected CqlSession getSession(String keyspace) {
DriverConfigLoader configLoader =
DriverConfigLoader.programmaticBuilder()
.withDuration(DefaultDriverOption.REQUEST_TIMEOUT, Duration.ofSeconds(0))
.withDuration(DefaultDriverOption.CONNECTION_INIT_QUERY_TIMEOUT, Duration.ofSeconds(10))
.build();

InetSocketAddress address = new InetSocketAddress("localhost", cassandraPort);
return wrap(
CqlSession.builder()
.addContactEndPoint(new SniEndPoint(address, "localhost"))
heyams marked this conversation as resolved.
Show resolved Hide resolved
.withConfigLoader(configLoader)
.withLocalDatacenter("datacenter1")
.withKeyspace(keyspace)
.build());
}
}
Loading