Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for InfluxDB #10850

Merged
merged 13 commits into from
May 8, 2024
1 change: 1 addition & 0 deletions docs/supported-libraries.md
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ These are the supported libraries and frameworks:
| [HikariCP](https://github.com/brettwooldridge/HikariCP) | 3.0+ | [opentelemetry-hikaricp-3.0](../instrumentation/hikaricp-3.0/library) | [Database Pool Metrics] |
| [HttpURLConnection](https://docs.oracle.com/en/java/javase/11/docs/api/java.base/java/net/HttpURLConnection.html) | Java 8+ | N/A | [HTTP Client Spans], [HTTP Client Metrics] |
| [Hystrix](https://github.com/Netflix/Hystrix) | 1.4+ | N/A | none |
| [InfluxDB Client](https://github.com/influxdata/influxdb-java) | 2.4+ | N/A | [Database Client Spans] |
| [Java Executors](https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/Executor.html) | Java 8+ | N/A | Context propagation |
| [Java Http Client](https://docs.oracle.com/en/java/javase/11/docs/api/java.net.http/java/net/http/package-summary.html) | Java 11+ | [opentelemetry-java-http-client](../instrumentation/java-http-client/library) | [HTTP Client Spans], [HTTP Client Metrics] |
| [java.util.logging](https://docs.oracle.com/javase/8/docs/api/java/util/logging/package-summary.html) | Java 8+ | N/A | none |
Expand Down
46 changes: 46 additions & 0 deletions instrumentation/influxdb-2.4/javaagent/build.gradle.kts
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
plugins {
id("otel.javaagent-instrumentation")
}

muzzle {
pass {
group.set("org.influxdb")
module.set("influxdb-java")
versions.set("[2.4,)")
assertInverse.set(true)
}
}

dependencies {
compileOnly("org.influxdb:influxdb-java:2.4")

compileOnly("com.google.auto.value:auto-value-annotations")
annotationProcessor("com.google.auto.value:auto-value")

// we use methods that weren't present before 2.14 in tests
testLibrary("org.influxdb:influxdb-java:2.14")
}

testing {
suites {
val test24 by registering(JvmTestSuite::class) {
dependencies {
implementation(project())
implementation("org.influxdb:influxdb-java:2.4")
implementation("org.testcontainers:testcontainers")
}
}
}
}

tasks {
test {
usesService(gradle.sharedServices.registrations["testcontainersBuildService"].service)
}

if (!(findProperty("testLatestDeps") as Boolean)) {
check {
dependsOn(testing.suites)
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.javaagent.instrumentation.influxdb.v2_4;

import io.opentelemetry.api.internal.StringUtils;
import io.opentelemetry.instrumentation.api.incubator.semconv.db.DbClientAttributesGetter;
import javax.annotation.Nullable;

final class InfluxDbAttributesGetter implements DbClientAttributesGetter<InfluxDbRequest> {

@Nullable
@Override
public String getStatement(InfluxDbRequest request) {
return request.getSqlStatementInfo().getFullStatement();
}

@Nullable
@Override
public String getOperation(InfluxDbRequest request) {
if (request.getSqlStatementInfo() != null) {
String operation = request.getSqlStatementInfo().getOperation();
return StringUtils.isNullOrEmpty(operation) ? request.getSql() : operation;
}
return null;
}

@Nullable
@Override
public String getSystem(InfluxDbRequest request) {
return "influxdb";
steverao marked this conversation as resolved.
Show resolved Hide resolved
}

@Nullable
@Override
public String getUser(InfluxDbRequest request) {
return null;
}

@Nullable
@Override
public String getName(InfluxDbRequest request) {
String dbName = request.getDbName();
return StringUtils.isNullOrEmpty(dbName) ? null : dbName;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.javaagent.instrumentation.influxdb.v2_4;

final class InfluxDbConstants {

private InfluxDbConstants() {}

public static final String CREATE_DATABASE_STATEMENT_NEW = "CREATE DATABASE \"%s\"";

/** In influxDB 0.x version, it uses below statement format to create a database. */
public static final String CREATE_DATABASE_STATEMENT_OLD = "CREATE DATABASE IF NOT EXISTS \"%s\"";

public static final String DELETE_DATABASE_STATEMENT = "DROP DATABASE \"%s\"";
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,211 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.javaagent.instrumentation.influxdb.v2_4;

import static io.opentelemetry.javaagent.bootstrap.Java8BytecodeBridge.currentContext;
import static io.opentelemetry.javaagent.instrumentation.influxdb.v2_4.InfluxDbConstants.CREATE_DATABASE_STATEMENT_NEW;
import static io.opentelemetry.javaagent.instrumentation.influxdb.v2_4.InfluxDbConstants.CREATE_DATABASE_STATEMENT_OLD;
import static io.opentelemetry.javaagent.instrumentation.influxdb.v2_4.InfluxDbConstants.DELETE_DATABASE_STATEMENT;
import static io.opentelemetry.javaagent.instrumentation.influxdb.v2_4.InfluxDbSingletons.instrumenter;
import static net.bytebuddy.matcher.ElementMatchers.isEnum;
import static net.bytebuddy.matcher.ElementMatchers.isMethod;
import static net.bytebuddy.matcher.ElementMatchers.named;
import static net.bytebuddy.matcher.ElementMatchers.namedOneOf;
import static net.bytebuddy.matcher.ElementMatchers.takesArgument;
import static net.bytebuddy.matcher.ElementMatchers.takesArguments;

import io.opentelemetry.context.Context;
import io.opentelemetry.context.Scope;
import io.opentelemetry.javaagent.bootstrap.CallDepth;
import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer;
import net.bytebuddy.asm.Advice;
import net.bytebuddy.description.type.TypeDescription;
import net.bytebuddy.implementation.bytecode.assign.Assigner;
import net.bytebuddy.matcher.ElementMatcher;
import okhttp3.HttpUrl;
import org.influxdb.dto.BatchPoints;
import org.influxdb.dto.Query;
import org.influxdb.impl.InfluxDBImpl;
import retrofit2.Retrofit;

public class InfluxDbImplInstrumentation implements TypeInstrumentation {

@Override
public ElementMatcher<TypeDescription> typeMatcher() {
return named("org.influxdb.impl.InfluxDBImpl");
}

@Override
public void transform(TypeTransformer transformer) {
transformer.applyAdviceToMethod(
isMethod().and(named("query")).and(takesArgument(0, named("org.influxdb.dto.Query"))),
this.getClass().getName() + "$InfluxDbQueryAdvice");

transformer.applyAdviceToMethod(
isMethod()
.and(named("write"))
.and(
takesArguments(1)
.and(takesArgument(0, named("org.influxdb.dto.BatchPoints")))
.or(takesArguments(2).and(takesArgument(0, int.class)))
.or(
takesArguments(4)
.and(takesArgument(0, String.class))
.and(takesArgument(1, String.class))
.and(takesArgument(2, isEnum())))
.or(
takesArguments(5)
.and(takesArgument(0, String.class))
.and(takesArgument(1, String.class))
.and(takesArgument(2, isEnum()))
.and(takesArgument(3, named("java.util.concurrent.TimeUnit"))))),
this.getClass().getName() + "$InfluxDbModifyAdvice");
transformer.applyAdviceToMethod(
isMethod().and(namedOneOf("createDatabase", "deleteDatabase")),
this.getClass().getName() + "$InfluxDbModifyAdvice");
}

@SuppressWarnings("unused")
public static class InfluxDbQueryAdvice {

@Advice.OnMethodEnter(suppress = Throwable.class)
public static void onEnter(
@Advice.Argument(0) Query query,
@Advice.AllArguments(readOnly = false, typing = Assigner.Typing.DYNAMIC) Object[] arguments,
@Advice.FieldValue(value = "retrofit") Retrofit retrofit,
@Advice.Local("otelCallDepth") CallDepth callDepth,
@Advice.Local("otelRequest") InfluxDbRequest influxDbRequest,
@Advice.Local("otelContext") Context context,
@Advice.Local("otelScope") Scope scope) {
callDepth = CallDepth.forClass(InfluxDBImpl.class);
if (callDepth.getAndIncrement() > 0) {
return;
}

if (query == null) {
return;
}
Context parentContext = currentContext();

HttpUrl httpUrl = retrofit.baseUrl();
influxDbRequest =
InfluxDbRequest.create(
httpUrl.host(), httpUrl.port(), query.getDatabase(), query.getCommand());

if (!instrumenter().shouldStart(parentContext, influxDbRequest)) {
return;
}

// wrap callbacks so they'd run in the context of the parent span
Object[] newArguments = new Object[arguments.length];
boolean hasChangedArgument = false;
for (int i = 0; i < arguments.length; i++) {
newArguments[i] = InfluxDbObjetWrapper.wrap(arguments[i], parentContext);
hasChangedArgument |= newArguments[i] != arguments[i];
}
if (hasChangedArgument) {
arguments = newArguments;
}

context = instrumenter().start(parentContext, influxDbRequest);
scope = context.makeCurrent();
}

@Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class)
public static void onExit(
@Advice.Thrown Throwable throwable,
@Advice.Local("otelCallDepth") CallDepth callDepth,
@Advice.Local("otelRequest") InfluxDbRequest influxDbRequest,
@Advice.Local("otelContext") Context context,
@Advice.Local("otelScope") Scope scope) {

if (callDepth.decrementAndGet() > 0) {
return;
}

if (scope == null) {
return;
}

scope.close();

instrumenter().end(context, influxDbRequest, null, throwable);
}
}

@SuppressWarnings("unused")
public static class InfluxDbModifyAdvice {

@Advice.OnMethodEnter(suppress = Throwable.class)
public static void onEnter(
@Advice.This InfluxDBImpl influxDbImpl,
@Advice.Origin("#m") String methodName,
@Advice.Argument(0) Object arg0,
@Advice.FieldValue(value = "retrofit") Retrofit retrofit,
@Advice.Local("otelCallDepth") CallDepth callDepth,
@Advice.Local("otelRequest") InfluxDbRequest influxDbRequest,
@Advice.Local("otelContext") Context context,
@Advice.Local("otelScope") Scope scope) {
callDepth = CallDepth.forClass(InfluxDBImpl.class);
if (callDepth.getAndIncrement() > 0) {
return;
}

if (arg0 == null) {
return;
}

Context parentContext = currentContext();

HttpUrl httpUrl = retrofit.baseUrl();
String database =
(arg0 instanceof BatchPoints)
? ((BatchPoints) arg0).getDatabase()
// write data by UDP protocol, in this way, can't get database name.
: arg0 instanceof Integer ? "" : String.valueOf(arg0);

String sql = methodName;
if ("createDatabase".equals(methodName)) {
sql =
influxDbImpl.version().startsWith("0.")
? String.format(CREATE_DATABASE_STATEMENT_OLD, database)
: String.format(CREATE_DATABASE_STATEMENT_NEW, database);
} else if ("deleteDatabase".equals(methodName)) {
sql = String.format(DELETE_DATABASE_STATEMENT, database);
}

influxDbRequest = InfluxDbRequest.create(httpUrl.host(), httpUrl.port(), database, sql);

if (!instrumenter().shouldStart(parentContext, influxDbRequest)) {
return;
}

context = instrumenter().start(parentContext, influxDbRequest);
scope = context.makeCurrent();
}

@Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class)
public static void onExit(
@Advice.Thrown Throwable throwable,
@Advice.Local("otelCallDepth") CallDepth callDepth,
@Advice.Local("otelRequest") InfluxDbRequest influxDbRequest,
@Advice.Local("otelContext") Context context,
@Advice.Local("otelScope") Scope scope) {

if (callDepth.decrementAndGet() > 0) {
return;
}

if (scope == null) {
return;
}
scope.close();

instrumenter().end(context, influxDbRequest, null, throwable);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.javaagent.instrumentation.influxdb.v2_4;

import static java.util.Collections.singletonList;

import com.google.auto.service.AutoService;
import io.opentelemetry.javaagent.extension.instrumentation.InstrumentationModule;
import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
import java.util.List;

@AutoService(InstrumentationModule.class)
public class InfluxDbInstrumentationModule extends InstrumentationModule {

public InfluxDbInstrumentationModule() {
super("influxdb", "influxdb-2.4");
}

@Override
public List<TypeInstrumentation> typeInstrumentations() {
return singletonList(new InfluxDbImplInstrumentation());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.javaagent.instrumentation.influxdb.v2_4;

import io.opentelemetry.instrumentation.api.semconv.network.ServerAttributesGetter;

final class InfluxDbNetworkAttributesGetter implements ServerAttributesGetter<InfluxDbRequest> {

@Override
public String getServerAddress(InfluxDbRequest request) {
return request.getHost();
}

@Override
public Integer getServerPort(InfluxDbRequest request) {
return request.getPort();
}
}
Loading
Loading