Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cannot reproduce interest-based Streams example with Java SDK #203

Open
thomasdarimont opened this issue Jun 12, 2024 · 4 comments
Open
Assignees
Labels
defect Suspected defect such as a bug or regression

Comments

@thomasdarimont
Copy link

thomasdarimont commented Jun 12, 2024

Observed behavior

The Java SDK currently lacks some functionality like "double ack" to reproduce the example.

Expected behavior

The Java SDK should offer the same functionality as the other language integrations.

Server and client version

Nats Server Version: 2.10.14
Java SDK:

<dependency>
    <groupId>io.nats</groupId>
    <artifactId>jnats</artifactId>
    <version>2.17.6</version>
</dependency>

Host environment

No response

Steps to reproduce

Inspect and compare the following program with https://natsbyexample.com/examples/jetstream/interest-stream/rust

package demo.streams;

import demo.support.Serde;
import io.nats.client.Connection;
import io.nats.client.JetStream;
import io.nats.client.JetStreamManagement;
import io.nats.client.JetStreamSubscription;
import io.nats.client.Message;
import io.nats.client.Nats;
import io.nats.client.Options;
import io.nats.client.PullSubscribeOptions;
import io.nats.client.api.AckPolicy;
import io.nats.client.api.ConsumerConfiguration;
import io.nats.client.api.ConsumerInfo;
import io.nats.client.api.PublishAck;
import io.nats.client.api.RetentionPolicy;
import io.nats.client.api.StreamConfiguration;
import io.nats.client.api.StreamInfo;
import io.nats.client.impl.NatsJetStreamMetaData;

import java.time.Duration;
import java.util.ArrayList;
import java.util.List;

public class InterestBasedStream {

    public static void main(String[] args) throws Exception {

        Options options = Options.builder() //
                .connectionName("jugsaar") //
                .userInfo("jugsaar", "jugsaar") //
                .server("nats://localhost:4222") //
                .build();

        try (Connection conn = Nats.connect(options)) {

            JetStreamManagement jsm = conn.jetStreamManagement();

            jsm.deleteStream("EVENTS");

            StreamConfiguration sc = StreamConfiguration.builder() //
                    .name("EVENTS") //
                    .retentionPolicy(RetentionPolicy.Interest) //
                    .subjects("events.>") //
                    .build();

            StreamInfo si = jsm.addStream(sc);
            System.out.printf("Created stream %s%n", sc.getName());

            JetStream js = conn.jetStream();

            js.publish("events.page_loaded", null);
            js.publish("events.mouse_clicked", null);
            PublishAck publishAck = js.publish("events.input_focused", null);
            System.out.println("Published 3 messages");

            System.out.printf("last message seq: %d%n", publishAck.getSeqno());

            System.out.println("# Stream info without any consumers\n");
            printStreamState(jsm, sc.getName());

            ConsumerInfo c1 = jsm.createConsumer(sc.getName(), ConsumerConfiguration.builder() //
                    .durable("processor-1") //
                    .ackPolicy(AckPolicy.Explicit) //
                    .build());

            js.publish("events.mouse_clicked", null);
            js.publishAsync("events.input_focused", null);

            System.out.println("# Stream info with one consumer\n");
            printStreamState(jsm, sc.getName());

            PullSubscribeOptions o1 = PullSubscribeOptions.bind("EVENTS", c1.getName());
            JetStreamSubscription sub1 = js.subscribe(null, o1);

            List<Message> fetched = sub1.fetch(2, Duration.ofSeconds(1));
            for (var msg : fetched) {
                msg.ackSync(Duration.ofSeconds(1)); // wait for broker to confirm
            }

            System.out.println("# Stream info with one consumer and acked messages\n");
            printStreamState(jsm, sc.getName());

            ConsumerInfo c2 = jsm.createConsumer(sc.getName(), ConsumerConfiguration.builder() //
                    .durable("processor-2") //
                    .ackPolicy(AckPolicy.Explicit) //
                    .build());

            PullSubscribeOptions o2 = PullSubscribeOptions.bind("EVENTS", c2.getName());
            JetStreamSubscription sub2 = js.subscribe(null, o2);

            js.publish("events.input_focused", null);
            js.publish("events.mouse_clicked", null);

            fetched = sub2.fetch(2, Duration.ofSeconds(1));
            List<NatsJetStreamMetaData> messageMeta = new ArrayList<>();
            for (var msg : fetched) {
                msg.ackSync(Duration.ofSeconds(1));
                messageMeta.add(msg.metaData());
            }
            System.out.printf("msg seqs %d and %d%n", messageMeta.get(0).streamSequence(), messageMeta.get(1).streamSequence());

            System.out.println("# Stream info with two consumers, but only one set of acked messages\n");
            printStreamState(jsm, sc.getName());

            fetched = sub2.fetch(2, Duration.ofSeconds(1));
            for (var msg : fetched) {
                // double ack feature missing here!
//                msg.ackSync(Duration.ofSeconds(1));
                msg.ack();
            }

            System.out.println("# Stream info with two consumers having both acked\n");
            printStreamState(jsm, sc.getName());


            ConsumerInfo c3 = jsm.createConsumer(sc.getName(), ConsumerConfiguration.builder() //
                    .durable("processor-3") //
                    .ackPolicy(AckPolicy.Explicit) //
                    .filterSubject("events.mouse_clicked") // not interested in input_focussed
                    .build());

            PullSubscribeOptions o3 = PullSubscribeOptions.bind("EVENTS", c3.getName());
            JetStreamSubscription sub3 = js.subscribe(null, o3);

            js.publish("events.input_focused", null);

            var msgs = sub1.fetch(1, Duration.ofSeconds(1));
            var msg = msgs.get(0);
            msg.term();

            msgs = sub2.fetch(1, Duration.ofSeconds(1));
            msg = msgs.get(0);
            msg.ackSync(Duration.ofSeconds(1));

            System.out.println("# Stream info with three consumers with interest from two\n");
            printStreamState(jsm, sc.getName());
        }
    }

