Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ feature ] pull() method to receive some messages without binding handler #24

Open
6562680 opened this issue Sep 18, 2022 · 4 comments

Comments

@6562680
Copy link

6562680 commented Sep 18, 2022

Internally library has some brood code where it's working with nats socket.

There is something like "bind handler to each message"... this callbacks could kill memory limit of any web server...

Usually in any applications worker does ITS OWN while (true) with configurable limits and timing.
Method (Stream)->handle() does that, but there's no possibility to "ask for some messages" without binding handler. Even if handler is one line with

$consumer->handle(function ($message) use (&$messages) { $messages[] = $message; });

its still a handler that needs memory to store the \Closure object.

I mean that IMPOSSIBLE to wrap while (true) into own while (true) thats why you need to options like "setBatching()" and "setDelay", its not NATS stuff (but supported of course for lazy people and to debug/demonstration) - its business and server configuration.

Actually any listening for better performance uses internally Observer pattern and works like

$eventBus = new EventBus();
$eventBus->on($subject1, $handler11);
$eventBus->on($subject1, $handler12);
$eventBus->on($subject2, $handler21);
$natsEventStore = new NatsEventStore($natsClient, $stream, $consumer);
$natsEventStore->listen($eventBus);

Instead of

$consumer->handle(function ($message) use ($eventBus) { $eventBus->fire($message->subject); });

Exactly because handle method with other settings could provide COLLECTION of messages and needs foreach then.
Also eventBus can parralel call handlers inside.

====

NATS is a fastest thing. Using many memory for processing - kills NATS bonuses.

I found this issue where trying to parralel connections with pcntl_fork(), garbage collector starts to delete so many callbacks from memory then i get error message "no handler for message", in case we CANNOT get message without a handler in this library.

@oxidmod
Copy link
Contributor

oxidmod commented Jan 27, 2023

How about code like this? There is no need to create new closure every time

class TestConsumer {
    private Closure $handler;
    public function __construct(private array $messageHandlers) {
        $this->handler = (function (Payload $payload) {
            foreach ($this->messageHandlers as $handler) {
                if ($handler->supports($payload->subject)) {
                    $handler->handle($payload->subject, $payload->body);
                }
            }
        })(...);
    }
    
    public function consume(): void
    {
        $consumer = ...; // setup consumer
        $consumer->handle($this->handler /* , $this->emptyHandler */); // you can store empty handler as a property too
    }
}

@franck-grenier
Copy link

Hello,
could it be the reason why my default Laravel worker queue:work cannot get NATS messages ?

The client, the stream and the consumer are correctly connected but the handle method is never triggered though messages are pushed in the stream.

@kuudr
Copy link

kuudr commented Nov 21, 2023

Hello, could it be the reason why my default Laravel worker queue:work cannot get NATS messages ?

The client, the stream and the consumer are correctly connected but the handle method is never triggered though messages are pushed in the stream.

i have same isuue
did you find a solution to fix that ?

@franck-grenier
Copy link

Hello @kuudr ,
my Nats queue worker works fine with the client handle method. I had an issue not related to this topic's initial comment.

Here is my queue::pop method:

/**
* Pop the next job off of the queue.
*
* @param  string|null  $queue
* @return \Illuminate\Contracts\Queue\Job|null
*/
public function pop($queue = null)
{
    $stream = $this->getRootStream($this->getQueueName($queue));
    $consumer = $stream
        ->getConsumer(config('queue.connections.nats.consumer'))
        ->setExpires(0)
        ->setIterations(1)
        ->setBatching(1);

    $job = null;

    $consumer->handle(
        function ($message) use (&$job, $queue) {
            // With NATS, non-filtered consumers may receive undesired messages from the stream,
            // so we check the message conformity before returning a job instance to avoid errors
            if (! $this->checkMessageIsKnownJob($message)) {
                return;
            }

            $job = new NatsJob(
                $this->container,
                $message,
                $this->connectionName,
                $this->getQueueName($queue),
            );
        }
    );

    return $job;
}

My issue was a misconfiguration of my consumer.

I fixed it by creating a "pull" consumer with "Interest" retention policy on my stream.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants