Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

How to: durable queue subscriber #95

Open
lorenzoisidori21 opened this issue Feb 22, 2019 · 3 comments
Open

How to: durable queue subscriber #95

lorenzoisidori21 opened this issue Feb 22, 2019 · 3 comments
Labels

Comments

@lorenzoisidori21
Copy link

lorenzoisidori21 commented Feb 22, 2019

I'm trying to use a durable queue subscriber. I just download the example and modified that.
In the StanSubscriber, I set the "DurableName" in the "StanSubscriptionOptions" and passed the "qGroup" parameter in the "Subcribe" method.
It doesn't work.

After many attempts, I removed the using that dispose the "IStanSubscription" and the durable feature worked.

I wonder to know if this is the way or not.
And if it is the way: if I never dispose a "IStanSubscription", will I get troubles?

@ColinSullivan1
Copy link
Member

I'm not aware of any specific issues - can you provide some test code that reproduces this?

If you don't dispose, you should close the subscriber when you are finished with it to recover resources and let the streaming server clean up.

@lorenzoisidori21
Copy link
Author

Thank you @ColinSullivan1 for the answer. I use this nats streaming server.
The code is from the example in this project, just few changes.
Here is my Producer Code:

`using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Text;
using STAN.Client;
using System.Threading;

namespace STAN.Example.Publish
{
class StanPublisher
{
static readonly string usageText =
@"Usage: stan-pub
-url NATS Streaming server URL(s)
-cluster NATS Streaming cluster name
-clientid NATS Streaming client ID
-subject subject to publish on, defaults to foo.
-message Text to send in the messages.
-async Asynchronous publish mode
-verbose verbose mode (affects performance).
";
Dictionary<string, string> parsedArgs = new Dictionary<string, string>();

    int count = 1;
    string url = StanConsts.DefaultNatsURL;
    string subject = "foo";
    string clientID = "cs-publisher";
    string clusterID = "test-cluster";
    byte[] payload = Encoding.UTF8.GetBytes("hello");
    bool verbose = false;
    bool async = true;

    StanOptions cOpts = StanOptions.GetDefaultOptions();

    public void Run(string[] args)
    {
        Stopwatch sw = null;
        long acksProcessed = 0;

        parseArgs(args);
        banner();

        cOpts.NatsURL = url;
        using (var c = new StanConnectionFactory().CreateConnection(clusterID, clientID, cOpts))
        {
            sw = Stopwatch.StartNew();

            if (async)
            {
                AutoResetEvent ev = new AutoResetEvent(false);

                for (int i = 0; i < count; i++)
                {
                    string guid = c.Publish(subject, payload, (obj, pubArgs) =>
                    {
                        if (verbose)
                        {
                            Console.WriteLine("Recieved ack for message {0}", pubArgs.GUID);
                        }
                        if (!string.IsNullOrEmpty(pubArgs.Error))
                        {
                            Console.WriteLine("Error processing message {0}", pubArgs.GUID);
                        }

                        if (Interlocked.Increment(ref acksProcessed) == count)
                            ev.Set();
                    });

                    if (verbose)
                        Console.WriteLine("Published message with guid: {0}", guid);
                }

                ev.WaitOne();

            }
            else
            {
                for (int i = 0; i < count; i++)
                {
                    c.Publish(subject, payload);
                    if (verbose)
                        Console.WriteLine("Published message.");
                }
            }

            sw.Stop();

            Console.Write("Published {0} msgs with acknowldegements in {1} seconds ", count, sw.Elapsed.TotalSeconds);
            Console.WriteLine("({0} msgs/second).",
                (int)(count / sw.Elapsed.TotalSeconds));
        }
    }

    private void usage()
    {
        Console.Error.WriteLine(usageText);
        Environment.Exit(-1);
    }

    private void parseArgs(string[] args)
    {
        if (args == null)
            return;

        for (int i = 0; i < args.Length; i++)
        {
            if (args[i].Equals("-verbose"))
            {
                parsedArgs.Add(args[i], "true");
            }
            else
            {
                if (i + 1 == args.Length)
                    usage();

                parsedArgs.Add(args[i], args[i + 1]);
                i++;
            }
        }

        if (parsedArgs.ContainsKey("-cluster"))
            clusterID = parsedArgs["-cluster"];

        if (parsedArgs.ContainsKey("-clientid"))
            clientID = parsedArgs["-clientid"];

        if (parsedArgs.ContainsKey("-count"))
            count = Convert.ToInt32(parsedArgs["-count"]);

        if (parsedArgs.ContainsKey("-url"))
            url = parsedArgs["-url"];

        if (parsedArgs.ContainsKey("-subject"))
            subject = parsedArgs["-subject"];

        if (parsedArgs.ContainsKey("-message"))
            payload = Encoding.UTF8.GetBytes(parsedArgs["-message"]);

        if (parsedArgs.ContainsKey("-verbose"))
            verbose = true;
    }

    private void banner()
    {
        Console.WriteLine("Connecting to cluster '{0}' as client '{1}'.",
            clusterID, clientID);
        Console.WriteLine("Publishing {0} messages on subject {1}",
            count, subject);
        Console.WriteLine("  Url: {0}", url);
        Console.WriteLine("  Payload is {0} bytes.",
            payload != null ? payload.Length : 0);
    }

    public static void Main(string[] args)
    {
        try
        {
            new StanPublisher().Run(args);
        }
        catch (Exception ex)
        {
            Console.Error.WriteLine("Exception: " + ex.Message);
            if (ex.InnerException != null)
                Console.Error.WriteLine("Inner Exception: " + ex.InnerException.Message);
        }
    }
}

}`

