Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Replace messaging.kafka.destination.partition with messaging.destination.partition.id #11086

Merged
merged 1 commit into from
Apr 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,6 @@ public String getDestination(Exchange exchange, Endpoint endpoint) {
return topic != null ? topic : super.getDestination(exchange, endpoint);
}

@SuppressWarnings("deprecation") // TODO
// MessagingIncubatingAttributes.MESSAGING_KAFKA_DESTINATION_PARTITION deprecation
@Override
public void pre(
AttributesBuilder attributes,
Expand All @@ -67,7 +65,7 @@ public void pre(
Integer partition = exchange.getIn().getHeader(PARTITION, Integer.class);
if (partition != null) {
attributes.put(
MessagingIncubatingAttributes.MESSAGING_KAFKA_DESTINATION_PARTITION, partition);
MessagingIncubatingAttributes.MESSAGING_DESTINATION_PARTITION_ID, partition.toString());
}

if (CAPTURE_EXPERIMENTAL_SPAN_ATTRIBUTES) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.assertj.core.api.AbstractLongAssert;
import org.assertj.core.api.AbstractStringAssert;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.TestInstance;
Expand Down Expand Up @@ -152,8 +153,6 @@ public void awaitUntilConsumerIsReady() throws InterruptedException {
consumer.seekToBeginning(Collections.emptyList());
}

@SuppressWarnings("deprecation") // TODO
// MessagingIncubatingAttributes.MESSAGING_KAFKA_DESTINATION_PARTITION deprecation
protected static List<AttributeAssertion> sendAttributes(
String messageKey, String messageValue, boolean testHeaders) {
List<AttributeAssertion> assertions =
Expand All @@ -166,8 +165,8 @@ protected static List<AttributeAssertion> sendAttributes(
MessagingIncubatingAttributes.MESSAGING_CLIENT_ID,
stringAssert -> stringAssert.startsWith("producer")),
satisfies(
MessagingIncubatingAttributes.MESSAGING_KAFKA_DESTINATION_PARTITION,
AbstractLongAssert::isNotNegative),
MessagingIncubatingAttributes.MESSAGING_DESTINATION_PARTITION_ID,
AbstractStringAssert::isNotEmpty),
satisfies(
MessagingIncubatingAttributes.MESSAGING_KAFKA_MESSAGE_OFFSET,
AbstractLongAssert::isNotNegative)));
Expand Down Expand Up @@ -214,8 +213,6 @@ protected static List<AttributeAssertion> receiveAttributes(boolean testHeaders)
return assertions;
}

@SuppressWarnings("deprecation") // TODO
// MessagingIncubatingAttributes.MESSAGING_KAFKA_DESTINATION_PARTITION deprecation
protected static List<AttributeAssertion> processAttributes(
String messageKey, String messageValue, boolean testHeaders) {
List<AttributeAssertion> assertions =
Expand All @@ -228,8 +225,8 @@ protected static List<AttributeAssertion> processAttributes(
MessagingIncubatingAttributes.MESSAGING_CLIENT_ID,
stringAssert -> stringAssert.startsWith("consumer")),
satisfies(
MessagingIncubatingAttributes.MESSAGING_KAFKA_DESTINATION_PARTITION,
AbstractLongAssert::isNotNegative),
MessagingIncubatingAttributes.MESSAGING_DESTINATION_PARTITION_ID,
AbstractStringAssert::isNotEmpty),
satisfies(
MessagingIncubatingAttributes.MESSAGING_KAFKA_MESSAGE_OFFSET,
AbstractLongAssert::isNotNegative),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,10 @@
import io.opentelemetry.semconv.incubating.MessagingIncubatingAttributes;
import java.nio.charset.StandardCharsets;
import org.assertj.core.api.AbstractLongAssert;
import org.assertj.core.api.AbstractStringAssert;

