Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Best practice regarding subscriptions #26

Open
mtryfoss opened this issue Mar 13, 2020 · 44 comments
Open

Best practice regarding subscriptions #26

mtryfoss opened this issue Mar 13, 2020 · 44 comments

Comments

@mtryfoss
Copy link
Contributor

Hello, and thanks for a really nice lib!

I got a question regarding subscription of channel events. It seems like a normal channelhandle.Subscribe() adds a subscription to "ari.event.." against NATS.

On a heavily used Asterisk node that will cause a lot of duplicate MSG's from NATS which then is discarded by the client (not belonging to me..).

Is it possible to make ari-proxy publish channel-messages on a subject for that channel only?

@Ulexus
Copy link
Member

Ulexus commented Mar 13, 2020

It is true that there will be duplicate messages, but that is not nearly as heavy as you might think at first glance.

First, we never copy the data for the duplicates; they are always just references.

Next, we only decode each message once and unless otherwise required, we only decode it enough to determine the type of Event.

Between these two things, it is actually very little overhead to have a large number of duplicates.

@Ulexus
Copy link
Member

Ulexus commented Mar 13, 2020

Please do let me know if you find a bottleneck here, but we have run some quite large installations without any constraints of this type.

@Ulexus Ulexus closed this as completed Mar 13, 2020
@mtryfoss
Copy link
Contributor Author

The current setup is about 700 concurrent calls (many long-lasting), and there's some indication of exponential growth of CPU usage compared to calls. 6 processes running the app, and that is using about 80% of one CPU core. However, there might be other parts of the code causing the load.

However, I see you're stating "we only decode it enough to determine the type of Event". I have one subscription to "ari.Events.All" and filtering what I need in a switch. That might be bad.

I'll have to dig more into it. Just wanted a quick feedback :)

@Ulexus Ulexus reopened this Mar 14, 2020
@Ulexus
Copy link
Member

Ulexus commented Mar 14, 2020

I definitely would avoid using ari.Events.All. Asterisk produces a lot of events, many of them closely clustered. Subscribe() takes multiple events as an option, and I'd use them. Also, because subscriptions on handles are scoped to those handles, I can often get away with not decoding the complete events at all: just use the channel as a trigger.

Another common issue is goroutine leaks. There are various strategies to reduce the likelihood of these, but the first thing, really, is to figure out where your load is coming from.

Are you familiar with pprof? That's a great tool for profiling your app and seeing where it is spending most of its time.

https://golang.org/pkg/net/http/pprof/
https://blog.golang.org/profiling-go-programs

@Ulexus
Copy link
Member

Ulexus commented Mar 14, 2020

I should also mention that we do support a concept called "dialogs." The naming comes from a previous intention of retaining compatibility with nvisibleinc's ari proxy. In our case, a dialog is really a tag which is applied to certain resources which causes the server to send events for those resources to a different NATS subject. This will have the result of reducing client traffic load on heavily-loaded machines. To use them, just set the Dialog parameter (arbitrary string) on any resource key before sending that key to the proxy server. The server will automatically register it and tag events related to it. The client will automatically switch its subscription over to the NATS subject for that dialog.

I have never actually had to use this in production, but the feature is there, and it is basically what you asked at the beginning. I'd still profile your app before diving into dialogs (which are much less tested than anything else in the package).

@mtryfoss
Copy link
Contributor Author

mtryfoss commented Mar 15, 2020

I'm pretty sure goroutines are under control. Did a lot of QA on this subject during development.

Ran pprof now, but while the system is relatively idle.

Top 10:
      flat  flat%   sum%        cum   cum%
     910ms 11.43% 11.43%      960ms 12.06%  encoding/json.stateInString
     650ms  8.17% 19.60%     1530ms 19.22%  encoding/json.checkValid
     530ms  6.66% 26.26%     1020ms 12.81%  encoding/json.(*decodeState).scanWhile
     330ms  4.15% 30.40%      450ms  5.65%  runtime.scanobject
     270ms  3.39% 33.79%      610ms  7.66%  encoding/json.(*decodeState).skip
     250ms  3.14% 36.93%     4460ms 56.03%  encoding/json.(*decodeState).object
     240ms  3.02% 39.95%      240ms  3.02%  encoding/json.unquoteBytes
     200ms  2.51% 42.46%      260ms  3.27%  encoding/json.stateEndValue
     170ms  2.14% 44.60%      910ms 11.43%  runtime.mallocgc
     160ms  2.01% 46.61%      160ms  2.01%  runtime.futex

So, it's seems like narrowing the subscriptions is probably the first step.

I will do some more testing and try an improved version in production during the coming week.

I'm using SetVariable() several places during each call mainly to manipulate SIP headers. I guess this will wait for a ChannelVarset event before return? It seems like this is one of the causes for the massive event generation.

@Ulexus
Copy link
Member

Ulexus commented Mar 15, 2020

It being mostly decoding load, I would definitely trim the subscription scopes, yes.
Right, ChannelVarset is the event you want.

I'm curious, though, are you actually needing something in a different goroutine or program to wait for something else to set the variable, or are you using the ChannelVarset as a confirmation of success? channel.SetVar() does provide (functional) failure indication (which is, admittedly, not universal amongst Asterisk's ARI routines).

@mtryfoss
Copy link
Contributor Author

I guess I was a bit unclear. I do not need a specific confirmation in another goroutine.

I just wondered about the SetVariable()-function.
Is the ChannelVarset-event used to determine success or not?
If not, I cannot currently see any case where I actually need to get at notification every time a variable is changed.

I already use a heavily modified version of Asterisk, so those events could also be removed to possible reduce some load if changing subscriptions do not work.

@Ulexus
Copy link
Member

Ulexus commented Mar 15, 2020

No, you do not need to watch for the event to confirm success. Asterisk provides confirmation to the underlying REST call, which means we set the error if it fails.

The event is useful when you have one thing seeing a variable and another unrelated thing looking for a change.

@mtryfoss
Copy link
Contributor Author

Ah, thank you!

The confirmation from the REST call drowned in all the other data.
I then assume this is such a feedback from proxy to NATS/client:

PUB _INBOX.ZAbjZi4QmaQzzIkwCcLsWF.QAh7Wfs3 12.
{"error":""}.

@mtryfoss
Copy link
Contributor Author

..added a simple "return" in the beginning of ast_channel_publish_varset(). Reduced a lot of traffic in my case.

Will first try running an adjusted subscription during higher load tomorrow and see how that works. Else we could probably also add some sort of of filter in ari-proxy to achieve the same effect as modifying Asterisk.

@Ulexus
Copy link
Member

Ulexus commented Mar 15, 2020

That's it, yes (assuming that it is the response to a SetVar request, anyway).

Are you using a different client or reading the NATS data directly? If you are using the Go client in this repo, you shouldn't be needing to read the NATS data; it's interpreted by the client and returned as a nil error.

@mtryfoss
Copy link
Contributor Author

I was just ngrep'ing to have a better understanding of how the communication between proxy and client works.

A normal call will set around 10 variables. Combined with all the native stuff Asterisk set during a call setup, that means a lot of events to be handled in total when call volumes go up.

I'm just thinking loud..
500 concurrent calls. Each means at least two call-legs. They have each one subscription. Total of 1000 subscriptions. Between NATS and the Go-client that would be something like 10-15000 events being published from NATS to Go for every new call. Combine that with me decoding all of them :)

@Ulexus
Copy link
Member

Ulexus commented Mar 15, 2020

Decoding them is the likely the problem. The rest should really be fine. NATS is quite efficient. We have systems using this with dozens of nodes running thousands of calls, also setting lots of variables.

@Ulexus
Copy link
Member

Ulexus commented Mar 15, 2020

One of these days, I'm going to convert the encoding over to a more efficient one, like protobuffers (at least optionally). In most cases, nowadays, I'm sending a lot of ARI stuff over gRPC anyway... after being adapted from the JSON from ari-proxy, that is. That's why you'll find protobuf definitions for the most critical components (such as resource keys) in the main ari package.

@mtryfoss
Copy link
Contributor Author

Yes, I'll see how the change behaves tomorrow. Doesn't seem to make any difference now, but that is probably because of very few calls.

Btw, there's currently no way to subscribe to a set of events for multiple channels in one single subscription?

Thanks for all the help so far!

@Ulexus
Copy link
Member

Ulexus commented Mar 15, 2020

Sort of. You have only a single resource key per subscription, but that resource key can be anything. So you could have a subscription to events on a particular node, a particular dialog, etc. Also, common uses, such as bridge subscriptions, include association with any number of channels (the channels who are joined to the bridge).

In general, though, it is one resource per subscription, yes.

@Ulexus
Copy link
Member

Ulexus commented Mar 15, 2020

Just for your reference, in case you didn't look into the structure of an ari.Key:

message Key {
	// Kind indicates the type of resource the Key points to.  e.g., "channel",
	// "bridge", etc.
	string kind = 1;

	// ID indicates the unique identifier of the resource
   string id = 2 [(gogoproto.customname) = "ID"];

	// Node indicates the unique identifier of the Asterisk node on which the
	// resource exists or will be created
   string node = 3;

	// Dialog indicates a named scope of the resource, for receiving events
   string dialog = 4;

	// App indiciates the ARI application that this key is bound to.
   string app = 5;
}