Here is my Consumer code:

`using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Threading;
using STAN.Client;

namespace STAN.Example.Subscribe
{
class StanSubscriber
{
static readonly string usageText =
@"
Usage: stan-sub [options]
Options:
-url < url > NATS Streaming server URL(s)
-cluster < cluster name > NATS Streaming cluster name
-clientid < client ID > NATS Streaming client ID
-verbose Verbose mode (affects performance).
Subscription Options:
-count < num > # of msgs to receieve
-qgroup < name > Queue group
-seq < seqno > Start at seqno
-all Deliver all available messages
-last Deliver starting with last published message
-since < duration > Deliver messages in last interval(e.g. 1s, 1hr)
(for more information: see .NET TimeSpan.Parse documentation)
--durable < name > Durable subscriber name
--unsubscribe Unsubscribe the durable on exit";

    Dictionary<string, string> parsedArgs = new Dictionary<string, string>();

    int count = 1;
    string url = StanConsts.DefaultNatsURL;
    string subject = "foo";
    int received = 0;
    bool verbose = false;
    string clientID = Guid.NewGuid().ToString();
    string clusterID = "test-cluster";
    string qGroup = "group1";
    bool unsubscribe = false;

    StanSubscriptionOptions sOpts = StanSubscriptionOptions.GetDefaultOptions();
    StanOptions cOpts = StanOptions.GetDefaultOptions();

    public void Run(string[] args)
    {
        parseArgs(args);
        banner();

        var opts = StanOptions.GetDefaultOptions();
        opts.NatsURL = url;

        using (var c = new StanConnectionFactory().CreateConnection(clusterID, clientID, opts))
        {
            TimeSpan elapsed = receiveAsyncSubscriber(c);

            Console.Write("Received {0} msgs in {1} seconds ", received, elapsed.TotalSeconds);
            Console.WriteLine("({0} msgs/second).",
                (int)(received / elapsed.TotalSeconds));

        }
    }

    private TimeSpan receiveAsyncSubscriber(IStanConnection c)
    {
        Stopwatch sw = new Stopwatch();
        AutoResetEvent ev = new AutoResetEvent(false);
        sOpts.AckWait = int.MaxValue;
        EventHandler<StanMsgHandlerArgs> msgHandler = (sender, args) =>
        {

            if (received == 0)
                sw.Start();
            Console.WriteLine("Receiving: redelivered: {0}, sequence : {1} ", args.Message.Redelivered, args.Message.Sequence);
            received++;

            if (verbose)
            {
                Console.WriteLine("Received seq # {0}: {1}",
                    args.Message.Sequence,
                    System.Text.Encoding.UTF8.GetString(args.Message.Data));
            }

            if (received >= count)
            {
                sw.Stop();
                ev.Set();
            }
        };
        sOpts.DurableName = "my-durable";
        using ( var s = c.Subscribe(subject, qGroup, sOpts, msgHandler))
        {
            ev.WaitOne();
        }

        return sw.Elapsed;
    }

    private void usage()
    {
        Console.Error.WriteLine(usageText);
        Environment.Exit(-1);
    }

    private void parseArgs(string[] args)
    {
        if (args == null)
            return;

        for (int i = 0; i < args.Length; i++)
        {
            if (args[i].Equals("-verbose") ||
                args[i].Equals("-all") ||
                args[i].Equals("-last"))
            {
                parsedArgs.Add(args[i], "true");
            }
            else
            {
                if (i + 1 == args.Length)
                    usage();

                parsedArgs.Add(args[i], args[i + 1]);
                i++;
            }
        }

        if (parsedArgs.ContainsKey("-clientid"))
            clientID = parsedArgs["-clientid"];

        if (parsedArgs.ContainsKey("-cluster"))
            clusterID = parsedArgs["-cluster"];

        if (parsedArgs.ContainsKey("-count"))
            count = Convert.ToInt32(parsedArgs["-count"]);

        if (parsedArgs.ContainsKey("-url"))
            url = parsedArgs["-url"];

        if (parsedArgs.ContainsKey("-subject"))
            subject = parsedArgs["-subject"];

        if (parsedArgs.ContainsKey("-qgroup"))
            qGroup = parsedArgs["-qgroup"];

        if (parsedArgs.ContainsKey("-seq"))
        {
            sOpts.StartAt(Convert.ToUInt64(parsedArgs["-seq"]));
        }

        if (parsedArgs.ContainsKey("-all"))
        {
            Console.WriteLine("Requesting all messages.");
            sOpts.DeliverAllAvailable();
        }

        if (parsedArgs.ContainsKey("-last"))
        {
            Console.WriteLine("Requesting last message.");
            sOpts.StartWithLastReceived();
        }

        if (parsedArgs.ContainsKey("-since"))
        {
            TimeSpan ts = TimeSpan.Parse(parsedArgs["-since"]);
            Console.WriteLine("Request messages starting from {0} ago.", ts);
            sOpts.StartAt(ts);
        }

        if (parsedArgs.ContainsKey("-durable"))
        {
            sOpts.DurableName = parsedArgs["-durable"];
            Console.WriteLine("Request messages on durable subscription {0}.",
                sOpts.DurableName);
        }
        if (parsedArgs.ContainsKey("-unsubscribe"))
        {
            Console.WriteLine("Will unsubscribe before exit.");
            unsubscribe = Convert.ToBoolean(parsedArgs["-unsubscribe"]);
        }

        if (parsedArgs.ContainsKey("-verbose"))
            verbose = true;
    }

    private void banner()
    {
        Console.WriteLine("Connecting to cluster '{0}' as client '{1}'.",
            clusterID, clientID);
        Console.WriteLine("Receiving {0} messages on subject {1}",
            count, subject);
        Console.WriteLine("  Url: {0}", url);
    }

    public static void Main(string[] args)
    {
        try
        {
            new StanSubscriber().Run(args);
        }
        catch (Exception ex)
        {
            Console.Error.WriteLine("Exception: " + ex.Message);
            if (ex.InnerException != null)
                Console.Error.WriteLine("Inner Exception: " + ex.InnerException.Message);
        }
    }
}

}`

Here the steps to reproduce:

  1. Start nats streaming server
  2. Start Consumer
  3. Start Producer
  4. Both Consumer and Producer finish their work
  5. Start Producer
  6. Start Consumer => the Consumer doesn't receive the message

I noticed if I don't call the Consumer's dispose, it works.
If i call only the Close metho, it works.
I just wonder what is the best practice.

I hope to have been clear this time.
Thank you again for your time.

@paul42
Copy link

paul42 commented Sep 14, 2020

I think there might be some logic around the StanSubscriptionOptions.LeaveOpen property - I was trying to simulate a 'durable subscription' with a console app that would make a new connection and put in a using and loop so it would "disconnect" every few moments, but what I didn't realize is that my using() closure was calling dispose and because I didn't set leave open on that subscription, I was not properly simulating a disconnect, the server received an unsubscribe. you might want to verify your LeaveOpen options

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

4 participants