    public static void printStreamState(JetStreamManagement jsm, String streamName) throws Exception {

        StreamInfo info = jsm.getStreamInfo(streamName);
        System.out.printf("inspecting stream info%n%s%n", Serde.json(info.getStreamState()));
    }
}

Maven POM excerpt:

<properties>
        <jnats.version>2.17.6</jnats.version>
        <jackson-databind.version>2.17.0</jackson-databind.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>io.nats</groupId>
            <artifactId>jnats</artifactId>
            <version>${jnats.version}</version>
        </dependency>

        <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-databind</artifactId>
            <version>${jackson-databind.version}</version>
        </dependency>

        <dependency>
            <groupId>com.fasterxml.jackson.datatype</groupId>
            <artifactId>jackson-datatype-jsr310</artifactId>
            <version>${jackson-databind.version}</version>
        </dependency>
    </dependencies>

Serde helper class:

package demo.support;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializationFeature;

public final class Serde {

    public static final ObjectMapper OM;

    static {
        OM = new ObjectMapper().enable(SerializationFeature.INDENT_OUTPUT).findAndRegisterModules();
    }

    public static <T> byte[] jsonBytes(T data) throws Exception {
        return OM.writeValueAsBytes(data);
    }

    public static <T> String json(T data) throws Exception {
        return OM.writeValueAsString(data);
    }

    public static <T> T fromJsonBytes(byte[] data, Class<T> type) throws Exception {
        return OM.readValue(data, type);
    }
}
@thomasdarimont thomasdarimont added the defect Suspected defect such as a bug or regression label Jun 12, 2024
@thomasdarimont thomasdarimont changed the title Cannot reproduce interest-based Streams with Java SDK Cannot reproduce interest-based Streams example with Java SDK Jun 12, 2024
@scottf
Copy link
Collaborator

scottf commented Jun 15, 2024

@thomasdarimont As far as double ack, do you mean acking and waiting for the server to respond to ensure the ack was received? If that is the case you are looking for

void ackSync(Duration timeout) throws TimeoutException, InterruptedException;

@scottf scottf self-assigned this Jun 15, 2024
@scottf
Copy link
Collaborator

scottf commented Jun 15, 2024

@thomasdarimont What do you expect the behavior of double ack to be? ackSync behavior waits for the server to respond for the length of the timeout. ackSync blocks until it gets a response or it times out. A response from the server indicates it has processed the ack.

for (var msg : fetched) {
  // double ack feature missing here!
  // msg.ackSync(Duration.ofSeconds(1));
  // msg.ack();
}

@scottf
Copy link
Collaborator

scottf commented Jun 28, 2024

@thomasdarimont I'm planning on closing this issue unless I hear back from you.

@thomasdarimont
Copy link
Author

thomasdarimont commented Jun 28, 2024

Hello @scottf sorry for the late response.

I'm trying to replicate the go/rust examples as close as possible for a talk at our local Java User Group. Since not all NATs use-cases have Java examples, I tried to add a Java example for every NATs use-case.
I was a bit puzzled, because I couldn't get the application to produce the same output sequence with the Java SDK as the go / rust examples did.

I expected that there is an explicit equivalent for Message#DoubleAck as used here:
https://github.com/ConnectEverything/nats-by-example/blob/main/examples/jetstream/interest-stream/go/main.go#L140

Even with msg.ackSync(Duration.ofSeconds(1)); I couldn't replicate the example output, but perhaps that.

If msg.ackSync(...) is the best way to model msg.DoubleAck in the other SDKs I think it would be helpful to highlight this in the documentation somewhere.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
defect Suspected defect such as a bug or regression
Projects
None yet
Development

No branches or pull requests

2 participants