@Ulexus
Copy link
Member

Ulexus commented Mar 15, 2020

I really wouldn't go around trying to optimize subscription count unless you know for certain that that is the culprit... and if it is, I'd like to know about it, so that I can fix the issue structurally.

@mtryfoss
Copy link
Contributor Author

Initially I tried some dialog related stuff, but ended up with keeping it as simple as possible.

I will do some more debugging, and let you know if I think there might be some room for improvement.

@mtryfoss
Copy link
Contributor Author

Changing subscription type did not seem to help on CPU usage. Compared a patched and unpatched process during higher load.

Did some ngrep'ing and found that over some seconds around 300k events was received. Of them, around 200k ChannelVarset.

I'm investigation a bit more. Just wanted to let you know.

@Ulexus
Copy link
Member

Ulexus commented Mar 16, 2020

That seems like an absurdly high percentage of ChannelVarset events. That's for approximately what call-per-second rate?

@mtryfoss
Copy link
Contributor Author

mtryfoss commented Mar 16, 2020

13899 StasisStart (not unique) and 192975 ChannelVarset in the dump. 20 seconds capture.

A single event was received 67 times (distributed to every goroutine listening to "something" by NATS). I've found a couple of places where there are two subscriptions pr channel, but even if you divide the numbers by two it's relatively high.

Can't really see how this is not going to happen as long as the NATS subject is only containing the node-id, and no other identifier. I know it's probably difficult changing this because of backward compatibility.

@Ulexus
Copy link
Member

Ulexus commented Mar 16, 2020

I wonder... are you creating a new from-scratch client connection for each call? Generally, you would create a root client and then call client.New() from there to create subsequently-scoped derived clients. Also, you are using subscriptions derived from the channel handles, right?

@mtryfoss
Copy link
Contributor Author

Nope. Only one client.New() for the whole project.
All messages from NATS is delivered through the same connection confirmed by trace.

It was based on one or you examples, like this:
func channelHandler(h *ari.ChannelHandle, startEvent *ari.StasisStart) {
...
subs := h.Subscribe(ari.Events.StasisEnd)

However, has there been some major bugs here fixed last months? I might be using some older version.

@Ulexus
Copy link
Member

Ulexus commented Mar 16, 2020

I can't recall any bugs; mostly new features.

I don't see how what you are saying reconciles with what you are measuring. That is, I'm missing something obvious. It shouldn't matter how many subscriptions you have locally, you should not get duplicated messages over NATS visible over the network or to the NATS trace logs.

@Ulexus
Copy link
Member

Ulexus commented Mar 16, 2020

Except... I am apparently lying. We are creating new NATS subscriptions for each. That's going to be a problem for you, yes!

@Ulexus
Copy link
Member

Ulexus commented Mar 16, 2020

Okay, let me see if I can get this fixed.

@mtryfoss
Copy link
Contributor Author

I really appreciate your help :) And hope you'll also benefit from this.

