Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[question] Unnamed stream workaround #23

Open
6562680 opened this issue Sep 13, 2022 · 3 comments
Open

[question] Unnamed stream workaround #23

6562680 opened this issue Sep 13, 2022 · 3 comments
Labels
enhancement New feature or request

Comments

@6562680
Copy link

6562680 commented Sep 13, 2022

import "github.com/nats-io/nats.go"

// Connect to NATS
nc, _ := nats.Connect(nats.DefaultURL)

// Create JetStream Context
js, _ := nc.JetStream(nats.PublishAsyncMaxPending(256))

// Simple Stream Publisher
js.Publish("ORDERS.scratch", []byte("hello"))

// Simple Async Stream Publisher
for i := 0; i < 500; i++ {
	js.PublishAsync("ORDERS.scratch", []byte("hello"))
}
select {
case <-js.PublishAsyncComplete():
case <-time.After(5 * time.Second):
	fmt.Println("Did not resolve in time")
}

Am trying to do same thing in PHP. PHP api allows ->getStream() by its name, my lead sad that nats can automatically resolve stream name by subject. Could you please provide me with some solution in PHP?

Guess, in defaultURL could be stream name and lead guy just dont mention that?

@nekufa
Copy link
Member

nekufa commented Sep 14, 2022

Hello!

Can you find an example of getting stream name by subject in javascript?
I've check stream api and did not find any lookup api
https://docs.nats.io/reference/reference-protocols/nats_api_reference#streams

@nekufa nekufa added the enhancement New feature or request label Sep 14, 2022
@6562680
Copy link
Author

6562680 commented Sep 14, 2022

idea is "configure broker to automatically resolve stream name by subject" thats why its "jetstream" not just "stream"

it should allow to configure streams/consumers/filters/modes OUTSIDE application code (in difference with redis where you MUST define the key to save like BUCKET in nats does). There is normal pub/sub behavior to use with known stream/queue names, jet-stream should automate the process.

as you see in GoLang code - there is possibility to create "unnamed stream" and still push to that stream, you dont need to create stream directly in application. actually unnamed stream is not a "stream", its just an engine that allows publishing messaging. GoLang code in that library is not enough clean to understand, PHP is more "clean" language for that stuff, but if we explore Nats Docs we see that stream in most cases could be configured even at "account" level - and dot separators does half of the job in that.

This case of composer basis-company/nats library CURRENTLY requires to use direct stream or direct bucket and forces programmer to create it if not exists.

am not sure nats returns errcode 1 if stream not exists...

For now, i could not fully understand the flow, but if we see some videos on youtube about nats - the concept is "SUBJECT", not "STREAM". And even then we dont need to cut any string in application, nats have to understand by part of subject what streams have to copy incoming message to their consumers.

You send "commands.booking.doSome.123" and nats could by some way undestrand that this message should be delivered to certain stream, and then consumers using subjectfilter could filter incoming messages...

The reason i've just made this issue - my teamlead correct me that "it is not the redis".
I think it should work like that:

  1. go config/bash and create stream with allowed subject wildcards. set mode (limit/queue/etc.) and storage (memory/etc.)
  2. go config/bash and create consumers, set subject filter for consumers.
  3. send message from application with subject/payload.
  4. nats read the subject, check all streams by subjects and mark some streams to "using" for that message
  5. nats copy incoming message (or maybe reference to, internally) to all marked in (4) streams
  6. stream consumers is noticed about their streams receive messages
  7. consumers filter the message and store it until being processed (depends on selected mode)
  8. once we subscribe to consumer - consumer does shift()/copy() stored message to our callback

In that case "Stream" object is not a concrete stream but "JetStreamEngine" allows to publish any subject to nats. And the application will not hardcode allowed stream and subject names anywhere. You just call something like:

JetStream::publish($subject, $payload)
JetStream::subscribe($consumerName, $function)

Personally for me its little bit unexpectable behavior, but it does.

@6562680
Copy link
Author

6562680 commented Sep 27, 2022

I had just done it unexpectedly easely:

// server configuring...
$streamName = 'stream1';
$stream = $nats->getApi()->getStream($streamName);
$stream->getConfiguration()
  ->setRetentionPolicy(RetentionPolicy::WORK_QUEUE)
  ->setStorage(Storage::MEMORY)
  ->setSubjects([ 'commands.*' ]);
$stream->create();

$consumerName = 'stream1-commands';
$consumer = $stream->getConsumer($consumerName);
$consumer->getConfiguration()
  ->setSubjectFilter([ 'commands.*' ]);
$consumer->create();
// client publishing...
$subject = 'commands.service.doSome';
$payload = '{}';
$jetStreamEngine = new Stream($nats, ''); // unnamed stream
$jetStreamEngine->publish($subject, $payload);
// worker subscribing...
$nats->getApi()
  ->getStream($streamName)
  ->getConsumer($consumerName)
  ->handle(function ($message) { /** doSome **/ });

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

No branches or pull requests

2 participants