class InterceptorsSuppressReceiveSpansTest extends AbstractInterceptorsTest {

@SuppressWarnings("deprecation") // TODO
// MessagingIncubatingAttributes.MESSAGING_KAFKA_DESTINATION_PARTITION deprecation
@Override
void assertTraces() {
testing.waitAndAssertTraces(
Expand Down Expand Up @@ -50,8 +49,8 @@ void assertTraces() {
MessagingIncubatingAttributes.MESSAGING_MESSAGE_BODY_SIZE,
greeting.getBytes(StandardCharsets.UTF_8).length),
satisfies(
MessagingIncubatingAttributes.MESSAGING_KAFKA_DESTINATION_PARTITION,
AbstractLongAssert::isNotNegative),
MessagingIncubatingAttributes.MESSAGING_DESTINATION_PARTITION_ID,
AbstractStringAssert::isNotEmpty),
satisfies(
MessagingIncubatingAttributes.MESSAGING_KAFKA_MESSAGE_OFFSET,
AbstractLongAssert::isNotNegative),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,10 @@
import java.nio.charset.StandardCharsets;
import java.util.concurrent.atomic.AtomicReference;
import org.assertj.core.api.AbstractLongAssert;
import org.assertj.core.api.AbstractStringAssert;

class InterceptorsTest extends AbstractInterceptorsTest {

@SuppressWarnings("deprecation") // TODO
// MessagingIncubatingAttributes.MESSAGING_KAFKA_DESTINATION_PARTITION deprecation
@Override
void assertTraces() {
AtomicReference<SpanContext> producerSpanContext = new AtomicReference<>();
Expand Down Expand Up @@ -87,8 +86,8 @@ void assertTraces() {
MessagingIncubatingAttributes.MESSAGING_MESSAGE_BODY_SIZE,
greeting.getBytes(StandardCharsets.UTF_8).length),
satisfies(
MessagingIncubatingAttributes.MESSAGING_KAFKA_DESTINATION_PARTITION,
AbstractLongAssert::isNotNegative),
MessagingIncubatingAttributes.MESSAGING_DESTINATION_PARTITION_ID,
AbstractStringAssert::isNotEmpty),
satisfies(
MessagingIncubatingAttributes.MESSAGING_KAFKA_MESSAGE_OFFSET,
AbstractLongAssert::isNotNegative),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import java.util.Collections;
import java.util.List;
import org.assertj.core.api.AbstractLongAssert;
import org.assertj.core.api.AbstractStringAssert;

class WrapperSuppressReceiveSpansTest extends AbstractWrapperTest {

Expand Down Expand Up @@ -52,8 +53,6 @@ void assertTraces(boolean testHeaders) {
.hasParent(trace.getSpan(0))));
}

@SuppressWarnings("deprecation") // TODO
// MessagingIncubatingAttributes.MESSAGING_KAFKA_DESTINATION_PARTITION deprecation
protected static List<AttributeAssertion> sendAttributes(boolean testHeaders) {
List<AttributeAssertion> assertions =
new ArrayList<>(
Expand All @@ -65,8 +64,8 @@ protected static List<AttributeAssertion> sendAttributes(boolean testHeaders) {
MessagingIncubatingAttributes.MESSAGING_CLIENT_ID,
stringAssert -> stringAssert.startsWith("producer")),
satisfies(
MessagingIncubatingAttributes.MESSAGING_KAFKA_DESTINATION_PARTITION,
AbstractLongAssert::isNotNegative),
MessagingIncubatingAttributes.MESSAGING_DESTINATION_PARTITION_ID,
AbstractStringAssert::isNotEmpty),
satisfies(
MessagingIncubatingAttributes.MESSAGING_KAFKA_MESSAGE_OFFSET,
AbstractLongAssert::isNotNegative)));
Expand All @@ -79,8 +78,6 @@ protected static List<AttributeAssertion> sendAttributes(boolean testHeaders) {
return assertions;
}

@SuppressWarnings("deprecation") // TODO
// MessagingIncubatingAttributes.MESSAGING_KAFKA_DESTINATION_PARTITION deprecation
private static List<AttributeAssertion> processAttributes(String greeting, boolean testHeaders) {
List<AttributeAssertion> assertions =
new ArrayList<>(
Expand All @@ -92,8 +89,8 @@ private static List<AttributeAssertion> processAttributes(String greeting, boole
MessagingIncubatingAttributes.MESSAGING_MESSAGE_BODY_SIZE,
greeting.getBytes(StandardCharsets.UTF_8).length),
satisfies(
MessagingIncubatingAttributes.MESSAGING_KAFKA_DESTINATION_PARTITION,
AbstractLongAssert::isNotNegative),
MessagingIncubatingAttributes.MESSAGING_DESTINATION_PARTITION_ID,
AbstractStringAssert::isNotEmpty),
satisfies(
MessagingIncubatingAttributes.MESSAGING_KAFKA_MESSAGE_OFFSET,
AbstractLongAssert::isNotNegative),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.util.List;
import java.util.concurrent.atomic.AtomicReference;
import org.assertj.core.api.AbstractLongAssert;
import org.assertj.core.api.AbstractStringAssert;

class WrapperTest extends AbstractWrapperTest {

Expand Down Expand Up @@ -74,8 +75,6 @@ void assertTraces(boolean testHeaders) {
.hasParent(trace.getSpan(1))));
}

@SuppressWarnings("deprecation") // TODO
// MessagingIncubatingAttributes.MESSAGING_KAFKA_DESTINATION_PARTITION deprecation
protected static List<AttributeAssertion> sendAttributes(boolean testHeaders) {
List<AttributeAssertion> assertions =
new ArrayList<>(
Expand All @@ -87,8 +86,8 @@ protected static List<AttributeAssertion> sendAttributes(boolean testHeaders) {
MessagingIncubatingAttributes.MESSAGING_CLIENT_ID,
stringAssert -> stringAssert.startsWith("producer")),
satisfies(
MessagingIncubatingAttributes.MESSAGING_KAFKA_DESTINATION_PARTITION,
AbstractLongAssert::isNotNegative),
MessagingIncubatingAttributes.MESSAGING_DESTINATION_PARTITION_ID,
AbstractStringAssert::isNotEmpty),
satisfies(
MessagingIncubatingAttributes.MESSAGING_KAFKA_MESSAGE_OFFSET,
AbstractLongAssert::isNotNegative)));
Expand All @@ -101,8 +100,6 @@ protected static List<AttributeAssertion> sendAttributes(boolean testHeaders) {
return assertions;
}

@SuppressWarnings("deprecation") // TODO
// MessagingIncubatingAttributes.MESSAGING_KAFKA_DESTINATION_PARTITION deprecation
private static List<AttributeAssertion> processAttributes(String greeting, boolean testHeaders) {
List<AttributeAssertion> assertions =
new ArrayList<>(
Expand All @@ -114,8 +111,8 @@ private static List<AttributeAssertion> processAttributes(String greeting, boole
MessagingIncubatingAttributes.MESSAGING_MESSAGE_BODY_SIZE,
greeting.getBytes(StandardCharsets.UTF_8).length),
satisfies(
MessagingIncubatingAttributes.MESSAGING_KAFKA_DESTINATION_PARTITION,
AbstractLongAssert::isNotNegative),
MessagingIncubatingAttributes.MESSAGING_DESTINATION_PARTITION_ID,
AbstractStringAssert::isNotEmpty),
satisfies(
MessagingIncubatingAttributes.MESSAGING_KAFKA_MESSAGE_OFFSET,
AbstractLongAssert::isNotNegative),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,15 @@
final class KafkaConsumerAttributesExtractor
implements AttributesExtractor<KafkaProcessRequest, Void> {

@SuppressWarnings("deprecation") // TODO
// MessagingIncubatingAttributes.MESSAGING_KAFKA_DESTINATION_PARTITION deprecation
@Override
public void onStart(
AttributesBuilder attributes, Context parentContext, KafkaProcessRequest request) {

ConsumerRecord<?, ?> record = request.getRecord();

attributes.put(
MessagingIncubatingAttributes.MESSAGING_KAFKA_DESTINATION_PARTITION,
(long) record.partition());
MessagingIncubatingAttributes.MESSAGING_DESTINATION_PARTITION_ID,
String.valueOf(record.partition()));
attributes.put(MessagingIncubatingAttributes.MESSAGING_KAFKA_MESSAGE_OFFSET, record.offset());

Object key = record.key();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,6 @@ private static boolean canSerialize(Class<?> keyClass) {
return !(keyClass.isArray() || keyClass == ByteBuffer.class);
}

@SuppressWarnings("deprecation") // TODO
// MessagingIncubatingAttributes.MESSAGING_KAFKA_DESTINATION_PARTITION deprecation
@Override
public void onEnd(
AttributesBuilder attributes,
Expand All @@ -47,8 +45,8 @@ public void onEnd(

if (recordMetadata != null) {
attributes.put(
MessagingIncubatingAttributes.MESSAGING_KAFKA_DESTINATION_PARTITION,
recordMetadata.partition());
MessagingIncubatingAttributes.MESSAGING_DESTINATION_PARTITION_ID,
String.valueOf(recordMetadata.partition()));
attributes.put(
MessagingIncubatingAttributes.MESSAGING_KAFKA_MESSAGE_OFFSET, recordMetadata.offset());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import static io.opentelemetry.api.trace.SpanKind.PRODUCER

class KafkaStreamsDefaultTest extends KafkaStreamsBaseTest {

@SuppressWarnings("deprecation") // TODO MessagingIncubatingAttributes.MESSAGING_KAFKA_DESTINATION_PARTITION deprecation
def "test kafka produce and consume with streams in-between"() {
setup:
def config = new Properties()
Expand Down Expand Up @@ -102,7 +101,7 @@ class KafkaStreamsDefaultTest extends KafkaStreamsBaseTest {
"$MessagingIncubatingAttributes.MESSAGING_DESTINATION_NAME" STREAM_PENDING
"$MessagingIncubatingAttributes.MESSAGING_OPERATION" "publish"
"$MessagingIncubatingAttributes.MESSAGING_CLIENT_ID" { it.startsWith("producer") }
"$MessagingIncubatingAttributes.MESSAGING_KAFKA_DESTINATION_PARTITION" { it >= 0 }
"$MessagingIncubatingAttributes.MESSAGING_DESTINATION_PARTITION_ID" String
"$MessagingIncubatingAttributes.MESSAGING_KAFKA_MESSAGE_OFFSET" 0
"$MessagingIncubatingAttributes.MESSAGING_KAFKA_MESSAGE_KEY" "10"
}
Expand Down Expand Up @@ -139,7 +138,7 @@ class KafkaStreamsDefaultTest extends KafkaStreamsBaseTest {
"$MessagingIncubatingAttributes.MESSAGING_OPERATION" "process"
"$MessagingIncubatingAttributes.MESSAGING_CLIENT_ID" { it.endsWith("consumer") }
"$MessagingIncubatingAttributes.MESSAGING_MESSAGE_BODY_SIZE" Long
"$MessagingIncubatingAttributes.MESSAGING_KAFKA_DESTINATION_PARTITION" { it >= 0 }
"$MessagingIncubatingAttributes.MESSAGING_DESTINATION_PARTITION_ID" String
"$MessagingIncubatingAttributes.MESSAGING_KAFKA_MESSAGE_OFFSET" 0
"$MessagingIncubatingAttributes.MESSAGING_KAFKA_MESSAGE_KEY" "10"
"kafka.record.queue_time_ms" { it >= 0 }
Expand All @@ -159,7 +158,7 @@ class KafkaStreamsDefaultTest extends KafkaStreamsBaseTest {
"$MessagingIncubatingAttributes.MESSAGING_DESTINATION_NAME" STREAM_PROCESSED
"$MessagingIncubatingAttributes.MESSAGING_OPERATION" "publish"
"$MessagingIncubatingAttributes.MESSAGING_CLIENT_ID" { it.endsWith("producer") }
"$MessagingIncubatingAttributes.MESSAGING_KAFKA_DESTINATION_PARTITION" { it >= 0 }
"$MessagingIncubatingAttributes.MESSAGING_DESTINATION_PARTITION_ID" String
"$MessagingIncubatingAttributes.MESSAGING_KAFKA_MESSAGE_OFFSET" 0
}
}
Expand Down Expand Up @@ -195,7 +194,7 @@ class KafkaStreamsDefaultTest extends KafkaStreamsBaseTest {
"$MessagingIncubatingAttributes.MESSAGING_OPERATION" "process"
"$MessagingIncubatingAttributes.MESSAGING_CLIENT_ID" { it.startsWith("consumer") }
"$MessagingIncubatingAttributes.MESSAGING_MESSAGE_BODY_SIZE" Long
"$MessagingIncubatingAttributes.MESSAGING_KAFKA_DESTINATION_PARTITION" { it >= 0 }
"$MessagingIncubatingAttributes.MESSAGING_DESTINATION_PARTITION_ID" String
"$MessagingIncubatingAttributes.MESSAGING_KAFKA_MESSAGE_OFFSET" 0
"$MessagingIncubatingAttributes.MESSAGING_KAFKA_MESSAGE_KEY" "10"
if (Boolean.getBoolean("testLatestDeps")) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import static io.opentelemetry.api.trace.SpanKind.PRODUCER

class KafkaStreamsSuppressReceiveSpansTest extends KafkaStreamsBaseTest {

@SuppressWarnings("deprecation") // TODO MessagingIncubatingAttributes.MESSAGING_KAFKA_DESTINATION_PARTITION deprecation
def "test kafka produce and consume with streams in-between"() {
setup:
def config = new Properties()
Expand Down Expand Up @@ -97,7 +96,7 @@ class KafkaStreamsSuppressReceiveSpansTest extends KafkaStreamsBaseTest {
"$MessagingIncubatingAttributes.MESSAGING_DESTINATION_NAME" STREAM_PENDING
"$MessagingIncubatingAttributes.MESSAGING_OPERATION" "publish"
"$MessagingIncubatingAttributes.MESSAGING_CLIENT_ID" "producer-1"
"$MessagingIncubatingAttributes.MESSAGING_KAFKA_DESTINATION_PARTITION" { it >= 0 }
"$MessagingIncubatingAttributes.MESSAGING_DESTINATION_PARTITION_ID" String
"$MessagingIncubatingAttributes.MESSAGING_KAFKA_MESSAGE_OFFSET" 0
"$MessagingIncubatingAttributes.MESSAGING_KAFKA_MESSAGE_KEY" "10"
}
Expand All @@ -113,7 +112,7 @@ class KafkaStreamsSuppressReceiveSpansTest extends KafkaStreamsBaseTest {
"$MessagingIncubatingAttributes.MESSAGING_OPERATION" "process"
"$MessagingIncubatingAttributes.MESSAGING_CLIENT_ID" { it.endsWith("consumer") }
"$MessagingIncubatingAttributes.MESSAGING_MESSAGE_BODY_SIZE" Long
"$MessagingIncubatingAttributes.MESSAGING_KAFKA_DESTINATION_PARTITION" { it >= 0 }
"$MessagingIncubatingAttributes.MESSAGING_DESTINATION_PARTITION_ID" String
"$MessagingIncubatingAttributes.MESSAGING_KAFKA_MESSAGE_OFFSET" 0
"$MessagingIncubatingAttributes.MESSAGING_KAFKA_MESSAGE_KEY" "10"
"kafka.record.queue_time_ms" { it >= 0 }
Expand All @@ -136,7 +135,7 @@ class KafkaStreamsSuppressReceiveSpansTest extends KafkaStreamsBaseTest {
"$MessagingIncubatingAttributes.MESSAGING_DESTINATION_NAME" STREAM_PROCESSED
"$MessagingIncubatingAttributes.MESSAGING_OPERATION" "publish"
"$MessagingIncubatingAttributes.MESSAGING_CLIENT_ID" String
"$MessagingIncubatingAttributes.MESSAGING_KAFKA_DESTINATION_PARTITION" { it >= 0 }
"$MessagingIncubatingAttributes.MESSAGING_DESTINATION_PARTITION_ID" String
"$MessagingIncubatingAttributes.MESSAGING_KAFKA_MESSAGE_OFFSET" 0
}
}
Expand All @@ -151,7 +150,7 @@ class KafkaStreamsSuppressReceiveSpansTest extends KafkaStreamsBaseTest {
"$MessagingIncubatingAttributes.MESSAGING_OPERATION" "process"
"$MessagingIncubatingAttributes.MESSAGING_CLIENT_ID" { it.startsWith("consumer") }
"$MessagingIncubatingAttributes.MESSAGING_MESSAGE_BODY_SIZE" Long
"$MessagingIncubatingAttributes.MESSAGING_KAFKA_DESTINATION_PARTITION" { it >= 0 }
"$MessagingIncubatingAttributes.MESSAGING_DESTINATION_PARTITION_ID" String
"$MessagingIncubatingAttributes.MESSAGING_KAFKA_MESSAGE_OFFSET" 0
"$MessagingIncubatingAttributes.MESSAGING_KAFKA_MESSAGE_KEY" "10"
if (Boolean.getBoolean("testLatestDeps")) {
Expand Down
Loading
Loading