Ulexus added a commit that referenced this issue Mar 18, 2020
Previously, we created a new NATS subscription for each ARI
subscription, which is inefficient for large numbers of similar
subscriptions.  (See Issue #26)

With this change, we now retain a single NATS subscription for any
number of ari subscriptions for which the same NATS subscription is
sufficient, regardless of where in the tree of ARI clients it may be.

To do this, we create a root-level (core) back-to-back event bus, with
the NATS subscription being bound to a stdbus ARI bus.  All ari-proxy
clients subtend a new SubBus, which forwards all requests on to the core
ARI bus, registering them along the way.  When the SubBus is closed, the
bound subscriptions will be closed, but the root-level bus itself will
stay in existence.

There is a remaining issue wherein long-running root-level ari-proxy
clients will accumulate NATS subscriptions and not have them terminate
even after all SubBuses are closed.  This will need to be fixed in a
later patch, but it should not affect most uses, for now.

Fixes #26
@Ulexus
Copy link
Member

Ulexus commented Mar 18, 2020

@mtryfoss When you get a chance, can you try building off of the issue-26 branch to see if this alleviates your problem?

@mtryfoss
Copy link
Contributor Author

Thank you! Will try tomorrow morning.

@mtryfoss
Copy link
Contributor Author

I experienced an application crash when doing the first Subscribe():

panic: runtime error: invalid memory address or nil pointer dereference
[signal SIGSEGV: segmentation violation code=0x1 addr=0x0 pc=0x83190c]
goroutine 33 [running]:
github.com/nats-io/nats%2ego.(*EncodedConn).subscribe(0x0, 0xc000084a50, 0x22, 0x0, 0x0, 0xb6c240, 0xc00034a220, 0xbe3880, 0x1, 0xc00034a220)
/root/go/src/github.com/nats-io/nats.go/enc.go:237 +0x1bc
github.com/nats-io/nats%2ego.(*EncodedConn).Subscribe(0x0, 0xc000084a50, 0x22, 0xb6c240, 0xc00034a220, 0x0, 0x11, 0xc0000bd4a0)
/root/go/src/github.com/nats-io/nats.go/enc.go:174 +0x62

nc pointer is nil in function newBusWrapper() in bus.go.

executing c.core.Start() before bus.New() in client.New() seem to fix the issue for me, but I don't know if that is problematic.

I suspect this might be related to using "WithURI" instead of "WithNATS" for a new connection:
client.New(ctx, client.WithApplication(ariApp), client.WithURI(natsuri))

@Ulexus
Copy link
Member

Ulexus commented Mar 19, 2020

That's what I get for making a last-minute clean-up. Should be fixed.

@mtryfoss
Copy link
Contributor Author

Reverted my changed and pulled yours. Still crashing.

@Ulexus
Copy link
Member

Ulexus commented Mar 19, 2020

Oh, sorry. Missed the core bus, too. I don't know how I managed to reverse those.

@mtryfoss
Copy link
Contributor Author

No problem :) Seems to work now.

ngrep show that there are two subscriptions receiving events on the same subject, but it seems to be constant regardless of concurrent calls.

Will do some more extended testing and get back to you.

@mtryfoss
Copy link
Contributor Author

It's been running in a low-volume node since Friday. No issues so far. Will wait some more days before testing it with a higher volume.

@mtryfoss
Copy link
Contributor Author

Hi again!

Seems like there'a new memory leak introduced:
flat flat% sum% cum cum%
261.41MB 88.10% 88.10% 261.41MB 88.10% github.com/CyCoreSystems/ari/stdbus.newSubscription (inline)

Total: 296.72MB
ROUTINE ======================== github.com/CyCoreSystems/ari/stdbus.newSubscription in /root/go/src/github.com/CyCoreSystems/ari/stdbus/bus.go
261.41MB 261.41MB (flat, cum) 88.10% of Total
. . 124: }
. . 125:}
. . 126:
. . 127:// Events returns the events channel
. . 128:func (s *subscription) Events() <-chan ari.Event {
261.41MB 261.41MB 129: return s.C
. . 130:}
. . 131:
. . 132:// Cancel cancels the subscription and removes it from
. . 133:// the event bus.
. . 134:func (s *subscription) Cancel() {

@Ulexus
Copy link
Member

Ulexus commented Mar 25, 2020

I'm not surprised. I knew it was going to be a bit leaky in the current implementation. I mainly wanted to see if that fixed your scale-out problem. I'll see about getting the leaks (the ones I know about, anyway) fixed.

@mtryfoss
Copy link
Contributor Author

CPU seems to be fine now. Updated version uses only 5-10% compared to unpatched. And that is after I temp removed some unnused events from Asterisk.

@mtryfoss
Copy link
Contributor Author

Hi again!

I completely forgot about this case. Did you get time to have a look at the leaks, or at least give me some hint on what to check?

@Ulexus
Copy link
Member

Ulexus commented Jun 10, 2020

I'm afraid I did, as well. The cleanup issue (commented in the code) remains, and I don't have a solution to it right away. Probably some sort of subbus tracking, but I don't know right now.

@mtryfoss
Copy link
Contributor Author

mtryfoss commented Jun 30, 2020

I've been looking a bit at this one myself.

It seems like you're using stdbus from the ari package for subscriptions.
Because of that, the Cancel() in the stdbus/ari package is used and there is not way to remove the entry appended from this line (107 in bus.go):
b.subs = append(b.subs, sub)

For me it seems like the only way would be to extract all what is needed from stdbus and implement this code locally with a specific Cancel() function to do the necessary cleanup?

edit:
Is it safe to append to "b.subs" without locking?

@Ulexus
Copy link
Member

Ulexus commented Jun 30, 2020

Yeah, that's one of the leaks I was referring to above. The current PR is definitely more a proof of concept than a good solution, which is why it hasn't been merged.

It should be safe to append to b.subs without locking because:

  • there is no preallocation of the slice
  • the slice is not (as you point out) otherwise modified in such a way as to prevent allocations on append

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants