Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[refactoring proposal] consumer api #65

Open
withinboredom opened this issue Feb 24, 2024 · 9 comments
Open

[refactoring proposal] consumer api #65

withinboredom opened this issue Feb 24, 2024 · 9 comments

Comments

@withinboredom
Copy link

withinboredom commented Feb 24, 2024

When working on #62, I discovered there are some issues with implementing the consumer model as-is. Further, some "newer" features of NATs is not easily available (NAKs, ordered consumers, delayed messages, etc) so it would be nice to have them.

I took a deep look at the Go implementation to get an idea of what it might look like in PHP. Go tends to be written in such a way that things feel synchronous even though they aren't. So it has the best chance of being a compatible model for PHP (for both traditional PHP and async PHP via fibers).

Taking inspiration from that, there are a few low-level types that this depends on:

// send to NATs describing how we want to consume from the stream
readonly class PullRequest { public function __construct(
  public int? $expiresSeconds,
  public int? $batchCount,
  public int? $maxBytes,
  public bool $noWait,
  public int? $idleHeartbeat
) {} }

readonly class Metadata { public function __construct(
  public int $consumerSequence,
  public int $streamSequence,
  public int $numDelivered,
  public int $numPending,
  public DateTimeImmutable $timestamp,
  public string $streamName,
  public string $consumerName,
  public string $domainName,
) {} }

interface ConsumerMessage {
  public function getMetadata(): Metadata;
  public function getRawData(): string;
  public function getHeaders(): array;
  public function getSubject(): string;
  public function getReplySubject(): string;

  // acknowledge the message
  public function ack(): void;

  // ack the message and wait for ack reply from the server. Useful for scenarios where the message loss
  // is unacceptable, despite the performance impact.
  public function doubleAck(): void;

  // tell the server to redeliver the message
  public function nak(): void;

  // tell the server to redeliver the message after a delay
  public function nakWithDelay(float $millisecondsDelay): void;

  // tell the server the message is still being worked on. This resets the server redelivery timeout.
  public function inProgress(): void;

  // tell the server to never redeliver the message
  public function term(): void;

  // tell the server why the message shouldn't be delivered which will be emitted as a server advisory.
  public function termWithReason(string $reason): void;
}

interface MessagesContext extends \Iterator {
  // gets the next message from the stream via foreach or manually. Blocks until there is a message
  public function next(): ConsumerMessage;

  // unsubscribe from the stream and immediately stops iteration. Messages may still be in the 
  // inbox and will be discarded.
  public function stop(): void;
  
  // unsubscribe from the stream but any messages still in the buffers/inbox will continue to be 
  // consumed until they are gone.
  public function drain(): void;
}

interface ConsumerContext {
  // stops consumer and any pending messages will be discarded
  public function stop(): void;

  // stops the consumer but continues to consume pending messages
  public function drain(): void;
}

interface MessageBatch {
  // @return Generator<ConsumerMessage>
  public function getMessages(): \Generator;
}

Here's the interface inspired by the Go consumer:

interface Consumer {
  // use to receive up to a $batchCount of messages from a stream or $maxWait seconds pass,
  // whichever is sooner. Note that $idleHeartbeat is 5s by default (for $maxWaits longer than 10s, or
  // disabled for shorter waits) and if the client hasn't received a heartbeat in 2x $idleHeartbeat, then
  // an exception should be thrown. This method is non-blocking, but returns a Messagebatch that can 
  // be iterated on.
  public function fetch(int $batchCount, int $maxWait = 30, int $idleHeartbeat = 5): MessageBatch;

  // exactly the same as fetch(), but counts by bytes instead of the number of messages.
  public function fetchBytes(int $maxBytes, int $maxWait = 30, int $idleHeartbeat = 5): MessageBatch;

  // exactly like fetch(), but if there are no messages available in the stream, then the generator 
  // will return immediately, regardless of the number of messages requested.
  public function fetchNoWait(int $batchCount): MessageBatch;

  // continuously consumes from a stream using the provided consumer function. The callback can 
  // accept up to two arguments: fn(ConsumerMsg $message, ConsumerContext $context)
  public function consume(\Closure $consumer): ConsumerContext;

  // Allow continuously iterating over a stream.
  public function Messages(): MessagesContext;

  // receive the next message from the stream. Note that this is a blocking call.
  public function next(int $maxWait = 30, int $idleHeartbeat = 5): ConsumerMessage;

  // get the current consumer configuration from the stream
  public function info(): Configuration;
}

What are your thoughts?

@nekufa
Copy link
Member

nekufa commented Mar 4, 2024

Hi @withinboredom i feel that current api is a bit outdated and we need to reinvent it.
I'm not sure that client dependency is good - we need to pass client to any message so it can use connection to perform calls. Looks like active record pattern, and maybe it's okay, your solution looks good for me!

@withinboredom
Copy link
Author

we need to pass client to any message so it can use connection to perform calls. Looks like active record pattern, and maybe it's okay

Yes, I think that is a good thing though. Where in the code the message is being processed might be quite far from a client (such as deep in a job worker) and keeping (n)ack'ing near where an error or success is actually determined should result in more maintainable code, vs. passing a message (or id) back up the call stack to be handled may be far more complex.

@nekufa
Copy link
Member

nekufa commented Mar 4, 2024

Is it better to rethink whole api like channel publishing and subscription?
I mean something like that

$client->getChannel('tester')->publish('hello world')

@ro0NL
Copy link
Contributor

ro0NL commented Mar 11, 2024

currenly im quite happy with the client, but i had some issues discovering features like ACK/NAK as well.

Also confusing is Stream::publish vs Client::publish + Stream::put vs Client::dispatch

we settled preferring the client usage directly

our flow is

SUB
NEXT
ACK/NACK
NEXT
ACK/NACK
...
UNSUB

So perhaps simply add Client::next/nextBatch/ack/nak and KISS :')

@nekufa
Copy link
Member

nekufa commented Mar 14, 2024

@ro0NL @withinboredom please, check queue implementation in latest release.
i'm happy with that, looks like good to me! also readme is updated (pub/sub and jetstream parts)

@ro0NL
Copy link
Contributor

ro0NL commented Mar 14, 2024

no BC breaks 👍

the new API doesnt work for our case, since we fetch/ack/nack in an abstraction layer.

So we need to subscribe once, while keep reusing that subscription for subsequent fetches

Also only payload+replyTo leaves the abstraction layer, so we cant use $msg->ack() later, which is an active-record issue generally :)

@nekufa
Copy link
Member

nekufa commented Mar 14, 2024

@ro0NL yep, the idea was to save api as much as possible)
subscribe method returns queue instance that you can persist in a transport and call fetch to get message one by one.
you can get queue instance for a consumer, there is an example in jetstream part of readme:

...
// consumer can be used via queue interface
$queue = $consumer->getQueue();
while ($message = $queue->next()) {
    if (rand(1, 10) % 2 == 0) {
        mail($message->payload, "See you later");
        $message->ack();
    } else {
        // not ack with 1 second timeout
        $message->nack(1);
    }
    // stop processing
    if (rand(1, 10) % 2 == 10) {
        // don't forget to unsubscribe
        $client->unsubscribe($queue);
        break;
    }
}

// use fetchAll method to batch process messages
// let's set batch size to 50
$queue = $goodbyer->setBatching(50)->create()->getQueue();

// fetching 100 messages provides 2 stream requests
// limit message fetching to 1 second
// it means no more that 100 messages would be fetched
$messages = $queue->setTimeout(1)->fetchAll(100);

$recipients = [];
foreach ($messages as $message) {
    $recipients[] = (string) $message->payload;
}

mail_to_all($recipients, "See you later");

// ack all messages
foreach ($messages as $message) {
    $message->ack();
}

@ro0NL
Copy link
Contributor

ro0NL commented Mar 14, 2024

it's fair API for standard usage, i agree 👍

edit: actually i can reuse Queue:

$this->queue ??= $this->getConsumer()->getQueue();
$data = $this->queue->next();

@ro0NL
Copy link
Contributor

ro0NL commented Mar 15, 2024

@nekufa etrias-nl/php-toolkit@eeb68a9 👌

on a side note, perhaps tag 1.0.0 soonish 👼

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants